Современные программы редко выполняют только одну задачу за раз. Они отправляют запросы в сеть, работают с файлами, обрабатывают данные и одновременно ждут ответа от внешних сервисов. Чтобы не тратить время на простои и эффективнее использовать ресурсы системы, в Python применяется асинхронное программирование.
Разберемся, как работает асинхронность, какие задачи она помогает решать и как использовать модуль asyncio на практике.
Что такое асинхронность в программировании и зачем она нужна
Представим себе простую домохозяйку, которой нужно испечь пирог, сварить суп и погладить вещи. Если никуда не торопиться, то все эти операции можно сделать последовательно:
- Сначала замесить тесто, подождать, пока оно подойдет (где-то 1 час), потом испечь пирог (40 минут). Все это время остальные дела не делаются.
- Затем сварить суп от начала до конца (1,5 часа).
- Наконец, погладить вещи (30 минут).
Это синхронный подход и он потребует более трех с половиной часов. А вот так может выглядеть асинхронный подход:
- Замесить тесто и поставить его подходить.
- Пока тесто подходит, почистить овощи и поставить варить бульон для супа.
- Проверить тесто — оно подошло. сформировать пирог, поставить его в духовку.
- Пока пирог печется, нарезать овощи для супа и добавить их в бульон.
- Поставить таймер на 30 минут (до готовности пирога), взять утюг и начинать гладить вещи.
- Когда таймер сработает, вынуть пирог из духовки. В это время суп почти готов — добавить специи.
- Догладить оставшиеся вещи, пока суп настаивается.
В итоге все дела сделаны за пару часов! Как видим, асинхронный подход позволяет наиболее эффективно распределять драгоценное время, в частности, использовать с наибольшей пользой возникающие периоды ожидания и в эти периоды выполнять какие-то параллельные задачи.
Асинхронность нужна для того, чтобы программа не «зависала» во время длительных операций, позволяя выполнять другой код, пока ожидается ответ. Это обеспечивает высокую производительность и отзывчивость приложений.
В языке программирования Python применение асинхронности особенно полезно для таких операций, как сетевые запросы, работа с файлами и базами данных. Например, при обработке тысячи HTTP-запросов синхронный код может выполняться целых 10-15 секунд, а асинхронный — всего 1-2 секунды.
Как работает асинхронное программирование
Для асинхронного программирования в Python применяются специальные функции, которые называются корутины. Они объявляются через ключевые слова async def и могут приостанавливать свое выполнение, сохраняя свое состояние, чтобы уступить место другим задачам, и возобновить выполнение позже.
Для прерывания выполнения функции используется ключевое слово await. При его использовании корутина передает поток управления в цикл событий, также известный как event loop. Этот цикл управляет выполнением корутин. Именно он решает, когда и какую корутину запускать, а когда останавливать.
Event loop — это механизм асинхронного программирования в Python, работающий в рамках одного потока. Он управляет выполнением множества асинхронных операций, например, сетевых запросов или операций с файлами, при этом не блокируя поток ожиданиями. Основа его работы — кооперативная многозадачность: корутины приостанавливаются на операторе await и передают управление обратно в event loop, который в это время может переключиться на другие задачи. При этом цикл отслеживает готовность I/O‑операций и возобновляет соответствующие корутины, когда данные становятся доступны.
Рассмотрим вот такую простенькую корутину:
import asyncio
async def greet():
print(f"Привет, Мир!")
await asyncio.sleep(2)
print(f"Пока, Мир!")
asyncio.run(greet())
В самом начале импортируем модуль asyncio. В этом модуле содержится все необходимое для работы асинхронных функций. Дальше идет объявление функции с именем greet, которая является асинхронной. С методами print все понятно — они выводят в консоль соответствующие сообщения.
А что за команда содержится между ними? А эта команда как раз и приостанавливает выполнение функции на 2 секунды. Как видим, в ней используется метод sleep из модуля asyncio. Чтобы запустить функцию используется метод run из того же модуля. После запуска получим в консоли такую строчку:
Привет, Мир!
А через две секунды увидим уже это:
Привет, Мир! Пока, Мир!
Значит, наша функция отработала, как положено. Для планирования выполнения корутин можно использовать задачи (Task). Тут все довольно просто: создаем задачу, передаем ей корутину, а потом выполняем. Простой пример:
import asyncio
async def fetch_data(url):
await asyncio.sleep(1)
return f"Данные с сайта {url}"
async def main():
task1 = asyncio.create_task(fetch_data("site1.ru"))
task2 = asyncio.create_task(fetch_data("site2.ru"))
result1, result2 = await asyncio.gather(task1, task2)
print(result1)
print(result2)
asyncio.run(main())
Функция fetch_data якобы получает какие-то данные с сайта. В функции main создаются две задачи task1 и task2. Задачи создаются посредством метода create_task из модуля asyncio. На вход метода подается функция fetch_data с нужными аргументами. Далее метод gather из того же модуля asyncio принимает на вход обе задачи и возвращает результат их выполнения. Остается только вывести результаты в консоль при помощи print. Запускается код также через метод run, на вход которого подается функция main. После запуска программы в консоли получим следующее:
Данные с сайта site1.ru Данные с сайта site2.ru
Обе записи появляются после небольшой задержки, которая прописана в функции fetch_data. Она имитирует время необходимое для получения данных с сайта. Также для запуска асинхронных функций можно создать новый цикл событий event loop, установить его и запустить в нем необходимую функцию. Пример:
import asyncio
async def spaceship_flight(ship_number):
print(f'{ship_number}: Старт ...')
await asyncio.sleep(3)
print(f'{ship_number}: ... Полет нормальный!')
async def main():
task1 = asyncio.create_task(spaceship_flight('Космический корабль №1'))
task2 = asyncio.create_task(spaceship_flight('Космический корабль №2'))
task3 = asyncio.create_task(spaceship_flight('Космический корабль №3'))
await asyncio.wait([task1,task2,task3])
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
except :
pass
Этот пример в целом похож на предыдущий, только задачи здесь запускаются при помощи метода wait, на вход которого подается список из трех задач. Также в конце видим конструкцию для обработки исключений try – except. В ней как раз и создается, устанавливается и запускается новый цикл событий. Сразу после запуска в консоли получим:
Космический корабль №1: Старт ... Космический корабль №2: Старт ... Космический корабль №3: Старт ...
А через 3 секунды увидим уже следующее:
Космический корабль №1: Старт ... Космический корабль №2: Старт ... Космический корабль №3: Старт ... Космический корабль №1: ... Полет нормальный! Космический корабль №2: ... Полет нормальный! Космический корабль №3: ... Полет нормальный!
Еще в асинхронном программировании на Python есть очереди (Queue). Они нужны для организации обработки задач по порядку и для разделения процессов. Таким образом, очереди помогают распределять нагрузку и предотвращают перегрузку систем. Очередь — это такая структура данных, которая работает по принципу «первый пришел — первый вышел» (FIFO: First In, First Out). То есть, новые элементы добавляются в конец очереди, а извлекаются из начала очереди. Рассмотрим простой пример:
import asyncio
async def basic_queue_example():
queue = asyncio.Queue()
await queue.put("Первый элемент")
await queue.put("Второй элемент")
await queue.put("Третий элемент")
await queue.put("Четвертый элемент")
await queue.put("Пятый элемент")
await queue.put("Шестой элемент")
print(f"Размер очереди: {queue.qsize()}")
while not queue.empty():
item = await queue.get()
print(f"Получено: {item}")
asyncio.run(basic_queue_example())
Здесь мы сначала создаем очередь queue, а затем добавляем в нее 6 элементов. Далее выводим в консоль сообщение о размере очереди, а потом при помощи цикла while извлекаем из нее элементы. Как видно из условия цикла, извлечение будет происходить только, если очередь не является пустой. В консоли получим:
Размер очереди: 6 Получено: Первый элемент Получено: Второй элемент Получено: Третий элемент Получено: Четвертый элемент Получено: Пятый элемент Получено: Шестой элемент
Примеры асинхронных функций: разбираемся на практике
Рассмотрим несколько практических примеров асинхронного программирования на языке Python.
Асинхронный таймер
Вот такой простенький таймер позволяет производить отсчет времени от заданного значения до одной секунды:
import asyncio
async def timer(seconds):
for i in range(seconds, 0, -1):
print(f"Осталось {i} сек.")
await asyncio.sleep(1)
print("Время вышло!")
asyncio.run(timer(10))
В функции timer запускается цикл for, в теле которого вызывается метод sleep с аргументом 1. Это как раз и создает секундную задержку, благодаря которой происходит отсчет времени. В нашем случае отсчет будет начинаться с 10-и секунд. Сразу же после запуска в консоли получим следующий результат:
Осталось 10 сек.
А через секунду увидим:
Осталось 10 сек. Осталось 9 сек.
И так далее, пока отсчет не завершится. В конце работы таймера в консоли будет выведено:
Осталось 10 сек. Осталось 9 сек. Осталось 8 сек. Осталось 7 сек. Осталось 6 сек. Осталось 5 сек. Осталось 4 сек. Осталось 3 сек. Осталось 2 сек. Осталось 1 сек. Время вышло!
Производитель и потребитель
Здесь мы столкнемся и с задачами и с очередями. В примере, который приводится ниже применяется паттерн Producer‑Consumer (производитель‑потребитель), в согласии с которым в коде одни задачи генерируют (производят) данные, а другие их обрабатывают (потребляют). Пример:
import asyncio
async def producer(queue):
for i in range(7):
await queue.put(f"Элемент {i}")
print(f"Произведено: Элемент {i}")
await asyncio.sleep(1)
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Потреблено: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
await producer_task
await queue.join() # Ждем, пока все элементы будут обработаны
queue.put_nowait(None) # Сигнал завершения для потребителя
await consumer_task
asyncio.run(main())
Итак, у нас здесь 3 асинхронных функции. Функция producer производит элементы, а функция consumer эти элементы потребляет. Третья функция main является главной, и в ней сначала создается очередь queue, а потом эта очередь подается на вход первым двум функциям, которые являются аргументами для метода create_task, создающему две задачи producer_task и consumer_task.
Далее выполняется задача producer_task, производящая элементы. Затем надо подождать, пока все элементы будут обработаны и получить сигнал завершения. И только после этого можно запускать вторую задачу. Сразу после запуска в консоли увидим такие строчки:
Произведено: Элемент 0 Потреблено: Элемент 0
Через секунду вывод уже будет таким:
Произведено: Элемент 0 Потреблено: Элемент 0 Произведено: Элемент 1 Потреблено: Элемент 1
И так будет продолжаться до шестого элемента включительно. Весь вывод имеет следующий вид:
Произведено: Элемент 0 Потреблено: Элемент 0 Произведено: Элемент 1 Потреблено: Элемент 1 Произведено: Элемент 2 Потреблено: Элемент 2 Произведено: Элемент 3 Потреблено: Элемент 3 Произведено: Элемент 4 Потреблено: Элемент 4 Произведено: Элемент 5 Потреблено: Элемент 5 Произведено: Элемент 6 Потреблено: Элемент 6
Периодическое выполнение задачи
Если нужно, чтобы какая-то задача выполнялась периодически и в течение определенного времени, то можно использовать следующий код:
import asyncio
from datetime import datetime
async def periodic_task():
while True:
print(f"Выполняется задача в {datetime.now().strftime("%H:%M:%S")}")
await asyncio.sleep(2)
async def main():
task = asyncio.create_task(periodic_task())
await asyncio.sleep(20)
task.cancel()
asyncio.run(main())
В функции periodic_task содержится цикл while, который каждые 2 секунды выводит в консоль сообщение о том, в какое время выполняется задача. В главной функции main создается эта задача и следующей командой устанавливается время, в течение которого она будет выполняться. Когда время истечет, третья команда отменяет задачу. Запустив код, через некоторое время увидим в консоли примерно следующее:
Выполняется задача в 11:30:12 Выполняется задача в 11:30:14 Выполняется задача в 11:30:16 Выполняется задача в 11:30:18 Выполняется задача в 11:30:20 Выполняется задача в 11:30:22 Выполняется задача в 11:30:24 Выполняется задача в 11:30:26 Выполняется задача в 11:30:28 Выполняется задача в 11:30:30
Асинхронный генератор
Также как и обычный генератор в Python, асинхронный генератор должен содержать хотя бы одно ключевое слово yield. Подробнее о генераторах в языке Python можно прочитать в этой статье. Рассмотрим такой пример:
import asyncio
async def async_generator():
for i in range(10):
await asyncio.sleep(1)
yield i
async def main():
async for value in async_generator():
print(value)
asyncio.run(main())
Функция async_generator создает при помощи цикла for последовательность чисел от 0 до 9. Выдает она эти числа с секундной задержкой. В главной функции main мы видим цикл for, специально предназначенный для перебора значений из асинхронных источников. От обычного for он отличается тем, что может вызывать await на каждой итерации. В результате выполнения программы в консоли поочередно будет выведена следующая последовательность чисел:
0 1 2 3 4 5 6 7 8 9
Асинхронный HTTP-запрос
Допустим, у нас есть адреса 4-х веб-страниц и нужно получить количество символов, содержащихся на каждой из этих страниц. Сделать это можно таким образом:
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
data = await response.text()
print(f"Получено {len(data)} символов с {url}")
return data
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/stream/3",
"https://httpbin.org/status/200"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
Получение данных со страниц и подсчет числа символов происходит в функции fetch_url. В главной функции main сначала создаем список адресов страниц. Затем, с помощью генератора, подаем на вход функции fetch_url каждый адрес из списка и, таким образом, получаем список задач. Далее выполняем эти задачи. В консоли получим следующий результат:
Получено 852 символов с https://httpbin.org/stream/3 Получено 0 символов с https://httpbin.org/status/200 Получено 361 символов с https://httpbin.org/delay/1 Получено 361 символов с https://httpbin.org/delay/2
Загрузка файлов
Пример ниже показывает, как асинхронно загрузить парочку файлов и не требует импорта модуля asyncio. Код:
async def download_file(url):
print(f"Загружен файл по ссылке {url}")
image_downloader = download_file('https://www.best-images.com/space/image1.jpg')
music_downloader = download_file('https://www.music-box.com/artists/artist/album3/song6.mp3')
coroutines = [music_downloader, image_downloader]
while True:
for coroutine in coroutines.copy():
try:
coroutine.send(None)
except StopIteration:
coroutines.remove(coroutine)
if len(coroutines) == 0:
break
Функция download_file имитирует загрузку файла с некоторого url, о чем и выводит сообщение в консоли. Далее создаем две корутины: image_downloader и music_downloader. Первая качает изображение, а вторая — музыку. На следующем этапе создаем список из этих двух корутин. После этого осталось только запустить корутины по очереди в бесконечном цикле while. Корутины запускаются при помощи метода send.
Если срабатывает исключение StopIteration, то это значит, что корутина истощилась, то есть файл был успешно скачан, и корутину можно убирать из списка. Делается это методом remove. Если длина списка корутин окажется равной нулю (все файлы скачаны и корутины удалены из списка), то происходит выход из цикла. В консоли получим:
Загружен файл по ссылке https://www.music-box.com/artists/artist/album3/song6.mp3 Загружен файл по ссылке https://www.best-images.com/space/image1.jpg
Как видим, в некоторых случаях можно обойтись и без модуля asyncio.
