Aby przyśpieszyć wydajność naszych aplikacji bardzo często decydujemy się na zrównoleglenie pracy. Przy zmianie modelu pracy dostarczamy pulę wątków, która będzie odpowiedzialna za równoległe wykonywanie zadań. Czasami jednak przy pracy w wielowątkowym środowisku może zdarzyć się sytuacja, w której wysycimy całą pulę wątków. Java dostarcza mechanizmy (polityki), które instruują pulę jak ma się zachować w przypadku wysycenia zasobów.
Tworzenie puli wątków
Najczęstszym sposobem tworzenia puli wątków jest wykorzystanie metod fabrykujących dostarczonych przez Executors
. Przykładowo jeśli chcemy stworzyć pulę z 10 wątkami to wywołujemy metodę:
Executors.newFixedThreadPool(10);
Jednakże pod spodem wywoływana jest klasa ThreadPoolExecutor
:
Executors.newFixedThreadPool(10); public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
Jak widzicie powyżej, przekazaliśmy tylko wartość 10 mimo to dostaliśmy wiele domyślnych wartości w tym kolejkę, thread factory oraz rejected execution handler.
Kolejka
Nie każdy o tym wie, ale większość puli wątków ma wbudowaną kolejkę. Jej zadaniem jest kolejkowanie zadań, które aktualnie nie mogą być wykonywane, ponieważ cała pula jest aktualnie zajęta. Jednakże należy się zastanowić co się stanie jeśli również kolejka jest już pełna?
Zlecanie zadania
Spójrzmy więc do implementacji w poszukiwaniu odpowiedzi na to pytanie. Zlecenie wykonania zadania wykonuje się za pomocą metody execute
:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
Jak widzicie, jeśli pula wątków jest zamknięta (metoda isRunning
) lub nie udało dodać się zadania (metoda addWorker
) wywoływana jest metoda reject
:
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
Metoda ta na typie implementującym RejectedExecutionHandler
wywołuje metodę rejectedExecution
.
RejectedExecutionHandler
Wiemy już, że podczas zlecenia pracy może zdarzyć się sytuacja, w której nasze zadanie zostanie odrzucone. Obsługą tego zadania zajmuje się RejectedExecutionHandler
:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
W przypadku puli wątków tworzonych przez klasę Executors
stosowana jest polityka AbortPolicy
:
Executors.newFixedThreadPool(10); public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
AbortPolicy
Jak napisałem powyżej AbortPolicy
to polityka, która stosowana jest jako domyślna polityka w przypadku tworzenia puli z wykorzystaniem klasy Executors
. Jej jedynem zadaniem jest rzucenie wyjątku typu RejectedExecutionException
:
public static class AbortPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
Można bardzo łatwo to przetestować:
@Test void shouldThrowRejectedExecutionExceptionWithAbortPolicy() { // given final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); // when threadPool.submit(() -> sleep(10_000)); threadPool.submit(() -> sleep(10_000)); // then assertThatExceptionOfType(RejectedExecutionException.class) .isThrownBy(() -> threadPool.submit(() -> sleep(10_000))); }
DiscardPolicy
Kolejną wbudowaną polityką (którą można ustawić w konstruktorze ThreadPoolExecutor
) jest polityka DiscardPolicy
. Jest ona przeciwieństwem AbortPolicy
tzn. w przypadku, gdy nowe zadanie nie może być wykonane nie dzieje się nic:
public static class DiscardPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
A test do tej polityki mógłby wyglądać tak:
@Test void shouldNotThrowRejectedExecutionExceptionWithDiscardPolicy() { // given final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() ); // when threadPool.submit(() -> sleep(10_000)); threadPool.submit(() -> sleep(10_000)); // then assertThatCode(() -> threadPool.submit(() -> sleep(10_000))) .doesNotThrowAnyException(); }
DiscardOldestPolicy
Polityka ta jest bardzo podobna do DiscardPolicy
z tą różnicą, że najstarsze zlecone zadanie jest przerywane i w jego miejsce “wskakuje” nasze najnowsze:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
Jest to dobra polityka jeśli zależy nam na tym, aby wykonywały się tylko najnowsze zadania i możemy sobie pozwolić na przerwanie tych starszych. Testy dla tej polityki:
@Test void shouldReturnNewestElementsWithDiscardOldestPolicy() throws InterruptedException { // given final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.DiscardOldestPolicy() ); // when threadPool.execute(() -> sleep(100)); BlockingQueue<String> queue = new LinkedBlockingDeque<>(); threadPool.execute(() -> queue.offer("Oldest")); threadPool.execute(() -> queue.offer("Job")); threadPool.execute(() -> queue.offer("Newest")); threadPool.awaitTermination(100, TimeUnit.MILLISECONDS); List<String> results = new ArrayList<>(); queue.drainTo(results); // then assertThat(results).containsExactlyInAnyOrder("Job", "Newest") .doesNotContain("Oldest"); }
Caller-Runs Policy
Kolejną wbudowaną polityką (którą można ustawić w konstruktorze ThreadPoolExecutor
) jest polityka CallerRunsPolicy
. Jest to bardzo ciekawe rozwiązanie, które może pozwolić nam na realizację throttlingu. Jest to technika, która pozwala na uniknięcie zbyt intensywnego zużycia zasobów, co może mieć swoje konsekwencje jak spowolnienie aplikacji lub jej całkowite wyłączenie. W przypadku, gdy zlecane jest nowe zadanie i pula jest przepełniona to zadanie wykonywane jest w tym wątku, który to zadanie zlecił. Tym samym blokujemy wątek zlecający zadanie, dzięki temu nowe zadania nie będą dorzucane do puli. W tym czasie pula wątków skończy część swoich zadań i zacznie przyjmować nowe:
public static class CallerRunsPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
Spróbujmy napisać do tego testy:
@Test void shouldBlockCallerThread() { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new ThreadPoolExecutor.CallerRunsPolicy()); threadPool.submit(() -> sleep(1_000)); threadPool.submit(() -> sleep(1_000)); long startTime = System.currentTimeMillis(); threadPool.submit(() -> sleep(1_000)); long blockedDuration = System.currentTimeMillis() - startTime; assertThat(blockedDuration).isGreaterThanOrEqualTo(1_000); }
Własna polityka
Wiedza o tym jak odrzucane są zadania pozwala nam napisać swoją implementację RejectedExecutionHandler
. Możemy przykładowo zapisać do bazy odrzucone zadania:
class CustomRejectedExecutionHandler implements RejectedExecutionHandler { private final RejectedTasksRepository rejectedTasksRepository; CustomRejectedExecutionHandler(final RejectedTasksRepository rejectedTasksRepository) { this.rejectedTasksRepository = rejectedTasksRepository; } @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { rejectedTasksRepository.save(r.toString()); } }
Ostatni test dotyczy naszej polityki:
@Test void shouldSaveRejectedTasksWithCustomPolicy() { // given RejectedTasksRepository repository = Mockito.mock(RejectedTasksRepository.class); final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new CustomRejectedExecutionHandler(repository) ); // when threadPool.submit(() -> sleep(10_000)); threadPool.submit(() -> sleep(10_000)); threadPool.submit(() -> sleep(10_000)); // then verify(repository).save(anyString()); }
Github
Całość jak zawsze na Githubie.