22 августа 2022

🐍🚀⌛ Django, Celery и Redis: гайд по работе с асинхронными задачами

iOS-developer, ИТ-переводчица, пишу статьи и гайды.
Подробная инструкция по интеграции Celery и Redis в проект Django для асинхронной обработки длительных и ресурсоемких задач в фоновом режиме.
🐍🚀⌛ Django, Celery и Redis: гайд по работе с асинхронными задачами
Данная статья является переводом. Ссылка на оригинал.

Вы создали превосходное приложение с помощью Django и хотите представить его широкой публике, но вас беспокоит наличие затратных по времени задач, которые являются частью рабочего процесса приложения. Очевидно, что вы не хотите, чтобы у ваших пользователей был негативный опыт использования вашего приложения. Для решения данных проблем вы можете интегрировать Celery.

Celery — это распределенная очередь задач для систем UNIX. Она позволяет вам выгрузить работу из вашего приложения на Python. Как только вы интегрируете Celery в свое приложение, вы можете отправлять трудоемкие задачи в очередь задач Celery. Таким образом, ваше веб-приложение может продолжать быстро реагировать на запросы пользователей, в то время как Celery асинхронно выполняет операции, требующие больших затрат в фоновом режиме.

В этом туториале вы узнаете, как:

  1. Распознать эффективные варианты использования Celery.
  2. Различать Celery beat и Celery worker.
  3. Интегрировать Celery и Redis в проект Django.
  4. Настроить асинхронные задачи, которые выполняются независимо от вашего приложения Django.
  5. Провести рефакторинг кода Django для запуска задачи с помощью Celery.

Исходный код: кликните здесь, чтобы загрузить исходный код, который вы будете использовать для интеграции Celery в свое приложение Django.

Основы Python Celery

Celery — это распределенная очередь задач, которая может собирать, записывать, планировать и выполнять задачи вне вашей основной программы.

Примечание
Celery прекратил поддержку Windows в версии 4, поэтому, хотя вы все еще можете заставить его работать в Windows, вместо этого лучше использовать другую очередь задач, например, huey или Dramatiq.

В данном туториале вы сосредоточитесь на использовании Celery в системах UNIX, но если вы пытаетесь настроить распределенную очередь задач в Windows, возможно, это руководство вам не подходит.

Чтобы получать задачи от вашей программы и отправлять результаты в серверную часть, Celery требуется брокер сообщений для связи. Redis и RabbitMQ — это два брокера сообщений, которые разработчики часто используют вместе с Celery.

В этом туториале вы будете использовать Redis в качестве брокера сообщений. Однако вы можете использовать RabbitMQ в качестве брокера сообщений.

Если вы хотите отслеживать результаты выполнения вашей задачи, вам также необходимо настроить серверную базу данных результатов.

Примечание
Подключать Celery к серверу результатов необязательно. Как только вы поручите Celery запустить задачу, она будет выполнять свои обязанности независимо от того, отслеживаете вы результат задачи или нет. Однако часто бывает полезно вести учет всех результатов задачи, если вы распределяете задачи по нескольким очередям. Чтобы сохранить информацию о результатах задачи, вам нужна серверная часть базы данных.

Вы можете использовать множество различных баз данных для отслеживания результатов задачи Celery. В этом руководстве вы будете работать с Redis как с брокером сообщений, так и с серверной частью результатов. Используя Redis, вы ограничиваете зависимости, которые необходимо установить, поскольку он может выполнять обе роли.

Вы не будете выполнять какую-либо работу с записанными результатами задачи в рамках этого туториала. Однако в качестве следующего шага вы можете проверить результаты с помощью интерфейса командной строки (CLI) Redis или вывести информацию на специальную страницу в вашем проекте Django.

🐍🎓 Библиотека собеса по Python
Подтянуть свои знания по Python вы можете на нашем телеграм-канале «Библиотека собеса по Python»
🐍🧩 Библиотека задач по Python
Интересные задачи по Python для практики можно найти на нашем телеграм-канале «Библиотека задач по Python»

Зачем использовать Celery?

Есть две основные причины, по которым большинство разработчиков хотят начать использовать Celery:

  1. Перенос работы из вашего приложения в распределенные процессы, которые могут работать независимо от вашего приложения.
  2. Планирование выполнения задач в определенное время, иногда как повторяющиеся события.
Celery — отличный выбор для обоих этих вариантов использования. Он определяет себя как «очередь задач, ориентированных на обработку в реальном времени, а также поддерживающих планирование задач». (Источник)

