Страницы

Поиск по вопросам

среда, 5 февраля 2020 г.

Условие закрытие пула потоков

#java #многопоточность #concurrency


Есть вот такой вот участок кода:

    BlockingQueue queue = new ArrayBlockingQueue(links.size());
    for (String link : links) {
        queue.add(link);
    }

    ExecutorService executorService = Executors.newCachedThreadPool();
    int threadsNumber = Integer.valueOf(arguments.get("-n"));

    for (int i = 0; i < threadsNumber; i++) {
        executorService.submit(new Consumer(queue, arguments.get("-o"), Integer.valueOf(arguments.get("-l"))));
    }

    while (!queue.isEmpty()) {
        Thread.sleep(1000);
    }

    executorService.shutdownNow();
    System.out.println("Все файлы были успешно скачены");


Последние строчки означают, что я собираюсь закрыть пул с потоками когда все сообщения
из очереди будут прочитаны. Все отлично, но возникает проблема в момент, когда один
из потоков забирает последнее сообщение из очереди. Условие !queue.isEmpty() перестает
выполняться и программа завершается до того как это сообщение обработалось. 

Как мне сделать так, чтобы программа завершалась только после обработки всех сообщений
в очереде ?

Метод run в Consumer

@Override
public void run() {
    try {
        while (true) {
            if (!queue.isEmpty()) {
                String stringURL = queue.take();
                String inputFileName = Paths.get(stringURL).getFileName().toString();
                String outputFileName = new File(new File(storageFilesDirectory),
inputFileName).toString();

                Downloader.download(stringURL, outputFileName, speedLimit);
                System.out.println(String.format("Файл %s был успешно скачен", inputFileName));
            }
            Thread.sleep(500);
        }
    } catch (InterruptedException e) {

    } catch (FileCouldNotBeDownloaded e) {
        System.out.println(String.format("Произошла ошибка скачивания файла."));
    }
}


В пуле у меня всего два потока, а сообщений в очереди может быть неограничено.
    


Ответы

Ответ 1



Если нужно выполнить N заранее известных задач в K потоков, то проще использовать Executor с фиксированным числом потоков и скормить ему N Runnable выполняющих одну задачу: final int NUMBER_OF_WORKERS = 2; List urls = IntStream.range( 0, 20 ).mapToObj( String::valueOf ).collect( Collectors.toList() ); ExecutorService executor = Executors.newFixedThreadPool( NUMBER_OF_WORKERS ); for ( String url : urls ) { executor.submit( () -> { // new SingleUrlDownloadTask( url ) etc... System.out.printf( "Worker [%s] downloading url: %s%n", Thread.currentThread().getName(), url ); try { Thread.sleep( 300 + ThreadLocalRandom.current().nextInt(400) ); } catch (Exception e) {} System.out.printf( "Worker [%s] completed url: %s%n", Thread.currentThread().getName(), url ); }); } System.out.println( "All tasks queued." ); executor.shutdown(); // пул перестает принимать новые задачи, // уже поставленные в очередь задачи будут выполнены рано или поздно executor.awaitTermination( Long.MAX_VALUE, TimeUnit.NANOSECONDS ); System.out.println( "All tasks done.");

Ответ 2



Обычно пулы потоков завершаются только при завершении самого приложения: автор приложения знает, что у него есть некоторая фоновая обработка задач, пул слушателей базы данных, поэтому можно их один раз создать и не беспокоиться о них - в худшем случае потоки просто будут простаивать, занимая какое-то небольшое количество оперативки. Если вы оставите сам пул потоков живым до полного завершения приложения, на работе самого приложения это не скажется, а вот работать с ним будет легче, потому что не придется иметь в виду его жизненный цикл. В вашем случае вы ищете не момент завершения пула потоков, а некоторый синхронизатор, который вам скажет о том, что все потребители отработали. Проще всего завести CountDownLatch, инициализировав его с помощью размера очереди, и снимая по единице при каждой обработке сущности из очереди - в этом случае вы можете с помощью него дождаться конца обработки. Можно обойтись и более низкоуровневыми примитивами (никогда с ними не работал, поэтому могу ошибиться): public class Consumer implements Runnable { private final BlockingQueue queue; private final Object synchronizer; public Consumer(BlockingQueue queue, Object synchronizer) { this.queue = queue; this.synchronizer = synchronizer; } public void run() { String value; while ((value = queue.poll(0, TimeUnit.Milliseconds)) !== null) { // обработка } // в эту область код попадет только тогда, когда в очереди кончатся элементы synchronizer.notify(); } } // код главного потока for (Consumer consumer : consumers) { executor.submit(consumer); } synchronizer.wait();

Ответ 3



Используйте java.util.concurrent.ThreadPoolExecutor, он гибче в использовании. Пример для вашей задачи: ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); for (int i = 0; i < 1_000_000; i++) poolExecutor.execute(()->{ //задача выполняемая в другом потоке }); while (poolExecutor.getActiveCount() != 0) LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); poolExecutor.shutdownNow();

Комментариев нет:

Отправить комментарий