Running multiple processes in PHP

Sometimes you need multiple commands to run in parallel to save up script processing time for repetitive tasks.
In the “Building a Video sharing site” project that i will present here soon, I needed a script to run multiple video processing jobs. PHP wasn’t meant to support multitasking, but with a few tricks we can emulate a multitasking environment.

On my research there were a couple of solutions that, althought they might seemed viable, had some weakness:

  • Using curl to open multiple instances of the script at once
    The problem was that the script was supposed to be accessible via the Web, and when trying to access external commands, this is a big security issue.
  • Using pcntl functions
    This could have been the best approach, but it required recompiling the PHP version with –enable-pcntl option, and many of us don’t have access to recompile PHP on the server. Beside that, it only works on linux systems, so it’s platform dependent.
  • Finally i found a nice class that simulates multi-threading and after some customisation i managed to set the multi tasking the way i wanted:

Threads.php

<?php
class Thread {
	var $pref ; // process reference
	var $pipes; // stdio
	var $buffer; // output buffer
	var $output;
	var $error;
	var $timeout;
	var $start_time;
 
 
	function Thread() {
		$this->pref = 0;
		$this->buffer = "";
		$this->pipes = (array)NULL;
		$this->output = "";
		$this->error="";
 
		$this->start_time = time();
		$this->timeout = 0;
	}
 
	function Create ($command) {
		$t = new Thread;
		$descriptor = array (0 => array ("pipe", "r"), 1 => array ("pipe", "w"), 2 => array ("pipe", "w"));
		//Open the resource to execute $command
		$t->pref = proc_open($command,$descriptor,$t->pipes);
		//Set STDOUT and STDERR to non-blocking 
		stream_set_blocking ($t->pipes[1], 0);
		stream_set_blocking ($t->pipes[2], 0);
		return $t;
	}
 
	//See if the command is still active
	function isActive () {
		$this->buffer .= $this->listen();
		$f = stream_get_meta_data ($this->pipes[1]);
		return !$f["eof"];
	}
 
	//Close the process
	function close () {
		$r = proc_close ($this->pref);
		$this->pref = NULL;
		return $r;
	}
 
	//Send a message to the command running
	function tell ($thought) {
		fwrite ($this->pipes[0], $thought);
	}
 
	//Get the command output produced so far
	function listen () {
		$buffer = $this->buffer;
		$this->buffer = "";
		while ($r = fgets ($this->pipes[1], 1024)) {
			$buffer .= $r;
			$this->output.=$r;
		}
		return $buffer;
	}
 
	//Get the status of the current runing process
	function getStatus(){
		return proc_get_status($this->pref);
	}
 
	//See if the command is taking too long to run (more than $this->timeout seconds)
	function isBusy(){
		return ($this->start_time>0) && ($this->start_time+$this->timeout<time());
	}
 
	//What command wrote to STDERR
	function getError () {
		$buffer = "";
		while ($r = fgets ($this->pipes[2], 1024)) {
			$buffer .= $r;
		}
		return $buffer;
	}
}
 
 
//Wrapper for Thread class
class Multithread{
	var $output;
	var $error;
	var $thread;
	var $commands = array();
 
	function __construct($commands){
		$this->commands = $commands;
 
		foreach ($this->commands as $key=>$command){
			$this->thread[$key]=Thread::create($command);
		}
	}
 
 
	function run(){
		$commands = $this->commands;
		//Cycle through commands
		while (count($commands)>0){
			foreach ($commands as $key=>$command){
				//Get the output and the errors
				$this->output[$key].=$this->thread[$key]->listen();
				$this->error[$key].=$this->thread[$key]->getError();
				//Check if command is still active
				if ($this->thread[$key]->isActive()){
					$this->output[$key].=$this->thread[$key]->listen();
					//Check if command is busy
					if ($this->thread[$key]->isBusy()){
						$this->thread[$key]->close();
						unset($commands[$key]);
					}
				} else {
					//Close the command and free resources
					$this->thread[$key]->close();
					unset($commands[$key]);
				}
			}
		}
		return $this->output;
	}
}
?>

I call the class using the following code:
Example.php

