kapo4ka 06 октября 2019
1
11632

Упрощаем разработку: асинхронные функции Python

Хотите программировать на Python асинхронно? Испытываете проблемы с многопоточностью? Расскажем, как использовать асинхронные функции в Python.

Разбираемся с асинхронностью

При синхронном подходе выполнение происходит поэтапно. Несмотря на ветвления и вызовы, за раз совершается одно действие. Следующий шаг запускается при завершении предыдущего.

Например:

  1. Приложения с пакетной обработкой. Берёте данные, обрабатываете и создаёте новые – вот последовательность этапов. Внимание здесь уделяется шагам и очерёдности. 
  2. Программы для командной строки. Создают и преобразуют информацию, генерируют отчёт или печатают вывод. Они тоже пошаговые. 

Асинхронная программа не дожидается завершения предыдущего шага для перехода к следующему. Значит, текущие и будущие действия происходят одновременно.

Создаём синхронный веб-сервер

По сути напоминает пакетную обработку. В синхронном варианте веб-сервер получится работающий, но жуткий. Потому что главная миссия – максимально быстро делать тысячи задач, одновременных или неодновременных.

Оптимизация здесь ограничена. Веб-сервер отвечает с недостаточной скоростью, не обрабатывает требуемые объёмы либо уходит в аут при накоплении задач.

Примечание. Добавьте ограничения в виде быстроты сети, запросов к сервисам и базам, ввода-вывода (IO). Это работает медленнее процессора в несколько раз. В синхронном приложении при отправке в базу данных запроса ЦП бездействует, пока не вернётся ответ. Цель пакетных программ – обрабатывать результаты IO, что занимает больше времени.

В асинхронном программировании во время IO-операций процессор выполняет другие задачи.

Переосмыслим разработку

Когда вы начнёте знакомство с асинхронностью, встретите массу обсуждений блокировки и неблокирующего кода. Это требует переосмысления.

Вообразите: вы родитель и пытаетесь одновременно поддерживать денежный баланс, стирать бельё и приглядывать за ребёнком.

Разберёмся:

  1. Поддержание баланса – задача синхронная. Выполняете сами.
  2. Вы отвлеклись от платёжных документов, чтобы постирать. Освобождаете сушилку, из стиральной машины перекладываете вещи в сушилку и повторно загружаете стиральную машину. Операция синхронная, но главное начинается после старта обеих. И вы возвращаетесь к балансу. Теперь стирка и сушка — асинхронные. Работа выполняется независимо до звукового сигнала.
  3. Приглядывать за ребёнком — тоже асинхронный таск. Он играет самостоятельно. Если ребёнок голоден или ушибся, поднимается крик, и вы реагируете. Ребёнок – долговременная и приоритетная задача, которая заменяет остальные: баланс или стирку.

Вы, как процессор: когда разгребаете бельё, заблокированы для других заданий. Но всё окей, ведь дело в таком случае двигается быстрее.

Стирка и сушка в машинке не помеха выполнению остального. Значит, задача асинхронная. После её старта вы возвращаетесь к первому делу. То есть переключается контекст, а машинка просто просигналит по завершении.

Проектируем программу-родителя. Это непросто!

Вариант 1: Синхронный родитель

Как напишете программу для последовательного выполнения указанных задач? Раз на первом месте забота о ребёнке, родитель делает это в ожидании запросов. Ни баланс, ни стирка не совершаются.

Переназначаются приоритеты задач по желанию, но происходит только одна. Это вполне неплохо, но дела стоят, пока ребёнок не уснёт. Спустя пару таких недель родитель выйдет в окно!

Вариант 2: Проверяющий родитель

При использовании опроса выполняется несколько тасков.

При интервале опроса продолжительностью 15 минут родитель отвлекается каждые 15 минут от платёжного документа и смотрит, требует ли внимания машинка или ребёнок. Если нет, возвращается к балансу, иначе – заботится о другом, прежде чем продолжить. И так повторяется до следующего перерыва.

Этот подход также работает, но порождает проблемы:

  1. Родитель тратит массу времени, чтобы проверить задачи, которые не требуют внимания: машинка ещё работает, а ребёнок играет, пока не произойдёт форс-мажор.
  2. Родитель так же пропускает завершённые задачи, требующие внимания: когда машинка постирала в момент начала установленного интервала, она ждёт целых 15 минут! Приглядывать за ребёнком – дело с высоким приоритетом. Он не выдержит 15 минут без родителя, если что-то пойдёт не по плану.

Хотите сократить интервал? Это поможет, но понадобится дополнительное время, чтобы переключить контекст. И снова в окно!

