23 марта 2023

🛍️ Пакетный API: объединение запросов с помощью asyncio и Batch API

iOS-developer, ИТ-переводчица, пишу статьи и гайды.
Разбираемся, как объединить множество отдельных вызовов функций в меньшее число запросов с помощью паттерна пакетной обработки (Batching Pattern) и питоновской библиотеки asyncio.
🛍️ Пакетный API: объединение запросов с помощью asyncio и Batch API
Данная статья является переводом. Ссылка на оригинал.

Введение в Batch API

В современных приложениях Python обычно применяется доступ к удаленному API с использованием REST или других веб-технологий. Использование Batch API (или пакетных API) позволяет уменьшить количество сетевых вызовов к удаленной службе путем объединения в один запрос большого числа вызовов.

Предположим, у вас есть REST API, который возвращает текущую цену акции. Если простой API подойдет для случая, когда у нас один идентификатор акции, то для получения информации о стоимости тысячи акций, вам потребуется сделать тысячу вызовов API. Batch API, в свою очередь, будет принимать набор идентификаторов акций в запросе и возвращать текущую цену для всех запрошенных идентификаторов. Используя Batch API, вы сможете получить все цены, которые вам нужны, в одном запросе. Это снижает нагрузку на сеть, тем самым сокращая время задержки вашего приложения. Это также потенциально снижает нагрузку на удаленный сервер.

🛍️ Пакетный API: объединение запросов с помощью asyncio и Batch API

Источник изображения.

Асинхронные функции Python в Excel

Эта статья написана пользователем надстройки Python Excel PyXLL, который задался вопрос о том, как использовать пакетный API для оптимизации своей электронной таблицы Excel.

PyXLL встраивает Python в Excel и позволяет вызывать функции Python непосредственно в электронных таблицах Excel. Каждый раз, когда ячейка выполняет вычисления с использованием функции Python, происходит вызов этой функции. В данном случае это была асинхронная функция, которая отправляет запрос на сервер REST.

Отдельные запросы к REST API от тысячи ячеек одного листа занимали слишком много времени. Решение нашлось в использовании паттерна пакетной обработки!

🛍️ Пакетный API: объединение запросов с помощью asyncio и Batch API

Источник изображения.

Вводная информация: AsyncIO и параллелизм

При выполнении нескольких запросов к удаленному серверу зачастую вы не хотите отправлять запрос и ждать ответа, прежде чем отправлять следующий запрос. Обычно отправка нескольких запросов параллельно (одновременно) и ожидание всех ответов происходит намного быстрее. Вы можете добиться этого в Python, используя многопоточность или асинхронное программирование, обзор которых приводится в этом разделе. Вы также поймете, когда стоит выбрать тот или иной вариант.

Многопоточность

Многопоточность — это способ одновременного выполнения нескольких задач. В потоковой модели вы запускаете несколько потоков, и каждый поток выполняет свой код одновременно. Если ваша проблема связана с ЦП, вам может помочь ее разбивка на задачи, которые будут выполняться параллельно с использованием многопоточности. Говорят, что программа привязана к ЦП, когда проблемной зоной производительности является время обработки ЦП.

Существуют некоторые тонкости многопоточности, характерные для Python, которые мы не будем рассматривать в этой статье, но в теории все работает именно так!

Операционная система компьютера управляет всеми потоками и гарантирует, что каждый из них получит долю процессорного времени. Это создает дополнительные сложности, поскольку каждое переключение контекста требует времени, которое можно было бы потратить на что-то другое. Эта сложность масштабируется количеством потоков. Когда проблемным местом является ожидание операций ввода-вывода (например, сетевых запросов), то выполнение нескольких потоков для каждого запроса и ожидание ответа сети – далеко не идеальный вариант. Он также не масштабируется до тысяч запросов. Вот где на помощь приходит асинхронное программирование.

Больше полезных материалов вы найдете на нашем телеграм-канале «Библиотека питониста»

Асинхронное программирование с помощью asyncio

Асинхронное программирование в Python — это другая модель параллелизма, которая не использует несколько потоков. Вместо этого все выполняется в одном потоке, а Python управляет переключением между активными задачами. Он идеально подходит для программ, которые используют много сетевых запросов или других задач, связанных с вводом-выводом, таких как доступ к диску или базе данных.

