74 lines
1.8 KiB

<?php
class DeferredTask {
public static function run() {
global $pcntl_continue;
$tube = 'switchboard-worker';
echo "PID " . posix_getpid() . " watching tube: " . $tube . "\n";
bs()->watch($tube)->ignore('default');
if(isset($pcntl_continue)) {
while($pcntl_continue)
{
if(($job=bs()->reserve(2)) == FALSE)
continue;
self::process($job);
} // while true
echo "\nBye from pid " . posix_getpid() . "!\n";
} else {
if(($job=bs()->reserve())) {
self::process($job);
}
}
}
public static function run_once() {
$tube = 'switchboard-worker';
echo "PID " . posix_getpid() . " watching tube: " . $tube . "\n";
bs()->watch($tube)->ignore('default');
if(($job=bs()->reserve())) {
self::process($job);
}
echo "\nBye from pid " . posix_getpid() . "!\n";
}
public static function queue($class, $method, $args=array(), $delay=0) {
if(!is_array($args))
$args = array($args);
bs()->putInTube('switchboard-worker',
json_encode(array('class'=>$class, 'method'=>$method, 'args'=>$args)),
1024, // priority
$delay, // delay
300); // time to run
}
private static function process(&$jobData) {
$data = json_decode($jobData->getData());
if(!is_object($data) || !property_exists($data, 'class')) {
echo "Found bad job:\n";
print_r($data);
echo "\n";
bs()->delete($jobData);
return;
}
echo "===============================================\n";
echo "# Beginning job: " . $data->class . '::' . $data->method . "\n";
call_user_func_array(array($data->class, $data->method), $data->args);
echo "\n# Job Complete\n-----------------------------------------------\n\n";
bs()->delete($jobData);
}
}