Несмотря на то, что обе эти функции являются частью Celery, к ним часто обращаются отдельно:

  1. Рабочие процессы Celery — это рабочие процессы, которые выполняют задачи независимо друг от друга и вне контекста вашего основного сервиса.
  2. Celery beat — это планировщик, который определяет, когда запускать задачи. Вы также можете использовать его для планирования периодических задач.

Рабочие процессы составляют основу Celery. Даже если вы хотите запланировать повторяющиеся задачи с помощью Celery beat, процесс Celery примет ваши инструкции и выполнит их в запланированное время. Celery beat добавляет к этому миксу планировщик для рабочих процессов Celery.

В этом туториале вы также узнаете, как интегрировать Celery с Django для асинхронного выполнения операций из основного потока выполнения вашего приложения с помощью рабочих процессов Celery.

Как вы можете использовать Celery для своего приложения Django?

Celery полезен для работы веб-приложений и пользуется заслуженной популярностью, так как вы можете эффективно справляться с некоторыми повседневными ситуациями в веб-разработке, например:

  1. Отправление по электронной почте: Вы можете отправить подтверждение по электронной почте, электронное письмо для сброса пароля или подтверждение отправки формы. Отправка электронных писем может занять некоторое время и замедлить работу вашего приложения, если пользователей много.
  2. Обработка изображений. Возможно, вы захотите изменить размер изображений аватаров, загружаемых пользователями, или применить какую-нибудь кодировку ко всем изображениям, которыми пользователи могут делиться на вашей платформе. Обработка изображений часто является ресурсоемкой задачей, которая может замедлить работу вашего веб-приложения, если вы обслуживаете большое сообщество пользователей.
  3. Обработка текста. Если вы разрешаете пользователям добавлять данные в ваше приложение, вы можете отслеживать то, что они вводят. Например, вы можете проверить наличие ненормативной лексики в комментариях или перевести пользовательский текст на другой язык. Обработка всей этой работы в контексте вашего веб-приложения может значительно снизить производительность.
  4. Вызовы API и другие веб-запросы. Если вам нужно сделать веб-запросы для предоставления услуги, предлагаемой вашим приложением, вы можете быстро столкнуться с непредвиденным временем ожидания. Это верно как для запросов API с ограниченной скоростью, так и для других задач, таких как парсинг веб-страниц. Часто лучше передать эти запросы другому процессу.
  5. Анализ данных. Обработка данных, как известно, требует больших ресурсов. Если ваше веб-приложение анализирует данные для ваших пользователей, вы быстро увидите, что ваше приложение перестает отвечать на запросы, если вы выполняете всю работу прямо в Django.
  6. Запуск модели машинного обучения. Как и в случае с анализом данных, ожидание результатов операций машинного обучения может занять некоторое время. Вместо того чтобы позволить вашим пользователям ждать завершения вычислений, вы можете переложить эту работу на Celery, чтобы они могли продолжать пользоваться вашим веб-приложением, пока результаты не будут получены.
  7. Генерация отчетов. Если вы используете приложение, которое позволяет пользователям создавать отчеты на основе предоставленных ими данных, вы заметите, что создание PDF-файлов не происходит мгновенно. Будет лучше, если вы позволите Celery обрабатывать это в фоновом режиме вместо того, чтобы замораживать работу своего веб-приложения, пока отчет не будет готов для загрузки.

Основная настройка для всех этих различных вариантов использования будет одинаковой. Как только вы поймете, как передать процессы, требующие больших вычислительных или временных ресурсов в распределенную очередь задач, вы освободите Django для обработки цикла HTTP-запрос-ответ.

В этом руководстве вы разберетесь со сценарием отправки электронной почты. Вы начнете с проекта, в котором Django обрабатывает отправку электронной почты синхронно. Вы увидите, как это «заморозит» ваше приложение Django. Затем вы узнаете, как перенести задачу в Celery, чтобы вы могли проверить, насколько это ускорить работу вашего веб-приложения.

Интеграция Celery с Django

Теперь, когда вы знаете, что такое Celery и как он может помочь вам повысить производительность вашего веб-приложения, пришло время интегрировать его, чтобы вы могли выполнять асинхронные задачи с помощью Celery.

Сосредоточьтесь на интеграции Celery в существующий проект Django. Начните с упрощенной версии приложения Django с минимальным вариантом использования: сбор отзывов пользователей и отправка электронного письма в качестве ответа.

Настройте приложение для обратной связи

Начните с загрузки исходного кода предоставленного приложения обратной связи:

Разархивируйте загруженный файл и используйте терминал, чтобы перейти в source_code_initial/directory, где вы должны увидеть стандартную структуру папок проекта Django :

Структура папок

        source_code_initial/
