#cpp #qt #udp
Пытаюсь написать быструю кросплатформенную систему обмена сообщениями с гарантированной
доставкой на Qt, через QUdpSocket. Конечная цель - шина, промежуточная - издатель-подписчик,
начальная - "точка в точку". UDP - чтобы не тратить время на connect'ы.
Структура такая: издатель ведёт список подписчиков, по каждому храня номер последнего
отправленного сообщения из кольцевой очереди. "Долбится" с этим сообщением, пока подписчик
не подтвердит, что успешно его принял, после чего издатель переходит к следующему.
Поскольку предполагается небольшое количество подписчиков, на каждого можно выделить
отдельный поток и работать блокирующими функциями. Поэтому издателей сделал перегрузкой
QThread, всю работу перенеся в run(). Подписчики ждут сигнала readyRead(), соответственно,
нуждаются в EventLoop. Поэтому подписчика унаследовал от QObject и перенёс в новый
QThread через moveToThread(). Вот основные куски кода:
void GQ::Subscriber::read()
{
while(socket_.hasPendingDatagrams())
{
arr_.resize(socket_.pendingDatagramSize());
socket_.readDatagram(arr_.data(),arr_.size(),&addr_,&senderPort_);
QDataStream is(arr_);
is>>number_;
//qDebug()<<"<<"<itemByID(firstToSend);
// Нет такого элемента
while (!item.isNull())
{
firstToSend=item->ID;
arr=GQ::toByteArray(*item.data());
uint timeout=minTimeout;
quint64 startTime=QDateTime::currentMSecsSinceEpoch();
socket.write(arr);
bool receivedOk=false;
do
{
// Если за таймаут ничего не пришло - посылаем снова
while (!socket.waitForReadyRead(timeout))
{
qDebug()<<"Timeout, writing again";
socket.write(arr);
quint64 elapsed=QDateTime::currentMSecsSinceEpoch()-startTime;
qDebug()<<"Elapsed: "<(timeout*toutMultiplier))&&(timeoutitemByID(++firstToSend);
}
updateSubscriber_=false;//*/
quint64 elapsed1=time1.elapsed();
if (elapsed1>10)
{
qDebug()<<"Total elapsed:"<>number_;
// qDebug()<<"<<"<
Ответы
Ответ 1
При вашей схеме все обработчики событий QEvent связанные с GQ::Publisher будут выполнятся
в контексте потока в котором вы создали GQ::Publisher то есть, видимо, в главном потоке.
Следовательно все они будут выполнятся синхронно. Вы скажете что вам они и не нужны,
что вы специально от них уходили. А я вам отвечу, а кто вам сказал что события не используются
внутри QUdpSocket к примеру?
Вообще вы не совсем правильно используете QThread. Он не то чтобы поток сам по себе
который исполняется асинхронно с другими, он скорее апартамент в понятиях COM. QThread
нужен для того чтобы пробрасывать QEvent'ы так чтобы они обрабатывались синхронно но
в контексте этого потока.
Поэтому правильнее было бы GQ::Publisher отнаследовать от QObject, работу с сокетами
и таймерами построить на сигналах/слотах. Ну и не забыть посадить паблишер в отдельный
апартамент. То есть сделать GQ::Publisher так же как вы сделали подписчиков.
Напомню что такое партаменты COM. В COM'е есть три потоковые модели: однопоточная,
мультипоточная и апартаментная. Каждый объект COM обязан сообщать о модели которую
он поддерживает. Потоковая модель это стандартизированный контракт который обязуются
выполнять клиент и сервер. В однопоточной модели все вызовы к серверу синхронные потому
что должны выполнятся из одного потока, в мультипоточной вызовы могут выполнятся полностью
асинхронно из разных потоков, в апартаментной модели вызовы выполняются синхронно но
из разных потоков. На самом деле там посложней все, но принцип думаю понятен.
Ответ 2
Решение найдено! Такая низкая скорость получается тогда, когда одна сторона bind'ит
порт, а другая подключается через connect. Сообщения в сторону узла, не открывшего
порт, идут почему-то очень медленно. Когда же я сделал bind на обеих сторонах и перешёл
на простые readDatagram/writeDatagram, удалось добиться скорости порядка 70-80К сообщений
на одном узле и 30-35К между узлами. То есть примерно 30% от ширины канала (Gigabit
Ethernet), что для режима "запрос - ответ" я считаю приемлемым результатом. К сожалению,
необходимость открывать порт на обеих сторонах усложняет архитектуру, ну да что-нибудь
придумаем.
Комментариев нет:
Отправить комментарий