AzaThread – многопоточность для PHP с блэкджеком

Бэкап статьи с хабра.

В сети гуляет довольно много решений для эмуляции многопоточности в php. Чаще всего они основываются на форках, но есть и вариации на тему с использованием curl, proc_open и т.п.

Все встреченные варианты по тем или иным причинам меня не устроили, и пришлось написать своё решение. Набор требований у меня был следующий:

  • Использование форков;
  • Синхронный режим с сохранением интерфейса при отсутствии необходимых расширений;
  • Многократное использование дочерних процессов;
  • Полноценный обмен данными между процессами. Т.е. запуск с аргументами и получение результата по завершении;
  • Возможность обмена событиями между дочерним процессом-«потоком» и основным процессом во время работы;
  • Работа с пулом потоков с сохранением многократного использования, передачи аргументов и получения результатов;
  • Обработка ошибок выполнения;
  • Таймауты на выполнение работы, ожидание работы потоком, инициализацию;
  • Максимум производительности.

В результате получилась библиотека AzaThread (старое название — CThread).

Для нетерпеливых сразу ссылка на исходники:
github.com/Anizoptera/AzaThread

Описание

AzaThread предоставляет простой интерфейс для создания классов-потоков. Которые на самом деле используют отдельные процессы для асинхронной работы, но вас это не должно волновать. Вы можете посылать события из потока, возвращать результаты, использовать один поток множество раз, передавая ему аргументы запуска, или создать пул из 16 потоков разгребающих ваши задачи как горячие пирожки, не обращая никакого внимания на то, что работа происходит в разных процессах.

Кроме этого вы можете легко протестировать производительность библиотеки в разных режимах, выбрав оптимальное число потоков и вариант передачи данных между процессами специально для вашей конфигурации.

Для полноценной работы требуются следующие расширения: libevent, posix и pcntl.

Библиотека использует LibEvent и парные сокеты для общения между процессами. Поддерживает 5 вариантов передачи данных (аргументов, результатов и данных событий)!

Варианты привожу сразу с данными производительности. Тестировалось с пулом из восьми потоков на Intel Core i7 2600K 3.40 Ghz (Ubuntu 11.04 на виртуалке VMware). Приведены средние результаты за 10 повторов теста в jps (jobs per second — кол-во задач просто получающих аргументы и отдающих данные в секунду).

jps Описание
1 6501 Передача данных в сериализованном виде через те же сокеты. Вариант по умолчанию.
2 6625 То же самое, но с igbinary сериализацией (наиболее производительный вариант). Используется по умолчанию если igbinary установлен.
3 6194 System V Memory queue (sysvmsg)
4 6008 System V Shared memory (sysvshm)
5 6052 Shared memory (shmop)

Автоматически выбирается расширение для работы с сокетами. Если доступно, то используется расширение sockets, что дает улучшение производительности. В ином случае задействуется stream.

В дочернем процессе слушаются все доступные сигналы. По умолчанию на все из них (кроме SIGWINCH и SIGINFO) следует завершение работы. Но это легко можно переопределить создав в классе потока метод с именем сигнала. Например sigWinch.

В родительском процессе по умолчанию тоже перхватываются все сигналы. Это можно изменить, выставив у класса параметр listenMasterSignals в false. В этом случае обрабатываться будет только SIGCHLD. Свои собственные обработчики можно легко добавить, создав статический метод с названием m<имя сигнала>. Например, mSigTerm.

Если дочерний процесс умрёт по каким-либо причинам, класс автоматически форкнется при запуске новой задачи. Это происходит незаметно и можно вообще об этом не задумываться. Просто инстанс не нужно пересоздавать в случае какой-либо ошибки.

Дочерний процесс время от времени проверяет существование родительского. Если он вдруг помрёт, то дочерний автоматический завершится.

Все ресурсы, используемые потоком или пулом потоков, автоматически очищаются при вызове деструктора. Но их можно очистить принудительно, если вызывать метод cleanup. В этом случае поток/пул больше нельзя использовать.

При стандартных настройках поток инициализируется заранее, сразу при создании класса. Если установить параметр prefork в false, то форк будет происходить только в момент запуска задачи.

Вообще, настраиваемых параметров довольно много. Смена имени дочернего процесса после форка (параметр pName конструктора), таймаут на время выполнения задачи (timeoutWork), таймаут на максимальное время ожидания задач дочерним процессом (timeoutMaxWait), таймаут на время пре-инициализации (timeoutInit), размеры буферов для чтения сокетов (pipeReadSize, pipeMasterReadSize). Можно отключить режим мультизадачности для потоков (multitask). В этом случае каждый раз по завершении задачи дочерний процесс будет умирать и форкаться заново для следующего запуска. Это заметно уменьшит производительность.