│
├── django_celery/
│   ├── __init__.py
│   ├── asgi.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
│
├── feedback/
│   │
│   ├── migrations/
│   │   └── __init__.py
│   │
│   ├── templates/
│   │   │
│   │   └── feedback/
│   │       ├── base.html
│   │       ├── feedback.html
│   │       └── success.html
│   │
│   │
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── forms.py
│   ├── models.py
│   ├── tests.py
│   ├── urls.py
│   └── views.py
│
├── .gitignore
├── manage.py
└── requirements.txt
    

Убедитесь что вы находитесь внутри source_code_initial/, затем создайте и активируйте виртуальную среду:

        $ python -m venv venv
$ source venv/bin/activate
(venv) $
    

Как только ваша виртуальная среда станет активной, вы можете установить Django:

        (venv) $ python -m pip install django
    

Завершите локальную настройку приложения Django, запустив миграцию и сервер разработки:

        (venv) $ python manage.py migrate
(venv) $ python manage.py runserver
    

Теперь вы можете открыть браузер и перейти на домашнюю страницу приложения по адресу https://localhost:8000, где вас встретит удобная форма обратной связи:

🐍🚀⌛ Django, Celery и Redis: гайд по работе с асинхронными задачами

Однако эта форма обратной связи в настоящее время только выглядит красиво. Заполните форму и отправьте отзыв. Представьте, что один из пользователей вашего веб-приложения столкнулся с такой ситуацией:

После нажатия кнопки Submit приложение зависает. Вы можете видеть, как вращается маленький символ счетчика на вкладке браузера, но страница не отвечает, и вы по-прежнему можете видеть всю информацию, которую вы ввели в форму.

Django требуется слишком много времени, чтобы обработать форму и перенаправить вас на нужную страницу! Django зависает, потому что ему необходимо синхронно обработать запрос на отправку электронной почты, прежде чем приступить к следующей задаче, которая должна перенаправить пользователя на нужную страницу.

Причина, по которой он так долго зависает, заключается в скрытом вызове time.sleep() в .send_email(), который симулирует трудоемкую задачу, которая может быть связана с отправкой электронной почты.

Конечно, в реальном приложении вы не добавите еще больше временной задержки в свой код, заставив Django спать. Однако, какую бы службу электронной почты вы ни использовали, к сожалению, у вас возникнут некоторые задержки. Когда ваше приложение начнет обслуживать множество пользователей, вы быстро столкнетесь с ограничениями.

Ваше приложение Django не должно синхронно обрабатывать задачи с длительным временем выполнения, потому что это ухудшает взаимодействие с пользователем и общую полезность вашего приложения.

Вместо этого, вы узнаете, как передать эту задачу процессу Celery. Рабочие процессы Celery могут заниматься вычислениями в качестве фоновой задачи и позволять вашим пользователям продолжать пользоваться вашим эффектным веб-приложением.

Установка Celery в качестве очереди задач

Теперь, когда вы настроили приложение обратной связи и заметили задержку, связанную с отправкой электронной почты, вы наверняка решили улучшить взаимодействие с пользователем.

Первым шагом в интеграции Celery в ваше приложение Django является установка пакета Celery в вашу виртуальную среду:

        (venv) $ python -m pip install celery
    

Однако просто установить Celery недостаточно. Если вы попытаетесь запустить очередь задач, вы заметите, что сначала Celery запускается нормально, но затем отображает сообщение об ошибке, указывающее, что Celery не может найти брокера сообщений:

        (venv) $ python -m celery worker
[ERROR/MainProcess] consumer: Cannot connect to
amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 2.00 seconds... (1/100)
    

Celery нуждается в брокере сообщений для связи с программами, которые отправляют задачи в очередь задач. Без брокера Celery не может получать инструкции, поэтому постоянно пытается восстановить соединение.

Примечание
Вы можете заметить URL-подобный синтаксис в цели, к которой Celery пытается подключиться. Имя протокола amqp расшифровывается как Advanced Message Queuing Protocol и является протоколом обмена сообщениями, который использует Celery. Самый известный проект, реализующий AMQP изначально, — это RabbitMQ, но Redis также может взаимодействовать с использованием протокола.

Прежде чем использовать Celery, вам необходимо установить брокер сообщений и определить проект в качестве производителя сообщений. В вашем случае производителем является ваше приложение Django, а брокером сообщений будет Redis.

Установите Redis в качестве брокера Celery и серверной части базы данных

Вам нужен брокер сообщений, чтобы Celery мог общаться с вашим производителем задач. Вы будете использовать Redis, потому что Redis может одновременно служить и брокером сообщений, и серверной частью базы данных.

