#java #многопоточность #concurrency
Есть вот такой вот участок кода: BlockingQueuequeue = new ArrayBlockingQueue(links.size()); for (String link : links) { queue.add(link); } ExecutorService executorService = Executors.newCachedThreadPool(); int threadsNumber = Integer.valueOf(arguments.get("-n")); for (int i = 0; i < threadsNumber; i++) { executorService.submit(new Consumer(queue, arguments.get("-o"), Integer.valueOf(arguments.get("-l")))); } while (!queue.isEmpty()) { Thread.sleep(1000); } executorService.shutdownNow(); System.out.println("Все файлы были успешно скачены"); Последние строчки означают, что я собираюсь закрыть пул с потоками когда все сообщения из очереди будут прочитаны. Все отлично, но возникает проблема в момент, когда один из потоков забирает последнее сообщение из очереди. Условие !queue.isEmpty() перестает выполняться и программа завершается до того как это сообщение обработалось. Как мне сделать так, чтобы программа завершалась только после обработки всех сообщений в очереде ? Метод run в Consumer @Override public void run() { try { while (true) { if (!queue.isEmpty()) { String stringURL = queue.take(); String inputFileName = Paths.get(stringURL).getFileName().toString(); String outputFileName = new File(new File(storageFilesDirectory), inputFileName).toString(); Downloader.download(stringURL, outputFileName, speedLimit); System.out.println(String.format("Файл %s был успешно скачен", inputFileName)); } Thread.sleep(500); } } catch (InterruptedException e) { } catch (FileCouldNotBeDownloaded e) { System.out.println(String.format("Произошла ошибка скачивания файла.")); } } В пуле у меня всего два потока, а сообщений в очереди может быть неограничено.
Ответы
Ответ 1
Если нужно выполнить N заранее известных задач в K потоков, то проще использовать Executor с фиксированным числом потоков и скормить ему N Runnable выполняющих одну задачу: final int NUMBER_OF_WORKERS = 2; Listurls = IntStream.range( 0, 20 ).mapToObj( String::valueOf ).collect( Collectors.toList() ); ExecutorService executor = Executors.newFixedThreadPool( NUMBER_OF_WORKERS ); for ( String url : urls ) { executor.submit( () -> { // new SingleUrlDownloadTask( url ) etc... System.out.printf( "Worker [%s] downloading url: %s%n", Thread.currentThread().getName(), url ); try { Thread.sleep( 300 + ThreadLocalRandom.current().nextInt(400) ); } catch (Exception e) {} System.out.printf( "Worker [%s] completed url: %s%n", Thread.currentThread().getName(), url ); }); } System.out.println( "All tasks queued." ); executor.shutdown(); // пул перестает принимать новые задачи, // уже поставленные в очередь задачи будут выполнены рано или поздно executor.awaitTermination( Long.MAX_VALUE, TimeUnit.NANOSECONDS ); System.out.println( "All tasks done."); Ответ 2
Обычно пулы потоков завершаются только при завершении самого приложения: автор приложения знает, что у него есть некоторая фоновая обработка задач, пул слушателей базы данных, поэтому можно их один раз создать и не беспокоиться о них - в худшем случае потоки просто будут простаивать, занимая какое-то небольшое количество оперативки. Если вы оставите сам пул потоков живым до полного завершения приложения, на работе самого приложения это не скажется, а вот работать с ним будет легче, потому что не придется иметь в виду его жизненный цикл. В вашем случае вы ищете не момент завершения пула потоков, а некоторый синхронизатор, который вам скажет о том, что все потребители отработали. Проще всего завести CountDownLatch, инициализировав его с помощью размера очереди, и снимая по единице при каждой обработке сущности из очереди - в этом случае вы можете с помощью него дождаться конца обработки. Можно обойтись и более низкоуровневыми примитивами (никогда с ними не работал, поэтому могу ошибиться): public class Consumer implements Runnable { private final BlockingQueuequeue; private final Object synchronizer; public Consumer(BlockingQueue queue, Object synchronizer) { this.queue = queue; this.synchronizer = synchronizer; } public void run() { String value; while ((value = queue.poll(0, TimeUnit.Milliseconds)) !== null) { // обработка } // в эту область код попадет только тогда, когда в очереди кончатся элементы synchronizer.notify(); } } // код главного потока for (Consumer consumer : consumers) { executor.submit(consumer); } synchronizer.wait(); Ответ 3
Используйте java.util.concurrent.ThreadPoolExecutor, он гибче в использовании. Пример для вашей задачи: ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); for (int i = 0; i < 1_000_000; i++) poolExecutor.execute(()->{ //задача выполняемая в другом потоке }); while (poolExecutor.getActiveCount() != 0) LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); poolExecutor.shutdownNow();
Комментариев нет:
Отправить комментарий