Gearman tasks are delegated to more than one worker (PHP)
I have a problem that in a PHP application, Gearman jobs are sometimes passed to multiple workers. I could minify the code to reproduce it in one file. Now I am not sure if this is a bug in Gearman, or a bug in the pecl library, or perhaps in my code.
Here's the code to reproduce the error:
#!/usr/bin/php
<?php
// Try 'standard', 'exception' or 'exception-sleep'.
$sWorker = 'exception';
// Detect run mode "client" or "worker".
if (!isset($argv[1]))
$sMode = 'client';
else
$sMode = 'worker-' . $sWorker;
$sLogFilePath = __DIR__ . '/log.txt';
switch ($sMode) {
case 'client':
// Remove all queued test jobs and quit if there are test workers running.
prepare();
// Init the greaman client.
$Client= new GearmanClient;
$Client->addServer();
// Empty the log file.
file_put_contents($sLogFilePath, '');
// Start some worker processes.
$aPids = array();
for ($i = 0; $i < 100; $i++)
$aPids[] = exec('php ' . __FILE__ . ' worker > /dev/null 2>&1 & echo $!');
// Start some jobs. Also try doHigh(), doBackground() and
// doBackgroundHigh();
for ($i = 0; $i < 50; $i++)
$Client->doNormal('test', $i);
// Wait a second (when running jobs in background).
// sleep(1);
// Prepare the log file entries.
$aJobs = array();
$aLines = file($sLogFilePath);
foreach ($aLines as $sLine) {
list($sTime, $sPid, $sHandle, $sWorkload) = $aAttributes = explode("\t", $sLine);
$sWorkload = trim($sWorkload);
if (!isset($aJobs[$sWorkload]))
$aJobs[$sWorkload] = array();
$aJobs[$sWorkload][] = $aAttributes;
}
// Remove all jobs that have been passed to only one worker as expected.
foreach ($aJobs as $sWorkload => $aJob) {
if (count($aJob) === 1)
unset($aJobs[$sWorkload]);
}
echo "\n\n";
if (empty($aJobs))
echo "No job has been passed to more than one worker.";
else {
echo "Those jobs has been passed more than one times to a worker:\n";
foreach ($aJobs as $sWorload => $aJob) {
echo "\nJob #" . $sWorload . ":\n";
foreach ($aJob as $aAttributes)
echo " $aAttributes[2] (Worker PID: $aAttributes[1])\n";
}
}
echo "\n\n";
// Kill all started workers.
foreach ($aPids as $sPid)
exec('kill ' . $sPid . ' > /dev/null 2>&1');
break;
case 'worker-standard':
$Worker = new GearmanWorker;
$Worker->addServer();
$Worker->addFunction('test', 'logJob');
$bShutdown = false;
while ($Worker->work())
if ($bShutdown)
continue;
break;
case 'worker-exception':
try {
$Worker = new GearmanWorker;
$Worker->addServer();
$Worker->addFunction('test', 'logJob');
$bShutdown = false;
while ($Worker->work())
if ($bShutdown)
throw new \Exception;
} catch (\Exception $E) {
}
break;
case 'worker-exception-sleep':
try {
$Worker = new GearmanWorker;
$Worker->addServer();
$Worker->addFunction('test', 'logJob');
$bShutdown = false;
while ($Worker->work())
{
if ($bShutdown) {
sleep(1);
throw new \Exception;
}
}
} catch (\Exception $E) {
}
break;
}
function logJob(\GearmanJob $Job)
{
global $bShutdown, $sLogFilePath;
$sLine = microtime(true) . "\t" . getmypid() . "\t" . $Job->handle() . "\t" . $Job->workload() . "\n";
file_put_contents($sLogFilePath, $sLine, FILE_APPEND);
$bShutdown = true;
}
function prepare()
{
$rGearman = fsockopen('127.0.0.1', '4730', $iErrno, $sErrstr, 3);
$aBuffer = array();
fputs ($rGearman, 'STATUS' . PHP_EOL);
stream_set_timeout($rGearman, 1);
while (!feof($rGearman))
if ('.' . PHP_EOL !== $sLine = fgets($rGearman, 128))
$aBuffer[] = $sLine;
else
break;
fclose($rGearman);
$bJobsInQueue = false;
$bWorkersRunning = false;
foreach ($aBuffer as $sFunctionLine) {
list($sFunctionName, $iQueuedJobs, $iRunningJobs, $iWorkers) = explode("\t", $sFunctionLine);
if ('test' === $sFunctionName) {
if (0 != $iQueuedJobs)
$bJobsInQueue = true;
if (0 != $iWorkers)
$bWorkersRunning = true;;
}
}
// Exit if there are workers running.
if ($bWorkersRunning)
die("There are some Gearman workers running that have registered a 'test' function. Please stop these workers and run again.\n\n");
// If there are test jobs in the queue start a worker that eat up the jobs.
if ($bJobsInQueue) {
$sPid = exec('gearman -n -w -f test > /dev/null 2>&1 & echo $!');
sleep(1);
exec ("kill $sPid > /dev/null 2>&1");
// Repeat this method to make sure all jobs are removed.
prepare();
}
}
When you run this code on the command line, it should output , but it always lists some jobs that have been submitted to multiple workers. The error does not appear if you have installed or . "No job
has been passed to more than one worker."
$sWorker = 'standard';
'exception-sleep'
It would help me a lot if you could run the code and tell me if you can reproduce the error if I have a bug in the code.
Had exactly the same problem with Gearman 0.24, PECL lib 1.0.2. The ability to reproduce the error with your script every time.
An early version of Gearman (0.14 I think) worked fine.
Gearman update to 0.33 fixed the issue.