Часто возникают задачи, которые можно значительно ускорить, если выполнять их в несколько потоков. PHP в отличие от Perl, имеет слабые возможности по организации реальной многопоточности, поэтому ее приходится “симулировать” с помощью пула неблокирующих сокетов.
config.php
// путь, где расположены скрипты
$path = http://domain.com/path/;
// число потоков
$max_threads = 20;
launcher.php
require_once(”config.php”);
// инициализация
$sockets = array();
$done = false;
// будем работать, к примеру с набором
// ключевых слов
$keywords = fopen(”keywords.txt”, “r”);
// приступаем к многопоточной работе
while (!$done)
{
// если обнаружен файл,
// то прекращаем выполнение скрипта
if (file_exists(”stop-file”))
die;
// если число запущенных потоков меньше
// разрешенного максимума
// то запускаем потоки еще
if ($max_threads > count($sockets))
{
if (!feof($keywords))
{
$buffer = array();
// читаем ключевое слово
// в реальности в этот массив можно
// положить очень много всего
// а не только одно ключевое слово…
$buffer[] = trim(fgets($keywords));
// задаем данные для запуска сокета
// request.php - это тот файл,
// которые делает “дело”
$query_url = $path . “request.php”;
$url_info = parse_url($query_url);
$url_info[port] = ($url_info[port]) ? $url_info[port] : 80;
$url_info[path] = ($url_info[path]) ? $url_info[path] : “/”;
$url_info[query] = ($url_info[query]) ? “?” . $url_info[query] : “”;
// пакуем данные для передачи
// в генерирующий скрипт
// использование serialize очень удобно,
// так как позволяет
// залить в request.php мегабайты данных
$request = serialize($buffer);
// формируем запрос для передачи по сокету
$query = “POST ” . $url_info[path] . ” HTTP/1.1\r\n”;
$query = $query . “Content-Type: text/xml\r\n”;
$query = $query . “Host: ” . $url_info[host] . “\r\n”;
$query = $query . “Content-length: ” . (strlen($request)) . “\r\n\r\n”;
$query = $query . $request;
// создаем сокет, переводим его
// в неблокирующий режим и запускаем
// обработчик запросов
$errno = 0;
$error = “”;
$socket = fsockopen($url_info[host], $url_info[port], $errno, $error, 30);
stream_set_blocking($socket, 0);
stream_set_timeout($socket, 3600);
fputs($socket, $query);
// запоминаем запущенный сокет
$sockets[md5(time())] = $socket;
}
}
// читаем данные из сокета. формально они нам
// не нужны, но это позволяет
// отработать обработчкику запросов
reset($sockets);
while ($socket = current($sockets))
{
if (feof($socket))
{
// убиваем сокет, который отработал
unset($sockets[key($sockets)]);
}
else
{
// читаем данные из сокета
$temp = fgets($socket, 1000);
}
// обрабатываем следующий сокет
next($sockets);
}
// делаем небольщую задержку,
// иначе загруженность сервера
// приближается к 100 процентам
sleep(1);
// если нет активных сокетов, то можно выходить
if (count($sockets) == 0)
$done = true;
}
fclose($keywords);
die;>
-------------------------------------------------------------------------
[Эту мини-статью я когда-то написал для журнала “International PHP Magazine”, как часть колонки “Спроси гуру”. Я перепечатываю ее здесь, потому что она полезная и потому что люди просили меня об этом дважды за последние два дня]
Вопрос:
Существует ли в PHP хоть какая-то многопоточность?
Скажем, вы написали PHP-приложения для мониторинга служб на некотором количестве серверов и было бы неплохо запрашивать несколько серверов одновременно, а не один за одним.
Это возможно?
Ответ:
Люди часто предполагают, что необходимо разветвлять или порождать потоки, когда понадобится выполнять несколько действий одновременно, и если к тому же приложение реализовано на PHP (а этот язык не поддерживает многопоточность), то они должны перейти на что-то другое более подходящее, например perl.
У меня для вас хорошая новость - в большинстве случаев вам не нужно порождать и создавать новых потоков вообще и можно получить отличную производительность и без этого.
Скажем, Вам нужно проверять веб-серверы, действительно ли они рабочие в данный момент. Вы можете написать следующий скрипт:
< ?php
$hosts = array("host1.sample.com", "host2.sample.com");
$timeout = 15;
$status = array();
foreach ($hosts as $host) {
$errno = 0;
$errstr = "";
$s = fsockopen($host, 80, $errno, $errstr, $timeout);
if ($s) {
$status[$host] = "Соединение установленоn";
fwrite($s, "HEAD / HTTP/1.0rnHost: $hostrnrn");
do {
$data = fread($s, 8192);
if (strlen($data) == 0) {
break;
}
$status[$host] .= $data;
} while (true);
fclose($s);
} else {
$status[$host] = "Соединение прервано: $errno $errstrn";
}
}
print_r($status);
?>
И этот скрипт будет работать отлично, но так как функция fsockopen() не возвращает управление до тех пор, пока не получит имя хоста и не установит соединение (или она будет ждать таймаут в $timeout секунд), то использовать этот сценарий для мониторинга большого количества хостов не получится в виду его медленности.
Нет никакой причины, почему мы должны делать это последовательно; мы можем открывать асинхронные соединения - то есть, соединения, где мы не должны ждать возврата из функции fsockopen(). PHP все еще будет определять имя хоста (так что лучше использовать IP-адреса), но управление будет возвращено в программу как только будет запущено открытие соединения, таким образом мы сможеи перейти к следующему хосту.
Есть два способа сделать это; в PHP 5, вы можете использовать функцию stream_socket_client() в качестве замены fsockopen(). В более ранних версиях PHP, Вам придется поработать ручками и воспользоваться расширением по работе с сокетами.
Вот как это делается в PHP 5:
< ?php
$hosts = array("host1.sample.com", "host2.sample.com");
$timeout = 15;
$status = array();
$sockets = array();
/* Инициируем соединения ко всем хостам одновременно */
foreach ($hosts as $id => $host) {
$s = stream_socket_client("$host:80", $errno, $errstr, $timeout,
STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
if ($s) {
$sockets[$id] = $s;
$status[$id] = "in progress";
} else {
$status[$id] = "failed, $errno $errstr";
}
}
/* Теперь ожидаем результат */
while (count($sockets)) {
$read = $write = $sockets;
/* Вот она - магическая функция - пояснения ниже */
$n = stream_select($read, $write, $e = null, $timeout);
if ($n > 0) {
/* доступные для чтения сокеты готовы отдать нам данные
или попытка провалилась
*/
foreach ($read as $r) {
$id = array_search($r, $sockets);
$data = fread($r, 8192);
if (strlen($data) == 0) {
if ($status[$id] == "in progress") {
$status[$id] = "failed to connect";
}
fclose($r);
unset($sockets[$id]);
} else {
$status[$id] .= $data;
}
}
/* доступные для записи сокеты могут принимать
HTTP-запросы
*/
foreach ($write as $w) {
$id = array_search($w, $sockets);
fwrite($w, "HEAD / HTTP/1.0rnHost: "
. $hosts[$id] . "rnrn");
$status[$id] = "waiting for response";
}
} else {
/* ожидаем таймаут; подразумевается, что все сокеты,
ассоциированные с массивом $sockets не сработали
*/
foreach ($sockets as $id => $s) {
$status[$id] = "timed out " . $status[$id];
}
break;
}
}
foreach ($hosts as $id => $host) {
echo "Host: $hostn";
echo "Status: " . $status[$id] . "nn";
}
?>
Мы используем функцию stream_select() для ожидания возникновения событий на открытых сокетах. stream_select() вызывает системную функцию select(2), а она работает так: первые три параметра - это массивы потоков, с которыми Вы хотите работать; Вы можете ожидать готовности чтения, записи или исключительных событий (параметр первый, второй и третий соответственно). stream_select() будет ждать $timeout секунд пока событие не появится - когда же эот случится, функция будет модифицировать массиы, которые Вы ей передали, так что они будут содержать идентификаторы сокетов, удовлетворябщих Вашему критерию.
Теперь, используя PHP 4.1.0 или более позднюю версию, если она скомпилирована с поддержкой расширения для работы с сокетами (sockets extension), Вы сможете использовать скрипт, который упомянут выше, но Вы должны заменить вызовы функций для работы с обычными потоками/файловой системой их эквивалентами из расширения sockets. Главная разница в способе открытия соединения; вместо stream_socket_client(), Вам необходимо использовать эту функцию:
< ?php
// Это значение верно для Linux,
// для других систем используйте другие значения
define('EINPROGRESS', 115);
function non_blocking_connect($host,$port,&$errno,&$errstr,$timeout) {
$ip = gethostbyname($host);
$s = socket_create(AF_INET, SOCK_STREAM, 0);
if (socket_set_nonblock($s)) {
$r = @socket_connect($s, $ip, $port);
if ($r || socket_last_error() == EINPROGRESS) {
$errno = EINPROGRESS;
return $s;
}
}
$errno = socket_last_error($s);
$errstr = socket_strerror($errno);
socket_close($s);
return false;
}
?>
Теперь, замените stream_select() на socket_select(), fread() на socket_read(), fwrite() на socket_write() и fclose() на socket_close() и вы готовы запускать сценарий.
Преимущество PHP 5 в том, что вы сможете использовать функцию stream_select() для ожидания данных (почти!) из любого типа потока - вы даже сможете использовать ее для ожидания ввода с клавиатуры терминала, включив STDIN в массив для чтения, или ожидать данные из каналов, созданных с помощью proc_open().
Если Вы пользуете PHP 4.3.x и хотите воспользоваться native streams approach, я приготовил патч, который позволяет работать функции fsockopen() асинхронно. Патч не поддерживается и не будет поставляться в официальном релизе PHP, однако, я написал оболочку, которая реализует функцию stream_socket_client() наряду с патчем, поэтому ваш код будет совместим с PHP 5.
Ресурсы:
Документация по stream_select
Документация по socket_select
Патч для PHP 4.3.2 и скрипт для эмуляции stream_socket_client() (должен работать и с более поздними версиями PHP)
вторник, 17 апреля 2007 г.
Подписаться на:
Комментарии к сообщению (Atom)
Комментариев нет:
Отправить комментарий