Вернитесь к своему терминалу и установите Redis в своей системе:

Linux

        (venv) $ sudo apt update
(venv) $ sudo apt install redis
    

macOS

        (venv) $ brew install redis
    

После завершения установки вы можете запустить сервер Redis, чтобы убедиться, что все работает. Откройте новое окно терминала, чтобы запустить сервер:

        $ redis-server
    

Это окно будет вашим выделенным окном терминала для Redis. Держите его открытым для остальных задач этого урока.

Примечание
Redis-server запускает сервер Redis. Вы запускаете Redis как процесс, независимый от Python, поэтому вам не нужно активировать виртуальную среду при ее запуске.

После запуска redis-server ваш терминал отобразит логотип Redis в виде изображения ASCII вместе с несколькими сообщениями журнала запуска. Самое последнее сообщение журнала сообщит вам, что Redis готов принимать подключения.

Чтобы проверить, работает ли связь с сервером Redis, запустите интерфейс командной строки Redis в новом окне терминала:

        $ redis-cli
    

Как только приглашение изменилось, вы можете ввести ping и нажать Enter, а затем дождаться ответа от Redis:

        127.0.0.1:6379> ping
PONG
127.0.0.1:6379>

    

После запуска интерфейса командной строки Redis с помощью redis-cli вы отправляете слово ping на сервер Redis, который отвечает авторитетным PONG. Если вы получили этот ответ, значит, ваша установка Redis прошла успешно, и Celery сможет взаимодействовать с Redis.

Выйдите из интерфейса командной строки Redis, нажав Ctrl+C, прежде чем перейти к следующему шагу.

Далее вам понадобится клиент Python для взаимодействия с Redis. Убедитесь, что вы находитесь в окне терминала, где ваша виртуальная среда все еще активна, а затем установите redis-py:

        (venv) $ python -m pip install redis
    

Эта команда не устанавливает Redis в вашей системе, а только предоставляет интерфейс Python для подключения к Redis.

Примечание
Вам потребуется установить Redis в вашей системе и выполнить redis-py в виртуальной среде Python, чтобы вы могли работать с Redis из своих программ Python.

После завершения обеих установок вы успешно настроили брокер сообщений. Однако вы еще не подключили своего производителя к Celery.

Если вы попытаетесь запустить Celery сейчас и включить имя приложения-производителя, передав А-параметр вместе с именем вашего приложения Django (django_celery), вы столкнетесь с еще одной ошибкой:

        (venv) $ python -m celery -A django_celery worker
...
Error: Invalid value for '-A' / '--app':
Unable to load celery application.
Module 'django_celery' has no attribute 'celery'

    

Пока что ваша распределенная очередь задач не может получать сообщения от вашего приложения Django, потому что в вашем проекте Django не настроено приложение Celery.

В следующем разделе вы добавите необходимый код в свое приложение Django, чтобы оно могло служить производителем задач для Celery.

Добавляем Celery в проект Django

Последняя часть головоломки — подключение приложения Django в качестве производителя сообщений к вашей очереди задач. Вы начнете с предоставленного кода проекта, поэтому загрузите его, если вы еще этого не сделали.

Получив код проекта на своем компьютере, перейдите в папку django_celery приложения управления и создайте новый файл с именем celery.py:

        django_celery/
├── __init__.py
├── asgi.py
├── celery.py
├── settings.py
├── urls.py
└── wsgi.py
    

Celery рекомендует использовать этот модуль для определения экземпляра приложения Celery. Откройте файл в вашем любимом текстовом редакторе или IDE и добавьте необходимый код:

        # django_celery/celery.py

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_celery.settings")
app = Celery("django_celery")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
    

Вам нужно только добавить эти несколько строк кода в файл. Сейчас расскажем, что делает каждый из них:

  • Строка 3: вы импортируете встроенный os модуль, который вы можете знать благодаря работе с файлами. Вы будете использовать его в строке 6 для установки переменной среды.
  • Строка 4: вы импортируете Celery из пакета celery. Вы будете использовать его в строке 7 для создания экземпляра приложения Celery.
  • Строка 6: вы используете .setdefault() , os.environ, чтобы убедиться, что модуль settings.py вашего проекта Django доступен через ключ "DJANGO_SETTINGS_MODULE".
  • Строка 7: вы создаете экземпляр приложения Celery и указываете имя основного модуля в качестве аргумента. В контексте вашего приложения Django основным модулем является приложение Django, содержащее celery.py, поэтому вы передаете "django_celery".
  • Строка 8: вы определяете файл настроек Django в качестве файла конфигурации для Celery и предоставляете пространство имен "CELERY". Вам нужно будет предварительно указать значение пространства имен, за которым следует символ подчеркивания (_), для каждой переменной конфигурации, связанной с Celery. Вы можете определить другой файл настроек, но сохранение конфигурации Celery в файле настроек Django позволяет вам придерживаться единого центрального места для конфигураций.
  • Строка 9: вы говорите своему экземпляру приложения Celery автоматически находить все задачи в каждом приложении вашего проекта Django. Это работает до тех пор, пока вы придерживаетесь структуры многократно используемых приложений и определяете все задачи Celery для приложения в специальном модуле tasks.py. Вы создадите и заполните этот файл для своего django_celery приложения позже , когда будете рефакторить код отправки электронной почты.

