Страницы

Поиск по вопросам

пятница, 27 декабря 2019 г.

Celery накапливает периодические задания и выполняет пачкой

#python #django #celery


Celery контролируется by superviser. Есть файл для celery и для beat, но запускаются
3 процесса (worker дублируется). В логах всегда фигурирует Worker-1.
Есть выдержки из логов тут.

Проблема заключается в том, что таски, выполняющиеся каждые 2 минуты, запускаются
вовремя только первые 20-40 минут. Дальше все начинает идти не так. По прошествии очередных
2-х минут, задания перестают выполняться вовремя. Остаются только сообщение отправки

Scheduler: Sending due task contestsapp.tasks.apply_votes

без получения 

Received task: contestsapp.tasks.apply_votes[786b5cc6-5b08-40b0-9638-970a5ce6990f] 

Таким образом они накапливаются (неясно где, но celery процессы держат 90MB в памяти).
Через некоторое время воркер хватает все невыполненные задания и мгновенно (они примитивные)
выполняет. 

В итоге задания с периодом 2 мин выполняются каждые 10 минут по 5 раз (показатели
варьируются). Больше 5-ти одинаковых тасков не скапливается.

В остальном система работает. Отказов и исключений нет, брокер сбрасывался, база
синхронизировалась, PeriodicTask.objects.update(last_run_at=None) не помог, TZ везде
(даже ОС) стоит UTC.

PS
Иногда приходит пара десятков отложенных на дни-недели тасков. Такой "нагрузки" хрупкий
баланс двухминуток обычно не выдерживает, и проявляется эта проблема (но и без них
сценарий всегда один). 

Компоненты системы:


Rabbitmq - celery 3
Apache2 - wsgi - Django 1.9.5
Supervisor for celery and beat (but it runs one additional worker )


Файл настроек:

settings.py

CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
CELERY_TIMEZONE = 'UTC'
TIME_ZONE = 'UTC'


Пример задачи:

@periodic_task(ignore_result=True, run_every=crontab(minute='*/2'))
def apply_denial():
    print('apply_denial')
    denialDicts =  {id:cache.get(id) for id in cache.keys("denial:*") if  cache.ttl(id)<=290}
    for k, v in denialDicts.items():
        user = Profile.objects.get(id=k[(k.index('denial:') + 7):])
        user.denial = (list(set(v) - set(user.denial)) + user.denial)[:300]
        user.save()
        cache.delete(k)


Копал уже во все стороны...
    


Ответы

Ответ 1



А вы урл брокера задали??? Типа BROKER_URL = "amqp://user:pass@localhost/queue", где user и pass это логин пароль от раббита, а queue - очередь куда пуляются ваши сообщения P.S если так запускаете воркеры -A project worker --loglevel=INFO -A project worker --loglevel=INFO -A project beat --loglevel=INFO попробуйте заменить на: -A app_celery worker -l info -B -Q "наименование очереди" если никакую очередь явно не используете то -Q не нужно указывать

Комментариев нет:

Отправить комментарий