Gearman tasks are delegated to more than one worker (PHP)

I have a problem that in a PHP application, Gearman jobs are sometimes passed to multiple workers. I could minify the code to reproduce it in one file. Now I am not sure if this is a bug in Gearman, or a bug in the pecl library, or perhaps in my code.

Here's the code to reproduce the error:

#!/usr/bin/php
<?php

// Try 'standard', 'exception' or 'exception-sleep'.
$sWorker = 'exception';



// Detect run mode "client" or "worker".
if (!isset($argv[1]))
    $sMode = 'client';
else
    $sMode = 'worker-' . $sWorker;

$sLogFilePath = __DIR__ . '/log.txt';

switch ($sMode) {

    case 'client':

        // Remove all queued test jobs and quit if there are test workers running.
        prepare();

        // Init the greaman client.
        $Client= new GearmanClient;
        $Client->addServer();

        // Empty the log file.
        file_put_contents($sLogFilePath, '');

        // Start some worker processes.
        $aPids = array();       
        for ($i = 0; $i < 100; $i++)
            $aPids[] = exec('php ' . __FILE__ . ' worker > /dev/null 2>&1 & echo $!');

        // Start some jobs. Also try doHigh(), doBackground() and
        // doBackgroundHigh();
        for ($i = 0; $i < 50; $i++)
            $Client->doNormal('test', $i);

        // Wait a second (when running jobs in background).
        // sleep(1);

        // Prepare the log file entries.
        $aJobs = array();
        $aLines = file($sLogFilePath);
        foreach ($aLines as $sLine) {
            list($sTime, $sPid, $sHandle, $sWorkload) = $aAttributes = explode("\t", $sLine);
            $sWorkload = trim($sWorkload);
            if (!isset($aJobs[$sWorkload]))
                $aJobs[$sWorkload] = array();
            $aJobs[$sWorkload][] = $aAttributes;
        }

        // Remove all jobs that have been passed to only one worker as expected.
        foreach ($aJobs as $sWorkload => $aJob) {
            if (count($aJob) === 1)
                unset($aJobs[$sWorkload]);
        }

        echo "\n\n";

        if (empty($aJobs))
            echo "No job has been passed to more than one worker.";
        else {
            echo "Those jobs has been passed more than one times to a worker:\n";
            foreach ($aJobs as $sWorload => $aJob) {

                echo "\nJob #" . $sWorload . ":\n";
                foreach ($aJob as $aAttributes)
                    echo "  $aAttributes[2] (Worker PID: $aAttributes[1])\n";
            }
        }

        echo "\n\n";

        // Kill all started workers.
        foreach ($aPids as $sPid)
            exec('kill ' . $sPid . ' > /dev/null 2>&1');

    break;

    case 'worker-standard':
        $Worker = new GearmanWorker;
        $Worker->addServer();
        $Worker->addFunction('test', 'logJob');
                    $bShutdown = false;
        while ($Worker->work())
            if ($bShutdown)
                continue;
        break;

    case 'worker-exception':
        try {
            $Worker = new GearmanWorker;
            $Worker->addServer();
            $Worker->addFunction('test', 'logJob');
            $bShutdown = false;
            while ($Worker->work())
                if ($bShutdown)
                    throw new \Exception;

        } catch (\Exception $E) {
        }
    break;

    case 'worker-exception-sleep':
        try {
            $Worker = new GearmanWorker;
            $Worker->addServer();
            $Worker->addFunction('test', 'logJob');
            $bShutdown = false;
            while ($Worker->work())
            {
                if ($bShutdown) {
                    sleep(1);
                    throw new \Exception;
                }
            }
        } catch (\Exception $E) {
        }
    break;
}

function logJob(\GearmanJob $Job)
{
    global $bShutdown, $sLogFilePath;
    $sLine = microtime(true) . "\t" . getmypid() . "\t" . $Job->handle() . "\t" . $Job->workload() . "\n";
    file_put_contents($sLogFilePath, $sLine, FILE_APPEND);
    $bShutdown = true;
}


function prepare()
{
    $rGearman = fsockopen('127.0.0.1', '4730', $iErrno, $sErrstr, 3);
    $aBuffer = array();
    fputs ($rGearman, 'STATUS' . PHP_EOL);
    stream_set_timeout($rGearman, 1);
    while (!feof($rGearman))
        if ('.' . PHP_EOL !== $sLine = fgets($rGearman, 128))
            $aBuffer[] = $sLine;
        else
            break;
    fclose($rGearman);

    $bJobsInQueue = false;
    $bWorkersRunning = false;
    foreach ($aBuffer as $sFunctionLine) {
        list($sFunctionName, $iQueuedJobs, $iRunningJobs, $iWorkers) = explode("\t", $sFunctionLine);
        if ('test' === $sFunctionName) {
            if (0 != $iQueuedJobs)
                $bJobsInQueue = true;
            if (0 != $iWorkers)
                $bWorkersRunning = true;;
        }
    }

    // Exit if there are workers running.
    if ($bWorkersRunning)
        die("There are some Gearman workers running that have registered a 'test' function. Please stop these workers and run again.\n\n");

    // If there are test jobs in the queue start a worker that eat up the jobs.
    if ($bJobsInQueue) {
        $sPid = exec('gearman -n -w -f test > /dev/null 2>&1 & echo $!');
        sleep(1);
        exec ("kill $sPid > /dev/null 2>&1");
        // Repeat this method to make sure all jobs are removed.
        prepare();
    }
}

      

When you run this code on the command line, it should output , but it always lists some jobs that have been submitted to multiple workers. The error does not appear if you have installed or . "No job has been passed to more than one worker."

$sWorker = 'standard';

'exception-sleep'

It would help me a lot if you could run the code and tell me if you can reproduce the error if I have a bug in the code.

+3


source to share


1 answer


Had exactly the same problem with Gearman 0.24, PECL lib 1.0.2. The ability to reproduce the error with your script every time.

An early version of Gearman (0.14 I think) worked fine.



Gearman update to 0.33 fixed the issue.

+3


source







All Articles