Настроив celery.py и попытавшись получить необходимые настройки Celery из вашего файла settings.py, переходите к settings.py для добавления этих записей настроек в конец файла:

        # django_celery/settings.py

# ...

# Celery settings
CELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"
    

Эти две записи дают вашему экземпляру приложения Celery достаточно информации, чтобы знать, куда отправлять сообщения и куда записывать результаты. Поскольку вы используете Redis как в качестве брокера сообщений, так и в качестве серверной части базы данных, оба URL-адреса указывают на один и тот же адрес.

Примечание
Эти URL-адреса также могут указывать на разные серверы и службы. Например, вы можете использовать RabbitMQ в качестве брокера сообщений и Redis в качестве серверной части результатов:
        CELERY_BROKER_URL = "amqp://myuser:mypassword@localhost:5672/myvhost"
CELERY_RESULT_BACKEND = "redis://localhost:6379"
    

Когда вы запустите свое приложение в рабочей среде, вы замените эти URL-адреса рабочими местоположениями каждой службы.

Обратите внимание на CELERY_namespace в начале переменных настройки. Вам нужно добавить его, потому что аргумент namespace="CELERY" вы передали app.config_from_object() в строке 8 файла celery.py.

На данный момент вы почти закончили интеграцию Celery в свое веб-приложение. Последнее дополнение входит в __init__.py приложение управления:

        django_celery/
├── __init__.py
├── asgi.py
├── celery.py
├── settings.py
├── urls.py
└── wsgi.py
    

Откройте файл в текстовом редакторе. В проекте Django по умолчанию в каждой папке приложения есть __init__.py файл, который помогает пометить его как модуль. По умолчанию файл пуст, но вы можете добавить код, влияющий на поведение импорта.

Чтобы убедиться, что ваше приложение Celery загружается при запуске Django, вы должны добавить его в __all__:

        # django_celery/__init__.py

from .celery import app as celery_app

__all__ = ("celery_app",)
    

Загрузка приложения Celery при запуске Django гарантирует, что декоратор @shared_task будет использовать его правильно. Вы узнаете больше об @shared_task в следующем разделе.

Время проверить вашу установку! Помните, что процесс, который вы настраиваете, требует одновременного запуска как минимум трех служб:

  1. Производитель: ваше приложение Django
  2. Брокер сообщений: сервер Redis
  3. Потребитель: ваше приложение Celery

Используя Redis, вы получите серверную часть базы данных в качестве бонуса без запуска другого сервиса.

Откройте три отдельных окна терминала и запустите все программы, если они еще не запущены.

Отправьте свое веб-приложение на сервер разработки Django в первом окне:

        (venv) $ python manage.py runserver

    

Затем запустите сервер Redis во втором окне терминала, если вы остановили его ранее:

        $ redis-server
    

Команда redis-server является единственной из трех команд, которую вы можете запускать за пределами вашей виртуальной среды, поэтому убедитесь, что ваша виртуальная среда активна в двух других окнах терминала.

Примечание
Вы можете получить сообщение об ошибке, если сервер Redis все еще работает в фоновом режиме. Если это так, вам нужно вызвать SHUTDOWN перед выполнением redis-server.

Наконец, теперь вы можете правильно запускать Celery, не сталкиваясь с сообщением об ошибке:

        (venv) $ python -m celery -A django_celery worker
    

При запуске Celery с помощью этой команды вы указываете имя модуля, содержащего экземпляр вашего приложения Celery "django_celery" в -A.

Примечание
Хотя сообщение об ошибке, которое вы видели при запуске Celery до установки Redis, исчезло, вы все равно можете увидеть предупреждение, связанное с DEBUG настройкой Django. Вы можете проигнорировать это предупреждение для этого примера, но вы всегда должны устанавливать DEBUGегоFalse перед развертыванием сайта в рабочей среде.

