Gearman tasks are delegated to more than one worker (PHP)
I have a problem that in a PHP application, Gearman jobs are sometimes passed to multiple workers. I could minify the code to reproduce it in one file. Now I am not sure if this is a bug in Gearman, or a bug in the pecl library, or perhaps in my code.
Here's the code to reproduce the error:
#!/usr/bin/php
<?php
// Try 'standard', 'exception' or 'exception-sleep'.
$sWorker = 'exception';
// Detect run mode "client" or "worker".
if (!isset($argv[1]))
$sMode = 'client';
else
$sMode = 'worker-' . $sWorker;
$sLogFilePath = __DIR__ . '/log.txt';
switch ($sMode) {
case 'client':
// Remove all queued test jobs and quit if there are test workers running.
prepare();
// Init the greaman client.
$Client= new GearmanClient;
$Client->addServer();
// Empty the log file.
file_put_contents($sLogFilePath, '');
// Start some worker processes.
$aPids = array();
for ($i = 0; $i < 100; $i++)
$aPids[] = exec('php ' . __FILE__ . ' worker > /dev/null 2>&1 & echo $!');
// Start some jobs. Also try doHigh(), doBackground() and
// doBackgroundHigh();
for ($i = 0; $i < 50; $i++)
$Client->doNormal('test', $i);
// Wait a second (when running jobs in background).
// sleep(1);
// Prepare the log file entries.
$aJobs = array();
$aLines = file($sLogFilePath);
foreach ($aLines as $sLine) {
list($sTime, $sPid, $sHandle, $sWorkload) = $aAttributes = explode("\t", $sLine);
$sWorkload = trim($sWorkload);
if (!isset($aJobs[$sWorkload]))
$aJobs[$sWorkload] = array();
$aJobs[$sWorkload][] = $aAttributes;
}
// Remove all jobs that have been passed to only one worker as expected.
foreach ($aJobs as $sWorkload => $aJob) {
if (count($aJob) === 1)
unset($aJobs[$sWorkload]);
}
echo "\n\n";
if (empty($aJobs))
echo "No job has been passed to more than one worker.";
else {
echo "Those jobs has been passed more than one times to a worker:\n";
foreach ($aJobs as $sWorload => $aJob) {
echo "\nJob #" . $sWorload . ":\n";
foreach ($aJob as $aAttributes)
echo " $aAttributes[2] (Worker PID: $aAttributes[1])\n";
}
}
echo "\n\n";
// Kill all started workers.
foreach ($aPids as $sPid)
exec('kill ' . $sPid . ' > /dev/null 2>&1');
break;
case 'worker-standard':
$Worker = new GearmanWorker;
$Worker->addServer();
$Worker->addFunction('test', 'logJob');
$bShutdown = false;
while ($Worker->work())
if ($bShutdown)
continue;
break;
case 'worker-exception':
try {
$Worker = new GearmanWorker;
$Worker->addServer();
$Worker->addFunction('test', 'logJob');
$bShutdown = false;
while ($Worker->work())
if ($bShutdown)
throw new \Exception;
} catch (\Exception $E) {
}
break;
case 'worker-exception-sleep':
try {
$Worker = new GearmanWorker;
$Worker->addServer();
$Worker->addFunction('test', 'logJob');
$bShutdown = false;
while ($Worker->work())
{
if ($bShutdown) {
sleep(1);
throw new \Exception;
}
}
} catch (\Exception $E) {
}
break;
}
function logJob(\GearmanJob $Job)
{
global $bShutdown, $sLogFilePath;
$sLine = microtime(true) . "\t" . getmypid() . "\t" . $Job->handle() . "\t" . $Job->workload() . "\n";
file_put_contents($sLogFilePath, $sLine, FILE_APPEND);
$bShutdown = true;
}
function prepare()
{
$rGearman = fsockopen('127.0.0.1', '4730', $iErrno, $sErrstr, 3);
$aBuffer = array();
fputs ($rGearman, 'STATUS' . PHP_EOL);
stream_set_timeout($rGearman, 1);
while (!feof($rGearman))
if ('.' . PHP_EOL !== $sLine = fgets($rGearman, 128))
$aBuffer[] = $sLine;
else
break;
fclose($rGearman);
$bJobsInQueue = false;
$bWorkersRunning = false;
foreach ($aBuffer as $sFunctionLine) {
list($sFunctionName, $iQueuedJobs, $iRunningJobs, $iWorkers) = explode("\t", $sFunctionLine);
if ('test' === $sFunctionName) {
if (0 != $iQueuedJobs)
$bJobsInQueue = true;
if (0 != $iWorkers)
$bWorkersRunning = true;;
}
}
// Exit if there are workers running.
if ($bWorkersRunning)
die("There are some Gearman workers running that have registered a 'test' function. Please stop these workers and run again.\n\n");
// If there are test jobs in the queue start a worker that eat up the jobs.
if ($bJobsInQueue) {
$sPid = exec('gearman -n -w -f test > /dev/null 2>&1 & echo $!');
sleep(1);
exec ("kill $sPid > /dev/null 2>&1");
// Repeat this method to make sure all jobs are removed.
prepare();
}
}
When you run this code on the command line, it should output , but it always lists some jobs that have been submitted to multiple workers. The error does not appear if you have installed or . "No job
has been passed to more than one worker."
$sWorker = 'standard';
'exception-sleep'
It would help me a lot if you could run the code and tell me if you can reproduce the error if I have a bug in the code.
source to share