Цикл событий управляет набором запущенных задач. Когда задача ожидает выполнения чего-то вроде сетевого запроса, она находится в состоянии await (ожидание). Пока задача ожидает, цикл событий может запланировать выполнение других задач. Это позволяет другой задаче отправить другой сетевой запрос, а затем ожидать, разрешая выполнение другой задачи и так далее и тому подобное. Когда сетевой запрос готов, цикл обработки событий может возобновить выполнение задачи. Это позволяет нам иметь несколько одновременных запросов в процессе выполнения одного потока на запрос без потери скорости.

Ключевое слово Python async говорит о том, что функция является асинхронной и будет выполняться в цикле событий. Ключевое слово await приводит к циклу обработки событий и ожидает завершения другой асинхронной функции или задачи. Пакет Python asyncio предоставляет примитивы, необходимые для управления асинхронными задачами.

Преимущества Batch API

Выше вы узнали, что можете делать несколько запросов одновременно. Это может быть намного быстрее, чем ожидание возвращения каждого запроса перед отправкой следующего запроса. Если вы можете отправлять все необходимые запросы одновременно, зачем вам пакетный API?

Для отправки нескольких запросов требуется больше сетевого трафика, чем для отправки одного. Более эффективно с точки зрения передачи данных запросить все необходимые данные с помощью одного запроса.

Кроме этого, есть и другие преимущества. Например, если удаленный сервер может сократить объем работы, которую ему необходимо выполнить, извлекая все за один раз, то время, необходимое для обслуживания одного пакетного запроса, может быть фактически меньше, чем общее время, необходимое для обслуживания эквивалентных отдельных запросов.

Паттерн пакетной обработки в Python

Теперь вы понимаете, что такое Batch API и что вы можете выполнять несколько запросов одновременно, используя асинхронное программирование на Python, и что представляет собой паттерн пакетной обработки (Batching Pattern) и зачем он вам нужен?

Проще говоря, паттерн пакетной обработки собирает несколько запросов в один и отправляет его зараз. Далее вы увидите, как можно с его помощью превратить реализацию, использующую множество отдельных запросов, в реализацию, которая объединяет запросы вместе, чтобы отправлять меньшее число запросов на удаленный сервер.

Пример: получение местоположений для адресов улиц

В качестве примера вы будете использовать получение местоположений уличных адресов. Для этого вы можете использовать REST API с https://www.geoapify.com/. Существует бесплатный тариф, на который вы можете подписаться для тестирования, и он поддерживает получение нескольких местоположений. Чтобы использовать приведенный ниже код, вам необходимо зарегистрироваться и получить ключ API.

Вот первая попытка кода для получения местоположений по нескольким уличным адресам:

        # Import the modules you're going to use.
# You may need to 'pip install aiohttp'
from urllib.parse import urlencode
import asyncio
import aiohttp
import json

# Test data
STREET_ADDRESSES = [
    "1600 Pennsylvania Avenue, Washington DC, USA",
    "11 Wall Street New York, NY",
    "350 Fifth Avenue New York, NY 10118",
    "221B Baker St, London, England",
    "Tour Eiffel Champ de Mars, Paris",
    "4059 Mt Lee Dr.Hollywood, CA 90068",
    "Buckingham Palace, London, England",
    "Statue of Liberty, Liberty Island New York, NY 10004",
    "Manger Square, Bethlehem, West Bank",
    "2 Macquarie Street, Sydney"
]

# Constants for accessing the Geoapify API
GEOCODING_API = "https://api.geoapify.com/v1/geocode/search"
YOUR_API_KEY = "xxxx-xxxx-xxxx-xxxx"

async def get_location(address):
    """Return (latitude, longitude) from an address."""
    # Construct the URL to do the lookup for a single address
    query_string = urlencode({
        "apiKey": YOUR_API_KEY,
        "text": address,
        "limit": 1,
        "format": "json"
    })
    url = f"{GEOCODING_API}?{query_string}"

    # Make the request to the API
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.read()

            # Read the json string and return the latitude and longitude
            # from the first result (there will only be one)
            results = json.loads(data.decode())["results"]
            return results[0]["lat"], results[0]["lon"]

