Процессы и потоки в Python помогают запускать несколько задач одновременно, чтобы программа работала быстрее и эффективнее. Разбираем, как это работает, на примерах.
Что такое процессы и потоки в Python
Прежде чем перейти к задачам, разберем, что такое процесс и поток.
Процесс
Процесс (модуль multiprocessing) — это отдельный экземпляр программы, который выполняется в своей области памяти, где хранятся ее данные, код и переменные. Это означает, что каждый процесс имеет свое адресное пространство, копии данных, кода и переменных, а также собственный экземпляр интерпретатора Python (CPython).
Это позволяет обойти ограничение глобальной блокировки интерпретатора (GIL), так как каждый интерпретатор процесса имеет собственный GIL, который управляет только потоками внутри этого процесса и не влияет на другие процессы (не конкурирует с ними).
- Каждый процесс независим и владеет собственными ресурсами. Типичный Python-скрипт — это один процесс, внутри которого по умолчанию есть главный поток выполнения.
- Процессы полностью изолированы, а обмен данными между ними требует явной передачи (например, через Queue, Pipe, Value или Array из модуля multiprocessing).
- Подходят для CPU-bound-задач (скорость выполнения которых ограничена производительностью CPU), где требуется интенсивная обработка данных, например:
- математика/алгоритмы, преобразование изображений, подготовка фичей в ML;
- параллельная обработка больших массивов или файлов.
Важно: в некоторых окружениях (например, на Windows) используется способ запуска spawn, поэтому обязательно защищайте точку входа конструкцией if __name__ == «__main__», иначе дочерние процессы попытаются повторно импортировать модуль и стартовать заново.
Поток
Поток (модуль threading) — это часть процесса, которая выполняет задачу внутри него. Потоки внутри одного процесса разделяют адресное пространство памяти процесса, включая глобальные переменные, объекты и кучу (heap), что делает их легче и быстрее для создания, но требует осторожности: например, важно не допускать, чтобы один поток изменял данные, пока второй их читает или записывает.
- Потоки выполняются в рамках одного процесса и делят общее адресное пространство, но из-за GIL в CPython только один поток может выполнять Python-код в определенный момент времени.
- Потоки легче создавать и управлять ими, так как они используют меньше ресурсов по сравнению с процессами.
- Подходят для I/O-bound-задач (задачи без большой нагрузки на процессор), где программа ожидает внешних операций, таких как:
- загрузка файлов из интернета,
- чтение/запись на диск,
- обращение к базам данных или API.
Процессы: как создавать и использовать в CPU-bound-задачах
Для создания процесса используют класс multiprocessing.Process. Вместо последовательной обработки миллиона записей в цикле он разделяет задачу на несколько процессов, каждый из которых обрабатывает свой кусок данных.
Пример с вычислением суммы факториалов
Рассмотрим задачу вычисления суммы факториалов чисел в большом диапазоне, например, от 0 до 2000. Последовательное выполнение этой задачи может занять много времени, особенно на больших числах, так как факториалы требуют интенсивных вычислений.
Мы разделим диапазон на части и поручим каждому процессу обработать свою часть, а затем соберем результаты через Queue для безопасного обмена данными. Это позволит задействовать несколько ядер процессора и сократить тем самым общее время выполнения программы.
- Сначала импортируем необходимые библиотеки:
from multiprocessing import Process, Queue import math
Process для создания независимых процессов.
Queue для безопасной передачи результатов из изолированных процессов в главный процесс.
math с функцией factorial для вычисления факториалов.
- Затем объявим функцию compute_factorial, которая вычисляет сумму факториалов чисел в заданном диапазоне и помещает результат в очередь (queue.put).
def compute_factorial(start, end, queue): result = sum(math.factorial(i) for i in range(start, end)) queue.put(result)
Каждый процесс выполняет свою часть вычислений независимо, что позволяет распараллелить задачу. Queue используем для передачи результата в главный процесс, так как процессы изолированы и не разделяют память.
- Создадим и запустим процессы:
if __name__ == "__main__": queue = Queue() processes = [ Process(target=compute_factorial, args=(0, 1000, queue)), Process(target=compute_factorial, args=(1000, 2000, queue)) ] for p in processes: p.start() for p in processes: p.join() total = sum(queue.get() for _ in processes) print(f"Сумма факториалов: {total}")
Создаем очередь для сбора результатов и два процесса, каждый из которых вычисляет сумму факториалов для своего диапазона (0–999 и 1000–1999). Методом start() запускаем процессы, а с join() ожидаем их завершения. Затем извлекаем результаты из очереди и суммируем их.
Разделение задачи на два процесса позволяет задействовать два ядра процессора.
Важно: не создавайте больше процессов, чем ядер процессора (узнать можно через multiprocessing.cpu_count()). Например, на четырехъядерной системе оптимально использовать четыре процесса.
Альтернатива с пулом процессов (Pool)
Для задач с множеством одинаковых вычислений используем multiprocessing.Pool, который автоматически делит входные данные на части, распределяет их по процессам и собирает результаты.
Создаем пул из четырех процессов, который автоматически делит список чисел (от 0 до 999999) на части и распределяет задачу вычисления квадратов между процессами.
from multiprocessing import Pool def compute_square(n): return n * n if __name__ == "__main__": with Pool(processes=4) as pool: results = pool.map(compute_square, range(1000000)) print(f"Сумма квадратов: {sum(results)}")
Метод pool.map применяет функцию compute_square к каждому числу и возвращает список результатов.
Для больших наборов данных можно использовать imap/imap_unordered и параметр chunksize.
Оптимизация с разделяемой памятью через Array/Value
Так как Queue использует механизм сериализации (через модуль pickle) для передачи объектов между процессами, это требует дополнительных вычислительных ресурсов и времени, особенно при больших объемах данных.
Если процессы обмениваются небольшими числами/флагами или работают с фиксированным набором примитивов, выгоднее использовать разделяемую память: multiprocessing.Array (массив) и multiprocessing.Value (одиночное значение). Тогда данные читаются/пишутся напрямую, без сериализации.
Пример с Array (каждый процесс меняет свой индекс)
Создадим функцию add_to_array, в которую передадим два аргумента: arr (разделяемый массив, созданный через Array) и index (индекс элемента в массиве). Функция увеличивает значение элемента массива по указанному индексу на 1.
from multiprocessing import Process, Array def add_to_array(arr, index): arr[index] += 1
Каждый процесс будет вызывать эту функцию, чтобы изменить свой элемент в общем массиве. Поскольку Array использует разделяемую память, изменения происходят напрямую, без необходимости сериализации данных.
Создадим объект Array с типом данных ‘i’ (целочисленный) и начальным значением [0, 0, 0]. Это выделяет блок разделяемой памяти, доступный всем процессам.
if __name__ == "__main__": shared_array = Array('i', [0, 0, 0])
Создаем список из трех процессов, каждый из которых выполнит функцию add_to_array. Аргументы: shared_array (общий массив) и i (индекс: 0, 1 или 2).
Каждый процесс будет работать с отдельным элементом массива (0-й, 1-й или 2-й), увеличивая его значение. Это распределяет задачу между процессами.
processes = [Process(target=add_to_array, args=(shared_array, i)) for i in range(3)]
Запускаем каждый процесс с помощью метода start(). Это инициирует выполнение функции add_to_array в отдельном экземпляре интерпретатора Python для каждого процесса. И используем join(), чтобы главный процесс ждал завершения всех дочерних.
for p in processes: p.start() for p in processes: p.join() print(f"Массив: {list(shared_array)}")
После завершения всех процессов массив shared_array будет содержать обновленные значения.
Когда нужна синхронизация
Обратите внимание, что в примере выше мы используем операцию +=, а она неатомарна, то есть выполняется в несколько шагов, и между ними другой поток/процесс может «вклиниться».
Если несколько процессов пишут в один и тот же элемент, используйте синхронизацию: Lock/Semaphore или структуры multiprocessing со встроенным локом (Array, рассмотренный выше, или Value), чтобы защитить участок и не потерять инкременты.
Value — общий скаляр (одно число/флаг) в разделяемой памяти.
Пример с Value (общий счетчик)
- Импортируем Process и Value и создаем метод do_work.
from multiprocessing import Process, Value def do_work(counter, n=100_000): for _ in range(n):
- Атомарно инкрементируем счетчик и получаем значение.
with counter.get_lock(): counter.value += 1 if __name__ == "__main__": counter = Value('i', 0) ps = [Process(target=do_work, args=(counter,)) for _ in range(3)] [p.start() for p in ps]; [p.join() for p in ps] print(counter.value)
Пример с Lock (несколько процессов пишут в один элемент Array)
- Создаем функцию add_to_same_slot:
from multiprocessing import Process, Array def add_to_same_slot(arr, times=100_000):
- Используем встроенный лок у synchronized Array:
lock = arr.get_lock() for _ in range(times): with lock: arr[0] += 1 if __name__ == "__main__": arr = Array('i', [0, 0, 0]) ps = [Process(target=add_to_same_slot, args=(arr,)) for _ in range(4)] [p.start() for p in ps]; [p.join() for p in ps] print(arr[0])
Пример с Semaphore (ограничиваем параллелизм и атомарность через лок)
Семафор сам по себе не делает += атомарным, он лишь ограничивает параллелизм. То есть по-прежнему нужен лок (или семафор с value=1), чтобы количество процессов, которые могут одновременно «зайти», не превышало указанного.
- Создаем функцию add_limited:
from multiprocessing import Process, Array, Semaphore, Lock def add_limited(arr, sem, arr_lock, times=100_000): for _ in range(times):
- Ограничиваем количество процессов внутри до 2:
with sem: with arr_lock: arr[0] += 1 if __name__ == "__main__":
- Отключаем общий лок, синхронизируем сами:
arr = Array('i', [0, 0, 0], lock=False) sem = Semaphore(2) arr_lock = Lock() ps = [Process(target=add_limited, args=(arr, sem, arr_lock)) for _ in range(4)] for p in ps: p.start() for p in ps: p.join() print(arr[0])
Работает это так:
- Процесс просит у семафора «пропуск». Если уже 2 внутри — ждет.
- Оказавшись внутри, берет лок на массив и делает arr[0] += 1 в одиночку.
- Освобождает лок и возвращает «пропуск» семафору.
Пример с ProcessPoolExecutor
Для реализации параллельного выполнения задач в отдельных процессах также можно использовать высокоуровневый интерфейс concurrent.futures.ProcessPoolExecutor.
- Он автоматически создает пул процессов (обычно по числу ядер процессора) и управляет их жизненным циклом (запуск, завершение, освобождение ресурсов).
- Возвращает итерируемые результаты, которые можно легко обработать (например, с помощью цикла или list()).
- Использует контекстный менеджер (with ProcessPoolExecutor() as executor), так что не нужно явно закрывать пул процессов.
Создаем пул из 4 процессов и распределяем задачу вычисления кубов чисел от 0 до 999 между ними.
from concurrent.futures import ProcessPoolExecutor def compute_cube(n): return n ** 3 if __name__ == "__main__": with ProcessPoolExecutor(max_workers=4) as executor: results = executor.map(compute_cube, range(1000)) print(f"Сумма кубов: {sum(results)}")
Метод map автоматически делит входной диапазон и возвращает результаты.
Таким образом, с ProcessPoolExecutor получилось написать компактный код. Если бы мы использовали Process, то пришлось бы вручную создавать процессы, запускать их, ожидать завершения и собирать результаты через Queue.
I/O-bound-задачи с потоками
В отличие от процессов, потоки не реализуют истинный параллелизм, так как его ограничивает GIL: одновременно байткод выполняет ровно один поток.
Пока один поток ждет сеть/диск, другой может работать. Поэтому сетевые вызовы, файловый I/O, ожидание БД и т. п. отлично масштабируются десятками потоков.
Инструменты работы с потоками
- Низкоуровневые потоки: threading.Thread из threading.
- Высокоуровневый пул потоков: ThreadPoolExecutor из concurrent.futures.
- Безопасный обмен между потоками: queue.Queue из queue.
Массовая загрузка веб-страниц (сетевой I/O)
Используем ThreadPoolExecutor и библиотеку Requests для параллельных HTTP-запросов.
from concurrent.futures import ThreadPoolExecutor, as_completed import requests import time URLS = [ "https://example.com/", "https://httpbin.org/delay/2", "https://httpbin.org/bytes/65536" ] def fetch(url: str) -> tuple[str, int]: r = requests.get(url, timeout=10) r.raise_for_status() return url, len(r.content) if __name__ == "__main__": started = time.perf_counter() with ThreadPoolExecutor(max_workers=20) as pool: futures = [pool.submit(fetch, url) for url in URLS] for f in as_completed(futures): url, size = f.result() print(f"{url} -> {size} байт") print(f"Завершено за {time.perf_counter() - started:.2f} сек.")
Пока один поток ждет сеть, другие выполняют работу. Поэтому I/O-bound-задачи масштабируются десятками потоков.
Подбирайте max_workers эмпирически: для типичных HTTP-запросов 10–100+ для медленной сети. Учитывайте лимиты внешних сервисов и ставьте timeout.
Конвейер «производитель-потребитель» для файлового I/O
Когда есть каталог из множества небольших файлов, логично читать их параллельно рабочими потоками, а результаты собирать в основной нити.
Важно: изменять общий dict из нескольких потоков нужно с использованием Lock.
from threading import Thread, Lock from queue import Queue from pathlib import Path from typing import Dict, Optional def worker(tasks: "Queue[Optional[Path]]", results: Dict[str, int], results_lock: Lock) -> None: while True: path = tasks.get()
Ставим сигнал завершения, если path является None:
if path is None: tasks.task_done() break try: with path.open("rb") as f:
Имитация «долгого» файлового I/O:
data = f.read() size = len(data)
Потокобезопасная запись в общий dict:
with results_lock: results[str(path)] = size finally: tasks.task_done() def scan_dir(root: Path, workers: int = 8) -> Dict[str, int]: tasks: "Queue[Optional[Path]]" = Queue() results: Dict[str, int] = {} results_lock = Lock() threads = [Thread(target=worker, args=(tasks, results, results_lock), daemon=True) for _ in range(workers)] for t in threads: t.start() for p in root.rglob("*"): if p.is_file(): tasks.put(p)
После постановки всех файлов в очередь мы отправляем в очередь по одному служебному значению None на каждый рабочий поток.
for _ in threads: tasks.put(None)
Получив None, поток понимает, что новых задач больше нет, корректно завершает цикл и выходит. Это гарантирует, что tasks.join() дождется обработки всех элементов очереди (включая сигналы остановки), и функция вернет результат после полного завершения работы потоков.
tasks.join() return results if __name__ == "__main__": sizes = scan_dir(Path("./data"), workers=8) print(f"Обработано файлов: {len(sizes)}")
Несколько практических советов по потокам
- Следите за потокобезопасностью используемых библиотек (например, клиенты БД, логгеры).
Некоторые клиенты БД и другие библиотеки не рассчитаны на совместное использование одним и тем же объектом из разных потоков.
Правило: один поток — один независимый объект подключения/клиент.
Создавайте отдельное подключение к БД в каждом потоке (или используйте пул), а не делитесь одним соединением между потоками.
- Избегайте «внутренних дедлоков».
Не вызывайте ожидание результата внутри рабочего потока того же пула.
Правило: ждем результаты в основном потоке, используя as_completed(…) или executor.map(…). Все submit(…) — снаружи, а внутри рабочих функций — только работа без ожиданий других задач.
Типичная ошибка: внутри функции, которую вы отправили в ThreadPoolExecutor, вы снова делаете executor.submit(…) и тут же вызываете .result(). Пул может «закончить» потоки и зависнуть.
- Если подключений/запросов очень много — рассмотрите асинхронные библиотеки.
Для сетевого и высококонкурентного I/O альтернативой потокам может быть asyncio (кооперативная многозадачность на async/await). Оно особенно эффективно, когда нужно управлять тысячами одновременных соединений.