Страницы

Поиск по вопросам

среда, 18 декабря 2019 г.

Производительный обмен сообщениями на QUdpSocket - возможно ли?

#cpp #qt #udp


Пытаюсь написать быструю кросплатформенную систему обмена сообщениями с гарантированной
доставкой на Qt, через QUdpSocket. Конечная цель - шина, промежуточная - издатель-подписчик,
начальная - "точка в точку". UDP - чтобы не тратить время на connect'ы. 

Структура такая: издатель ведёт список подписчиков, по каждому храня номер последнего
отправленного сообщения из кольцевой очереди. "Долбится" с этим сообщением, пока подписчик
не подтвердит, что успешно его принял, после чего издатель переходит к следующему.
Поскольку предполагается небольшое количество подписчиков, на каждого можно выделить
отдельный поток и работать блокирующими функциями. Поэтому издателей сделал перегрузкой
QThread, всю работу перенеся в run(). Подписчики ждут сигнала readyRead(), соответственно,
нуждаются в EventLoop. Поэтому подписчика унаследовал от QObject и перенёс в новый
QThread через moveToThread(). Вот основные куски кода:

void GQ::Subscriber::read()
{
    while(socket_.hasPendingDatagrams())
    {
        arr_.resize(socket_.pendingDatagramSize());
        socket_.readDatagram(arr_.data(),arr_.size(),&addr_,&senderPort_);
        QDataStream is(arr_);
        is>>number_;
        //qDebug()<<"<<"<itemByID(firstToSend);
        // Нет такого элемента
        while (!item.isNull())
        {
            firstToSend=item->ID;
            arr=GQ::toByteArray(*item.data());

            uint timeout=minTimeout;
            quint64 startTime=QDateTime::currentMSecsSinceEpoch();
            socket.write(arr);
            bool receivedOk=false;
            do
            {
                // Если за таймаут ничего не пришло - посылаем снова
                while (!socket.waitForReadyRead(timeout))
                {
                    qDebug()<<"Timeout, writing again";
                    socket.write(arr);
                    quint64 elapsed=QDateTime::currentMSecsSinceEpoch()-startTime;
                    qDebug()<<"Elapsed: "<(timeout*toutMultiplier))&&(timeout>number_;
//      qDebug()<<"<<"<


Ответы

Ответ 1



При вашей схеме все обработчики событий QEvent связанные с GQ::Publisher будут выполнятся в контексте потока в котором вы создали GQ::Publisher то есть, видимо, в главном потоке. Следовательно все они будут выполнятся синхронно. Вы скажете что вам они и не нужны, что вы специально от них уходили. А я вам отвечу, а кто вам сказал что события не используются внутри QUdpSocket к примеру? Вообще вы не совсем правильно используете QThread. Он не то чтобы поток сам по себе который исполняется асинхронно с другими, он скорее апартамент в понятиях COM. QThread нужен для того чтобы пробрасывать QEvent'ы так чтобы они обрабатывались синхронно но в контексте этого потока. Поэтому правильнее было бы GQ::Publisher отнаследовать от QObject, работу с сокетами и таймерами построить на сигналах/слотах. Ну и не забыть посадить паблишер в отдельный апартамент. То есть сделать GQ::Publisher так же как вы сделали подписчиков. Напомню что такое партаменты COM. В COM'е есть три потоковые модели: однопоточная, мультипоточная и апартаментная. Каждый объект COM обязан сообщать о модели которую он поддерживает. Потоковая модель это стандартизированный контракт который обязуются выполнять клиент и сервер. В однопоточной модели все вызовы к серверу синхронные потому что должны выполнятся из одного потока, в мультипоточной вызовы могут выполнятся полностью асинхронно из разных потоков, в апартаментной модели вызовы выполняются синхронно но из разных потоков. На самом деле там посложней все, но принцип думаю понятен.

Ответ 2



Решение найдено! Такая низкая скорость получается тогда, когда одна сторона bind'ит порт, а другая подключается через connect. Сообщения в сторону узла, не открывшего порт, идут почему-то очень медленно. Когда же я сделал bind на обеих сторонах и перешёл на простые readDatagram/writeDatagram, удалось добиться скорости порядка 70-80К сообщений на одном узле и 30-35К между узлами. То есть примерно 30% от ширины канала (Gigabit Ethernet), что для режима "запрос - ответ" я считаю приемлемым результатом. К сожалению, необходимость открывать порт на обеих сторонах усложняет архитектуру, ну да что-нибудь придумаем.

Комментариев нет:

Отправить комментарий