Таким образом, вам нужно всего лишь добавить код в три упомянутых файла, чтобы интегрировать Celery в ваше приложение Django и подготовить его для обработки асинхронных задач. Когда эта базовая настройка завершена, вы готовы написать задачу, которую вы можете передать Celery.

В следующем разделе вы проведете рефакторинг .send_email() для вызова асинхронной задачи Celery вместо синхронной обработки отправки электронной почты в Django.

Асинхронная обработка нагрузок с помощью Celery

Вы успешно собрали кусочки головоломки, необходимые для запуска асинхронных задач с помощью Django, Redis и Celery. Но на данный момент вы еще не определили какие-либо задачи для передачи в Celery.

Ваш последний шаг для интеграции Celery с Django и переноса работы в распределенную очередь задач Celery — реорганизация функции отправки электронной почты в задачу Celery.

Вернемся к синхронному коду

На данный момент ваш код определяет функциональность отправки электронной почты в .send_email() из FeedbackFormin в forms.py:

        # feedback/forms.py

from time import sleep
from django.core.mail import send_mail
from django import forms

class FeedbackForm(forms.Form):
    email = forms.EmailField(label="Email Address")
    message = forms.CharField(
        label="Message", widget=forms.Textarea(attrs={"rows": 5})
    )

    def send_email(self):
        """Sends an email when the feedback form has been submitted."""
        sleep(20)  # Simulate expensive operation(s) that freeze Django
        send_mail(
            "Your Feedback",
            f"\t{self.cleaned_data['message']}\n\nThank you!",
            "support@example.com",
            [self.cleaned_data["email_address"]],
            fail_silently=False,
        )
    

Вы определяете .send_email() в строке 13. Метод имитирует затратную операцию, которая замораживает ваше приложение на двадцать секунд с помощью вызова sleep() в строке 15. В строках с 16 по 22 вы составляете электронное письмо, которое вы отправляете с помощью удобного send_mail(), которое, в свою очередь, импортируется в строке 4.

Вам также нужно вызвать .send_email() для успешной отправки формы, вы уже это настроили благодаря .form_valid() в views.py:

        # feedback/views.py

from feedback.forms import FeedbackForm
from django.views.generic.edit import FormView
from django.views.generic.base import TemplateView

class FeedbackFormView(FormView):
    template_name = "feedback/feedback.html"
    form_class = FeedbackForm
    success_url = "/success/"

    def form_valid(self, form):
        form.send_email()
        return super().form_valid(form)

class SuccessView(TemplateView):
    template_name = "feedback/success.html"
    

В строке 12 происходит определение .form_valid(), которую FeedbackFormView автоматически вызывает при успешной отправке формы. В строке 13 вы, наконец, вызываете .send_email().

Ваша настройка работает, но из-за симуляции затратной операции требуется слишком много времени, прежде чем ваше приложение снова станет отвечать и позволит пользователям продолжить использование. Пришло время изменить это, позволив Celery отправлять электронную почту по собственному расписанию!

Рефакторинг кода как задача Celery

Чтобы app.autodiscover_tasks() работал как было задумано, вам нужно определить свои задачи Celery в отдельном tasks.py модуле внутри каждого приложения вашего проекта Django.

Примечание. В этом примере у вас есть только одно приложение. Более крупные проекты Django, скорее всего, будут иметь больше приложений. Если вы придерживаетесь стандартной настройки, вы создадите файл tasks.py для каждого приложения и сохраните задачи приложения Celery в этом файле.

Создайте новый файл с именем tasks.py в вашем feedback/app:

        feedback/
│
├── migrations/
│   └── __init__.py
│
├── templates/
│   │
│   └── feedback/
│       ├── base.html
│       ├── feedback.html
│       └── success.html
│
├── __init__.py
├── admin.py
├── apps.py
├── forms.py
├── models.py
├── tasks.py
├── tests.py
├── urls.py
└── views.py
    

В этом файле вы задаете новую функцию, которая будет обрабатывать логику отправки электронной почты. Получив код из .send_mail() в forms.py, используйте его в качестве основы для создания send_feedback_email_task() в tasks.py:

        # feedback/tasks.py

from time import sleep
from django.core.mail import send_mail

def send_feedback_email_task(email_address, message):
    """Sends an email when the feedback form has been submitted."""
    sleep(20)  # Simulate expensive operation(s) that freeze Django
    send_mail(
        "Your Feedback",
        f"\t{message}\n\nThank you!",
        "support@example.com",
        [email_address],
        fail_silently=False,
    )
    

Не забудьте добавить необходимые импорты, как показано в строках 3 и 4.