async def main():
    # Print the city for each IP address
    tasks = []
    for address in STREET_ADDRESSES:
        location = await get_location(address)
        print(f"{address} -> {location}")

# Because it's an async function you need to run it using the asyncio event loop
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
    

Вы могли заметить, что приведенный выше код по-прежнему последовательно вызывает API для каждого адреса. Несмотря на использование асинхронности, цикл for в настоящее время ожидает завершения каждого запроса, прежде чем перейти к следующему адресу. Чтобы исправить это, вы можете использовать асинхронную функцию gather. Собрав задачи вместе и ожидая их все в конце, вам не нужно ждать их по отдельности.

Ваша обновленная основная функция теперь выглядит так:

        async def main():
   # Get the location for each address
   tasks = []
   for address in STREET_ADDRESSES:
       tasks.append(get_location(address))

   # Wait for all tasks to complete
   locations = await asyncio.gather(*tasks)

   # Print them all once all requests have completed
   for address, location in zip(STREET_ADDRESSES, locations):
       print(f"{address} -> {location}")
    

Вы по-прежнему отправляете несколько запросов на сервер. Далее вы увидите, как паттерн пакетной обработки группирует эти запросы вместе, чтобы уменьшить количество запросов, не изменяя основную функцию.

Пример: получение нескольких местоположений с помощью Batch API

Используя пакетный API, вы можете отправить несколько запросов за один раз. Если сервер обрабатывает пакет более эффективно, чем отдельные запросы, то, возможно, намного быстрее будет использовать пакетный запрос, чем обрабатывать несколько.

Вы будете использовать пакетную версию API геокодирования, описанную выше. Это немного сложнее. Вместо того чтобы отправлять один адрес как часть URL-адреса, вы должны сделать запрос POST. Поскольку обработка пакета может занять некоторое время, вместо немедленного возврата результатов сервер сначала ответит идентификатором запроса, который вы затем запросите, чтобы проверить, готовы ли результаты или нет. Это распространенный шаблон, используемый при реализации пакетного API.

Следующая функция запрашивает у API расположение списка адресов. Это делается с помощью одного запроса к пакетному API.

        # Constants for accessing the Geoapify batch API
GEOCODING_BATCH_API = "https://api.geoapify.com/v1/batch/geocode/search"
YOUR_API_KEY = "xxxx-xxxx-xxxx-xxxx"

async def get_locations(addresses):
    """Return a dictionary of address -> (lat, lon)."""
    # Construct the URL to do the batch request
    query_string = urlencode({"apiKey": YOUR_API_KEY})
    url = f"{GEOCODING_BATCH_API}?{query_string}"

    # Build the JSON payload for the batch POST request
    data = json.dumps(addresses)

    # And use Content-Type: application/json in the headers
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    # Make the POST request to the API
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=data, headers=headers) as response:
            response_json = await response.read()
            response_data = json.loads(response_json)

    # The API can return a dict with a pending status if it needs more
    # time to complete. Poll the API until the result is ready.
    while isinstance(response_data, dict) \
    and response_data.get("status") == "pending":
        # Wait a bit before calling the API
        await asyncio.sleep(0.1)

        # Query the result to see if it's ready yet
        request_id = response_data.get("id")
        async with aiohttp.ClientSession() as session:
            async with session.get(url + f"&id={request_id}") as response:
                response_json = await response.read()
                response_data = json.loads(response_json)

    # Gather the results into a dictionary of address -> (lat, lon)
    locations = {}
    for result in response_data:
        address = result["query"]["text"]
        coords = result["lat"], result["lon"]
        locations[address] = coords

    return locations
    

Собираем вместе: шаблон пакетной обработки

Теперь у вас есть функция, которая может вызывать пакетный API для массового поиска расположения списка адресов. Ваша следующая задача — реорганизовать get_location, чтобы он мог использовать пакетный API без необходимости изменять «основную» функцию.

Почему бы не изменить «основную» функцию? В этом простом примере достаточно легко изменить функцию main для вызова get_locations. В реальных проектах такой рефакторинг часто не так прост. В других случаях даже нежелательно изменять входные данные, которые принимает функция, и часто требуется скрыть от пользователя детали реализации.

Вернемся к первоначальному вопросу, который послужил причиной написания этого поста, касаемо вызова функций Python из Excel с помощью надстройки Excel PyXLL. В данном случае конечным пользователем является пользователь Excel, который может ничего не знать о Python. Наличие одной функции, которая принимает одну переменную на вход и возвращает один результат, соответствует их ожиданиям как пользователя Excel. Знакомство с концепцией пакетов привело бы к ненужной путанице. Это также означало бы, что им пришлось бы структурировать свою электронную таблицу, чтобы вызывать ее эффективным способом. Пакетная обработка запросов «за кулисами» при сохранении интерфейса, который видит конечный пользователь, в этом случае определенно является преимуществом.

Как это работает

В псевдокоде то, что мы хотим написать, выглядит примерно так:

Пакетные запросы

Вы можете добиться этого в Python, используя asyncio. Вы можете запустить функцию get_location() в фоновом режиме для обработки любых запросов в очереди. Он будет ждать, пока данная фоновая задача не обработает пакет, содержащий ваш запрос, а затем вернет его. Фоновую задачу следует запускать только один раз, поэтому вам нужно будет проверить перед запуском, не запущена ли она в данный момент. Если get_location вызывается несколько раз, то, поскольку это асинхронная функция, она может выполняться, пока другие ожидают. Каждый последующий вызов будет добавлять запрос в текущую очередь.

Для возврата результата из фоновой задачи в ожидающие функции get_location вы можете использовать асинхронный примитив Future. Future — это так называемый ожидаемый объект, который при ожидании блокируется до тех пор, пока не будет установлен результат.

Ваша функция get_location(), переписанная для группировки запросов с использованием будущих для передачи результата, выглядит так:

        # State for batching requests
ADDRESSES_BATCH = []
BATCH_LOOP_RUNNING = False

async def get_location(address):
    """Return (latitude, longitude) from an address."""
    global BATCH_LOOP_RUNNING

    # Create a Future that will be set with the location once the
    # request has completed.
    loop = asyncio.get_event_loop()
    future = loop.create_future()

    # Add the ip address and future to the batch
    ADDRESSES_BATCH.append((address, future))

    # Start 'process_batches' running on the asyncio event loop if it's
    # not already running.
    # We've not written 'process_batches_loop' yet!
    if not BATCH_LOOP_RUNNING:
        BATCH_LOOP_RUNNING = True
        asyncio.create_task(process_batches_loop())

    # Wait for the batch your address is in to return
    await future

    # And return the result
    return future.result()
    

Приведенный выше код создает объект asyncio.Future и добавляет и его, и адрес в список, который будет обрабатываться как пакет. Если цикл обработки пакетов не запущен, то он будет запущен с помощью «asyncio.create_task». Функция asyncio.create_task планирует ваш processes_batched_loop в цикле событий asyncio, который будет вызываться, когда другие запущенные задачи ожидают. Вы еще не определили свою функцию process_batches_loop, но сделаете это чуть позже. Вы ожидаете будущих задач, при этом позволяя другим задачам, работающим в цикле событий asyncio, выполняться, и как только результат будет получен, вы вернете его.

Обработка пакета

Функция process_batches_loop некоторое время ждет, чтобы позволить другим функциям добавить запросы в список ADDRESSES_BATCH. Затем он отправляет все запросы в очереди одним вызовом REST API. Как только результаты возвращаются из REST API, он распаковывает результаты и устанавливает результаты для фьючерсов, позволяя завершить каждую ожидающую функцию get_location.

        async def process_batches_loop():
    global ADDRESSES_BATCH, BATCH_LOOP_RUNNING

    # Loop while BATCH_LOOP_RUNNING is True
    while BATCH_LOOP_RUNNING:
        # Wait for more to be added to the batch
        await asyncio.sleep(0.1)

        # If nothing has been added to the batch then continue
        # to the start of the loop as there's nothing to do.
        if not ADDRESSES_BATCH:
            continue

        # Get the current items from the batch and reset the batch
        batch = ADDRESSES_BATCH
        ADDRESSES_BATCH = []

        # Get the locations of the current batch
        addresses = [address for (address, future) in batch]
        locations = await get_locations(addresses)

        # Set the results on the futures from this batch.
        # This allows each awaiting 'get_location' function to continue.
        for address, future in batch:
            coords = locations.get(address)
            future.set_result(coords)
    

