Потерели потерянные или неудачные задачи (сельдерея, Django и Rabbitmq)

StackOverflow https://stackoverflow.com/questions/5336645

  •  26-10-2019
  •  | 
  •  

Вопрос

Есть ли способ определить, потеряна ли какая -либо задача, и повторно ее?


Я думаю, что причиной потерях может быть ошибка диспетчера или сбой потока рабочей нити.

Я планировал их повторно, но я не уверен, как определить, какие задачи должны быть в отставке?

И как сделать этот процесс автоматически? Могу ли я использовать свой собственный планировщик, который создаст новые задачи?

РЕДАКТИРОВАТЬ: Я обнаружил из документации, что RabbitMQ никогда не теряет задачи, но что происходит, когда в середине выполнения задачи сбивается с ударом в середине выполнения задачи?

Это было полезно?

Решение

Вам нужно установить

Celery_acks_late = true

Поздний ACK означает, что сообщения задачи будут подтверждены после выполнения задачи, а не только раньше, что является поведением по умолчанию. Таким образом, если у работника сбои кролика MQ все еще будет иметь сообщение.

Очевидно, что общая авария (работники Rabbit +) В то же время нет способа восстановить задачу, за исключением случаев, когда вы реализуете ведение журнала при запуске задачи и завершении задачи. Лично я пишу в MongoDB линию каждый раз, когда начинается задача, и другая, когда задача завершается (независимо от результата), таким образом я могу знать, какая задача была прервана путем анализа журналов монго.

Вы можете сделать это легко, переопределив методы __call__ а также after_return базового класса базовых целей.

Следуя, вы видите часть моего кода, который использует класс TaskLogger в качестве диспетчера контекста (с точкой входа и выхода). Класс TaskLogger просто записывает строку, содержащую информацию о задаче в экземпляре MongoDB.

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

Я надеюсь, что это может помочь

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top