<?php
set_time_limit(0);
include "threads.php";
$commands = array('ffmpeg -i '.$inputFile[0].' '.$outputFile[0].' 2>&1','ffmpeg -i '.$inputFile[0].' '.$outputFile[0].' 2>&1');
$threads = new Multithread($commands);
$threads->run();
foreach ($threads->commands as $key=>$command){
	echo "Command ".$command.":<br>";
	echo "Output ".$threads->output[$key]."<br>";
	echo "Error ".$threads->error[$key]."<br><br>";
}
?>

Let’s talk about what the script does:

First in Create method it defines some descriptors to use with proc_open. The standard descriptors for shell commands are:
0 for stdin (what is read by the process)
1 for stdout (what is the output of the runing process)
2 for stderror (errors that the process throw).
You can customize your settings here, for example you can choose to log all the errors in a file like this:

2 => array("file", $path_to_error_log_file, "a")

By setting stream_set_blocking($t->pipes[1],0), and stream_set_blocking($t->pipes[2],0) we  tell  php to not wait for the process to finnish with the output and error reporting, before runing another command. This is an important step, in achieving multitasking.
The current state of the runing process is achieved using isActive method, that will read from the stdout (stream_get_meta_data($this->pipes[1])) to see if the process is still active or not.
The command output will be generated by the $t->listen method. Aditionaly, you can capture the errors by calling getError method.
Close method will close the command strem and free the resources associated.
isBusy method will check to see if the $timeout time was passed. I recomend setting timeout high, if you know that your script take longer time to load. If timeout is set to 0 it will not timeout any script.

In the attached file you will see an example implementation of this technique. 3 scripts (one.php, two.php, three.php) will write data in a log file with one second delay between writings, to see how the multitasking is achieved, err.php will simulate a script producing an error.
Have fun experimenting with the script, and if you have any questions, just leave a comment here.

Download code for this post

No related posts.

