Страницы

Поиск по вопросам

понедельник, 16 декабря 2019 г.

(Java) Как правильно использовать BufferedReader одновременно несколькими потоками (Thread)

#java #многопоточность #input #synchronized


При изучении потоков (Threads) столкнулся с задачей. Нужно, используя один BufferedReader
и три потока (Thread), считать строки с клавиатуры и сохранить в ArrayList значения
считанных потоком строк. (3 экземпляра класса-наследника Thread и также 3 экземпляра
переменной класса Array ArrayList)

Методом проб и ошибок вывел рабочий код:

public class Solution {
    public static volatile AtomicInteger countReadStrings = new AtomicInteger(0);
    public static volatile BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

    public static void main(String[] args) throws IOException {
        //read count of strings
        int count = Integer.parseInt(reader.readLine());

        //init threads
        ReaderThread consolReader1 = new ReaderThread();
        ReaderThread consolReader2 = new ReaderThread();
        ReaderThread consolReader3 = new ReaderThread();

        consolReader1.start();
        consolReader2.start();
        consolReader3.start();

        while (count > countReadStrings.get()) {
        }

        consolReader1.interrupt();
        consolReader2.interrupt();
        consolReader3.interrupt();
        System.out.println("#1:" + consolReader1);
        System.out.println("#2:" + consolReader2);
        System.out.println("#3:" + consolReader3);

        reader.close();
    }

    public static class ReaderThread extends Thread {
        private List result = new ArrayList();

        public void run() {
            while ( !Thread.currentThread().isInterrupted()){
                    try {
                        if (reader.ready()){ // <- корректно отрабатывает
                            result.add(reader.readLine());
                            Solution.countReadStrings.incrementAndGet();
                        }
                    }catch (IOException e){
                        System.out.println("*************error");
                }
            }
        }

        @Override
        public String toString() {
            return result.toString();
        }
    }
}


Но ранее пробовал такой метод run() класса ReaderThread

            public void run() {
            //add your code here - добавьте код тут
            while ( !Thread.currentThread().isInterrupted()){
                    try {
                        synchronized (reader){ // <- НЕкорректно отрабатывает
                            result.add(reader.readLine());
                            Solution.countReadStrings.incrementAndGet();
                        }
                    }catch (IOException e){
                        System.out.println("*************error");
                }
            }
        }


и несмотря на synchronized почему-то некоторые потоки проходили до result.add(reader.readLine());
и, соответственно, при их прерывании с помощью  interrupt() отрабатывали не так, как
предполагалось (для полной остановки программы требовалось ввести еще 1 строку и возникали
2 IOException (видимо от оставшихся двух потоков))


  Т.е. у меня вопрос: Почему реализация с synchronized (reader)
  отрабатывает некорректно, а с if (reader.ready()) - корректно ... P.S.
  в задаче требуется реализовать метод run() класса ReaderThread,
  остальное по условию задачи трогать нельзя ...


//пример входных данных и результата

10  //сколько строк считать
1  //первая строка для считывания потоками
2
3
4
5
6
7
8
9
10       //ввод заканчивается здесь
//вывод начинается отсюда
#1:[2, 4, 5, 8, 10]  // то что удалось считать 1-ому потоку
#2:[6] // то что удалось считать 2-ому потоку
#3:[1, 3, 7, 9] // то что удалось считать 3-ему потоку

    


Ответы

Ответ 1



На самом деле сейчас у вас в коде есть проблемы. В 9 случаях из 10 ваш код сработает корректно, но смотрите, что будет в последнем: while ( !Thread.currentThread().isInterrupted()){ try { if (reader.ready()){ // 1 result.add(reader.readLine()); //2 Solution.countReadStrings.incrementAndGet(); } }catch (IOException e){ System.out.println("*************error"); } Метод reader.ready() проверяет, можно ли прочесть что-то из буффера ввода, и если там что-то есть, запускается блокирующий метод reader.readLie(), т.е. он блокирует выполнение потока, пока не увидит в буфере символ окончания ввода (\n, \r). Теперь смотрите, у вас три потока используют один BufferedReader, это означает, что если будет долгая задержка от пользователя (такая, что все три потока успеют отработать и пройдут //1), то все три потока зависнут на блокирующем методе //2. Как только пользователь нажмет энтер, первый проснувшийся поток сможет прочитать строку и сохранит её себе. Заметьте, что оставшиеся два потока так и будут висеть на //2. Это неправильно с той точки зрения, что свою проверку они уже прошли, но строку так и не прочитали. Поэтому возможна нередкая ситуация, когда вы вводите 10 строку из 10, вам распечатывается результат, но программа не завершается, а ждет еще ввода. Это всё потому, что есть висящие потоки на строке //2. После этого главный поток вызывает reader.close(); и висящие потоки ловят IOException, после чего выполнение программы завершается. Теперь посмотрим на ваш изначальный вариант: while ( !Thread.currentThread().isInterrupted()){ try { synchronized (reader){ // 1 result.add(reader.readLine()); // 2 Solution.countReadStrings.incrementAndGet(); } }catch (IOException e){ System.out.println("*************error"); } } Этот вариант использует синхронизацию, таким образом никто не может войти внутрь блока кода //1, если его уже занял другой поток. И на первый взгляд это правильно, только почему-то это не работает. И вот почему. Потоки, которые не могут войти в синхронизированный блок, ожидают освобождения монитора. Теперь представьте ситуацию, вы ввели 9 строк и хотите ввести последнюю. У нас три потока: один стоит на блокирующем методе //2, два остальных стоят перед синхронизированным блоком //1. Вы вводите 10 строку, поток выходит из блока и освобождает его другим. Главный поток выходит из цикла while и начинает интерраптить ваши потоки. Теперь просыпается любой блокированный поток, у него выставлен статус interrupted, но он стоит на //1 и НЕ проверяет свой статус. Таким образом, проснувшись, он заходит в свободный уже блок кода и повисает на блокирующем методе //2. Главный поток закрывает буффер, вы что-то вводите, получаете эксепшн, поток закрывается и просыпается последний поток, который точно так же стоит на строчке //1 и НЕ проверяя свой статус, заходит в освободившийся блок кода. Там повторяется то же самое. Этим объясняется то, что вам нужно еще два раза ввести данные, чтобы программа завершилась. Решение, которое мне здесь видится, это скомбинировать два этих способа: synchronized (reader){ if (reader.ready()) { result.add(reader.readLine()); Test.countReadStrings.incrementAndGet(); } } Таким образом, проснувшиеся потоки зайдя в синхронизированный блок кода не будут зависать на вводе, поскольку условие if (reader.ready()) должно будет возвращать false.

Комментариев нет:

Отправить комментарий