001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035 036import static java.util.logging.Level.FINER; 037import static java.util.logging.Level.SEVERE; 038import static java.util.logging.Level.WARNING; 039 040import com.google.common.annotations.Beta; 041import com.google.common.annotations.VisibleForTesting; 042import com.google.common.base.Function; 043import com.google.common.collect.FluentIterable; 044import com.google.common.collect.ImmutableList; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 046import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 047import com.google.common.util.concurrent.Futures.FutureCombiner; 048 049 050 051import java.io.Closeable; 052import java.util.IdentityHashMap; 053import java.util.Map; 054import java.util.concurrent.Callable; 055import java.util.concurrent.CancellationException; 056import java.util.concurrent.CountDownLatch; 057import java.util.concurrent.ExecutionException; 058import java.util.concurrent.Executor; 059import java.util.concurrent.Future; 060import java.util.concurrent.RejectedExecutionException; 061import java.util.concurrent.atomic.AtomicReference; 062import java.util.logging.Logger; 063 064 065/** 066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 067 * complete, some objects captured during the computation are closed. 068 * 069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 071 * cancellation of the operation so far. The only way to extract the value or exception from a step 072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 073 * "value" of a successful step or the "result" (value or exception) of any step. 074 * 075 * <ol> 076 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 077 * block or a {@link ListenableFuture}. 078 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 079 * can be captured for later closing. 080 * <li>There is one last step (the root of the tree), from which you can extract the final result 081 * of the computation. After that result is available (or the computation fails), all objects 082 * captured by any of the steps in the pipeline are closed. 083 * </ol> 084 * 085 * <h3>Starting a pipeline</h3> 086 * 087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 090 * #from(ListenableFuture)} instead. 091 * 092 * <h3>Derived steps</h3> 093 * 094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 095 * ways similar to {@link FluentFuture}s: 096 * 097 * <ul> 098 * <li>by transforming the value from a successful input step, 099 * <li>by catching the exception from a failed input step, or 100 * <li>by combining the results of several input steps. 101 * </ul> 102 * 103 * Each derivation can capture the next value or any intermediate objects for later closing. 104 * 105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 106 * exception, or combine it with others, you cannot do anything else with it, including declare it 107 * to be the last step of the pipeline. 108 * 109 * <h4>Transforming</h4> 110 * 111 * To derive the next step by asynchronously applying a function to an input step's value, call 112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 113 * Executor)} on the input step. 114 * 115 * <h4>Catching</h4> 116 * 117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 119 * 120 * <h4>Combining</h4> 121 * 122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 124 * 125 * <h3>Cancelling</h3> 126 * 127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 129 * successfully cancelled step will immediately start closing all objects captured for later closing 130 * by it and by its input steps. 131 * 132 * <h3>Ending a pipeline</h3> 133 * 134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 135 * close the captured objects automatically or manually. 136 * 137 * <h4>Automatically closing</h4> 138 * 139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 140 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin 141 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future 142 * completes before closing starts, rather than once it has finished. 143 * 144 * <pre>{@code 145 * FluentFuture<UserName> userName = 146 * ClosingFuture.submit( 147 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 148 * executor) 149 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 150 * .transform((closer, result) -> result.get("userName"), directExecutor()) 151 * .catching(DBException.class, e -> "no user", directExecutor()) 152 * .finishToFuture(); 153 * }</pre> 154 * 155 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 156 * result cursor will both be closed, even if the operation is cancelled or fails. 157 * 158 * <h4>Manually closing</h4> 159 * 160 * If you want to close the captured objects manually, after you've used the final result, call 161 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 162 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 163 * 164 * <pre>{@code 165 * ClosingFuture.submit( 166 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 167 * executor) 168 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 169 * .transform((closer, result) -> result.get("userName"), directExecutor()) 170 * .catching(DBException.class, e -> "no user", directExecutor()) 171 * .finishToValueAndCloser( 172 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 173 * 174 * // later 175 * try { // get() will throw if the operation failed or was cancelled. 176 * UserName userName = userNameValueAndCloser.get(); 177 * // do something with userName 178 * } finally { 179 * userNameValueAndCloser.closeAsync(); 180 * } 181 * }</pre> 182 * 183 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 184 * the query result cursor will both be closed, even if the operation is cancelled or fails. 185 * 186 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 187 * automatic-closing approach described above is safer. 188 * 189 * @param <V> the type of the value of this step 190 * @since 30.0 191 */ 192// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 193@Beta // @Beta for one release. 194 195// TODO(dpb): GWT compatibility. 196public final class ClosingFuture<V> { 197 198 private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName()); 199 200 /** 201 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 202 * done. 203 */ 204 public static final class DeferredCloser { 205 private final CloseableList list; 206 207 DeferredCloser(CloseableList list) { 208 this.list = list; 209 } 210 211 /** 212 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 213 * 214 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 215 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 216 * Closeable}. (For more about the flavors, see <a 217 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 218 * build</a>.) 219 * 220 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 221 * building for Android): Ensure that any object you pass implements the interface not just in 222 * your current SDK version but also at the oldest version you support. For example, <a 223 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 224 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 225 * {@code Closeable} with a method reference like {@code cursor::close}. 226 * 227 * <p>Note that this method is still binary-compatible between flavors because the erasure of 228 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 229 * 230 * @param closeable the object to be closed (see notes above) 231 * @param closingExecutor the object will be closed on this executor 232 * @return the first argument 233 */ 234 235 public <C extends Object & AutoCloseable> C eventuallyClose( 236 C closeable, Executor closingExecutor) { 237 checkNotNull(closingExecutor); 238 if (closeable != null) { 239 list.add(closeable, closingExecutor); 240 } 241 return closeable; 242 } 243 } 244 245 /** 246 * An operation that computes a result. 247 * 248 * @param <V> the type of the result 249 */ 250 @FunctionalInterface 251 public interface ClosingCallable<V extends Object> { 252 /** 253 * Computes a result, or throws an exception if unable to do so. 254 * 255 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 256 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 257 * not before this method completes), even if this method throws or the pipeline is cancelled. 258 */ 259 V call(DeferredCloser closer) throws Exception; 260 } 261 262 /** 263 * An operation that computes a {@link ClosingFuture} of a result. 264 * 265 * @param <V> the type of the result 266 * @since 30.1 267 */ 268 @FunctionalInterface 269 public interface AsyncClosingCallable<V extends Object> { 270 /** 271 * Computes a result, or throws an exception if unable to do so. 272 * 273 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 274 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 275 * not before this method completes), even if this method throws or the pipeline is cancelled. 276 */ 277 ClosingFuture<V> call(DeferredCloser closer) throws Exception; 278 } 279 280 /** 281 * A function from an input to a result. 282 * 283 * @param <T> the type of the input to the function 284 * @param <U> the type of the result of the function 285 */ 286 @FunctionalInterface 287 public interface ClosingFunction<T extends Object, U extends Object> { 288 289 /** 290 * Applies this function to an input, or throws an exception if unable to do so. 291 * 292 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 293 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 294 * not before this method completes), even if this method throws or the pipeline is cancelled. 295 */ 296 U apply(DeferredCloser closer, T input) throws Exception; 297 } 298 299 /** 300 * A function from an input to a {@link ClosingFuture} of a result. 301 * 302 * @param <T> the type of the input to the function 303 * @param <U> the type of the result of the function 304 */ 305 @FunctionalInterface 306 public interface AsyncClosingFunction<T extends Object, U extends Object> { 307 /** 308 * Applies this function to an input, or throws an exception if unable to do so. 309 * 310 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 311 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 312 * not before this method completes), even if this method throws or the pipeline is cancelled. 313 */ 314 ClosingFuture<U> apply(DeferredCloser closer, T input) throws Exception; 315 } 316 317 /** 318 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 319 * allows the user to close all the closeable objects that were captured during it for later 320 * closing. 321 * 322 * <p>The asynchronous operation will have completed before this object is created. 323 * 324 * @param <V> the type of the value of a successful operation 325 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 326 */ 327 public static final class ValueAndCloser<V> { 328 329 private final ClosingFuture<? extends V> closingFuture; 330 331 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 332 this.closingFuture = checkNotNull(closingFuture); 333 } 334 335 /** 336 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 337 * {@link Future#get()} would. 338 * 339 * <p>Because the asynchronous operation has already completed, this method is synchronous and 340 * returns immediately. 341 * 342 * @throws CancellationException if the computation was cancelled 343 * @throws ExecutionException if the computation threw an exception 344 */ 345 346 public V get() throws ExecutionException { 347 return getDone(closingFuture.future); 348 } 349 350 /** 351 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 352 * operation on the {@link Executor}s specified by calls to {@link 353 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 354 * 355 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 356 * closed synchronously. 357 * 358 * <p>Idempotent: objects will be closed at most once. 359 */ 360 public void closeAsync() { 361 closingFuture.close(); 362 } 363 } 364 365 /** 366 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 367 * ClosingFuture} pipeline. 368 * 369 * @param <V> the type of the final value of a successful pipeline 370 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 371 */ 372 @FunctionalInterface 373 public interface ValueAndCloserConsumer<V> { 374 375 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 376 void accept(ValueAndCloser<V> valueAndCloser); 377 } 378 379 /** 380 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 381 * 382 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 383 * execution 384 */ 385 public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) { 386 return new ClosingFuture<>(callable, executor); 387 } 388 389 /** 390 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 391 * 392 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 393 * execution 394 * @since 30.1 395 */ 396 public static <V> ClosingFuture<V> submitAsync( 397 AsyncClosingCallable<V> callable, Executor executor) { 398 return new ClosingFuture<>(callable, executor); 399 } 400 401 /** 402 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 403 * 404 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 405 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 406 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 407 */ 408 public static <V> ClosingFuture<V> from(ListenableFuture<V> future) { 409 return new ClosingFuture<V>(future); 410 } 411 412 /** 413 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 414 * 415 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when 416 * the pipeline is done, even if the pipeline is canceled or fails. 417 * 418 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 419 * value in order to close it. 420 * 421 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 422 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable, 423 * Executor)}. 424 * @param closingExecutor the future's result will be closed on this executor 425 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 426 * underlying value may never be closed if the {@link Future} is canceled after its operation 427 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 428 * including those that pass them to this method, with {@link #submit(ClosingCallable, 429 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 430 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 431 * ClosingFuture#from}. 432 */ 433 @Deprecated 434 public static <C extends Object & AutoCloseable> 435 ClosingFuture<C> eventuallyClosing( 436 ListenableFuture<C> future, final Executor closingExecutor) { 437 checkNotNull(closingExecutor); 438 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 439 Futures.addCallback( 440 future, 441 new FutureCallback<AutoCloseable>() { 442 @Override 443 public void onSuccess(AutoCloseable result) { 444 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 445 } 446 447 @Override 448 public void onFailure(Throwable t) {} 449 }, 450 directExecutor()); 451 return closingFuture; 452 } 453 454 /** 455 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 456 * 457 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 458 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 459 */ 460 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 461 return new Combiner(false, futures); 462 } 463 464 /** 465 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 466 * 467 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 468 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 469 */ 470 public static Combiner whenAllComplete( 471 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 472 return whenAllComplete(asList(future1, moreFutures)); 473 } 474 475 /** 476 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 477 * all succeed. If any fail, the resulting pipeline will fail. 478 * 479 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 480 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 481 */ 482 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 483 return new Combiner(true, futures); 484 } 485 486 /** 487 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 488 * they all succeed. If any fail, the resulting pipeline will fail. 489 * 490 * <p>Calling this method allows you to use lambdas or method references typed with the types of 491 * the input {@link ClosingFuture}s. 492 * 493 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 494 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 495 */ 496 public static <V1, V2> Combiner2<V1, V2> whenAllSucceed( 497 ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 498 return new Combiner2<>(future1, future2); 499 } 500 501 /** 502 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 503 * they all succeed. If any fail, the resulting pipeline will fail. 504 * 505 * <p>Calling this method allows you to use lambdas or method references typed with the types of 506 * the input {@link ClosingFuture}s. 507 * 508 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 509 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 510 */ 511 public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed( 512 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 513 return new Combiner3<>(future1, future2, future3); 514 } 515 516 /** 517 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 518 * they all succeed. If any fail, the resulting pipeline will fail. 519 * 520 * <p>Calling this method allows you to use lambdas or method references typed with the types of 521 * the input {@link ClosingFuture}s. 522 * 523 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 524 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 525 */ 526 public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed( 527 ClosingFuture<V1> future1, 528 ClosingFuture<V2> future2, 529 ClosingFuture<V3> future3, 530 ClosingFuture<V4> future4) { 531 return new Combiner4<>(future1, future2, future3, future4); 532 } 533 534 /** 535 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 536 * they all succeed. If any fail, the resulting pipeline will fail. 537 * 538 * <p>Calling this method allows you to use lambdas or method references typed with the types of 539 * the input {@link ClosingFuture}s. 540 * 541 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 542 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 543 */ 544 public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 545 ClosingFuture<V1> future1, 546 ClosingFuture<V2> future2, 547 ClosingFuture<V3> future3, 548 ClosingFuture<V4> future4, 549 ClosingFuture<V5> future5) { 550 return new Combiner5<>(future1, future2, future3, future4, future5); 551 } 552 553 /** 554 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 555 * all succeed. If any fail, the resulting pipeline will fail. 556 * 557 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 558 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 559 */ 560 public static Combiner whenAllSucceed( 561 ClosingFuture<?> future1, 562 ClosingFuture<?> future2, 563 ClosingFuture<?> future3, 564 ClosingFuture<?> future4, 565 ClosingFuture<?> future5, 566 ClosingFuture<?> future6, 567 ClosingFuture<?>... moreFutures) { 568 return whenAllSucceed( 569 FluentIterable.of(future1, future2, future3, future4, future5, future6) 570 .append(moreFutures)); 571 } 572 573 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 574 private final CloseableList closeables = new CloseableList(); 575 private final FluentFuture<V> future; 576 577 private ClosingFuture(ListenableFuture<V> future) { 578 this.future = FluentFuture.from(future); 579 } 580 581 private ClosingFuture(final ClosingCallable<V> callable, Executor executor) { 582 checkNotNull(callable); 583 TrustedListenableFutureTask<V> task = 584 TrustedListenableFutureTask.create( 585 new Callable<V>() { 586 @Override 587 public V call() throws Exception { 588 return callable.call(closeables.closer); 589 } 590 591 @Override 592 public String toString() { 593 return callable.toString(); 594 } 595 }); 596 executor.execute(task); 597 this.future = task; 598 } 599 600 private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) { 601 checkNotNull(callable); 602 TrustedListenableFutureTask<V> task = 603 TrustedListenableFutureTask.create( 604 new AsyncCallable<V>() { 605 @Override 606 public ListenableFuture<V> call() throws Exception { 607 CloseableList newCloseables = new CloseableList(); 608 try { 609 ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 610 closingFuture.becomeSubsumedInto(closeables); 611 return closingFuture.future; 612 } finally { 613 closeables.add(newCloseables, directExecutor()); 614 } 615 } 616 617 @Override 618 public String toString() { 619 return callable.toString(); 620 } 621 }); 622 executor.execute(task); 623 this.future = task; 624 } 625 626 /** 627 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 628 * future returns {@code null} if the step is successful or throws the same exception that would 629 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 630 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 631 * 632 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 633 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 634 * a derivation method <i>on the same instance</i>. This is important because calling {@code 635 * statusFuture} alone does not provide a way to close the pipeline. 636 */ 637 public ListenableFuture<?> statusFuture() { 638 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 639 } 640 641 /** 642 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 643 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 644 * when the pipeline is done. 645 * 646 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 647 * ClosingFuture} will be equivalent to this one. 648 * 649 * <p>If the function throws an exception, that exception is used as the result of the derived 650 * {@code ClosingFuture}. 651 * 652 * <p>Example usage: 653 * 654 * <pre>{@code 655 * ClosingFuture<List<Row>> rowsFuture = 656 * queryFuture.transform((closer, result) -> result.getRows(), executor); 657 * }</pre> 658 * 659 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 660 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 661 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 662 * 663 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 664 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 665 * this {@code ClosingFuture}. 666 * 667 * @param function transforms the value of this step to the value of the derived step 668 * @param executor executor to run the function in 669 * @return the derived step 670 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 671 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 672 * finished} 673 */ 674 public <U> ClosingFuture<U> transform( 675 final ClosingFunction<? super V, U> function, Executor executor) { 676 checkNotNull(function); 677 AsyncFunction<V, U> applyFunction = 678 new AsyncFunction<V, U>() { 679 @Override 680 public ListenableFuture<U> apply(V input) throws Exception { 681 return closeables.applyClosingFunction(function, input); 682 } 683 684 @Override 685 public String toString() { 686 return function.toString(); 687 } 688 }; 689 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 690 return derive(future.transformAsync(applyFunction, executor)); 691 } 692 693 /** 694 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 695 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 696 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 697 * captured by the returned {@link ClosingFuture}). 698 * 699 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 700 * returned by the function. 701 * 702 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 703 * ClosingFuture} will be equivalent to this one. 704 * 705 * <p>If the function throws an exception, that exception is used as the result of the derived 706 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 707 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 708 * closed. 709 * 710 * <p>Usage guidelines for this method: 711 * 712 * <ul> 713 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 714 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 715 * Executor)} instead, with a function that returns the next value directly. 716 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 717 * for every closeable object this step creates in order to capture it for later closing. 718 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 719 * ClosingFuture} call {@link #from(ListenableFuture)}. 720 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 721 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 722 * {@link #withoutCloser(AsyncFunction)} 723 * </ul> 724 * 725 * <p>Example usage: 726 * 727 * <pre>{@code 728 * // Result.getRowsClosingFuture() returns a ClosingFuture. 729 * ClosingFuture<List<Row>> rowsFuture = 730 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 731 * 732 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 733 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 734 * // Closeable). 735 * ClosingFuture<Integer> rowsFuture2 = 736 * queryFuture.transformAsync( 737 * (closer, result) -> { 738 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 739 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 740 * }, 741 * executor); 742 * 743 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 744 * ClosingFuture<List<Row>> rowsFuture3 = 745 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 746 * 747 * }</pre> 748 * 749 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 750 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 751 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 752 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 753 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 754 * responsible for completing the returned {@code ClosingFuture}.) 755 * 756 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 757 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 758 * this {@code ClosingFuture}. 759 * 760 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 761 * the derived step 762 * @param executor executor to run the function in 763 * @return the derived step 764 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 765 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 766 * finished} 767 */ 768 public <U> ClosingFuture<U> transformAsync( 769 final AsyncClosingFunction<? super V, U> function, Executor executor) { 770 checkNotNull(function); 771 AsyncFunction<V, U> applyFunction = 772 new AsyncFunction<V, U>() { 773 @Override 774 public ListenableFuture<U> apply(V input) throws Exception { 775 return closeables.applyAsyncClosingFunction(function, input); 776 } 777 778 @Override 779 public String toString() { 780 return function.toString(); 781 } 782 }; 783 return derive(future.transformAsync(applyFunction, executor)); 784 } 785 786 /** 787 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 788 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 789 * {@link ListenableFuture}. 790 * 791 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 792 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 793 * meets these conditions: 794 * 795 * <ul> 796 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 797 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 798 * <li>It returns a {@link ListenableFuture}. 799 * </ul> 800 * 801 * <p>Example usage: 802 * 803 * <pre>{@code 804 * // Result.getRowsFuture() returns a ListenableFuture. 805 * ClosingFuture<List<Row>> rowsFuture = 806 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 807 * }</pre> 808 * 809 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 810 * ListenableFuture} with the value of a derived step 811 */ 812 public static <V, U> AsyncClosingFunction<V, U> withoutCloser( 813 final AsyncFunction<V, U> function) { 814 checkNotNull(function); 815 return new AsyncClosingFunction<V, U>() { 816 @Override 817 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 818 return ClosingFuture.from(function.apply(input)); 819 } 820 }; 821 } 822 823 /** 824 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 825 * to its exception if it is an instance of a given exception type. The function can use a {@link 826 * DeferredCloser} to capture objects to be closed when the pipeline is done. 827 * 828 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 829 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 830 * one. 831 * 832 * <p>If the function throws an exception, that exception is used as the result of the derived 833 * {@code ClosingFuture}. 834 * 835 * <p>Example usage: 836 * 837 * <pre>{@code 838 * ClosingFuture<QueryResult> queryFuture = 839 * queryFuture.catching( 840 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 841 * }</pre> 842 * 843 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 844 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 845 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 846 * 847 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 848 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 849 * this {@code ClosingFuture}. 850 * 851 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 852 * type is matched against this step's exception. "This step's exception" means the cause of 853 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 854 * underlying this step or, if {@code get()} throws a different kind of exception, that 855 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 856 * prefer more specific types, avoiding {@code Throwable.class} in particular. 857 * @param fallback the function to be called if this step fails with the expected exception type. 858 * The function's argument is this step's exception. "This step's exception" means the cause 859 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 860 * underlying this step or, if {@code get()} throws a different kind of exception, that 861 * exception itself. 862 * @param executor the executor that runs {@code fallback} if the input fails 863 */ 864 public <X extends Throwable> ClosingFuture<V> catching( 865 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 866 return catchingMoreGeneric(exceptionType, fallback, executor); 867 } 868 869 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 870 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 871 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 872 checkNotNull(fallback); 873 AsyncFunction<X, W> applyFallback = 874 new AsyncFunction<X, W>() { 875 @Override 876 public ListenableFuture<W> apply(X exception) throws Exception { 877 return closeables.applyClosingFunction(fallback, exception); 878 } 879 880 @Override 881 public String toString() { 882 return fallback.toString(); 883 } 884 }; 885 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 886 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 887 } 888 889 /** 890 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 891 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 892 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 893 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 894 * 895 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 896 * ClosingFuture} will be equivalent to the one returned by the function. 897 * 898 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 899 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 900 * one. 901 * 902 * <p>If the function throws an exception, that exception is used as the result of the derived 903 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 904 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 905 * closed. 906 * 907 * <p>Usage guidelines for this method: 908 * 909 * <ul> 910 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 911 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 912 * ClosingFunction, Executor)} instead, with a function that returns the next value 913 * directly. 914 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 915 * for every closeable object this step creates in order to capture it for later closing. 916 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 917 * ClosingFuture} call {@link #from(ListenableFuture)}. 918 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 919 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 920 * {@link #withoutCloser(AsyncFunction)} 921 * </ul> 922 * 923 * <p>Example usage: 924 * 925 * <pre>{@code 926 * // Fall back to a secondary input stream in case of IOException. 927 * ClosingFuture<InputStream> inputFuture = 928 * firstInputFuture.catchingAsync( 929 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 930 * } 931 * }</pre> 932 * 933 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 934 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 935 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 936 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 937 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 938 * responsible for completing the returned {@code ClosingFuture}.) 939 * 940 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 941 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 942 * this {@code ClosingFuture}. 943 * 944 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 945 * type is matched against this step's exception. "This step's exception" means the cause of 946 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 947 * underlying this step or, if {@code get()} throws a different kind of exception, that 948 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 949 * prefer more specific types, avoiding {@code Throwable.class} in particular. 950 * @param fallback the function to be called if this step fails with the expected exception type. 951 * The function's argument is this step's exception. "This step's exception" means the cause 952 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 953 * underlying this step or, if {@code get()} throws a different kind of exception, that 954 * exception itself. 955 * @param executor the executor that runs {@code fallback} if the input fails 956 */ 957 // TODO(dpb): Should this do something special if the function throws CancellationException or 958 // ExecutionException? 959 public <X extends Throwable> ClosingFuture<V> catchingAsync( 960 Class<X> exceptionType, 961 AsyncClosingFunction<? super X, ? extends V> fallback, 962 Executor executor) { 963 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 964 } 965 966 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 967 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 968 Class<X> exceptionType, 969 final AsyncClosingFunction<? super X, W> fallback, 970 Executor executor) { 971 checkNotNull(fallback); 972 AsyncFunction<X, W> asyncFunction = 973 new AsyncFunction<X, W>() { 974 @Override 975 public ListenableFuture<W> apply(X exception) throws Exception { 976 return closeables.applyAsyncClosingFunction(fallback, exception); 977 } 978 979 @Override 980 public String toString() { 981 return fallback.toString(); 982 } 983 }; 984 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 985 } 986 987 /** 988 * Marks this step as the last step in the {@code ClosingFuture} pipeline. 989 * 990 * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when 991 * the pipeline is cancelled. 992 * 993 * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously 994 * <b>after</b> the returned {@code Future} is done: the future completes before closing starts, 995 * rather than once it has finished. 996 * 997 * <p>After calling this method, you may not call {@link 998 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 999 * derivation method on this {@code ClosingFuture}. 1000 * 1001 * @return a {@link Future} that represents the final value or exception of the pipeline 1002 */ 1003 public FluentFuture<V> finishToFuture() { 1004 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 1005 logger.log(FINER, "will close {0}", this); 1006 future.addListener( 1007 new Runnable() { 1008 @Override 1009 public void run() { 1010 checkAndUpdateState(WILL_CLOSE, CLOSING); 1011 close(); 1012 checkAndUpdateState(CLOSING, CLOSED); 1013 } 1014 }, 1015 directExecutor()); 1016 } else { 1017 switch (state.get()) { 1018 case SUBSUMED: 1019 throw new IllegalStateException( 1020 "Cannot call finishToFuture() after deriving another step"); 1021 1022 case WILL_CREATE_VALUE_AND_CLOSER: 1023 throw new IllegalStateException( 1024 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1025 1026 case WILL_CLOSE: 1027 case CLOSING: 1028 case CLOSED: 1029 throw new IllegalStateException("Cannot call finishToFuture() twice"); 1030 1031 case OPEN: 1032 throw new AssertionError(); 1033 } 1034 } 1035 return future; 1036 } 1037 1038 /** 1039 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1040 * {@code receiver} will be called with an object that contains the result of the operation. The 1041 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1042 * 1043 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1044 * any other derivation method on this {@code ClosingFuture}. 1045 * 1046 * @param consumer a callback whose method will be called (using {@code executor}) when this 1047 * operation is done 1048 */ 1049 public void finishToValueAndCloser( 1050 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1051 checkNotNull(consumer); 1052 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1053 switch (state.get()) { 1054 case SUBSUMED: 1055 throw new IllegalStateException( 1056 "Cannot call finishToValueAndCloser() after deriving another step"); 1057 1058 case WILL_CLOSE: 1059 case CLOSING: 1060 case CLOSED: 1061 throw new IllegalStateException( 1062 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1063 1064 case WILL_CREATE_VALUE_AND_CLOSER: 1065 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1066 1067 case OPEN: 1068 break; 1069 } 1070 throw new AssertionError(state); 1071 } 1072 future.addListener( 1073 new Runnable() { 1074 @Override 1075 public void run() { 1076 provideValueAndCloser(consumer, ClosingFuture.this); 1077 } 1078 }, 1079 executor); 1080 } 1081 1082 private static <C, V extends C> void provideValueAndCloser( 1083 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1084 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1085 } 1086 1087 /** 1088 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1089 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1090 * successful, and this step has not started when {@code cancel} is called, this step should never 1091 * run. 1092 * 1093 * <p>If successful, causes the objects captured by this step (if already started) and its input 1094 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1095 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1096 * 1097 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1098 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1099 * cancelled regardless 1100 * @return {@code false} if the step could not be cancelled, typically because it has already 1101 * completed normally; {@code true} otherwise 1102 */ 1103 1104 public boolean cancel(boolean mayInterruptIfRunning) { 1105 logger.log(FINER, "cancelling {0}", this); 1106 boolean cancelled = future.cancel(mayInterruptIfRunning); 1107 if (cancelled) { 1108 close(); 1109 } 1110 return cancelled; 1111 } 1112 1113 private void close() { 1114 logger.log(FINER, "closing {0}", this); 1115 closeables.close(); 1116 } 1117 1118 private <U> ClosingFuture<U> derive(FluentFuture<U> future) { 1119 ClosingFuture<U> derived = new ClosingFuture<>(future); 1120 becomeSubsumedInto(derived.closeables); 1121 return derived; 1122 } 1123 1124 private void becomeSubsumedInto(CloseableList otherCloseables) { 1125 checkAndUpdateState(OPEN, SUBSUMED); 1126 otherCloseables.add(closeables, directExecutor()); 1127 } 1128 1129 /** 1130 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1131 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1132 * 1133 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1134 */ 1135 public static final class Peeker { 1136 private final ImmutableList<ClosingFuture<?>> futures; 1137 private volatile boolean beingCalled; 1138 1139 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1140 this.futures = checkNotNull(futures); 1141 } 1142 1143 /** 1144 * Returns the value of {@code closingFuture}. 1145 * 1146 * @throws ExecutionException if {@code closingFuture} is a failed step 1147 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1148 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1149 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1150 * @throws IllegalStateException if called outside of a call to {@link 1151 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1152 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1153 */ 1154 public final <D extends Object> D getDone(ClosingFuture<D> closingFuture) 1155 throws ExecutionException { 1156 checkState(beingCalled); 1157 checkArgument(futures.contains(closingFuture)); 1158 return Futures.getDone(closingFuture.future); 1159 } 1160 1161 private <V extends Object> V call( 1162 CombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1163 beingCalled = true; 1164 CloseableList newCloseables = new CloseableList(); 1165 try { 1166 return combiner.call(newCloseables.closer, this); 1167 } finally { 1168 closeables.add(newCloseables, directExecutor()); 1169 beingCalled = false; 1170 } 1171 } 1172 1173 private <V extends Object> FluentFuture<V> callAsync( 1174 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1175 beingCalled = true; 1176 CloseableList newCloseables = new CloseableList(); 1177 try { 1178 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1179 closingFuture.becomeSubsumedInto(closeables); 1180 return closingFuture.future; 1181 } finally { 1182 closeables.add(newCloseables, directExecutor()); 1183 beingCalled = false; 1184 } 1185 } 1186 } 1187 1188 /** 1189 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1190 * 1191 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1192 * instantiate this class. 1193 * 1194 * <p>Example: 1195 * 1196 * <pre>{@code 1197 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1198 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1199 * ListenableFuture<Integer> numberOfDifferentLines = 1200 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1201 * .call( 1202 * (closer, peeker) -> { 1203 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1204 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1205 * return countDifferentLines(file1Reader, file2Reader); 1206 * }, 1207 * executor) 1208 * .closing(executor); 1209 * }</pre> 1210 */ 1211 // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1212 1213 public static class Combiner { 1214 1215 private final CloseableList closeables = new CloseableList(); 1216 1217 /** 1218 * An operation that returns a result and may throw an exception. 1219 * 1220 * @param <V> the type of the result 1221 */ 1222 @FunctionalInterface 1223 public interface CombiningCallable<V extends Object> { 1224 /** 1225 * Computes a result, or throws an exception if unable to do so. 1226 * 1227 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1228 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1229 * is done (but not before this method completes), even if this method throws or the pipeline 1230 * is cancelled. 1231 * 1232 * @param peeker used to get the value of any of the input futures 1233 */ 1234 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1235 } 1236 1237 /** 1238 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1239 * 1240 * @param <V> the type of the result 1241 */ 1242 @FunctionalInterface 1243 public interface AsyncCombiningCallable<V extends Object> { 1244 /** 1245 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1246 * 1247 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1248 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1249 * is done (but not before this method completes), even if this method throws or the pipeline 1250 * is cancelled. 1251 * 1252 * @param peeker used to get the value of any of the input futures 1253 */ 1254 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1255 } 1256 1257 private final boolean allMustSucceed; 1258 protected final ImmutableList<ClosingFuture<?>> inputs; 1259 1260 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1261 this.allMustSucceed = allMustSucceed; 1262 this.inputs = ImmutableList.copyOf(inputs); 1263 for (ClosingFuture<?> input : inputs) { 1264 input.becomeSubsumedInto(closeables); 1265 } 1266 } 1267 1268 /** 1269 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1270 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1271 * objects to be closed when the pipeline is done. 1272 * 1273 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1274 * fail, so will the returned step. 1275 * 1276 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1277 * cancelled. 1278 * 1279 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1280 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1281 */ 1282 public <V> ClosingFuture<V> call( 1283 final CombiningCallable<V> combiningCallable, Executor executor) { 1284 Callable<V> callable = 1285 new Callable<V>() { 1286 @Override 1287 public V call() throws Exception { 1288 return new Peeker(inputs).call(combiningCallable, closeables); 1289 } 1290 1291 @Override 1292 public String toString() { 1293 return combiningCallable.toString(); 1294 } 1295 }; 1296 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1297 derived.closeables.add(closeables, directExecutor()); 1298 return derived; 1299 } 1300 1301 /** 1302 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1303 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1304 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1305 * captured by the returned {@link ClosingFuture}). 1306 * 1307 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1308 * fail, so will the returned step. 1309 * 1310 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1311 * cancelled. 1312 * 1313 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1314 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1315 * 1316 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1317 * derived step. 1318 * 1319 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1320 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1321 * 1322 * <p>Usage guidelines for this method: 1323 * 1324 * <ul> 1325 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1326 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1327 * Executor)} instead, with a function that returns the next value directly. 1328 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1329 * closer.eventuallyClose()} for every closeable object this step creates in order to 1330 * capture it for later closing. 1331 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1332 * ClosingFuture} call {@link #from(ListenableFuture)}. 1333 * </ul> 1334 * 1335 * <p>The same warnings about doing heavyweight operations within {@link 1336 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1337 */ 1338 public <V> ClosingFuture<V> callAsync( 1339 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1340 AsyncCallable<V> asyncCallable = 1341 new AsyncCallable<V>() { 1342 @Override 1343 public ListenableFuture<V> call() throws Exception { 1344 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1345 } 1346 1347 @Override 1348 public String toString() { 1349 return combiningCallable.toString(); 1350 } 1351 }; 1352 ClosingFuture<V> derived = 1353 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1354 derived.closeables.add(closeables, directExecutor()); 1355 return derived; 1356 } 1357 1358 private FutureCombiner<Object> futureCombiner() { 1359 return allMustSucceed 1360 ? Futures.whenAllSucceed(inputFutures()) 1361 : Futures.whenAllComplete(inputFutures()); 1362 } 1363 1364 private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE = 1365 new Function<ClosingFuture<?>, FluentFuture<?>>() { 1366 @Override 1367 public FluentFuture<?> apply(ClosingFuture<?> future) { 1368 return future.future; 1369 } 1370 }; 1371 1372 private ImmutableList<FluentFuture<?>> inputFutures() { 1373 return FluentIterable.from(inputs).transform(INNER_FUTURE).toList(); 1374 } 1375 } 1376 1377 /** 1378 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1379 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1380 * combination. 1381 * 1382 * @param <V1> the type returned by the first future 1383 * @param <V2> the type returned by the second future 1384 */ 1385 public static final class Combiner2<V1 extends Object, V2 extends Object> 1386 extends Combiner { 1387 1388 /** 1389 * A function that returns a value when applied to the values of the two futures passed to 1390 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1391 * 1392 * @param <V1> the type returned by the first future 1393 * @param <V2> the type returned by the second future 1394 * @param <U> the type returned by the function 1395 */ 1396 @FunctionalInterface 1397 public interface ClosingFunction2< 1398 V1 extends Object, V2 extends Object, U extends Object> { 1399 1400 /** 1401 * Applies this function to two inputs, or throws an exception if unable to do so. 1402 * 1403 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1404 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1405 * is done (but not before this method completes), even if this method throws or the pipeline 1406 * is cancelled. 1407 */ 1408 U apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception; 1409 } 1410 1411 /** 1412 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1413 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1414 * 1415 * @param <V1> the type returned by the first future 1416 * @param <V2> the type returned by the second future 1417 * @param <U> the type returned by the function 1418 */ 1419 @FunctionalInterface 1420 public interface AsyncClosingFunction2< 1421 V1 extends Object, V2 extends Object, U extends Object> { 1422 1423 /** 1424 * Applies this function to two inputs, or throws an exception if unable to do so. 1425 * 1426 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1427 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1428 * is done (but not before this method completes), even if this method throws or the pipeline 1429 * is cancelled. 1430 */ 1431 ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception; 1432 } 1433 1434 private final ClosingFuture<V1> future1; 1435 private final ClosingFuture<V2> future2; 1436 1437 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1438 super(true, ImmutableList.of(future1, future2)); 1439 this.future1 = future1; 1440 this.future2 = future2; 1441 } 1442 1443 /** 1444 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1445 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1446 * objects to be closed when the pipeline is done. 1447 * 1448 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1449 * any of the inputs fail, so will the returned step. 1450 * 1451 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1452 * 1453 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1454 * ExecutionException} will be extracted and used as the failure of the derived step. 1455 */ 1456 public <U extends Object> ClosingFuture<U> call( 1457 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1458 return call( 1459 new CombiningCallable<U>() { 1460 @Override 1461 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1462 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1463 } 1464 1465 @Override 1466 public String toString() { 1467 return function.toString(); 1468 } 1469 }, 1470 executor); 1471 } 1472 1473 /** 1474 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1475 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1476 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1477 * captured by the returned {@link ClosingFuture}). 1478 * 1479 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1480 * any of the inputs fail, so will the returned step. 1481 * 1482 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1483 * 1484 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1485 * ExecutionException} will be extracted and used as the failure of the derived step. 1486 * 1487 * <p>If the function throws any other exception, it will be used as the failure of the derived 1488 * step. 1489 * 1490 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1491 * the closeable objects in that {@code ClosingFuture} will be closed. 1492 * 1493 * <p>Usage guidelines for this method: 1494 * 1495 * <ul> 1496 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1497 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1498 * Executor)} instead, with a function that returns the next value directly. 1499 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1500 * closer.eventuallyClose()} for every closeable object this step creates in order to 1501 * capture it for later closing. 1502 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1503 * ClosingFuture} call {@link #from(ListenableFuture)}. 1504 * </ul> 1505 * 1506 * <p>The same warnings about doing heavyweight operations within {@link 1507 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1508 */ 1509 public <U extends Object> ClosingFuture<U> callAsync( 1510 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1511 return callAsync( 1512 new AsyncCombiningCallable<U>() { 1513 @Override 1514 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1515 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1516 } 1517 1518 @Override 1519 public String toString() { 1520 return function.toString(); 1521 } 1522 }, 1523 executor); 1524 } 1525 } 1526 1527 /** 1528 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1529 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1530 * ClosingFuture)} to start this combination. 1531 * 1532 * @param <V1> the type returned by the first future 1533 * @param <V2> the type returned by the second future 1534 * @param <V3> the type returned by the third future 1535 */ 1536 public static final class Combiner3< 1537 V1 extends Object, V2 extends Object, V3 extends Object> 1538 extends Combiner { 1539 /** 1540 * A function that returns a value when applied to the values of the three futures passed to 1541 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1542 * 1543 * @param <V1> the type returned by the first future 1544 * @param <V2> the type returned by the second future 1545 * @param <V3> the type returned by the third future 1546 * @param <U> the type returned by the function 1547 */ 1548 @FunctionalInterface 1549 public interface ClosingFunction3< 1550 V1 extends Object, 1551 V2 extends Object, 1552 V3 extends Object, 1553 U extends Object> { 1554 /** 1555 * Applies this function to three inputs, or throws an exception if unable to do so. 1556 * 1557 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1558 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1559 * is done (but not before this method completes), even if this method throws or the pipeline 1560 * is cancelled. 1561 */ 1562 U apply(DeferredCloser closer, V1 value1, V2 value2, V3 v3) throws Exception; 1563 } 1564 1565 /** 1566 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1567 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1568 * 1569 * @param <V1> the type returned by the first future 1570 * @param <V2> the type returned by the second future 1571 * @param <V3> the type returned by the third future 1572 * @param <U> the type returned by the function 1573 */ 1574 @FunctionalInterface 1575 public interface AsyncClosingFunction3< 1576 V1 extends Object, 1577 V2 extends Object, 1578 V3 extends Object, 1579 U extends Object> { 1580 /** 1581 * Applies this function to three inputs, or throws an exception if unable to do so. 1582 * 1583 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1584 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1585 * is done (but not before this method completes), even if this method throws or the pipeline 1586 * is cancelled. 1587 */ 1588 ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3) 1589 throws Exception; 1590 } 1591 1592 private final ClosingFuture<V1> future1; 1593 private final ClosingFuture<V2> future2; 1594 private final ClosingFuture<V3> future3; 1595 1596 private Combiner3( 1597 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1598 super(true, ImmutableList.of(future1, future2, future3)); 1599 this.future1 = future1; 1600 this.future2 = future2; 1601 this.future3 = future3; 1602 } 1603 1604 /** 1605 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1606 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1607 * objects to be closed when the pipeline is done. 1608 * 1609 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1610 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1611 * 1612 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1613 * 1614 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1615 * ExecutionException} will be extracted and used as the failure of the derived step. 1616 */ 1617 public <U extends Object> ClosingFuture<U> call( 1618 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1619 return call( 1620 new CombiningCallable<U>() { 1621 @Override 1622 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1623 return function.apply( 1624 closer, 1625 peeker.getDone(future1), 1626 peeker.getDone(future2), 1627 peeker.getDone(future3)); 1628 } 1629 1630 @Override 1631 public String toString() { 1632 return function.toString(); 1633 } 1634 }, 1635 executor); 1636 } 1637 1638 /** 1639 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1640 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1641 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1642 * captured by the returned {@link ClosingFuture}). 1643 * 1644 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1645 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1646 * 1647 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1648 * 1649 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1650 * ExecutionException} will be extracted and used as the failure of the derived step. 1651 * 1652 * <p>If the function throws any other exception, it will be used as the failure of the derived 1653 * step. 1654 * 1655 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1656 * the closeable objects in that {@code ClosingFuture} will be closed. 1657 * 1658 * <p>Usage guidelines for this method: 1659 * 1660 * <ul> 1661 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1662 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1663 * Executor)} instead, with a function that returns the next value directly. 1664 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1665 * closer.eventuallyClose()} for every closeable object this step creates in order to 1666 * capture it for later closing. 1667 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1668 * ClosingFuture} call {@link #from(ListenableFuture)}. 1669 * </ul> 1670 * 1671 * <p>The same warnings about doing heavyweight operations within {@link 1672 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1673 */ 1674 public <U extends Object> ClosingFuture<U> callAsync( 1675 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1676 return callAsync( 1677 new AsyncCombiningCallable<U>() { 1678 @Override 1679 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1680 return function.apply( 1681 closer, 1682 peeker.getDone(future1), 1683 peeker.getDone(future2), 1684 peeker.getDone(future3)); 1685 } 1686 1687 @Override 1688 public String toString() { 1689 return function.toString(); 1690 } 1691 }, 1692 executor); 1693 } 1694 } 1695 1696 /** 1697 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1698 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1699 * ClosingFuture)} to start this combination. 1700 * 1701 * @param <V1> the type returned by the first future 1702 * @param <V2> the type returned by the second future 1703 * @param <V3> the type returned by the third future 1704 * @param <V4> the type returned by the fourth future 1705 */ 1706 public static final class Combiner4< 1707 V1 extends Object, 1708 V2 extends Object, 1709 V3 extends Object, 1710 V4 extends Object> 1711 extends Combiner { 1712 /** 1713 * A function that returns a value when applied to the values of the four futures passed to 1714 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1715 * 1716 * @param <V1> the type returned by the first future 1717 * @param <V2> the type returned by the second future 1718 * @param <V3> the type returned by the third future 1719 * @param <V4> the type returned by the fourth future 1720 * @param <U> the type returned by the function 1721 */ 1722 @FunctionalInterface 1723 public interface ClosingFunction4< 1724 V1 extends Object, 1725 V2 extends Object, 1726 V3 extends Object, 1727 V4 extends Object, 1728 U extends Object> { 1729 /** 1730 * Applies this function to four inputs, or throws an exception if unable to do so. 1731 * 1732 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1733 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1734 * is done (but not before this method completes), even if this method throws or the pipeline 1735 * is cancelled. 1736 */ 1737 U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4) throws Exception; 1738 } 1739 1740 /** 1741 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1742 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1743 * ClosingFuture)}. 1744 * 1745 * @param <V1> the type returned by the first future 1746 * @param <V2> the type returned by the second future 1747 * @param <V3> the type returned by the third future 1748 * @param <V4> the type returned by the fourth future 1749 * @param <U> the type returned by the function 1750 */ 1751 @FunctionalInterface 1752 public interface AsyncClosingFunction4< 1753 V1 extends Object, 1754 V2 extends Object, 1755 V3 extends Object, 1756 V4 extends Object, 1757 U extends Object> { 1758 /** 1759 * Applies this function to four inputs, or throws an exception if unable to do so. 1760 * 1761 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1762 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1763 * is done (but not before this method completes), even if this method throws or the pipeline 1764 * is cancelled. 1765 */ 1766 ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4) 1767 throws Exception; 1768 } 1769 1770 private final ClosingFuture<V1> future1; 1771 private final ClosingFuture<V2> future2; 1772 private final ClosingFuture<V3> future3; 1773 private final ClosingFuture<V4> future4; 1774 1775 private Combiner4( 1776 ClosingFuture<V1> future1, 1777 ClosingFuture<V2> future2, 1778 ClosingFuture<V3> future3, 1779 ClosingFuture<V4> future4) { 1780 super(true, ImmutableList.of(future1, future2, future3, future4)); 1781 this.future1 = future1; 1782 this.future2 = future2; 1783 this.future3 = future3; 1784 this.future4 = future4; 1785 } 1786 1787 /** 1788 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1789 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1790 * objects to be closed when the pipeline is done. 1791 * 1792 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1793 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1794 * 1795 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1796 * 1797 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1798 * ExecutionException} will be extracted and used as the failure of the derived step. 1799 */ 1800 public <U extends Object> ClosingFuture<U> call( 1801 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1802 return call( 1803 new CombiningCallable<U>() { 1804 @Override 1805 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1806 return function.apply( 1807 closer, 1808 peeker.getDone(future1), 1809 peeker.getDone(future2), 1810 peeker.getDone(future3), 1811 peeker.getDone(future4)); 1812 } 1813 1814 @Override 1815 public String toString() { 1816 return function.toString(); 1817 } 1818 }, 1819 executor); 1820 } 1821 1822 /** 1823 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1824 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1825 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1826 * captured by the returned {@link ClosingFuture}). 1827 * 1828 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1829 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1830 * 1831 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1832 * 1833 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1834 * ExecutionException} will be extracted and used as the failure of the derived step. 1835 * 1836 * <p>If the function throws any other exception, it will be used as the failure of the derived 1837 * step. 1838 * 1839 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1840 * the closeable objects in that {@code ClosingFuture} will be closed. 1841 * 1842 * <p>Usage guidelines for this method: 1843 * 1844 * <ul> 1845 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1846 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1847 * Executor)} instead, with a function that returns the next value directly. 1848 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1849 * closer.eventuallyClose()} for every closeable object this step creates in order to 1850 * capture it for later closing. 1851 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1852 * ClosingFuture} call {@link #from(ListenableFuture)}. 1853 * </ul> 1854 * 1855 * <p>The same warnings about doing heavyweight operations within {@link 1856 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1857 */ 1858 public <U extends Object> ClosingFuture<U> callAsync( 1859 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1860 return callAsync( 1861 new AsyncCombiningCallable<U>() { 1862 @Override 1863 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1864 return function.apply( 1865 closer, 1866 peeker.getDone(future1), 1867 peeker.getDone(future2), 1868 peeker.getDone(future3), 1869 peeker.getDone(future4)); 1870 } 1871 1872 @Override 1873 public String toString() { 1874 return function.toString(); 1875 } 1876 }, 1877 executor); 1878 } 1879 } 1880 1881 /** 1882 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1883 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1884 * ClosingFuture, ClosingFuture)} to start this combination. 1885 * 1886 * @param <V1> the type returned by the first future 1887 * @param <V2> the type returned by the second future 1888 * @param <V3> the type returned by the third future 1889 * @param <V4> the type returned by the fourth future 1890 * @param <V5> the type returned by the fifth future 1891 */ 1892 public static final class Combiner5< 1893 V1 extends Object, 1894 V2 extends Object, 1895 V3 extends Object, 1896 V4 extends Object, 1897 V5 extends Object> 1898 extends Combiner { 1899 /** 1900 * A function that returns a value when applied to the values of the five futures passed to 1901 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1902 * ClosingFuture)}. 1903 * 1904 * @param <V1> the type returned by the first future 1905 * @param <V2> the type returned by the second future 1906 * @param <V3> the type returned by the third future 1907 * @param <V4> the type returned by the fourth future 1908 * @param <V5> the type returned by the fifth future 1909 * @param <U> the type returned by the function 1910 */ 1911 @FunctionalInterface 1912 public interface ClosingFunction5< 1913 V1 extends Object, 1914 V2 extends Object, 1915 V3 extends Object, 1916 V4 extends Object, 1917 V5 extends Object, 1918 U extends Object> { 1919 /** 1920 * Applies this function to five inputs, or throws an exception if unable to do so. 1921 * 1922 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1923 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1924 * is done (but not before this method completes), even if this method throws or the pipeline 1925 * is cancelled. 1926 */ 1927 U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5) 1928 throws Exception; 1929 } 1930 1931 /** 1932 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1933 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1934 * ClosingFuture, ClosingFuture)}. 1935 * 1936 * @param <V1> the type returned by the first future 1937 * @param <V2> the type returned by the second future 1938 * @param <V3> the type returned by the third future 1939 * @param <V4> the type returned by the fourth future 1940 * @param <V5> the type returned by the fifth future 1941 * @param <U> the type returned by the function 1942 */ 1943 @FunctionalInterface 1944 public interface AsyncClosingFunction5< 1945 V1 extends Object, 1946 V2 extends Object, 1947 V3 extends Object, 1948 V4 extends Object, 1949 V5 extends Object, 1950 U extends Object> { 1951 /** 1952 * Applies this function to five inputs, or throws an exception if unable to do so. 1953 * 1954 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1955 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1956 * is done (but not before this method completes), even if this method throws or the pipeline 1957 * is cancelled. 1958 */ 1959 ClosingFuture<U> apply( 1960 DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5) 1961 throws Exception; 1962 } 1963 1964 private final ClosingFuture<V1> future1; 1965 private final ClosingFuture<V2> future2; 1966 private final ClosingFuture<V3> future3; 1967 private final ClosingFuture<V4> future4; 1968 private final ClosingFuture<V5> future5; 1969 1970 private Combiner5( 1971 ClosingFuture<V1> future1, 1972 ClosingFuture<V2> future2, 1973 ClosingFuture<V3> future3, 1974 ClosingFuture<V4> future4, 1975 ClosingFuture<V5> future5) { 1976 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 1977 this.future1 = future1; 1978 this.future2 = future2; 1979 this.future3 = future3; 1980 this.future4 = future4; 1981 this.future5 = future5; 1982 } 1983 1984 /** 1985 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1986 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1987 * objects to be closed when the pipeline is done. 1988 * 1989 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1990 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 1991 * returned step. 1992 * 1993 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1994 * 1995 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1996 * ExecutionException} will be extracted and used as the failure of the derived step. 1997 */ 1998 public <U extends Object> ClosingFuture<U> call( 1999 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2000 return call( 2001 new CombiningCallable<U>() { 2002 @Override 2003 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2004 return function.apply( 2005 closer, 2006 peeker.getDone(future1), 2007 peeker.getDone(future2), 2008 peeker.getDone(future3), 2009 peeker.getDone(future4), 2010 peeker.getDone(future5)); 2011 } 2012 2013 @Override 2014 public String toString() { 2015 return function.toString(); 2016 } 2017 }, 2018 executor); 2019 } 2020 2021 /** 2022 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2023 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2024 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2025 * captured by the returned {@link ClosingFuture}). 2026 * 2027 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2028 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2029 * returned step. 2030 * 2031 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2032 * 2033 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2034 * ExecutionException} will be extracted and used as the failure of the derived step. 2035 * 2036 * <p>If the function throws any other exception, it will be used as the failure of the derived 2037 * step. 2038 * 2039 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2040 * the closeable objects in that {@code ClosingFuture} will be closed. 2041 * 2042 * <p>Usage guidelines for this method: 2043 * 2044 * <ul> 2045 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2046 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2047 * Executor)} instead, with a function that returns the next value directly. 2048 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 2049 * closer.eventuallyClose()} for every closeable object this step creates in order to 2050 * capture it for later closing. 2051 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2052 * ClosingFuture} call {@link #from(ListenableFuture)}. 2053 * </ul> 2054 * 2055 * <p>The same warnings about doing heavyweight operations within {@link 2056 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2057 */ 2058 public <U extends Object> ClosingFuture<U> callAsync( 2059 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2060 return callAsync( 2061 new AsyncCombiningCallable<U>() { 2062 @Override 2063 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2064 return function.apply( 2065 closer, 2066 peeker.getDone(future1), 2067 peeker.getDone(future2), 2068 peeker.getDone(future3), 2069 peeker.getDone(future4), 2070 peeker.getDone(future5)); 2071 } 2072 2073 @Override 2074 public String toString() { 2075 return function.toString(); 2076 } 2077 }, 2078 executor); 2079 } 2080 } 2081 2082 @Override 2083 public String toString() { 2084 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2085 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2086 } 2087 2088 @Override 2089 protected void finalize() { 2090 if (state.get().equals(OPEN)) { 2091 logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2092 FluentFuture<V> unused = finishToFuture(); 2093 } 2094 } 2095 2096 private static void closeQuietly(final AutoCloseable closeable, Executor executor) { 2097 if (closeable == null) { 2098 return; 2099 } 2100 try { 2101 executor.execute( 2102 new Runnable() { 2103 @Override 2104 public void run() { 2105 try { 2106 closeable.close(); 2107 } catch (Exception e) { 2108 logger.log(WARNING, "thrown by close()", e); 2109 } 2110 } 2111 }); 2112 } catch (RejectedExecutionException e) { 2113 if (logger.isLoggable(WARNING)) { 2114 logger.log( 2115 WARNING, String.format("while submitting close to %s; will close inline", executor), e); 2116 } 2117 closeQuietly(closeable, directExecutor()); 2118 } 2119 } 2120 2121 private void checkAndUpdateState(State oldState, State newState) { 2122 checkState( 2123 compareAndUpdateState(oldState, newState), 2124 "Expected state to be %s, but it was %s", 2125 oldState, 2126 newState); 2127 } 2128 2129 private boolean compareAndUpdateState(State oldState, State newState) { 2130 return state.compareAndSet(oldState, newState); 2131 } 2132 2133 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2134 private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor> 2135 implements Closeable { 2136 private final DeferredCloser closer = new DeferredCloser(this); 2137 private volatile boolean closed; 2138 private volatile CountDownLatch whenClosed; 2139 2140 <V, U> ListenableFuture<U> applyClosingFunction( 2141 ClosingFunction<? super V, U> transformation, V input) throws Exception { 2142 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2143 CloseableList newCloseables = new CloseableList(); 2144 try { 2145 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2146 } finally { 2147 add(newCloseables, directExecutor()); 2148 } 2149 } 2150 2151 <V, U> FluentFuture<U> applyAsyncClosingFunction( 2152 AsyncClosingFunction<V, U> transformation, V input) throws Exception { 2153 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2154 CloseableList newCloseables = new CloseableList(); 2155 try { 2156 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2157 closingFuture.becomeSubsumedInto(newCloseables); 2158 return closingFuture.future; 2159 } finally { 2160 add(newCloseables, directExecutor()); 2161 } 2162 } 2163 2164 @Override 2165 public void close() { 2166 if (closed) { 2167 return; 2168 } 2169 synchronized (this) { 2170 if (closed) { 2171 return; 2172 } 2173 closed = true; 2174 } 2175 for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) { 2176 closeQuietly(entry.getKey(), entry.getValue()); 2177 } 2178 clear(); 2179 if (whenClosed != null) { 2180 whenClosed.countDown(); 2181 } 2182 } 2183 2184 void add(AutoCloseable closeable, Executor executor) { 2185 checkNotNull(executor); 2186 if (closeable == null) { 2187 return; 2188 } 2189 synchronized (this) { 2190 if (!closed) { 2191 put(closeable, executor); 2192 return; 2193 } 2194 } 2195 closeQuietly(closeable, executor); 2196 } 2197 2198 /** 2199 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2200 */ 2201 CountDownLatch whenClosedCountDown() { 2202 if (closed) { 2203 return new CountDownLatch(0); 2204 } 2205 synchronized (this) { 2206 if (closed) { 2207 return new CountDownLatch(0); 2208 } 2209 checkState(whenClosed == null); 2210 return whenClosed = new CountDownLatch(1); 2211 } 2212 } 2213 } 2214 2215 /** 2216 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2217 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2218 */ 2219 @VisibleForTesting 2220 CountDownLatch whenClosedCountDown() { 2221 return closeables.whenClosedCountDown(); 2222 } 2223 2224 /** The state of a {@link CloseableList}. */ 2225 enum State { 2226 /** The {@link CloseableList} has not been subsumed or closed. */ 2227 OPEN, 2228 2229 /** 2230 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2231 * into any other. 2232 */ 2233 SUBSUMED, 2234 2235 /** 2236 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2237 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2238 */ 2239 WILL_CLOSE, 2240 2241 /** 2242 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2243 * {@link CloseableList} may not be subsumed. 2244 */ 2245 CLOSING, 2246 2247 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2248 CLOSED, 2249 2250 /** 2251 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2252 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2253 */ 2254 WILL_CREATE_VALUE_AND_CLOSER, 2255 } 2256}