Код покрыт тестами и подробно документирован, примеры использования можно посмотреть и запустить в файле example.php. Более сложные примеры с обработкой ошибок можно увидеть в коде юнит-теста.

Есть режим отладки, в котором выводится очень подробная информация о том, что именно и где творится.

Примеры использования

Основная фича — максимальная простота. Если вы хотите просто запустить что-либо в отдельном «потоке» достаточно следующего кода:

class ExampleThread extends Thread
{
    protected function process()
    {
        // Some work here
    }
}

$thread = new ExampleThread();
$thread->wait()->run();

Если есть всё необходимое для полноценной работы, то задача будет выполнена асинхронно. Если нет, то всё будет по-прежнему работать, но в синхронном режиме.

С передачей параметра и получением результата код будет выглядеть лишь чуть-чуть сложнее:

class ExampleThread extends Thread
{
    protected function process()
    {
        return $this->getParam(0);
    }
}

$thread = new ExampleThread();
$thread->wait()->run(123);
$result = $thread->wait()->getResult();

Аналогично, лёгким мановением руки, добавляем обработку событий из потока:

class ExampleThread extends Thread
{
    const EV_PROCESS = 'process';

    protected function process()
    {
        $events = $this->getParam(0);
        for ($i = 0; $i < $events; $i++) {
            $event_data = $i;
            $this->trigger(self::EV_PROCESS, $event_data);
        }
    }
}

// Дополнительный аргумент.
$additionalArgument = 123;

$thread->bind(ExampleThread::EV_PROCESS, function($event_name, $event_data, $additional_arg)  {
    // обработка события
}, $additionalArgument);

$events = 10; // число событий, которое сгенерирует поток

// Чтобы не вызывать вручную ожидание потока перед первым вызовом,
// можно переопределить свойство preforkWait в TRUE в классе-наследнике
$thread->wait();

$thread = new ExampleThread();
$thread->run($events)->wait();

И, наконец, использование пула из восьми потоков с обработкой ошибок выполнения:

$threads = 8  // Число потоков
$pool = new ThreadPool('ExampleThread', $threads);

$num = 25;    // Количество задач
$left = $num; // Количество оставшихся задач

do {
    // Если в пуле есть свободные потоки
    // И у нас остались задачи для выполнения
    while ($pool->hasWaiting() && $left > 0) {
        // При запуске получаем id потока
        $threadId = $pool->run();
        $left--;
    }
    if ($results = $pool->wait($failed)) {
        foreach ($results as $threadId => $result) {
            // Успешно выполненная задача
            // Результат можно идентифицировать
            // по id потока ($threadId)
            $num--;
        }
    }
    if ($failed) {
        // Обработка ошибок выполнения.
        // Работа считается завершенной неуспешно
        // если дочерний процесс умер во время выполнения или
        // истек таймаут на выполнение задачи
        foreach ($failed as $threadId) {
            $left++;
        }
    }
} while ($num > 0);

// Завершаем все дочерние процессы. Очищаем ресурсы используемые пулом.
$pool->cleanup();

Результаты тестирования производительности

Тесты запускал на двух машинах с Ubuntu 11.04.
Первая — Intel Core i3 540 3.07 Ghz.
Вторая — Intel Core i7 2600K 3.40 Ghz (убунту стоит на VMware виртуалке).

Результаты привожу просто чтобы можно было оценить рост производительности. Опять же, это средние результаты за серию из 10 повторов теста в jps (jobs per second — кол-во задач в секунду).

В качестве задачи потоки выполняют следующую фигню:

for ($i = 0; $i < 1000; $i++) {
    $r = mt_rand(0, PHP_INT_MAX) * mt_rand(0, PHP_INT_MAX);
}

Первый результат указан для синхронного режима работы (без форков). 18 и 20 потоков на первой конфигурации я не пробовал, так как уже для 12 началось падение производительности.

Число потоков Первая конфигурация Вторая
0 553 763
1 330 669
2 580 1254
4 1015 2188
8 1040 2618
10 1027 2719
12 970 2739
16 958 2904
18 - 2830
20 - 2730

AzaThread performance test

То есть производительность поднимается в 2-4 раза и более в зависимости от процессора!

Код, выполняющий серию тестов с нужными параметрами, лежит в файле examples/speed_test.php. Так что вы легко можете потестировать производительность и выбрать оптимальное число потоков у себя.

 

Буду очень рад если библиотека кому-либо пригодится. Любые фич-реквесты или обнаруженные баги можно оставлять на гитхабе, буду оперативно фиксить и улучшать библиотеку.

Ссылки