Вариант 3: Потоковый родитель

«Вот бы себя клонировать...». Виртуально это достигается с помощью многопоточности. В итоге параллельно выполняется много частей одного приложения. Такая независимая секция – поток. У потоков разделяемая память.

Вы создаёте «клонов» родителя на каждое задание: приглядывать за ребёнком, контролировать машинку и подводить баланс. Копии действуют автономно.

Выглядит хорошо, но не без проблем. Во-первых, придётся задать всем копиям объём работы. Осложнения возникают из-за совместного использования памяти.

Так, родитель 1 занимается сушилкой. Увидел сухие вещи — начинает выгружать. А клон 2 заметил остановку стирки и теперь вынимает бельё. Ему нужен контроль над сушилкой, чтобы поместить туда мокрую одежду. Это невозможно, ведь сушилка под контролем родителя 1.

Когда родитель 1 закончил выгрузку, он требует контроля над стиральной машиной, чтобы переносить вещи в свободную сушилку. И сделать этого не выйдет, ведь клон 2 занимает стиральную машинку!

Они пришли к взаимной блокировке. Клоны занимают свой ресурс и требуют контроля над чужим. Ожидание длиною в вечность.

Примечание. Совместное использование потоков памяти часто приводит к повреждению информации и чтению неадекватных данных.  Многопоточная разработка предполагает, что переключением контекста управляет система, а не программист Python. Возникающие ошибки трудно понять.

Есть ещё одна сложность. Допустим, ребёнок травмировался и нуждается в медицинской помощи. Родитель 3 решает вопрос и выписывает огромный чек за посещение врача. А клон 4 занимается платёжным документом и не в курсе событий, поэтому в шоке, когда видит баланс!

Помните, денежный счёт – разделяемый ресурс, значит, следует проинформировать о тратах родителя 4. Иначе потребуется механизм блокировки.

Применяем асинхронные функции Python

Приложения тестировались на версии 3.7.2. В requirements.txt вы найдёте требуемые модули для установки.

Синхронное программирование

Первый пример извлекает задания из очереди. Язык программирования Python предоставляет методы для создания очереди и вывода данных в порядке вставки.

Вот код example_1.py:

            import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f"Task {name} nothing to do")
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            print(f"Task {name} running")
            for x in range(count):
                total += 1
            print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = queue.Queue()

    # Put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some synchronous tasks
    tasks = [
        (task, "One", work_queue),
        (task, "Two", work_queue)
    ]

    # Run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == "__main__":
    main() 
        

Разберём, что происходит в строках:

  1. 1 делает импорт модуля. 
  2. 3–13 извлекают работу из work_queue и выполняют её. 
  3. 15 объявляет main() для старта приложения. 
  4. 20 создаёт work_queue – разделяемый ресурс для получения задач. 
  5. 23–24 ставят задачу в очередь – здесь рандомное количество для обработки. 
  6. 27–29 формируют кортежи тасков и параметров. 
  7. 33–34 в цикле вызывают каждое задание из списка и передают необходимые параметры. 
  8. 36 запускает программу.

Получаем такой результат:

            Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do 
        

Видно, что Task One выполняет все задачи, когда оказывается в while. После завершения цикла Task Two обнаруживает пустую очередь и пишет об отсутствии работы. Код не меняет контексты и не даёт работать одновременно.

Кооперативная многозадачность

Для совместной работы добавим yield, который вернёт управление в заданный момент при сохранении контекста. Таск станет возобновляемый.

yield делает task() генератором: при выполнении инструкции управление получает вызывающая функция. Короче говоря, мы переключаем контекст.

С помощью next() возвращаем управление генератору, который запускается с точки остановки с предыдущими переменными.

example_2.py иллюстрирует такую многозадачность:

            import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        print(f"Task {name} running")
        for x in range(count):
            total += 1
            yield
        print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = queue.Queue()

    # Put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some tasks
    tasks = [
        task("One", work_queue),
        task("Two", work_queue)
    ]

    # Run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

if __name__ == "__main__":
    main() 
        

Смотрите, что выполняется в строках:

  1. 3–11 по-прежнему объявляют task(). Строка 10 с yield делает из него генератор, чтобы переключать контекст и возвращать управление в цикл main()
  2. 25–28 формируют список задач по-другому: вызывают каждую с параметрами, чтобы запустить генератор впервые. 
  3. 34–39 модифицируют цикл для возврата управления таскам, продолжения и запуска следующей задачи. 
  4. 35 отдаёт управление обратно в task() и возобновляет работу с момента yield
  5. 39 задаёт done. Цикл завершается при удалении из tasks заданий. 