7 Responses to “Running multiple processes in PHP”

  1. masterb says:

    Great script, works perfect!

  2. Nickolas Wood says:

    Hi,
    I have modified your class a little bit and fixed a couple bugs for you; figured I would contribute back. First off, line 107 and 108 of your threads.php file return undefined reference notices when ini_set(‘display_errors’, ‘On’); and error_reporting(E_ALL); are used. Secondly, the thread timeout of 0 on line 20 of the threads.php file (with no dynamic way of changing it) causes weird behavior when you start checking each thread for null output and resubmitting the command to a new multithread object on such null output. If a thread takes longer then 0 seconds to complete, the isbusy routine kills it. Most commands take longer then 0 seconds to return anything so I kept getting null output. This threw me for a loop for awhile.

    These three problems have been fixed in the below code. I have also made a couple of minor changes to better handle output. Look for lines with #MODIFIED in them, those I changed. I hope someone else finds it useful.

    Great work on your multithread class by the way. It helped me alot.

    START CODE BLOCK:

    pref = 0;
    $this->buffer = “”;
    $this->pipes = (array)NULL;
    $this->output = “”;
    $this->error=”";

    $this->start_time = time();
    $this->timeout = $timeout; #MODIFIED, used $timeout
    }

    function Create ($command, $timeout) { #MODIFIED, added $timeout
    $t = new Thread($timeout); #MODIFIED, used timeout
    $descriptor = array (0 => array (“pipe”, “r”), 1 => array (“pipe”, “w”), 2 => array (“pipe”, “w”));
    //Open the resource to execute $command
    $t->pref = proc_open($command,$descriptor,$t->pipes);
    //Set STDOUT and STDERR to non-blocking
    stream_set_blocking ($t->pipes[1], 0);
    stream_set_blocking ($t->pipes[2], 0);
    return $t;
    }

    //See if the command is still active
    function isActive () {
    $this->buffer .= $this->listen();
    $f = stream_get_meta_data ($this->pipes[1]);
    return !$f["eof"];
    }

    //Close the process
    function close () {
    $r = proc_close ($this->pref);
    $this->pref = NULL;
    return $r;
    }

    //Send a message to the command running
    function tell ($thought) {
    fwrite ($this->pipes[0], $thought);
    }

    //Get the command output produced so far
    function listen () {
    $buffer = $this->buffer;
    $this->buffer = “”;
    while ($r = stream_get_contents($this->pipes[1])) { #MODIFIED, replaced fgets with stream_get_contents
    #MODIFIED, removed $this->output appending (unneeded)
    $buffer .= $r;
    }
    return $buffer;
    }

    //Get the status of the current runing process
    function getStatus(){
    return proc_get_status($this->pref);
    }

    //See if the command is taking too long to run (more than $this->timeout seconds)
    function isBusy(){
    return ($this->start_time>0) && ($this->start_time+$this->timeoutpipes[2])) { #MODIFIED, replaced fgets with stream_get_contents
    $buffer .= $r;
    }
    return $buffer;
    }
    }

    //Wrapper for Thread class
    class Multithread{
    var $output;
    var $error;
    var $thread;
    var $commands = array();

    function __construct($commands, $timeout){ #MODIFED, added $timeout
    $this->commands = $commands;

    foreach ($this->commands as $key=>$command){
    $this->thread[$key]=Thread::create($command, $timeout); #MODIFIED, used $timeout
    $this->output[$key] = “”; #MODIFIED, added this line to remove undefined reference notices
    $this->error[$key] = “”; #MODIFIED, added this line to remove undefined reference notices
    }
    }

    function run(){
    $commands = $this->commands;
    //Cycle through commands
    while (count($commands)>0){
    foreach ($commands as $key=>$command){
    //Check if command is still active
    if ($this->thread[$key]->isActive()){
    //Get the output and the errors
    $this->output[$key].=$this->thread[$key]->listen(); #I removed this line on my server as I don’t need it, I want all output or nothing. I could see where it is useful however
    $this->error[$key].=$this->thread[$key]->getError(); #I removed this line on my server as I don’t need it, I want all output or nothing. I could see where it is useful however
    //Check if command is busy
    if ($this->thread[$key]->isBusy()){
    #MODIFIED, removed $this-output (unneeded, if we kill a process then its output is unusable and there is no need to update it further)
    $this->thread[$key]->close();
    unset($commands[$key]);
    }
    } else {
    //Close the command, gather results and free resources #MODIFIED, added, gather results comment
    $this->output[$key].=$this->thread[$key]->listen(); #MODIFIED, added this line (want to make sure we have all output before we close a thread)
    $this->error[$key].=$this->thread[$key]->getError(); #MODIFIED, added this line (want to make sure we have all output before we close a thread)
    $this->thread[$key]->close();
    unset($commands[$key]);
    }
    }
    }
    return $this->output;
    }
    }
    ?>

    END CODE BLOCK:

    I know, the formatting sucks but I couldn’t find a nice way of doing it. Now, I call this class like so:

    START CODE BLOCK:

    $threads = new Multithread($command_list_exec, $execution_time);
    $threads->run();

    END CODE BLOCK:

    As I mentioned earlier, I check each process’s output against several strpos instances for known conditions. The most common problem I saw was no output at all. If this was the case then it was killed before anything had a chance to be reported and it needs to be run again.

    It is vitally important that set_time_limit($execution_time); is used in php pages that call the multithread class so that a rogue process doesn’t cripple a with endless thread creations/kills.

    Hope this helps.

    Thanks again!!

  3. Nickolas Wood says:

    I apologize, I missed a couple lines of code at the very top; at the first START CODE BLOCK. This codes should be inserted there:

    START CODE BLOCK:

    END CODE BLOCK:

  4. Nickolas Wood says:

    Umm, let try this again removing the initial php tag:

    START CODE BLOCK:

    class Thread {
    var $pref ; // process reference
    var $pipes; // stdio
    var $buffer; // output buffer
    var $output;
    var $error;
    var $timeout;
    var $start_time;

    function Thread($timeout) {
    $this->

    END CODE BLOCK:

    This will be my last attempt so that I don’t over post here.

  5. Silviu says:

    Hello. I’ve tried your code and I get the following error. Please help :)

    “Error ‘tasks’ is not recognized as an internal or external command, operable program or batch file.”

  6. carlos Pimentel says:

    Hi Nickolas Wood,
    could you please post the code that you’ve created in pastebin.com or similar website ?
    This code isn’t working for me, sometime I receive the output from the process some time I don’t and I almost crazy debugging this…
    Thanks !

  7. bee says:

    Hey cool threads.php class, Works great. I got a question anyway, what is the maximum number of scripts that can be multitasked?

    Thanks,
    bee.

Leave a Reply