
Klasa CompletableFuture jest z nami od dłuższego czasu, jednakże ostatnimi czasy stosowałem ją bardzo często. Podczas swojej pracy postanowiłem spisać między innymi na co warto zwrócić szczególną uwagę stosując CompletableFuture, jak działają poszczególne transformacje oraz jak obsłużyć sytuacje wyjątkow, zapraszam!
CompletableFuture
* @author Doug Lea
* @since 1.8
*/
public class CompletableFuture
CompletableFuture jest to operacja, która zaczęła się gdzieś w tle, ale jeszcze się potencjalnie nie skończyła. Podobnie jak Callable operacja ta ma określony typ zwracany, na przykład String lub Integer. Ponadto implementuje dwa interfejsy Future i CompletionStage.
CompletableFuture
Operacja ta, może zakończyć się poprzez zwrócenie wartości lub wyjątku.
CompletableFuture
return “Value”; //zwraca wartość
});
CompletableFuture
throw new IllegalArgumentException(); //zwraca wyjątek
});
W przeciwieństwie do rozwiązania Future, CompletableFuture pozwala nam składać transformacje w nieblokujący sposób.
Callable
String resultOfCall = blocking.call(); //blokujące
resultOfCall.toUpperCase();
CompletableFuture
nonBlocking.thenApply(String::toUpperCase); //nieblokujące
nonBlocking.get();
Jeśli nie określimy, aby nowa transformacja odbyła się asynchronicznie, odbędzie się ona na tym samym wątku, który wykonywał pierwszą transformację. Dzięki takiemu rozwiązaniu zyskujemy optymalizację czasową związaną z brakiem potrzeby zmiany wątku.
Tworzenie
Klasa CompletableFuture dostarcza statyczne metody do tworzenia samej siebie:
supplyAsync(suppiler)- zwracaCompletableFuture, który zostanie wywołany wForkJoinPoolsupplyAsync(suppiler, executors)- zwracaCompletableFuture, który zostanie wywołany w podanej puli wątków executorsrunAsync(runnable)- zwracaCompletableFuture, który zostanie wywołany wForkJoinPoolrunAsync(runnable, executors)- zwracaCompletableFuture, który zostanie wywołany w podanej puli wątków executorscompletedFuture(value)- zwracaCompletableFuture, który jest już zakończony i posiada wartośćallOf(CompletableFuture<?>… cfs)- zwracaCompletableFutureUWAGA! typu<Void>po zakończeniu wszystkich przekazanychCompletableFutureanyOf(CompletableFuture<?>… cfs)- zwracaCompletableFutureUWAGA! typu<Object>po zakończeniu dowolnego przekazanegoCompletableFuture
Ponadto CompletableFuture można tworzyć przez wykonywanie tranformacji.
Metody
CompletableFuture oferuje trzy podstawowe metody:
complete- oznaczamyCompletableFuturejako skończonycompletedFuture- zwracam wartość od razu (na przykład z cache)completeExceptionally- opakowujemy wyjątek wCompletableFuture
String getStringFromWebService() {
String stringFromCache = getStringFromCache();
if (StringUtils.isNotBlank(stringFromCache)) {
return stringFromCache;
}
try {
return WebService.readString();
} catch (Exception e) {
return "Default Value";
}
}
//Asynchroniczna metoda
CompletableFuture
String stringFromCache = getStringFromCache();
if (StringUtils.isNotBlank(stringFromCache)) {
//oznaczamy \`CompletableFuture\` jako skończony
return CompletableFuture.completedFuture(stringFromCache);
}
try {
CompletableFuture<String> async = new CompletableFuture<>();
String stringFromWebService = WebService.readString();
//oznaczamy \`CompletableFuture\` jako skończony
async.complete(stringFromWebService);
return async;
} catch (Exception e) {
CompletableFuture<String> async = new CompletableFuture<>();
//opakowujemy wyjątek w \`CompletableFuture\`
async.completeExceptionally(e);
return async;
}
}
Oczywiście nie korzystamy z CompletableFuture w sposób jaki jest pokazany powyżej. Wykorzystujemy do tego celu tranformacje.
Transformacje
Klasa CompletableFuture dostarcza około 50 metod. Pomimo tak dużej ilości większość metod stworzonych jest według wzorców:
- apply metoda, która przyjmuje
Function - accept metoda, która przyjmuje
Consumer - run metoda, która przyjmuje
Runnable
Nowa pula wątków:
- async oznacza uruchomienie wykonywania w
ForkJoinPool - async(Executors) oznacza uruchomienie wykonywania na innej puli wątków
Więcej wzorców:
- then - służy do łączenia tranformacji - na przykład
thenApplylubthenAccept - either - wybierz pierwszy wynik - na przykład z dwóch
CompletableFuture - both - wykonaj tranformację jeśli skończą się oba wcześniejsze
CompletableFuture - combine - złącz pierwszy wynik z drugim, aby powstał kolejny wynik, działa jak operator
zip - compose - działa jak operator
flatMap
thenApply i thenApplyAsync
Są to metody, które służą do dodania kolejnych tranformacji. Transformacje te są typu Function, czyli przyjmują i zwracają wartość. Wersja async przyjmuje własną pulę wątków.
CompletableFuture
task.thenApply((String someString) -> “Value returned”);
task.thenApplyAsync((String someString) -> “ForkJoinPool”);
task.thenApplyAsync((String someString) -> “Value returned”, executors);
thenAccept i thenAcceptAsync
Są to metody, które służą do dodania kolejnych tranformacji. Transformacje te są typu Consumer, czyli przyjmują wartość, natomiast nic nie zwracają. Wersja async przyjmuje własną pulę wątków.
CompletableFuture
task.thenAccept((String someString) -> System.out.println(“Value consumed”));
task.thenAcceptAsync((String someString) -> System.out.println(“ForkJoinPool”));
task.thenAcceptAsync((String someString) -> System.out.println(“Value consumed”), executors);
thenRun i thenRunAsync
Są to metody, które służą do dodania kolejnych tranformacji. Transformacje te są typu Runnable, czyli nie przyjmują wartość oraz nie zwracają wartości. Wersja async przyjmuje własną pulę wątków.
CompletableFuture
task.thenRun(()->{
String threadName = Thread.currentThread().getName();
System.out.println(“Thread name: “ + threadName);
});
task.thenRunAsync(()->{
String threadName = Thread.currentThread().getName();
System.out.println(“ForkJoinPool: “ + threadName);
});
task.thenRunAsync(()->{
String threadName = Thread.currentThread().getName();
System.out.println(“Thread name: “ + threadName);
}, executors);
either
Transformacja zawarta w *Either wykonywana jest na pierwszym zakończonym wyniku. Metoda ta dostępna jest dla wszystkich wzorców: acceptEither, applyToEither oraz runAfterEither.
CompletableFuture
CompletableFuture
task.acceptEither(otherTask, x -> {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + “ “ + x);
});
both
Transformacja zawarta w *Both wykonywana jest na wynikach dwóch wcześniejszych CompletableFuture. Metoda ta dostępna jest dla wzorców: thenAcceptBoth oraz runAfterBoth.
CompletableFuture
CompletableFuture
task.thenAcceptBoth(otherTask, (x, y) -> System.out.println(x + “ “ + y));
combine
Transformacja zawarta w thenCombine podobnie jak *Both wykonywana jest na wynikach dwóch wcześniejszych CompletableFuture. Różnicą w stosunku do *Both jest to, iż zwracana jest wartość, ponieważ drugim paramterem jest BiFunction.
CompletableFuture
CompletableFuture
CompletableFuture
compose
Transformacja zawarta w thenCompose działa na zasadzie operatora flatMap. Potrafi ona “spłaszczyć” wywołanie innego CompletableFuture. Dzięki tej tranformacji unikamy Callback Hell.
…
CompletableFuture
task.thenCompose(otherString -> doOtherTask(otherString));
task.join();
…
}
CompletableFuture
return CompletableFuture.completedFuture(string);
}
Pobranie wartości
Pobranie wartości odbywa się w sposób blokujący, tak samo jak w Future przy wykorzystaniu metody get.
get()- blokujące wywołanie, które rzuca checked exceptions takie jakExecutionExceptionorazInterruptedExceptionget(timeout, timeunit)- blokujące wywołanie, które rzuca checked exceptions takie jakExecutionException,InterruptedExceptionorazTimeoutException, jeśli zostanie przekroczony czas zadeklarowany w metodziegetNow(valueIfAbset)- blokujące wywołanie, które rzuca unchecked exceptions takie jakCompletionExceptionorazCancellationException. Zwraca wartość domyślną jeśli danyCompletableFuturejeszcze się nie zakończyłjoin()- działa jakget(), jednakże rzuca unchecked exceptionCompletionException
Przerwanie pracy
Jeśli chcemy przerwać działanie CompletableFuture możemy zakończyć pulę wątków. Innym rozwiązaniem, bardziej związanym z CompletableFuture jest wywołanie metody cancel. Przerywa ona działanie aktualnego CompletableFuture, który w następstwie zwróci CancellationException. Jeśli istnieją inne zależne tranformacje kończą się one z wyjątkiem CompletionException spowodowanym poprzednim CancellationException.
CompletableFuture
task.cancel(true);
Wyjątki
Jeśli podczas wykonywania transformacji wystąpi wyjątek, podobnie jak w kodzie imperatywnym jest on propagowany do “samej góry” wywołania. Obsługa wyjątków odbywa się poprzez tranformację exceptionally:
CompletableFuture
task.exceptionally((throwable -> “default value”));
Wyjątki możemy także obsłużyć przy pomocy metody handle:
CompletableFuture
task.handle((x, throwable) -> {
if (throwable != null) {
return “default value”;
}
return x + “some value”;
});
Jeśli w przeciwieństwie do metody handle nie chcemy zwracać żadnej wartości możemy wykorzystać whenComplete:
CompletableFuture
task.whenComplete((x, throwable) -> {
if (throwable != null) {
System.out.println(“Exception occurs”);
}
System.out.println(x);
});
Więcej metod
Istnieje też kilka metod pomocniczych:
obtrudeException(exception)- ustawia wyjątek dlaCompletableFutureniezależnie od tego, czy się zakończył, czy nieobtrudeValue(value)- ustawia wartość dlaCompletableFutureniezależnie od tego, czy się zakończył, czy nietoCompletableFuture()- zwracaCompletableFuturegetNumberOfDependents()- oblicza liczbę niezakończonych jeszcze tranformacji naCompletableFutureisCancelled()- sprawdza, czyCompletableFuturezostał przerwanyisCompletedExceptionally()- sprawdza, czyCompletableFuturezostał przerwany przez wyjątekisDone()- sprawdza, czyCompletableFuturezostał zakończony