На днях на работе мне было поручено написать очень простенький crawler который будет находить ссылки в разных источниках (к примеру Twitter) и будет эти ссылки нам сохранять. Само собой всякие укоротители ссылок вроде bit.ly нас не интересуют, нам нужны конечные ссылки и желательно titles страниц на которые ссылки указывают.
Сначала казалось что никаких проблем возникнуть не должно. У меня был тестовый list на твиттере где появлялось в среднем до 10 ссылок в минуту. Логично было написать скрипт, запускаемый cron-ом каждые 10-15 минут, который будует получать новые tweets в этом листе и будет их сканировать на наличие ссылок. Сказано - сделано, скрипт написан, cron подправлен.
class Crawler
{
public function crawl(array $lists)
{
foreach ($lists as $list) {
$tweets = $twitterClient->getLatestTweets( $list );
// Collect urls first
$urls = array();
foreach ($tweets as $tweet) {
$urls += $this->_extractUrls($tweet['text']);
}
// Now analyze them
foreach ($urls as $url) {
list($final_url, $title) = $this->_analyzeUrl($url);
}
}
}
private function _analyzeUrl($url)
{
list($url, $body) = $http_client->_analyze($url);
$title = $this->_extractPageTitle($body);
return array($url, $title);
}
}
$crawler = new Crawler;
$crawler->crawl($list);
Однако когда я получил настоящий список twitter аккаунтов (а их оказалось 50) подобный метод оказался слишком медленным. В среднем анализ одной ссылки занимал 1-1.5 секунды. Итого 50 аккунтов * в среднем 30 ссылок в каждом (если запускать скрипт каждые 10 минут) - получаем 1500 ссылок которые надо проверить. Итого ~30-35 минут. Абсолютно неприемлемо.
Используя pcntl extension мы получаем возможность fork наш процесс столько раз сколько мы хотим параллельно работающих процессов. Однако есть одна проблема, между старшим и дочерним процессом нет никакой связи. Первая версия этого класса использовала временные файлы. Поскольку старший процесс знает PID дочерних процессов мы можем периодически проверять появился ли файл допустим с названием /tmp/crawler_result.[PID]. Как только файл появился и его содержимое валидно (я использовал набор символов в конце файла) мы знаем что дочерний процесс закончил работу и мы можем его освободить и запустить следующий.
Однако следующий вариант мне нравится еще больше поскольку операции чтения-записи файлов будут помедленнее чем прямой доступ к памяти. Итак здесь мы спользуем еще два extension - sysvshm и sysvsem. Немного теории. Нам нужен кусок памяти доступный из старшего и дочерних процессов. Однако чтобы избежать повреждения информации нам нужно сделать так чтобы только один процесс мог записывать в определённое время. Для этого мы будем использовать семафор. Смотрим код и постигаем! :)
class Crawler
{
private $_runningThreads = array();
private $_concurrentThreadsNum = 10;
public function __construct()
{
// Setup semaphore and shared memory
$token = ftok(__FILE__, 'c');
$this->_semaphore = sem_get($token);
if (! $this->_shm = @shm_attach($token, 1 * 1024 * 1024, 0644)) {
echo 'Unable to allocate memory (1 megabyte)';
exit;
}
}
public function __destruct()
{
if ($this->_parentProcess) {
shm_remove($this->_shm);
shm_detach($this->_shm);
}
}
public function crawl(array $lists)
{
foreach ($lists as $list) {
$tweets = $twitterClient->getLatestTweets( $list );
// Collect urls first
$urls = array();
foreach ($tweets as $tweet) {
$urls += $this->_extractUrls($tweet['text']);
}
// Analyze forked
$this->_analyzeForked($urls);
}
}
private function _analyzeForked(array $urls)
{
$this->_urls = $urls;
$this->_parentProcess = false;
$current_index = 0;
$urls_count = count($urls);
while (true) {
// Harvest zombies
foreach ($this->_runningThreads as $i => $threadId) {
if (pcntl_waitpid($threadId, $status, WNOHANG) > 0) {
unset($this->_runningThreads[$i]);
}
}
// Run some more threads
if ($current_index < $urls_count) {
// Run more threads if there are empty slots
if (count($this->_runningThreads) < $this->_concurrentThreads) {
// Spawn thread
if ($pid = $this->_fork($current_index)) {
$this->_runningThreads[] = $pid;
}
// Fallback in case spawning thread has failed
else {
$analysed_data = $this->_analyzeUrl($urls[$current_index]);
$this->_updateData($current_index, $analysed_data);
}
$current_index++;
}
}
// Are all threads done?
if (!$this->_runningThreads) {
$this->_parentProcess = true;
break;
}
}
// Final data
return $this->_getData();
}
/**
* Fork this process
*
* @param int $index
*/
private function _fork($index)
{
$thread_id = pcntl_fork();
// Couldn't spawn child process
if ($thread_id == -1) {
return false;
}
// Child spawned, save the process id
elseif ($thread_id) {
return $thread_id;
}
// Update data
$analysed_data = $this->_analyzeUrl($this->_urls[$index]);
$this->_updateData($index, $analysed_data);
exit;
}
/**
* Analyze url
*
* @param string $url
* @return array
*/
private function _analyzeUrl($url)
{
list($url, $body) = $http_client->_analyze($url);
$title = $this->_extractPageTitle($body);
return array($url, $title);
}
/**
* Safely updated data in the shared memory block
*
* @param int $index
* @param mixed $_data
*/
private function _updateData($index, $_data)
{
sem_acquire($this->_semaphore);
$data = shm_has_var($this->_shm, $this->_shmId) ?
shm_get_var($this->_shm, $this->_shmId) :
array();
$data[$index] = $_data;
shm_put_var($this->_shm, $this->_shmId, $data);
sem_release($this->_semaphore);
}
/**
* Return data currently stored in shared memory block
*
* @return array
*/
private function _getData()
{
if (shm_has_var($this->_shm, $this->_shmId)) {
sem_acquire($this->_semaphore);
$data = shm_get_var($this->_shm, $this->_shmId);
sem_release($this->_semaphore);
return $data;
}
return array();
}
}
$crawler = new Crawler;
$crawler->crawl($list);
Не стесняемся, задаём вопросы.
http://pleac.sourceforge.net/pleac_php/processmanagementetc.html