Sign In or Create Account

Huge image

PHP + PCNTL

Posted on Jan 28, 2010 by Andrei

На днях на работе мне было поручено написать очень простенький crawler который будет находить ссылки в разных источниках (к примеру Twitter) и будет эти ссылки нам сохранять. Само собой всякие укоротители ссылок вроде bit.ly нас не интересуют, нам нужны конечные ссылки и желательно titles страниц на которые ссылки указывают.

Сначала казалось что никаких проблем возникнуть не должно. У меня был тестовый list на твиттере где появлялось в среднем до 10 ссылок в минуту. Логично было написать скрипт, запускаемый cron-ом каждые 10-15 минут, который будует получать новые tweets в этом листе и будет их сканировать на наличие ссылок. Сказано - сделано, скрипт написан, cron подправлен.

class Crawler
{
	public function crawl(array $lists)
	{
		foreach ($lists as $list) {
			$tweets = $twitterClient->getLatestTweets( $list );
			
			// Collect urls first
			$urls = array();
			foreach ($tweets as $tweet) {
				$urls += $this->_extractUrls($tweet['text']);
			}
			
			// Now analyze them
			foreach ($urls as $url) {
				list($final_url, $title) = $this->_analyzeUrl($url);
			}
		}
	}
	
	private function _analyzeUrl($url)
	{
		list($url, $body) = $http_client->_analyze($url);
		$title = $this->_extractPageTitle($body);
		return array($url, $title);
	}
}

$crawler = new Crawler;
$crawler->crawl($list);

Однако когда я получил настоящий список twitter аккаунтов (а их оказалось 50) подобный метод оказался слишком медленным. В среднем анализ одной ссылки занимал 1-1.5 секунды. Итого 50 аккунтов * в среднем 30 ссылок в каждом (если запускать скрипт каждые 10 минут) - получаем 1500 ссылок которые надо проверить. Итого ~30-35 минут. Абсолютно неприемлемо.

Используя pcntl extension мы получаем возможность fork наш процесс столько раз сколько мы хотим параллельно работающих процессов. Однако есть одна проблема, между старшим и дочерним процессом нет никакой связи. Первая версия этого класса использовала временные файлы.  Поскольку старший процесс знает PID дочерних процессов мы можем периодически проверять появился ли файл допустим с названием /tmp/crawler_result.[PID]. Как только файл появился и его содержимое валидно (я использовал набор символов в конце файла) мы знаем что дочерний процесс закончил работу и мы можем его освободить и запустить следующий.

Однако следующий вариант мне нравится еще больше поскольку операции чтения-записи файлов будут помедленнее чем прямой доступ к памяти. Итак здесь мы спользуем еще два extension - sysvshm и sysvsem. Немного теории. Нам нужен кусок памяти доступный из старшего и дочерних процессов. Однако чтобы избежать повреждения информации нам нужно сделать так чтобы только один процесс мог записывать в определённое время. Для этого мы будем использовать семафор. Смотрим код и постигаем! :)

class Crawler
{
	private $_runningThreads = array();
	
	private $_concurrentThreadsNum = 10;

	public function __construct()
	{
        // Setup semaphore and shared memory
        $token = ftok(__FILE__, 'c');
        $this->_semaphore = sem_get($token);
        if (! $this->_shm = @shm_attach($token, 1 * 1024 * 1024, 0644)) {
			echo 'Unable to allocate memory (1 megabyte)';
            exit;
        }
	}
	
	public function __destruct()
	{
		if ($this->_parentProcess) {
            shm_remove($this->_shm);
            shm_detach($this->_shm);
		}
	}

	public function crawl(array $lists)
	{
		foreach ($lists as $list) {
			$tweets = $twitterClient->getLatestTweets( $list );
			
			// Collect urls first
			$urls = array();
			foreach ($tweets as $tweet) {
				$urls += $this->_extractUrls($tweet['text']);
			}

			// Analyze forked
			$this->_analyzeForked($urls);
		}
	}
	
	private function _analyzeForked(array $urls)
	{
		$this->_urls = $urls;
		$this->_parentProcess = false;
		
		$current_index = 0;
		$urls_count = count($urls);
	
		while (true) {
			// Harvest zombies
            foreach ($this->_runningThreads as $i => $threadId) {
                if (pcntl_waitpid($threadId, $status, WNOHANG) > 0) {
                    unset($this->_runningThreads[$i]);
                }
            }
			
			// Run some more threads
            if ($current_index < $urls_count) {
                // Run more threads if there are empty slots
                if (count($this->_runningThreads) < $this->_concurrentThreads) {
                    // Spawn thread
                    if ($pid = $this->_fork($current_index)) {
                        $this->_runningThreads[] = $pid;
                    }
                    // Fallback in case spawning thread has failed
                    else {
                        $analysed_data = $this->_analyzeUrl($urls[$current_index]);
                        $this->_updateData($current_index, $analysed_data);
                    }
 
                    $current_index++;
                }
            }		
		
			// Are all threads done?
            if (!$this->_runningThreads) {
                $this->_parentProcess = true;
                break;
            }
		}

        // Final data
        return $this->_getData();
	}
	
	/**
	 * Fork this process
	 *
	 * @param int $index
	 */
	private function _fork($index)
    {
        $thread_id = pcntl_fork();
        
        // Couldn't spawn child process
        if ($thread_id == -1) {
            return false;
        }
        // Child spawned, save the process id
        elseif ($thread_id) {
            return $thread_id;
        }
        
        // Update data
        $analysed_data = $this->_analyzeUrl($this->_urls[$index]);
        $this->_updateData($index, $analysed_data);

        exit;
	}
	
	/**
	 * Analyze url
	 *
	 * @param string $url
	 * @return array
	 */
	private function _analyzeUrl($url)
	{
		list($url, $body) = $http_client->_analyze($url);
		$title = $this->_extractPageTitle($body);
		return array($url, $title);
	}
	
	/**
	 * Safely updated data in the shared memory block
	 *
	 * @param int $index
	 * @param mixed $_data
	 */
    private function _updateData($index, $_data)
    {
       sem_acquire($this->_semaphore);
       $data = shm_has_var($this->_shm, $this->_shmId) ?
           shm_get_var($this->_shm, $this->_shmId) :
           array();
       $data[$index] = $_data;
       shm_put_var($this->_shm, $this->_shmId, $data);
       sem_release($this->_semaphore);
    }
    
	/**
	 * Return data currently stored in shared memory block
	 *
	 * @return array
	 */
    private function _getData()
    {
        if (shm_has_var($this->_shm, $this->_shmId)) {
            sem_acquire($this->_semaphore);
            $data = shm_get_var($this->_shm, $this->_shmId);
            sem_release($this->_semaphore);
            return $data;
        }
        return array();
    }
}

$crawler = new Crawler;
$crawler->crawl($list);

Не стесняемся, задаём вопросы.

Материалы по теме:

http://pleac.sourceforge.net/pleac_php/processmanagementetc.html




Tags:

Oops. No tags.


Comments:


Name:
Email: (won't be dispayed anywhere on the site)
URL:
Comment: