Хочешь уверенно проходить IT-интервью?

Мы понимаем, как сложно подготовиться: стресс, алгоритмы, вопросы, от которых голова идёт кругом. Но с AI тренажёром всё гораздо проще.
💡 Почему Т1 тренажёр — это мастхэв?
- Получишь настоящую обратную связь: где затык, что подтянуть и как стать лучше
- Научишься не только решать задачи, но и объяснять своё решение так, чтобы интервьюер сказал: "Вау!".
- Освоишь все этапы собеседования, от вопросов по алгоритмам до диалога о твоих целях.
Зачем листать миллион туториалов? Просто зайди в Т1 тренажёр, потренируйся и уверенно удиви интервьюеров. Мы не обещаем лёгкой прогулки, но обещаем, что будешь готов!
Реклама. ООО «Смарт Гико», ИНН 7743264341. Erid 2VtzqwP8vqy
Разбираемся с асинхронностью
При синхронном подходе выполнение происходит поэтапно. Несмотря на ветвления и вызовы, за раз совершается одно действие. Следующий шаг запускается при завершении предыдущего.
Например:
- Приложения с пакетной обработкой. Берёте данные, обрабатываете и создаёте новые – вот последовательность этапов. Внимание здесь уделяется шагам и очерёдности.
- Программы для командной строки. Создают и преобразуют информацию, генерируют отчёт или печатают вывод. Они тоже пошаговые.
Асинхронная программа не дожидается завершения предыдущего шага для перехода к следующему. Значит, текущие и будущие действия происходят одновременно.
Создаём синхронный веб-сервер
По сути напоминает пакетную обработку. В синхронном варианте веб-сервер получится работающий, но жуткий. Потому что главная миссия – максимально быстро делать тысячи задач, одновременных или неодновременных.
Оптимизация здесь ограничена. Веб-сервер отвечает с недостаточной скоростью, не обрабатывает требуемые объёмы либо уходит в аут при накоплении задач.
Примечание. Добавьте ограничения в виде быстроты сети, запросов к сервисам и базам, ввода-вывода (IO). Это работает медленнее процессора в несколько раз. В синхронном приложении при отправке в базу данных запроса ЦП бездействует, пока не вернётся ответ. Цель пакетных программ – обрабатывать результаты IO, что занимает больше времени.
В асинхронном программировании во время IO-операций процессор выполняет другие задачи.
Переосмыслим разработку
Когда вы начнёте знакомство с асинхронностью, встретите массу обсуждений блокировки и неблокирующего кода. Это требует переосмысления.
Вообразите: вы родитель и пытаетесь одновременно поддерживать денежный баланс, стирать бельё и приглядывать за ребёнком.
Разберёмся:
- Поддержание баланса – задача синхронная. Выполняете сами.
- Вы отвлеклись от платёжных документов, чтобы постирать. Освобождаете сушилку, из стиральной машины перекладываете вещи в сушилку и повторно загружаете стиральную машину. Операция синхронная, но главное начинается после старта обеих. И вы возвращаетесь к балансу. Теперь стирка и сушка — асинхронные. Работа выполняется независимо до звукового сигнала.
- Приглядывать за ребёнком — тоже асинхронный таск. Он играет самостоятельно. Если ребёнок голоден или ушибся, поднимается крик, и вы реагируете. Ребёнок – долговременная и приоритетная задача, которая заменяет остальные: баланс или стирку.
Вы, как процессор: когда разгребаете бельё, заблокированы для других заданий. Но всё окей, ведь дело в таком случае двигается быстрее.
Стирка и сушка в машинке не помеха выполнению остального. Значит, задача асинхронная. После её старта вы возвращаетесь к первому делу. То есть переключается контекст, а машинка просто просигналит по завершении.
Проектируем программу-родителя. Это непросто!
Вариант 1: Синхронный родитель
Как напишете программу для последовательного выполнения указанных задач? Раз на первом месте забота о ребёнке, родитель делает это в ожидании запросов. Ни баланс, ни стирка не совершаются.
Переназначаются приоритеты задач по желанию, но происходит только одна. Это вполне неплохо, но дела стоят, пока ребёнок не уснёт. Спустя пару таких недель родитель выйдет в окно!
Вариант 2: Проверяющий родитель
При использовании опроса выполняется несколько тасков.
При интервале опроса продолжительностью 15 минут родитель отвлекается каждые 15 минут от платёжного документа и смотрит, требует ли внимания машинка или ребёнок. Если нет, возвращается к балансу, иначе – заботится о другом, прежде чем продолжить. И так повторяется до следующего перерыва.
Этот подход также работает, но порождает проблемы:
- Родитель тратит массу времени, чтобы проверить задачи, которые не требуют внимания: машинка ещё работает, а ребёнок играет, пока не произойдёт форс-мажор.
- Родитель так же пропускает завершённые задачи, требующие внимания: когда машинка постирала в момент начала установленного интервала, она ждёт целых 15 минут! Приглядывать за ребёнком – дело с высоким приоритетом. Он не выдержит 15 минут без родителя, если что-то пойдёт не по плану.
Хотите сократить интервал? Это поможет, но понадобится дополнительное время, чтобы переключить контекст. И снова в окно!
Вариант 3: Потоковый родитель
«Вот бы себя клонировать...». Виртуально это достигается с помощью многопоточности. В итоге параллельно выполняется много частей одного приложения. Такая независимая секция – поток. У потоков разделяемая память.
Вы создаёте «клонов» родителя на каждое задание: приглядывать за ребёнком, контролировать машинку и подводить баланс. Копии действуют автономно.
Выглядит хорошо, но не без проблем. Во-первых, придётся задать всем копиям объём работы. Осложнения возникают из-за совместного использования памяти.
Так, родитель 1 занимается сушилкой. Увидел сухие вещи — начинает выгружать. А клон 2 заметил остановку стирки и теперь вынимает бельё. Ему нужен контроль над сушилкой, чтобы поместить туда мокрую одежду. Это невозможно, ведь сушилка под контролем родителя 1.
Когда родитель 1 закончил выгрузку, он требует контроля над стиральной машиной, чтобы переносить вещи в свободную сушилку. И сделать этого не выйдет, ведь клон 2 занимает стиральную машинку!
Они пришли к взаимной блокировке. Клоны занимают свой ресурс и требуют контроля над чужим. Ожидание длиною в вечность.
Примечание. Совместное использование потоков памяти часто приводит к повреждению информации и чтению неадекватных данных. Многопоточная разработка предполагает, что переключением контекста управляет система, а не программист Python. Возникающие ошибки трудно понять.
Есть ещё одна сложность. Допустим, ребёнок травмировался и нуждается в медицинской помощи. Родитель 3 решает вопрос и выписывает огромный чек за посещение врача. А клон 4 занимается платёжным документом и не в курсе событий, поэтому в шоке, когда видит баланс!
Помните, денежный счёт – разделяемый ресурс, значит, следует проинформировать о тратах родителя 4. Иначе потребуется механизм блокировки.
Применяем асинхронные функции Python
Приложения тестировались на версии 3.7.2. В requirements.txt
вы найдёте требуемые модули для установки.
Синхронное программирование
Первый пример извлекает задания из очереди. Язык программирования Python предоставляет методы для создания очереди и вывода данных в порядке вставки.
Вот код example_1.py
:
import queue
def task(name, work_queue):
if work_queue.empty():
print(f"Task {name} nothing to do")
else:
while not work_queue.empty():
count = work_queue.get()
total = 0
print(f"Task {name} running")
for x in range(count):
total += 1
print(f"Task {name} total: {total}")
def main():
"""
This is the main entry point for the program.
"""
# Create the queue of 'work'
work_queue = queue.Queue()
# Put some 'work' in the queue
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Create some synchronous tasks
tasks = [
(task, "One", work_queue),
(task, "Two", work_queue)
]
# Run the tasks
for t, n, q in tasks:
t(n, q)
if __name__ == "__main__":
main()
Разберём, что происходит в строках:
- 1 делает импорт модуля.
- 3–13 извлекают работу из
work_queue
и выполняют её. - 15 объявляет
main()
для старта приложения. - 20 создаёт
work_queue
– разделяемый ресурс для получения задач. - 23–24 ставят задачу в очередь – здесь рандомное количество для обработки.
- 27–29 формируют кортежи тасков и параметров.
- 33–34 в цикле вызывают каждое задание из списка и передают необходимые параметры.
- 36 запускает программу.
Получаем такой результат:
Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do
Видно, что Task One
выполняет все задачи, когда оказывается в while
. После завершения цикла Task Two
обнаруживает пустую очередь и пишет об отсутствии работы. Код не меняет контексты и не даёт работать одновременно.
Кооперативная многозадачность
Для совместной работы добавим yield
, который вернёт управление в заданный момент при сохранении контекста. Таск станет возобновляемый.
yield
делает task()
генератором: при выполнении инструкции управление получает вызывающая функция. Короче говоря, мы переключаем контекст.
С помощью next()
возвращаем управление генератору, который запускается с точки остановки с предыдущими переменными.
example_2.py
иллюстрирует такую многозадачность:
import queue
def task(name, queue):
while not queue.empty():
count = queue.get()
total = 0
print(f"Task {name} running")
for x in range(count):
total += 1
yield
print(f"Task {name} total: {total}")
def main():
"""
This is the main entry point for the program.
"""
# Create the queue of 'work'
work_queue = queue.Queue()
# Put some 'work' in the queue
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Create some tasks
tasks = [
task("One", work_queue),
task("Two", work_queue)
]
# Run the tasks
done = False
while not done:
for t in tasks:
try:
next(t)
except StopIteration:
tasks.remove(t)
if len(tasks) == 0:
done = True
if __name__ == "__main__":
main()
Смотрите, что выполняется в строках:
- 3–11 по-прежнему объявляют
task()
. Строка 10 сyield
делает из него генератор, чтобы переключать контекст и возвращать управление в циклmain()
. - 25–28 формируют список задач по-другому: вызывают каждую с параметрами, чтобы запустить генератор впервые.
- 34–39 модифицируют цикл для возврата управления таскам, продолжения и запуска следующей задачи.
- 35 отдаёт управление обратно в
task()
и возобновляет работу с моментаyield
. - 39 задаёт
done
. Цикл завершается при удалении изtasks
заданий.
Вот результат запуска программы:
Task One running
Task Two running
Task Two total: 10
Task Two running
Task One total: 15
Task One running
Task Two total: 5
Task One total: 2
Заметили, что Task Two
печатает итог первым? Здесь нет асинхронности. Task One
считает до 15, а второй – до 10, поэтому быстрее справляется.
Кооперативная многозадачность с блокирующими вызовами
К предыдущей версии добавляем time.sleep(delay)
, чтобы сымитировать блокировку – вызов, останавливающий процессор на время. Это тот момент, когда родитель без отрыва от процесса сводит баланс.
elapsed_time
покажет время, которое прошло с инстанциирования до вызова.
И код example_3.py
:
import time
import queue
from lib.elapsed_time import ET
def task(name, queue):
while not queue.empty():
delay = queue.get()
et = ET()
print(f"Task {name} running")
time.sleep(delay)
print(f"Task {name} total elapsed time: {et():.1f}")
yield
def main():
"""
This is the main entry point for the program.
"""
# Create the queue of 'work'
work_queue = queue.Queue()
# Put some 'work' in the queue
for work in [15, 10, 5, 2]:
work_queue.put(work)
tasks = [
task("One", work_queue),
task("Two", work_queue)
]
# Run the tasks
et = ET()
done = False
while not done:
for t in tasks:
try:
next(t)
except StopIteration:
tasks.remove(t)
if len(tasks) == 0:
done = True
print(f"\nTotal elapsed time: {et():.1f}")
if __name__ == "__main__":
main()
Вот строки с отличиями:
- 1 обеспечивает доступ к
time.sleep()
. - 11 добавляет
time.sleep(delay)
для имитации IO-приостановки и заменяетfor
из первого примера.
После запуска вы увидите:
Task One running
Task One total elapsed time: 15.0
Task Two running
Task Two total elapsed time: 10.0
Task One running
Task One total elapsed time: 5.0
Task Two running
Task Two total elapsed time: 2.0
Total elapsed time: 32.01021909713745
Приостановка показала, что кооперативная многозадачность не помогла. ЦП ждёт, когда закончится ввод-вывод. Вещь, которую в Python-статьях называют блокирующим кодом.
Кооперативная многозадачность с отсутствием блокировки
Перейдём на асинхронную разработку Python.
Заменим блокирующее засыпание на пакет asyncio
. Сделаем task
асинхронным.
Вместо метода time
и генератора напишите await asyncio.sleep(delay
).
while
и task_array
нет, вызываем await asyncio.gather(...)
, чтобы дать asyncio
такие инструкции:
- Сделать два
task
и начать выполнение. - Подождать завершения обоих.
Запись asyncio.run(main())
вызывает main()
и создаёт event loop – цикл событий.
Этот цикл запускает код. Выполнением задачи занимается процессор. При достижении await
переключается контекст и возобновляется управление. Event loop мониторит ожидающие событий задачи, и передаёт контроль нужной.
await asyncio.sleep(delay)
не блокирует процессор. ЦП фиксирует ожидание в очереди и переключает контекст, возвращая управление в событийный цикл. Event loop бесконечно ищет завершённые события и передаёт контроль ожидающему таску. Поэтому процессор окажется занят при доступных задачах, пока цикл отслеживает будущие события.
Примечание. Асинхронное приложение однопоточное. Вы контролируете переключение контекста и облегчаете решение проблемы разделяемой памяти при многопоточном подходе.
Код четвёртого примера:
import asyncio
from lib.elapsed_time import ET
async def task(name, work_queue):
while not work_queue.empty():
delay = await work_queue.get()
et = ET()
print(f"Task {name} running")
await asyncio.sleep(delay)
print(f"Task {name} total elapsed time: {et():.1f}")
async def main():
"""
This is the main entry point for the program.
"""
# Create the queue of 'work'
work_queue = asyncio.Queue()
# Put some 'work' in the queue
for work in [15, 10, 5, 2]:
await work_queue.put(work)
# Run the tasks
et = ET()
await asyncio.gather(
asyncio.create_task(task("One", work_queue)),
asyncio.create_task(task("Two", work_queue)),
)
print(f"\nTotal elapsed time: {et():.1f}")
if __name__ == "__main__":
asyncio.run(main())
Отличия от третьей версии найдёте в таких строках:
- 1 вместо
time
импортируетasyncio
. - 4 добавляет
async
для асинхронной работы. - 9 – на смену блокирующему
sleep
приходитasyncio.sleep(delay)
для возврата управления в событийный цикл. - 17 создаёт асинхронную очередь без блокировки.
- 20–21 с помощью
await
добавляют задачи в очередь асинхронно. - 25–28 создают два таска и группируют их для ожидания выполнения обоих.
- 32 делает запуск приложения асинхронным.
Взгляните, как обе задачи стартуют одновременно и ждут при ложном IO-вызове:
Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0
Total elapsed time: 17.0
Вот и подтверждение отсутствия блокировки.
Отметили, что приложение затратило половину времени работы предыдущей версии? Таков профит от применения асинхронных функций Python! Программа выполняется быстрее суммы времени своих составляющих. Избавились от синхронности!
Вызываем HTTP синхронно
Приложение взаимодействует с реальным IO: посылает HTTP-запросы на URL-адреса из списка и принимает ответы с контентом. Только с блокировкой.
Импортируем requests для работы с HTTP. В очереди будут URL-адреса, а не числа. Вместо увеличения счётчика task()
принимает контент из URL-адреса и выводит затраченное время.
Код пятого примера:
import queue
import requests
from lib.elapsed_time import ET
def task(name, work_queue):
with requests.Session() as session:
while not work_queue.empty():
url = work_queue.get()
print(f"Task {name} getting URL: {url}")
et = ET()
session.get(url)
print(f"Task {name} total elapsed time: {et():.1f}")
yield
def main():
"""
This is the main entry point for the program.
"""
# Create the queue of 'work'
work_queue = queue.Queue()
# Put some 'work' in the queue
for url in [
"http://google.com",
"http://yahoo.com",
"http://linkedin.com",
"http://apple.com",
"http://microsoft.com",
"http://facebook.com",
"http://twitter.com"
]:
work_queue.put(url)
tasks = [
task("One", work_queue),
task("Two", work_queue)
]
# Run the tasks
et = ET()
done = False
while not done:
for t in tasks:
try:
next(t)
except StopIteration:
tasks.remove(t)
if len(tasks) == 0:
done = True
print(f"\nTotal elapsed time: {et():.1f}")
if __name__ == "__main__":
main()
Вот, что делают строки:
- 2 импортируют
requests
, чтобы работать с HTTP. - 10–11 делают приостановку, как в
example_3.py
, только с вызовомsession.get(url)
для получения контента страницы из URL. - 23–32 наполняют очередь списком URL.
После запуска увидите:
Task One getting URL: http://google.com
Task One total elapsed time: 0.3
Task Two getting URL: http://yahoo.com
Task Two total elapsed time: 0.8
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.4
Task Two getting URL: http://apple.com
Task Two total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task One total elapsed time: 0.5
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.5
Task One getting URL: http://twitter.com
Task One total elapsed time: 0.4
Total elapsed time: 3.2
Все таски берут из очереди адрес, получают контент страницы и печатают затраченное время.
Как прежде, из-за yield
оба задания выполняются совместно. Но все session.get()
блокируют процессор до получения ответа. Засеките время, потраченное на запуск приложения, чтобы сравнить со следующим примером.
Делаем HTTP-запросы асинхронными
В этой версии нужна Python-библиотека aiohttp, чтобы работать с HTTP асинхронно.
Удалим yield
, так как запрос HTTP GET
уже не блокирующий.
Так выглядит шестой пример:
import asyncio
import aiohttp
from lib.elapsed_time import ET
async def task(name, work_queue):
async with aiohttp.ClientSession() as session:
while not work_queue.empty():
url = await work_queue.get()
print(f"Task {name} getting URL: {url}")
et = ET()
async with session.get(url) as response:
await response.text()
print(f"Task {name} total elapsed time: {et():.1f}")
async def main():
"""
This is the main entry point for the program.
"""
# Create the queue of 'work'
work_queue = asyncio.Queue()
# Put some 'work' in the queue
for url in [
"http://google.com",
"http://yahoo.com",
"http://linkedin.com",
"http://apple.com",
"http://microsoft.com",
"http://facebook.com",
"http://twitter.com",
]:
await work_queue.put(url)
# Run the tasks
et = ET()
await asyncio.gather(
asyncio.create_task(task("One", work_queue)),
asyncio.create_task(task("Two", work_queue)),
)
print(f"\nTotal elapsed time: {et():.1f}")
if __name__ == "__main__":
asyncio.run(main())
Вот, за что отвечают строки:
- 2 для асинхронности HTTP-запросов импортирует
aiohttp
. - 5 делает функцию асинхронной.
- 6 создаёт менеджер контекста сессии
aiohttp
. - 11 создаёт такой менеджер для ответа
aiohttp
и отправляетHTTP GET
на URL-адрес из очереди. - 12 асинхронно получает текст из ответа.
Запустите и получите:
Task One getting URL: http://google.com
Task Two getting URL: http://yahoo.com
Task One total elapsed time: 0.3
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.3
Task One getting URL: http://apple.com
Task One total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task Two total elapsed time: 0.9
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.4
Task Two getting URL: http://twitter.com
Task One total elapsed time: 0.5
Task Two total elapsed time: 0.3
Total elapsed time: 1.7
Видите, что общее время – половина времени всех GET-запросов? Дело в асинхронном выполнении. Вы используете процессор с умом: запросы отправляются одновременно.
Заключение
Теперь вы знаете, как применять асинхронные функции Python. Значит, получаете контроль при переключении контекста и облегчаете решение проблем многопоточного программирования.
Комментарии