Aby przyśpieszyć wydajność naszych aplikacji bardzo często decydujemy się na zrównoleglenie pracy. Przy zmianie modelu pracy dostarczamy pulę wątków, która będzie odpowiedzialna za równoległe wykonywanie zadań. Czasami jednak przy pracy w wielowątkowym środowisku może zdarzyć się sytuacja, w której wysycimy całą pulę wątków. Java dostarcza mechanizmy (polityki), które instruują pulę jak ma się zachować w przypadku wysycenia zasobów.
Tworzenie puli wątków
Najczęstszym sposobem tworzenia puli wątków jest wykorzystanie metod fabrykujących dostarczonych przez Executors. Przykładowo jeśli chcemy stworzyć pulę z 10 wątkami to wywołujemy metodę:
1
Executors.newFixedThreadPool(10);
Jednakże pod spodem wywoływana jest klasa ThreadPoolExecutor:
publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
Jak widzicie powyżej, przekazaliśmy tylko wartość 10 mimo to dostaliśmy wiele domyślnych wartości w tym kolejkę, thread factory oraz rejected execution handler.
Kolejka
Nie każdy o tym wie, ale większość puli wątków ma wbudowaną kolejkę. Jej zadaniem jest kolejkowanie zadań, które aktualnie nie mogą być wykonywane, ponieważ cała pula jest aktualnie zajęta. Jednakże należy się zastanowić co się stanie jeśli również kolejka jest już pełna?
Zlecanie zadania
Spójrzmy więc do implementacji w poszukiwaniu odpowiedzi na to pytanie. Zlecenie wykonania zadania wykonuje się za pomocą metody execute:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); intc= ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { intrecheck= ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); elseif (workerCountOf(recheck) == 0) addWorker(null, false); } elseif (!addWorker(command, false)) reject(command); }
Jak widzicie, jeśli pula wątków jest zamknięta (metoda isRunning) lub nie udało dodać się zadania (metoda addWorker) wywoływana jest metoda reject:
Metoda ta na typie implementującym RejectedExecutionHandler wywołuje metodę rejectedExecution.
RejectedExecutionHandler
Wiemy już, że podczas zlecenia pracy może zdarzyć się sytuacja, w której nasze zadanie zostanie odrzucone. Obsługą tego zadania zajmuje się RejectedExecutionHandler:
publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
Jak napisałem powyżej AbortPolicy to polityka, która stosowana jest jako domyślna polityka w przypadku tworzenia puli z wykorzystaniem klasy Executors. Jej jedynem zadaniem jest rzucenie wyjątku typu RejectedExecutionException:
Kolejną wbudowaną polityką (którą można ustawić w konstruktorze ThreadPoolExecutor) jest polityka DiscardPolicy. Jest ona przeciwieństwem AbortPolicy tzn. w przypadku, gdy nowe zadanie nie może być wykonane nie dzieje się nic:
Polityka ta jest bardzo podobna do DiscardPolicy z tą różnicą, że najstarsze zlecone zadanie jest przerywane i w jego miejsce “wskakuje” nasze najnowsze:
Jest to dobra polityka jeśli zależy nam na tym, aby wykonywały się tylko najnowsze zadania i możemy sobie pozwolić na przerwanie tych starszych. Testy dla tej polityki:
// then assertThat(results).containsExactlyInAnyOrder("Job", "Newest") .doesNotContain("Oldest"); }
Caller-Runs Policy
Kolejną wbudowaną polityką (którą można ustawić w konstruktorze ThreadPoolExecutor) jest polityka CallerRunsPolicy. Jest to bardzo ciekawe rozwiązanie, które może pozwolić nam na realizację throttlingu. Jest to technika, która pozwala na uniknięcie zbyt intensywnego zużycia zasobów, co może mieć swoje konsekwencje jak spowolnienie aplikacji lub jej całkowite wyłączenie. W przypadku, gdy zlecane jest nowe zadanie i pula jest przepełniona to zadanie wykonywane jest w tym wątku, który to zadanie zlecił. Tym samym blokujemy wątek zlecający zadanie, dzięki temu nowe zadania nie będą dorzucane do puli. W tym czasie pula wątków skończy część swoich zadań i zacznie przyjmować nowe:
Wiedza o tym jak odrzucane są zadania pozwala nam napisać swoją implementację RejectedExecutionHandler. Możemy przykładowo zapisać do bazy odrzucone zadania: