Баннер мобильный (3) Пройти тест

Как ускорить программу на Python с помощью процессов и потоков

В каких задачах использовать процессы, а в каких — потоки

Инструкция

26 сентября 2025

Поделиться

Скопировано
Как ускорить программу на Python с помощью процессов и потоков

Содержание

    Процессы и потоки в Python помогают запускать несколько задач одновременно, чтобы программа работала быстрее и эффективнее. Разбираем, как это работает, на примерах.

    Что такое процессы и потоки в Python

    Прежде чем перейти к задачам, разберем, что такое процесс и поток.

    Процесс

    Процесс (модуль multiprocessing) — это отдельный экземпляр программы, который выполняется в своей области памяти, где хранятся ее данные, код и переменные. Это означает, что каждый процесс имеет свое адресное пространство, копии данных, кода и переменных, а также собственный экземпляр интерпретатора Python (CPython).

    Это позволяет обойти ограничение глобальной блокировки интерпретатора (GIL), так как каждый интерпретатор процесса имеет собственный GIL, который управляет только потоками внутри этого процесса и не влияет на другие процессы (не конкурирует с ними).

    • Каждый процесс независим и владеет собственными ресурсами. Типичный Python-скрипт — это один процесс, внутри которого по умолчанию есть главный поток выполнения. 
    • Процессы полностью изолированы, а обмен данными между ними требует явной передачи (например, через Queue, Pipe, Value или Array из модуля multiprocessing). 
    • Подходят для CPU-bound-задач (скорость выполнения которых ограничена производительностью CPU), где требуется интенсивная обработка данных, например: 
      • математика/алгоритмы, преобразование изображений, подготовка фичей в ML;
      • параллельная обработка больших массивов или файлов.

    Важно: в некоторых окружениях (например, на Windows) используется способ запуска spawn, поэтому обязательно защищайте точку входа конструкцией if __name__ == «__main__», иначе дочерние процессы попытаются повторно импортировать модуль и стартовать заново.

    Поток

    Поток (модуль threading) — это часть процесса, которая выполняет задачу внутри него. Потоки внутри одного процесса разделяют адресное пространство памяти процесса, включая глобальные переменные, объекты и кучу (heap), что делает их легче и быстрее для создания, но требует осторожности: например, важно не допускать, чтобы один поток изменял данные, пока второй их читает или записывает.

    • Потоки выполняются в рамках одного процесса и делят общее адресное пространство, но из-за GIL в CPython только один поток может выполнять Python-код в определенный момент времени. 
    • Потоки легче создавать и управлять ими, так как они используют меньше ресурсов по сравнению с процессами. 
    • Подходят для I/O-bound-задач (задачи без большой нагрузки на процессор), где программа ожидает внешних операций, таких как: 
      • загрузка файлов из интернета,
      • чтение/запись на диск,
      • обращение к базам данных или API.

    Процессы: как создавать и использовать в CPU-bound-задачах

    Для создания процесса используют класс multiprocessing.Process. Вместо последовательной обработки миллиона записей в цикле он разделяет задачу на несколько процессов, каждый из которых обрабатывает свой кусок данных.

    Пример с вычислением суммы факториалов

    Рассмотрим задачу вычисления суммы факториалов чисел в большом диапазоне, например, от 0 до 2000. Последовательное выполнение этой задачи может занять много времени, особенно на больших числах, так как факториалы требуют интенсивных вычислений. 

    Мы разделим диапазон на части и поручим каждому процессу обработать свою часть, а затем соберем результаты через Queue для безопасного обмена данными. Это позволит задействовать несколько ядер процессора и сократить тем самым общее время выполнения программы.

    1. Сначала импортируем необходимые библиотеки:
    from multiprocessing import Process, Queue
    import math

    Process для создания независимых процессов. 

    Queue для безопасной передачи результатов из изолированных процессов в главный процесс.

    math с функцией factorial для вычисления факториалов.

    1. Затем объявим функцию compute_factorial, которая вычисляет сумму факториалов чисел в заданном диапазоне и помещает результат в очередь (queue.put).
    def compute_factorial(start, end, queue):
        result = sum(math.factorial(i) for i in range(start, end))
        queue.put(result)

    Каждый процесс выполняет свою часть вычислений независимо, что позволяет распараллелить задачу. Queue используем для передачи результата в главный процесс, так как процессы изолированы и не разделяют память.

    1. Создадим и запустим процессы:
    if __name__ == "__main__":
        queue = Queue()
        processes = [
            Process(target=compute_factorial, args=(0, 1000, queue)),
            Process(target=compute_factorial, args=(1000, 2000, queue))
        ]
        for p in processes:
            p.start()
        for p in processes:
            p.join()
        total = sum(queue.get() for _ in processes)
        print(f"Сумма факториалов: {total}")

    Создаем очередь для сбора результатов и два процесса, каждый из которых вычисляет сумму факториалов для своего диапазона (0–999 и 1000–1999). Методом start() запускаем процессы, а с join() ожидаем их завершения. Затем извлекаем результаты из очереди и суммируем их.

    Разделение задачи на два процесса позволяет задействовать два ядра процессора.

    Важно: не создавайте больше процессов, чем ядер процессора (узнать можно через multiprocessing.cpu_count()). Например, на четырехъядерной системе оптимально использовать четыре процесса.

    Альтернатива с пулом процессов (Pool)

    Для задач с множеством одинаковых вычислений используем multiprocessing.Pool, который автоматически делит входные данные на части, распределяет их по процессам и собирает результаты.

    Создаем пул из четырех процессов, который автоматически делит список чисел (от 0 до 999999) на части и распределяет задачу вычисления квадратов между процессами.

    from multiprocessing import Pool
    def compute_square(n):
        return n * n
    
    if __name__ == "__main__":
        with Pool(processes=4) as pool:
            results = pool.map(compute_square, range(1000000))
        print(f"Сумма квадратов: {sum(results)}")

    Метод pool.map применяет функцию compute_square к каждому числу и возвращает список результатов.

    Для больших наборов данных можно использовать imap/imap_unordered и параметр chunksize.

    Оптимизация с разделяемой памятью через Array/Value

    Так как Queue использует механизм сериализации (через модуль pickle) для передачи объектов между процессами, это требует дополнительных вычислительных ресурсов и времени, особенно при больших объемах данных.

    Если процессы обмениваются небольшими числами/флагами или работают с фиксированным набором примитивов, выгоднее использовать разделяемую память: multiprocessing.Array (массив) и multiprocessing.Value (одиночное значение). Тогда данные читаются/пишутся напрямую, без сериализации.

    Пример с Array (каждый процесс меняет свой индекс)

    Создадим функцию add_to_array, в которую передадим два аргумента: arr (разделяемый массив, созданный через Array) и index (индекс элемента в массиве). Функция увеличивает значение элемента массива по указанному индексу на 1.

    from multiprocessing import Process, Array
    
    def add_to_array(arr, index):
        arr[index] += 1

    Каждый процесс будет вызывать эту функцию, чтобы изменить свой элемент в общем массиве. Поскольку Array использует разделяемую память, изменения происходят напрямую, без необходимости сериализации данных.

    Создадим объект Array с типом данных ‘i’ (целочисленный) и начальным значением [0, 0, 0]. Это выделяет блок разделяемой памяти, доступный всем процессам.

    if __name__ == "__main__":
        shared_array = Array('i', [0, 0, 0])

    Создаем список из трех процессов, каждый из которых выполнит функцию add_to_array. Аргументы: shared_array (общий массив) и i (индекс: 0, 1 или 2).

    Каждый процесс будет работать с отдельным элементом массива (0-й, 1-й или 2-й), увеличивая его значение. Это распределяет задачу между процессами.

        processes = [Process(target=add_to_array, args=(shared_array, i)) for i in range(3)]

    Запускаем каждый процесс с помощью метода start(). Это инициирует выполнение функции add_to_array в отдельном экземпляре интерпретатора Python для каждого процесса. И используем join(), чтобы главный процесс ждал завершения всех дочерних.

        for p in processes:
            p.start()
        for p in processes:
            p.join()
        print(f"Массив: {list(shared_array)}")

    После завершения всех процессов массив shared_array будет содержать обновленные значения.

    Когда нужна синхронизация

    Обратите внимание, что в примере выше мы используем операцию +=, а она неатомарна, то есть выполняется в несколько шагов, и между ними другой поток/процесс может «вклиниться».

    Если несколько процессов пишут в один и тот же элемент, используйте синхронизацию: Lock/Semaphore или структуры multiprocessing со встроенным локом (Array, рассмотренный выше, или Value), чтобы защитить участок и не потерять инкременты.

    Value — общий скаляр (одно число/флаг) в разделяемой памяти.

    Пример с Value (общий счетчик)

    1. Импортируем Process и Value и создаем метод do_work.
       from multiprocessing import Process, Value
    
    def do_work(counter, n=100_000):
        for _ in range(n):
    1. Атомарно инкрементируем счетчик и получаем значение.
       with counter.get_lock():
                counter.value += 1
    if __name__ == "__main__":
        counter = Value('i', 0)
        ps = [Process(target=do_work, args=(counter,)) for _ in range(3)]
        [p.start() for p in ps]; [p.join() for p in ps]
        print(counter.value)

    Пример с Lock (несколько процессов пишут в один элемент Array)

    1. Создаем функцию add_to_same_slot:
    from multiprocessing import Process, Array
    
    def add_to_same_slot(arr, times=100_000):
    1. Используем встроенный лок у synchronized Array:
      lock = arr.get_lock()
        for _ in range(times):
            with lock:
                arr[0] += 1
    
    if __name__ == "__main__":
        arr = Array('i', [0, 0, 0])
        ps = [Process(target=add_to_same_slot, args=(arr,)) for _ in range(4)]
        [p.start() for p in ps]; [p.join() for p in ps]
        print(arr[0])

    Пример с Semaphore (ограничиваем параллелизм и атомарность через лок)

    Семафор сам по себе не делает += атомарным, он лишь ограничивает параллелизм. То есть по-прежнему нужен лок (или семафор с value=1), чтобы количество процессов, которые могут одновременно «зайти», не превышало указанного.

    1. Создаем функцию add_limited:
     from multiprocessing import Process, Array, Semaphore, Lock
    
    def add_limited(arr, sem, arr_lock, times=100_000):
        for _ in range(times):
    1. Ограничиваем количество процессов внутри до 2:
            with sem:
                with arr_lock:
                    arr[0] += 1
    
    if __name__ == "__main__":
    1. Отключаем общий лок, синхронизируем сами:
     arr = Array('i', [0, 0, 0], lock=False)
        sem = Semaphore(2)
        arr_lock = Lock()
        ps = [Process(target=add_limited, args=(arr, sem, arr_lock)) for _ in range(4)]
        for p in ps: p.start()
        for p in ps: p.join()
        print(arr[0])

    Работает это так:

    • Процесс просит у семафора «пропуск». Если уже 2 внутри — ждет.
    • Оказавшись внутри, берет лок на массив и делает arr[0] += 1 в одиночку.
    • Освобождает лок и возвращает «пропуск» семафору.

    Пример с ProcessPoolExecutor

    Для реализации параллельного выполнения задач в отдельных процессах также можно использовать высокоуровневый интерфейс concurrent.futures.ProcessPoolExecutor

    • Он автоматически создает пул процессов (обычно по числу ядер процессора) и управляет их жизненным циклом (запуск, завершение, освобождение ресурсов). 
    • Возвращает итерируемые результаты, которые можно легко обработать (например, с помощью цикла или list()).
    • Использует контекстный менеджер (with ProcessPoolExecutor() as executor), так что не нужно явно закрывать пул процессов.

    Создаем пул из 4 процессов и распределяем задачу вычисления кубов чисел от 0 до 999 между ними.

    from concurrent.futures import ProcessPoolExecutor
    
    def compute_cube(n):
        return n ** 3
    
    if __name__ == "__main__":
        with ProcessPoolExecutor(max_workers=4) as executor:
            results = executor.map(compute_cube, range(1000))
        print(f"Сумма кубов: {sum(results)}")

    Метод map автоматически делит входной диапазон и возвращает результаты.

    Таким образом, с ProcessPoolExecutor получилось написать компактный код. Если бы мы использовали Process, то пришлось бы вручную создавать процессы, запускать их, ожидать завершения и собирать результаты через Queue.

    I/O-bound-задачи с потоками

    В отличие от процессов, потоки не реализуют истинный параллелизм, так как его ограничивает GIL: одновременно байткод выполняет ровно один поток.

    Пока один поток ждет сеть/диск, другой может работать. Поэтому сетевые вызовы, файловый I/O, ожидание БД и т. п. отлично масштабируются десятками потоков.

    Инструменты работы с потоками

    • Низкоуровневые потоки: threading.Thread из threading.
    • Высокоуровневый пул потоков: ThreadPoolExecutor из concurrent.futures.
    • Безопасный обмен между потоками: queue.Queue из queue.

    Массовая загрузка веб-страниц (сетевой I/O)

    Используем ThreadPoolExecutor и библиотеку Requests для параллельных HTTP-запросов.

    from concurrent.futures import ThreadPoolExecutor, as_completed
    import requests
    import time
    
    URLS = [
        "https://example.com/",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/bytes/65536"
    ]
    
    def fetch(url: str) -> tuple[str, int]:
        r = requests.get(url, timeout=10)
        r.raise_for_status()
        return url, len(r.content)
    
    if __name__ == "__main__":
        started = time.perf_counter()
        with ThreadPoolExecutor(max_workers=20) as pool:
            futures = [pool.submit(fetch, url) for url in URLS]
            for f in as_completed(futures):
                url, size = f.result()
                print(f"{url} -> {size} байт")
        print(f"Завершено за {time.perf_counter() - started:.2f} сек.")

    Пока один поток ждет сеть, другие выполняют работу. Поэтому I/O-bound-задачи масштабируются десятками потоков.

    Подбирайте max_workers эмпирически: для типичных HTTP-запросов 10–100+ для медленной сети. Учитывайте лимиты внешних сервисов и ставьте timeout.

    Конвейер «производитель-потребитель» для файлового I/O

    Когда есть каталог из множества небольших файлов, логично читать их параллельно рабочими потоками, а результаты собирать в основной нити.

    Важно: изменять общий dict из нескольких потоков нужно с использованием Lock.

    from threading import Thread, Lock
    from queue import Queue
    from pathlib import Path
    from typing import Dict, Optional
    
    def worker(tasks: "Queue[Optional[Path]]",
               results: Dict[str, int],
               results_lock: Lock) -> None:
        while True:
            path = tasks.get()

    Ставим сигнал завершения, если path является None: 

            if path is None:
                tasks.task_done()
                break
            try:
                with path.open("rb") as f:

    Имитация «долгого» файлового I/O:

                         data = f.read()
                size = len(data)

    Потокобезопасная запись в общий dict:

                with results_lock:
                    results[str(path)] = size
            finally:
                tasks.task_done()
    
    def scan_dir(root: Path, workers: int = 8) -> Dict[str, int]:
        tasks: "Queue[Optional[Path]]" = Queue()
        results: Dict[str, int] = {}
        results_lock = Lock()
    
        threads = [Thread(target=worker, args=(tasks, results, results_lock), daemon=True)
                   for _ in range(workers)]
        for t in threads: t.start()
    
        for p in root.rglob("*"):
            if p.is_file():
                tasks.put(p)

    После постановки всех файлов в очередь мы отправляем в очередь по одному служебному значению None на каждый рабочий поток. 

        for _ in threads:
            tasks.put(None)

    Получив None, поток понимает, что новых задач больше нет, корректно завершает цикл и выходит. Это гарантирует, что tasks.join() дождется обработки всех элементов очереди (включая сигналы остановки), и функция вернет результат после полного завершения работы потоков.

         tasks.join()
        return results
    
    if __name__ == "__main__":
        sizes = scan_dir(Path("./data"), workers=8)
        print(f"Обработано файлов: {len(sizes)}")

    Несколько практических советов по потокам

    • Следите за потокобезопасностью используемых библиотек (например, клиенты БД, логгеры). 

    Некоторые клиенты БД и другие библиотеки не рассчитаны на совместное использование одним и тем же объектом из разных потоков.

    Правило: один поток — один независимый объект подключения/клиент.

    Создавайте отдельное подключение к БД в каждом потоке (или используйте пул), а не делитесь одним соединением между потоками.

    • Избегайте «внутренних дедлоков».

    Не вызывайте ожидание результата внутри рабочего потока того же пула.

    Правило: ждем результаты в основном потоке, используя as_completed(…) или executor.map(…). Все submit(…) — снаружи, а внутри рабочих функций — только работа без ожиданий других задач.

    Типичная ошибка: внутри функции, которую вы отправили в ThreadPoolExecutor, вы снова делаете executor.submit(…) и тут же вызываете .result(). Пул может «закончить» потоки и зависнуть.

    • Если подключений/запросов очень много — рассмотрите асинхронные библиотеки.

    Для сетевого и высококонкурентного I/O альтернативой потокам может быть asyncio (кооперативная многозадачность на async/await). Оно особенно эффективно, когда нужно управлять тысячами одновременных соединений.

    Инструкция

    Поделиться

    Скопировано
    0 комментариев
    Комментарии