#cpp #qt #udp
Пытаюсь написать быструю кросплатформенную систему обмена сообщениями с гарантированной доставкой на Qt, через QUdpSocket. Конечная цель - шина, промежуточная - издатель-подписчик, начальная - "точка в точку". UDP - чтобы не тратить время на connect'ы. Структура такая: издатель ведёт список подписчиков, по каждому храня номер последнего отправленного сообщения из кольцевой очереди. "Долбится" с этим сообщением, пока подписчик не подтвердит, что успешно его принял, после чего издатель переходит к следующему. Поскольку предполагается небольшое количество подписчиков, на каждого можно выделить отдельный поток и работать блокирующими функциями. Поэтому издателей сделал перегрузкой QThread, всю работу перенеся в run(). Подписчики ждут сигнала readyRead(), соответственно, нуждаются в EventLoop. Поэтому подписчика унаследовал от QObject и перенёс в новый QThread через moveToThread(). Вот основные куски кода: void GQ::Subscriber::read() { while(socket_.hasPendingDatagrams()) { arr_.resize(socket_.pendingDatagramSize()); socket_.readDatagram(arr_.data(),arr_.size(),&addr_,&senderPort_); QDataStream is(arr_); is>>number_; //qDebug()<<"<<"<itemByID(firstToSend); // Нет такого элемента while (!item.isNull()) { firstToSend=item->ID; arr=GQ::toByteArray(*item.data()); uint timeout=minTimeout; quint64 startTime=QDateTime::currentMSecsSinceEpoch(); socket.write(arr); bool receivedOk=false; do { // Если за таймаут ничего не пришло - посылаем снова while (!socket.waitForReadyRead(timeout)) { qDebug()<<"Timeout, writing again"; socket.write(arr); quint64 elapsed=QDateTime::currentMSecsSinceEpoch()-startTime; qDebug()<<"Elapsed: "< (timeout*toutMultiplier))&&(timeout itemByID(++firstToSend); } updateSubscriber_=false;//*/ quint64 elapsed1=time1.elapsed(); if (elapsed1>10) { qDebug()<<"Total elapsed:"< >number_; // qDebug()<<"<<"< Ответы
Ответ 1
При вашей схеме все обработчики событий QEvent связанные с GQ::Publisher будут выполнятся в контексте потока в котором вы создали GQ::Publisher то есть, видимо, в главном потоке. Следовательно все они будут выполнятся синхронно. Вы скажете что вам они и не нужны, что вы специально от них уходили. А я вам отвечу, а кто вам сказал что события не используются внутри QUdpSocket к примеру? Вообще вы не совсем правильно используете QThread. Он не то чтобы поток сам по себе который исполняется асинхронно с другими, он скорее апартамент в понятиях COM. QThread нужен для того чтобы пробрасывать QEvent'ы так чтобы они обрабатывались синхронно но в контексте этого потока. Поэтому правильнее было бы GQ::Publisher отнаследовать от QObject, работу с сокетами и таймерами построить на сигналах/слотах. Ну и не забыть посадить паблишер в отдельный апартамент. То есть сделать GQ::Publisher так же как вы сделали подписчиков. Напомню что такое партаменты COM. В COM'е есть три потоковые модели: однопоточная, мультипоточная и апартаментная. Каждый объект COM обязан сообщать о модели которую он поддерживает. Потоковая модель это стандартизированный контракт который обязуются выполнять клиент и сервер. В однопоточной модели все вызовы к серверу синхронные потому что должны выполнятся из одного потока, в мультипоточной вызовы могут выполнятся полностью асинхронно из разных потоков, в апартаментной модели вызовы выполняются синхронно но из разных потоков. На самом деле там посложней все, но принцип думаю понятен.Ответ 2
Решение найдено! Такая низкая скорость получается тогда, когда одна сторона bind'ит порт, а другая подключается через connect. Сообщения в сторону узла, не открывшего порт, идут почему-то очень медленно. Когда же я сделал bind на обеих сторонах и перешёл на простые readDatagram/writeDatagram, удалось добиться скорости порядка 70-80К сообщений на одном узле и 30-35К между узлами. То есть примерно 30% от ширины канала (Gigabit Ethernet), что для режима "запрос - ответ" я считаю приемлемым результатом. К сожалению, необходимость открывать порт на обеих сторонах усложняет архитектуру, ну да что-нибудь придумаем.
Комментариев нет:
Отправить комментарий