До сих пор вы в основном копировали код из .send_mail() в send_feedback_email_task(). Вы также немного отредактировали определение функции, добавив два параметра в строку 6. Вы используете эти параметры в строках 11 и 13 для замены значений, которые вы ранее извлекли из .cleaned_data в .send_mail(). Это изменение необходимо, потому что у вас нет доступа к этому атрибуту экземпляра в вашей новой функции.

Кроме того, send_feedback_email_task() выглядит так же, как .send_email(). Celery еще даже не вмешивался!

Чтобы преобразовать эту функцию в задачу Celery, все, что вам нужно сделать, это добавить @shared_task, который вы импортируете из celery:

        # feedback/tasks.py

from time import sleep
from django.core.mail import send_mail
from celery import shared_task

@shared_task()
def send_feedback_email_task(email_address, message):
    """Sends an email when the feedback form has been submitted."""
    sleep(20)  # Simulate expensive operation(s) that freeze Django
    send_mail(
        "Your Feedback",
        f"\t{message}\n\nThank you!",
        "support@example.com",
        [email_address],
        fail_silently=False,
    )
    

После импорта shared_task() из celery и добавления в него функции send_feedback_email_task(), работа с необходимыми изменениями кода в этом файле будет закончена.

Передача задачи Celery вращается вокруг класса Celery – Task, и вы можете создавать задачи, добавляя декораторы к своим определениям функций.

Если вашим производителем является приложение Django, вам нужно использовать декоратор @shared_task для настройки задачи, которая обеспечивает возможность повторного использования ваших приложений.

С этими дополнениями вы закончили настройку асинхронной задачи с Celery. Вам нужно будет только реорганизовать, где и как вы вызываете его в коде веб-приложения.

Вернитесь к forms.py, откуда вы взяли код отправки электронной почты, и реорганизуйте .send_email() так, чтобы он вызывал send_feedback_email_task():

        # feedback/forms.py

# Removed: from time import sleep
# Removed: from django.core.mail import send_mail
from django import forms
from feedback.tasks import send_feedback_email_task

class FeedbackForm(forms.Form):
    email = forms.EmailField(label="Email Address")
    message = forms.CharField(
        label="Message", widget=forms.Textarea(attrs={"rows": 5})
    )

    def send_email(self):
        send_feedback_email_task.delay(
            self.cleaned_data["email"], self.cleaned_data["message"]
        )
    

Вместо того чтобы обрабатывать логику кода отправки электронной почты в .send_email(), вы переместили ее в send_feedback_email_task() в tasks.py. Это изменение означает, что вы также можете удалить устаревшие инструкции импорта в строках 3 и 4.

Теперь вы импортируете send_feedback_email_task() из feedback.tasks строки 6.

В строке 15 вы вызываете .delay() для send_feedback_email_task() и передаете отправленные данные формы, полученные из .cleaned_data в качестве аргументов в строке 16.

Примечание
Вызов .delay() — это самый быстрый способ отправить сообщение о задаче в Celery. Этот метод является ярлыком для более мощного метода .apply_async(), который дополнительно поддерживает параметры выполнения для точной настройки вашего сообщения о задаче.

Используя .apply_async(), вызов для тех же целей будет немного более подробным:

        send_feedback_email_task.apply_async(args=[
    self.cleaned_data["email"], self.cleaned_data["message"]
    ]
)
    

Хотя .delay()это лучший выбор в простом сообщении о задаче, вы получите преимущества использования множества вариантов выполнения с помощью .apply_async(), таких как countdown и retry.

Применив эти изменения в tasks.py и forms.py, рефакторинг завершен! Основная часть работы по запуску асинхронных задач с Django и Celery заключается в настройке, а не в фактическом коде, который вам нужно написать.

Но работает ли это? Рассылаются ли письма по-прежнему, и остается ли ваше приложение Django таким же работоспособным?

Протестируем асинхронную задачу

Когда вы запускаете рабочий процесс Celery, он загружает ваш код в память. Когда он получит задачу через вашего брокера сообщений, он выполнит этот код. Из-за этого вам нужно перезапускать рабочий процесс Celery каждый раз, когда вы меняете свой код.

Примечание
Чтобы избежать ручного перезапуска вашего рабочего процесса Celery при каждом изменении кода во время разработки, вы можете настроить автоматическую перезагрузку с помощью watchdog или написав пользовательскую команду управления.

Вы создали задачу, о которой ранее запущенный рабочий процесс не знает, поэтому вам необходимо перезапустить его. Откройте окно терминала, в котором вы запускаете рабочий процесс Celery, и остановите выполнение, нажав Ctrl+C .

Затем перезапустите процесс с помощью той же команды, которую вы использовали ранее, и добавьте -l info, чтобы установить уровень лога в сведения:

        (venv) $ python -m celery -A django_celery worker -l info

    