Теперь вы достигли первоначальной цели. У вас есть функция get_location, которая выглядит для вызывающей стороны как ваша исходная функция. Он принимает один адрес и возвращает одно местоположение. В фоновом режиме он объединяет эти отдельные запросы и отправляет их в пакетный API. Пакетные API могут обеспечить более высокую производительность по сравнению с API, которые обрабатывают только отдельные запросы, и теперь ваша функция может воспользоваться этим преимуществом без каких-либо изменений в способе вызова функции.

Время, затрачиваемое на ожидание добавления запросов в пакет, должно быть отлажено в соответствии с тем, как используется функция. Если функция может вызываться много раз почти в одно и то же время, например, несколько ячеек вычисляются одновременно в Excel, тогда можно использовать короткую задержку. В других ситуациях, например, если вызов является результатом некоторого пользовательского ввода, который может занять несколько секунд, тогда потребуется более длительная задержка. Регистрация времени добавления каждого элемента в пакет вместе со временем обработки каждого пакета поможет нам определить оптимальное время ожидания.

Нет предела совершенству

Представленный здесь код можно улучшить множеством способов. Я надеюсь, что это дало вам некоторые идеи для развития и использования в ваших собственных проектах! Код был написан относительно простым способом, чтобы попытаться прояснить замысел, стоящий за ним, но, прежде чем использовать его в реальном приложении, вам необходимо обратить внимание на некоторые вещи.

  1. Проверка ошибок. Это, пожалуй, самое важное, что нужно добавить. Что произойдет, если цикл обработки пакетов завершится сбоем? Ваш код должен корректно обрабатывать любые ошибки, которые могут возникнуть, или, по крайней мере, регистрировать их, чтобы вы могли отслеживать, что произошло.
  2. Ненужный цикл. Цикл для обработки пакетов продолжает цикл, даже если делать нечего. Вы можете изменить это, чтобы ожидать объект asyncio.Event до тех пор, пока вы не поставите в очередь хотя бы один элемент. В качестве альтернативы вы можете выйти из цикла, когда больше нет элементов для обработки, и перезапустить его при необходимости.
  3. Остановка цикла, когда ваша программа заканчивается. Цикл будет продолжаться до тех пор, пока BATCH_LOOP_RUNNING имеет значение True. Когда ваша программа заканчивает работать, вы должны подумать о том, как аккуратно завершить цикл. Это может быть просто установка значения False для BATCH_LOOP_RUNNING и последующее ожидание завершения задачи. Функция asyncio.create_task возвращает объект Task, который вы можете сохранить как глобальную переменную.

Подведение итогов

В этой статье вы узнали, что такое пакетный API и почему его использование может быть выгодным. Вы рассмотрели параллелизм в Python, сравнив многопоточность с асинхронным программированием. Наконец, вы продемонстрировали, как с помощью шаблона пакетной обработки можно превратить функцию, обрабатывающую отдельные запросы, в функцию, использующую пакетный API.

Полный код, используемый в этой статье, доступен здесь.

REST API — это один из примеров пакетного API. Вы можете применить тот же механизм к запросам к базе данных или любому другому типу функций, где эффективнее будет поставить в очередь данные в большом количестве.

Пакетирование запросов «за кулисами» и сокрытие деталей от пользовательской функции или API полезно, когда вы хотите, чтобы все было просто для пользователя функции. Это также может быть способом модифицировать пакетный API для существующей базы кода, где рефакторинг будет затруднен.

Мотивацией в примере использования, на котором основана эта статья, был вызов функции Python из Excel, не подвергая пользователя Excel деталям управления пакетными вызовами. Пользователь вызывает простую функцию для выполнения отдельного запроса. Если они создают лист, который делает несколько запросов в разных ячейках, то данное решение автоматически объединяет все вместе за кулисами. Надстройка Excel PyXLL позволяет интегрировать Python в Excel, позволяя вызывать функции Python, как функции рабочего листа Excel.

Для получения дополнительной информации о надстройке PyXLL посетите https://www.pyxll.com.

Источники

МЕРОПРИЯТИЯ

Комментарии

ВАКАНСИИ

Добавить вакансию
Разработчик С#
от 200000 RUB до 400000 RUB
Java Team Lead
Москва, по итогам собеседования

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