#python #rabbitmq
Как в RabbitMQ на python сделать отправку сообщений в очередь так, чтобы оно пришло в очередь только через определенное время, например, 4 часа ?
Ответы
Ответ 1
RabbitMQ не поддерживает такую "родную" функцию. Предложение приведенное выше проблематично так как требует базы данных и постоянную обработку сообщений. Такое решение намного более "рискованно". Не говоря уже о том что просто дополнительное обрабатывающее средство + база данных ну просто ни к чему. Но функцию возможно эмулировать. При этом сообщение будет отослано и будет находиться на сервере до тех пор пока не наступит время сообщение доставить. Следующая версия NServiceBus.RabbitMQ транспорта будет поддерживать Native Delays при помощи Topic Exchanges. Имплементация не зависит от языка посылающей аппликации. Идея заключается в создании header exchanges где каждый уровень представляет бит из задержки. Если бит 0, exchange направляет сообщение на следующий уровень. Если бит 1, то в queue с TTL соответствующий задержке на этом уровне. Каждый queue с DLQ binding к следующему exchange. Схематически, это будет выглядеть так: Заметка: можно также имплементировать с Header Exchanges С 32-ух битной задержкой (32 уровня exchanges) задержка получится от 1 до 2^31 секунд.Ответ 2
Сообщение попадёт в очередь как только вы его туда отправите.. Что то вы намудрить хотите. Тогда уже добавляйте дату в сообщение и при чтении из очереди если 4 часа не прошло - отправляйте клон задачи в конец очереди, текущую завершайте. Или, пишите в какую-нибудь промежуточную таблицу в базе данных. Читайте её постоянно (например, по крону). Когда выгребутся сообщения с нужной датой - - создавайте задачу в очередь а строку из базы удаляйте:)Ответ 3
Есть отличный плагин, который позволяет это реализовать - Scheduling Messages with RabbitMQ. Пример использования (Java) Определим точку обмена: // ... Mapargs = new HashMap (); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); // ... Добавим пару сообщений: byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map headers = new HashMap (); headers.put("x-delay", 5000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes); byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8"); Map headers2 = new HashMap (); headers2.put("x-delay", 1000); AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2); channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2); Задержка указывается в заголовке сообщения (x-delay) в секундах, соответственно. Больше примеров и информации можно найти на странице самого проекта - Scheduling Messages with RabbitMQ.
Комментариев нет:
Отправить комментарий