Вот результат запуска программы:

            Task One running
Task Two running
Task Two total: 10
Task Two running
Task One total: 15
Task One running
Task Two total: 5
Task One total: 2 
        

Заметили, что Task Two печатает итог первым? Здесь нет асинхронности. Task One считает до 15, а второй – до 10, поэтому быстрее справляется.

Кооперативная многозадачность с блокирующими вызовами

К предыдущей версии добавляем time.sleep(delay), чтобы сымитировать блокировку – вызов, останавливающий процессор на время. Это тот момент, когда родитель без отрыва от процесса сводит баланс.

elapsed_time покажет время, которое прошло с инстанциирования до вызова.

И код example_3.py:

            import time
import queue
from lib.elapsed_time import ET

def task(name, queue):
    while not queue.empty():
        delay = queue.get()
        et = ET()
        print(f"Task {name} running")
        time.sleep(delay)
        print(f"Task {name} total elapsed time: {et():.1f}")
        yield

def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = queue.Queue()

    # Put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    tasks = [
        task("One", work_queue),
        task("Two", work_queue)
    ]

    # Run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print(f"\nTotal elapsed time: {et():.1f}")

if __name__ == "__main__":
    main() 
        

Вот строки с отличиями:

  1. 1 обеспечивает доступ к time.sleep()
  2. 11 добавляет time.sleep(delay) для имитации IO-приостановки и заменяет for из первого примера. 

После запуска вы увидите:

            Task One running
Task One total elapsed time: 15.0
Task Two running
Task Two total elapsed time: 10.0
Task One running
Task One total elapsed time: 5.0
Task Two running
Task Two total elapsed time: 2.0

Total elapsed time: 32.01021909713745 
        

Приостановка показала, что кооперативная многозадачность не помогла. ЦП ждёт, когда закончится ввод-вывод. Вещь, которую в Python-статьях называют блокирующим кодом.

Кооперативная многозадачность с отсутствием блокировки

Перейдём на асинхронную разработку Python.

Заменим блокирующее засыпание на пакет asyncio. Сделаем task асинхронным.

Вместо метода time и генератора напишите await asyncio.sleep(delay).

while и task_array нет, вызываем await asyncio.gather(...), чтобы дать asyncio такие инструкции:

  1. Сделать два task и начать выполнение. 
  2. Подождать завершения обоих. 

Запись asyncio.run(main()) вызывает main() и создаёт event loop – цикл событий.

Этот цикл запускает код. Выполнением задачи занимается процессор. При достижении await переключается контекст и возобновляется управление. Event loop мониторит ожидающие событий задачи, и передаёт контроль нужной.

await asyncio.sleep(delay) не блокирует процессор. ЦП фиксирует ожидание в очереди и переключает контекст, возвращая управление в событийный цикл. Event loop бесконечно ищет завершённые события и передаёт контроль ожидающему таску. Поэтому процессор окажется занят при доступных задачах, пока цикл отслеживает будущие события.

Примечание. Асинхронное приложение однопоточное. Вы контролируете переключение контекста и облегчаете решение проблемы разделяемой памяти при многопоточном подходе.

Код четвёртого примера:

            import asyncio
from lib.elapsed_time import ET

async def task(name, work_queue):
    while not work_queue.empty():
        delay = await work_queue.get()
        et = ET()
        print(f"Task {name} running")
        await asyncio.sleep(delay)
        print(f"Task {name} total elapsed time: {et():.1f}")

async def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = asyncio.Queue()

    # Put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        await work_queue.put(work)

    # Run the tasks
    et = ET()
    await asyncio.gather(
        asyncio.create_task(task("One", work_queue)),
        asyncio.create_task(task("Two", work_queue)),
    )
    print(f"\nTotal elapsed time: {et():.1f}")

if __name__ == "__main__":
    asyncio.run(main()) 
        

Отличия от третьей версии найдёте в таких строках:

  1. 1 вместо time импортирует asyncio
  2. 4 добавляет async для асинхронной работы. 
  3. 9 – на смену блокирующему sleep приходит asyncio.sleep(delay) для возврата управления в событийный цикл. 
  4. 17 создаёт асинхронную очередь без блокировки. 
  5. 20–21 с помощью await добавляют задачи в очередь асинхронно. 
  6. 25–28 создают два таска и группируют их для ожидания выполнения обоих. 
  7. 32 делает запуск приложения асинхронным. 

Взгляните, как обе задачи стартуют одновременно и ждут при ложном IO-вызове:

            Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0

Total elapsed time: 17.0 
        

Вот и подтверждение отсутствия блокировки.

