Потерели потерянные или неудачные задачи (сельдерея, Django и Rabbitmq)
-
26-10-2019 - |
Вопрос
Есть ли способ определить, потеряна ли какая -либо задача, и повторно ее?
Я думаю, что причиной потерях может быть ошибка диспетчера или сбой потока рабочей нити.
Я планировал их повторно, но я не уверен, как определить, какие задачи должны быть в отставке?
И как сделать этот процесс автоматически? Могу ли я использовать свой собственный планировщик, который создаст новые задачи?
РЕДАКТИРОВАТЬ: Я обнаружил из документации, что RabbitMQ никогда не теряет задачи, но что происходит, когда в середине выполнения задачи сбивается с ударом в середине выполнения задачи?
Решение
Вам нужно установить
Celery_acks_late = true
Поздний ACK означает, что сообщения задачи будут подтверждены после выполнения задачи, а не только раньше, что является поведением по умолчанию. Таким образом, если у работника сбои кролика MQ все еще будет иметь сообщение.
Очевидно, что общая авария (работники Rabbit +) В то же время нет способа восстановить задачу, за исключением случаев, когда вы реализуете ведение журнала при запуске задачи и завершении задачи. Лично я пишу в MongoDB линию каждый раз, когда начинается задача, и другая, когда задача завершается (независимо от результата), таким образом я могу знать, какая задача была прервана путем анализа журналов монго.
Вы можете сделать это легко, переопределив методы __call__
а также after_return
базового класса базовых целей.
Следуя, вы видите часть моего кода, который использует класс TaskLogger в качестве диспетчера контекста (с точкой входа и выхода). Класс TaskLogger просто записывает строку, содержащую информацию о задаче в экземпляре MongoDB.
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
#Inizialize context managers
self.taskLogger = TaskLogger(args, kwargs)
self.taskLogger.__enter__()
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point for context managers
self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)
Я надеюсь, что это может помочь