В django_workers встроить защиту от блокирования очереди одной поломанной задачей
Проблема стала заметна при использовании django_workers на одном из проектов. Очередь там выглядит так:
class AccrualNotificationQueue(AbstractTaskQueue):
def get_pending_tasks_queryset(self) -> models.QuerySet:
return (
ChatNotification.objects.filter(was_processed=False)
)
def handle_task(self, queryset_item: ChatNotification):
...
def process_task_error(self, queryset_item: models.Model, error: TaskError) -> None:
queryset_item.was_processed = False
queryset_item.save()
Что произошло в ходе эксплуатации: отправка уведомлений ломается на задаче номер 4, отправщик заходит на второй круг и снова ломается на той же задаче. При этом он не может продвинуться дальше по очереди, снова и снова терзает одну поломанную задачу и не добирается до следующих задач. Поломка отправки одного сообщения ломает отправку всех сообщений.
Тут была допущена ошибка разработчиком очереди AccrualNotificationQueue. Но сама ошибка не очевидна, её трудно заметить при отладке в локальном окружении, и, по недосмотру, легко пропустить такой код в production. А там всё может взорваться в любой момент.
Хорошо бы подстраховаться на стороне библиотеки django_workers, чтобы невнимательность прикладного программиста не приводила к таким серьёзным последствиям. Хочется безопасного в использовании инструмента.
Идеи что предпринять
Остроту проблемы можно снизить, если на стороне django_workers добавить временную память и механизм автоматического пропуска недавно зафейленных задач. Например, если задача зафейленна, то новая попытка запуска будет зафейлена не раньше чем через час или два. Но тут есть возражения:
- Для некоторых задач час ожидания -- это слишком много, а для других -- мало. Здесь понадобится точка кастомизации
- Такая подстраховка приведёт к плавной деградации и к отсрочке блокировки, но не решит проблему целиком. Блокировка всё равно случится. Со временем поломанных задач накопится так много, что их хватит как раз на час работы. Остальные задачи останутся без внимания.
Можно включить в абстрактный интерфейс AccrualNotificationQueue обязательное требование указывать поле с датой последнего сбоя, чтобы отфильтровывать их ещё при запросе к БД. Но и тут не всё гладко:
- Дата может лежать в другой модели данных, поэтому код
AccrualNotificationQueueвряд ли сможет сам записывать туда дату сбоя. А если записывать факт сбоя будет программист, но риск ошибки по невнимательности останется, хотя вероятность и снизится.
В идеале -- было бы здорово покрывать код очереди AccrualNotificationQueue автотестами, но непонятно как это сделать на стороне библиотеки django-workers. Если оставить эту задачу на стороне прикладного программиста, то это похоже будет на перекладывание проблемы. Инструмент django-workers останется опасным в использовании.
Можно написать туториал к django-workers с подробным описанием проблемы. Это снизит остроту проблемы, но django-workers как инструмент останется опасным в использовании.
Можно изменить интерфейс AccrualNotificationQueue, чтобы заставить программиста исключать из очереди зафейленные задачи. Для этого можно в дополнение к методу get_pending_tasks_queryset добавить обязательный метод exclude_recently_failed_tasks.
Можно вынести код реакции на ошибку и код фильтрации недавних ошибок в отдельную абстракцию "Контроллер ошибок". Тогда вместе с библиотекой можно будет поставлять готовые простые контроллеры, аля TaskFailureDateTimeFieldController.