Отметили, что приложение затратило половину времени работы предыдущей версии? Таков профит от применения асинхронных функций Python! Программа выполняется быстрее суммы времени своих составляющих. Избавились от синхронности!

Вызываем HTTP синхронно

Приложение взаимодействует с реальным IO: посылает HTTP-запросы на URL-адреса из списка и принимает ответы с контентом. Только с блокировкой.

Импортируем requests для работы с HTTP. В очереди будут URL-адреса, а не числа. Вместо увеличения счётчика task() принимает контент из URL-адреса и выводит затраченное время.

Код пятого примера:

            import queue
import requests
from lib.elapsed_time import ET

def task(name, work_queue):
    with requests.Session() as session:
        while not work_queue.empty():
            url = work_queue.get()
            print(f"Task {name} getting URL: {url}")
            et = ET()
            session.get(url)
            print(f"Task {name} total elapsed time: {et():.1f}")
            yield

def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = queue.Queue()

    # Put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
        "http://twitter.com"
    ]:
        work_queue.put(url)

    tasks = [
        task("One", work_queue),
        task("Two", work_queue)
    ]

    # Run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print(f"\nTotal elapsed time: {et():.1f}")

if __name__ == "__main__":
    main() 
        

Вот, что делают строки:

  1. 2 импортируют requests, чтобы работать с HTTP. 
  2. 10–11 делают приостановку, как в example_3.py, только с вызовом session.get(url) для получения контента страницы из URL. 
  3. 23–32 наполняют очередь списком URL.

После запуска увидите:

            Task One getting URL: http://google.com
Task One total elapsed time: 0.3
Task Two getting URL: http://yahoo.com
Task Two total elapsed time: 0.8
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.4
Task Two getting URL: http://apple.com
Task Two total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task One total elapsed time: 0.5
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.5
Task One getting URL: http://twitter.com
Task One total elapsed time: 0.4

Total elapsed time: 3.2 
        

Все таски берут из очереди адрес, получают контент страницы и печатают затраченное время.

Как прежде, из-за yield оба задания выполняются совместно. Но все session.get() блокируют процессор до получения ответа. Засеките время, потраченное на запуск приложения, чтобы сравнить со следующим примером.

Делаем HTTP-запросы асинхронными

В этой версии нужна Python-библиотека aiohttp, чтобы работать с HTTP асинхронно.

Удалим yield, так как запрос HTTP GET уже не блокирующий.

Так выглядит шестой пример:

            import asyncio
import aiohttp
from lib.elapsed_time import ET

async def task(name, work_queue):
    async with aiohttp.ClientSession() as session:
        while not work_queue.empty():
            url = await work_queue.get()
            print(f"Task {name} getting URL: {url}")
            et = ET()
            async with session.get(url) as response:
                await response.text()
            print(f"Task {name} total elapsed time: {et():.1f}")

async def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = asyncio.Queue()

    # Put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
        "http://twitter.com",
    ]:
        await work_queue.put(url)

    # Run the tasks
    et = ET()
    await asyncio.gather(
        asyncio.create_task(task("One", work_queue)),
        asyncio.create_task(task("Two", work_queue)),
    )
    print(f"\nTotal elapsed time: {et():.1f}")

if __name__ == "__main__":
    asyncio.run(main()) 
        

Вот, за что отвечают строки:

  1. 2 для асинхронности HTTP-запросов импортирует aiohttp
  2. 5 делает функцию асинхронной. 
  3. 6 создаёт менеджер контекста сессии aiohttp
  4. 11 создаёт такой менеджер для ответа aiohttp и отправляет HTTP GET на URL-адрес из очереди. 
  5. 12 асинхронно получает текст из ответа.

Запустите и получите:

            Task One getting URL: http://google.com
Task Two getting URL: http://yahoo.com
Task One total elapsed time: 0.3
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.3
Task One getting URL: http://apple.com
Task One total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task Two total elapsed time: 0.9
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.4
Task Two getting URL: http://twitter.com
Task One total elapsed time: 0.5
Task Two total elapsed time: 0.3

Total elapsed time: 1.7 
        

Видите, что общее время – половина времени всех GET-запросов? Дело в асинхронном выполнении. Вы используете процессор с умом: запросы отправляются одновременно.

Заключение

Теперь вы знаете, как применять асинхронные функции Python. Значит, получаете контроль при переключении контекста и облегчаете решение проблем многопоточного программирования.

Мы что-то упустили? Напишите, что бы вы еще добавили по данной теме.

РУБРИКИ В СТАТЬЕ

Комментарии 1

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ

BUG