Есть вот такой вот участок кода:
BlockingQueue
ExecutorService executorService = Executors.newCachedThreadPool();
int threadsNumber = Integer.valueOf(arguments.get("-n"));
for (int i = 0; i < threadsNumber; i++) {
executorService.submit(new Consumer(queue, arguments.get("-o"), Integer.valueOf(arguments.get("-l"))));
}
while (!queue.isEmpty()) {
Thread.sleep(1000);
}
executorService.shutdownNow();
System.out.println("Все файлы были успешно скачены");
Последние строчки означают, что я собираюсь закрыть пул с потоками когда все сообщения из очереди будут прочитаны. Все отлично, но возникает проблема в момент, когда один из потоков забирает последнее сообщение из очереди. Условие !queue.isEmpty() перестает выполняться и программа завершается до того как это сообщение обработалось.
Как мне сделать так, чтобы программа завершалась только после обработки всех сообщений в очереде ?
Метод run в Consumer
@Override
public void run() {
try {
while (true) {
if (!queue.isEmpty()) {
String stringURL = queue.take();
String inputFileName = Paths.get(stringURL).getFileName().toString();
String outputFileName = new File(new File(storageFilesDirectory), inputFileName).toString();
Downloader.download(stringURL, outputFileName, speedLimit);
System.out.println(String.format("Файл %s был успешно скачен", inputFileName));
}
Thread.sleep(500);
}
} catch (InterruptedException e) {
} catch (FileCouldNotBeDownloaded e) {
System.out.println(String.format("Произошла ошибка скачивания файла."));
}
}
В пуле у меня всего два потока, а сообщений в очереди может быть неограничено.
Ответ
Если нужно выполнить N заранее известных задач в K потоков, то проще использовать Executor с фиксированным числом потоков и скормить ему N Runnable выполняющих одну задачу:
final int NUMBER_OF_WORKERS = 2;
List
ExecutorService executor = Executors.newFixedThreadPool( NUMBER_OF_WORKERS );
for ( String url : urls ) {
executor.submit( () -> { // new SingleUrlDownloadTask( url ) etc...
System.out.printf( "Worker [%s] downloading url: %s%n", Thread.currentThread().getName(), url );
try {
Thread.sleep( 300 + ThreadLocalRandom.current().nextInt(400) );
} catch (Exception e) {}
System.out.printf( "Worker [%s] completed url: %s%n", Thread.currentThread().getName(), url );
});
}
System.out.println( "All tasks queued." );
executor.shutdown(); // пул перестает принимать новые задачи,
// уже поставленные в очередь задачи будут выполнены рано или поздно
executor.awaitTermination( Long.MAX_VALUE, TimeUnit.NANOSECONDS );
System.out.println( "All tasks done.");
Комментариев нет:
Отправить комментарий