Страницы

Поиск по вопросам

вторник, 9 апреля 2019 г.

Условие закрытие пула потоков

Есть вот такой вот участок кода:
BlockingQueue queue = new ArrayBlockingQueue(links.size()); for (String link : links) { queue.add(link); }
ExecutorService executorService = Executors.newCachedThreadPool(); int threadsNumber = Integer.valueOf(arguments.get("-n"));
for (int i = 0; i < threadsNumber; i++) { executorService.submit(new Consumer(queue, arguments.get("-o"), Integer.valueOf(arguments.get("-l")))); }
while (!queue.isEmpty()) { Thread.sleep(1000); }
executorService.shutdownNow(); System.out.println("Все файлы были успешно скачены");
Последние строчки означают, что я собираюсь закрыть пул с потоками когда все сообщения из очереди будут прочитаны. Все отлично, но возникает проблема в момент, когда один из потоков забирает последнее сообщение из очереди. Условие !queue.isEmpty() перестает выполняться и программа завершается до того как это сообщение обработалось.
Как мне сделать так, чтобы программа завершалась только после обработки всех сообщений в очереде ?
Метод run в Consumer
@Override public void run() { try { while (true) { if (!queue.isEmpty()) { String stringURL = queue.take(); String inputFileName = Paths.get(stringURL).getFileName().toString(); String outputFileName = new File(new File(storageFilesDirectory), inputFileName).toString();
Downloader.download(stringURL, outputFileName, speedLimit); System.out.println(String.format("Файл %s был успешно скачен", inputFileName)); } Thread.sleep(500); } } catch (InterruptedException e) {
} catch (FileCouldNotBeDownloaded e) { System.out.println(String.format("Произошла ошибка скачивания файла.")); } }
В пуле у меня всего два потока, а сообщений в очереди может быть неограничено.


Ответ

Если нужно выполнить N заранее известных задач в K потоков, то проще использовать Executor с фиксированным числом потоков и скормить ему N Runnable выполняющих одну задачу:
final int NUMBER_OF_WORKERS = 2; List urls = IntStream.range( 0, 20 ).mapToObj( String::valueOf ).collect( Collectors.toList() );
ExecutorService executor = Executors.newFixedThreadPool( NUMBER_OF_WORKERS ); for ( String url : urls ) { executor.submit( () -> { // new SingleUrlDownloadTask( url ) etc... System.out.printf( "Worker [%s] downloading url: %s%n", Thread.currentThread().getName(), url ); try { Thread.sleep( 300 + ThreadLocalRandom.current().nextInt(400) ); } catch (Exception e) {} System.out.printf( "Worker [%s] completed url: %s%n", Thread.currentThread().getName(), url ); }); }
System.out.println( "All tasks queued." );
executor.shutdown(); // пул перестает принимать новые задачи, // уже поставленные в очередь задачи будут выполнены рано или поздно executor.awaitTermination( Long.MAX_VALUE, TimeUnit.NANOSECONDS );
System.out.println( "All tasks done.");

Комментариев нет:

Отправить комментарий