Установка параметра -l в качестве дополнения к info означает, что вы увидите больше информации, распечатываемой на вашем терминале. При запуске Celery отображает все задачи, которые обнаружил в [tasks] разделе:

        [tasks]
  . feedback.tasks.send_feedback_email_task
    

Выходные данные подтверждают, что Celery зарегистрировал send_feedback_email_task() и готов обрабатывать входящие сообщения, связанные с этой задачей.

Когда все сервисы запущены и произведен рефакторинг кода для Celery, вы готовы протестировать улучшенную программу с точки зрения пользователей:

Если вы сейчас отправите форму обратной связи на главной странице приложения, вы будете быстро перенаправлены на нужную страницу. Ура! Не нужно ждать и накапливать разочарование. Вы даже можете вернуться к форме обратной связи и немедленно отправить другой ответ.

Но что происходит в бэкенде? В вашем синхронном примере вы увидели сообщение электронной почты, появившееся в окне терминала, где вы запустили сервер разработки Django. На этот раз его там нет — даже по прошествии двадцати секунд.

Вместо этого, вы увидите, что текст электронной почты появляется в окне терминала, где вы запускаете Celery, наряду с другими журналами обработки задачи:

        [INFO/MainProcess] celery@Martins-MBP.home ready.
[INFO/MainProcess] Task feedback.tasks.send_feedback_email_task
⮑ [a5054d64-5592-4347-be77-cefab994c2bd] received
[WARNING/ForkPoolWorker-7] Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: Your Feedback
From: support@example.com
To: martin@realpython.com
Date: Tue, 12 Jul 2025 14:49:23 -0000
Message-ID: <165763736314.3405.4812564479387463177@martins-mbp.home>

        Great!

Thank you!
[WARNING/ForkPoolWorker-7] -----------------------------------------
[INFO/ForkPoolWorker-7] Task feedback.tasks.send_feedback_email_task
⮑ [a5054d64-5592-4347-be77-cefab994c2bd] succeeded in 20.078754458052572s:
⮑ None
    

Поскольку вы запускали свой рабочий процесс Celery с информацией об уровне журнала (-l info), вы можете прочитать подробное описание того, что происходит на стороне Celery.

Во-первых, вы можете заметить, что журналы информируют вас о получении файлов send_feedback_email_task. Если вы посмотрите на это окно терминала сразу после отправки ответа на отзыв, вы увидите, что эта строка журнала печатается сразу же.

После этого Celery переходит в фазу ожидания, вызванную вызовом sleep(), которая ранее зависала в вашем приложении Django. Хотя вы можете сразу же продолжить использовать свое приложение Django, Celery выполняет для вас затратные вычисления в фоновом режиме.

По истечении двадцати секунд Celery выводит фиктивное электронное письмо, которое Django создает с помощью send_mail(), в окно терминала. Затем он добавляет еще одну запись в журнал, в которой сообщается об успешном выполнении send_feedback_email_task, сколько времени это заняло ( 20.078754458052572s) и каково было возвращаемое значение (None).

Примечание
Имейте в виду, что ваше приложение Django не будет знать, успешно ли выполнена задача Celery в этом примере. Сообщение «Thank you!», которое видит ваш читатель, не обязательно означает успешность выполнения задачи для вас. Поскольку вы настраиваете серверную часть базы данных с помощью Redis, вы можете, однако, сделать запрос для серверной части, чтобы понять, был запуск задачи успешным или нет.Из-за того, как работает HTTP, информация для пользователя во фронтенде об успешном завершении фоновой задачи не является тривиальной задачей. Для этого вам нужно настроить опрос AJAX или WebSockets через каналы Django.

Все прошло хорошо! Ваш отзыв, кажется, был отправлен быстро, и вам не пришлось разочаровываться из-за времени ожидания.

Отличная работа! Вы успешно интегрировали Celery в свое приложение Django и настроили его для обработки асинхронной задачи. Celery теперь обрабатывает отправку электронной почты и все ее накладные расходы как фоновую задачу. Задача отправки электронной почты не должна касаться вашего веб-приложения после того, как оно передало инструкции по задаче в распределенную очередь задач Celery.

Вывод

Вжух! Отзыв отправлен!

После того как вы интегрировали Celery и выполнили рефакторинг кода Django, отправка отзывов в вашем приложении станет таким приятным опытом, что вы не захотите прекращать отправлять положительные отзывы!

***
Больше полезных материалов вы найдете на нашем телеграм-канале «Библиотека питониста»

Материалы по теме

Источники

МЕРОПРИЯТИЯ

Комментарии

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