Multi Threading does not handle the whole task

I am using below code to execute a task using threading, here I am trying to complete all records from DatTable Datatable. with a limited number of threads of 2 (i.e. only two threads / execution are allowed at the time). The problem is that it doesn't execute all the records available in the Datatable, and its data is executed irregularly. What could be the problem? .. Thanks in advance.

 public class Generator : IDisposable
 {
    public static int maxThreadCount = 2;
    public static int runningThreadCount = 0;

    public RunTask()
    {
       for (int ro = 0; ro <= dtTable.Rows.Count - 1; ro++)
       {
          if (maxThreadCount > runningThreadCount)
          {
            Thread atpthread = new Thread(delegate()
            {
            DoOperationMethod(dtTable.Rows[ro], Task, startDate, EndDate, dtTemplate);
            });
            atpthread.Start();
            runningThreadCount = runningThreadCount + 1;
            Mainthreads.Add(atpthread);
          }
          else
          {
            ro--;
          }
       }
  }

  public void DoOperationMethod(DataRow drAttachpoint, System.StrTaskItem Task, DateTime startDate, DateTime EndDate, DataTable dtTemplate)
    {
     //doing my Operation
     runningThreadCount = runningThreadCount-1; //Once Task done count will get reduce
    }

      

}

I am using .net 3.5 (FYI).

+3


source to share


3 answers


The problem is you keep repeating rows in the datatable while waiting for the stream to become free. In any case, this type of multithreading, even if it was actually correct (it is not), is rather inefficient - most of the time is probably spent starting new threads.

Try something like this:

Parallel
 .ForEach(dtTable.Rows.OfType<DataRow>(), row => DoOperationMethod(...))
 .WithDegreeOfParallelism(2);

      

EDIT:



To figure out how the problem started, you must understand how variables are captured by anonymous methods. Your call is DoOperationMethod

not being passed by the data string you want as the variable is ro

not copied but referenced to it. So when it ro

changes in the loop, it also changes in the threads you created.

It doesn't matter if your code is horribly unsafe and inefficient:

  • You actually spend three threads on work - no blocking, your loop just adds and subtracts ro

    , which almost completely cleanses the CPU. This is much more waste than just blocking while waiting for the result.
  • You can't just read and write static fields from multiple threads and expect everything to work correctly. It is actually very easy for you to run more threads in parallel than you would like, or even slow down the whole thing when it runningThread

    ends 2

    while threads are down.
  • You keep spawning new threads to perform a seemingly trivial operation - I would imagine most of your work is either I / O-bound or the cost of creating new threads over and over again.
  • I am assuming that it Mainthreads

    is a list of some kind, and I am assuming that you also change it by method DoOperationMethod

    - this can lead to random exceptions and unexpected results.
  • In theory, it's even possible that the validation maxThreadCount > runningThreadCount

    will never be evaluated due to various optimizations and caching. In practice, on current .NET on an x86 processor, this is not entirely likely for a method as complex as this, but it is something that might bite you when upgrading to .NET 7.0 or whatever :)

Multithreading is tough. You really don't want to guess your path. At least try to understand the basics first - http://www.albahari.com/threading/ .

+4


source


It seems to me that the biggest problem with your code is that even if you fix your thread with unsafe use runningThreadCount

, your code "spins", waiting for some thread to finish. This completely ties up the processor core while you try to get the real work done.

The solution suggested by Luann is fine, although I would use Cast<DataRow>()

instead OfType<DataRow>()

(since all the elements in the enum should actually be of type DataRow

). One big advantage other than its brevity is that it uses a thread pool, which will significantly reduce the thread management overhead (since it reuses threads instead of creating and destroying them over and over).

If you prefer a more explicit approach, you can change the code you entered to use a semaphore:

SemaphoreSlim semaphore = new SemaphoreSlim(2);

for (int ro = 0; ro <= dtTable.Rows.Count - 1; ro++)
{
    semaphore.Wait();

    DataRow row = dtTable.Rows[ro];

    Thread atpthread = new Thread(delegate()
    {
        DoOperationMethod(row, Task, startDate, EndDate, dtTemplate);
        semaphore.Release();
    });
    atpthread.Start();
    Mainthreads.Add(atpthread);
}

      



This will block the main thread on call Wait()

when the semaphore count reaches 0, and continue as soon as the count is positive again (i.e. after the thread calls Release()

).

I mark the commentator. A cryptic point about whether it is used dtTable

safely or not. I am assuming the object does not change during this processing. With this assumption in place, using it out of sync should be fine. But if these assumptions are false, then I agree with them that this is a mistake in the code.

Finally, I will point out that the reason you are looking at the missing lines is because you are using a variable ro

in an anonymous method. This variable can easily be incremented before the anonymous method is invoked for a given loop iteration, causing this thread to process the wrong string. Some lines may be processed multiple times, while other lines are skipped. I covered the problem in the above code example, fetching an object DataRow

into a variable inside a loop block, so that each thread gets its own private copy of the variable, exactly with the object DataRow

it needs to process.

+2


source


Threads perspective, completely inappropriate code, you mutate variables in a thread-safe way for multiple threads, which creates a problem. The biggest problem is in the code:

runningThreadCount = runningThreadCount-1;

      

It can be mutated in more than one thread, leading to race condition

, memory corruption

streaming safe way to do the same using Interlocked

the code:

Interlocked.Decrement(ref runningThreadCount);

      

It will just handle this problem. As said, you are accessing the DataRow in a thread, this data structure is not thread safe, so ideally you need to parallelize the data, preferably using an API Parallel.ForEach

where each data item is processed separately in a thread pool of .NET threads takes care of this, you don't need to accessing it by index like dtTable.Rows[ro]

, the first solution from @Luaan explains it well, although I would prefer:

Parallel.ForEach(dtTable.AsEnumerable(), row =>DoOperationMethod(...))
        .WithDegreeOfParallelism(2);

      

Don't forget to use AsEnumerable on the DataTable, you need a namespace - System.Data.DataSetExtensions

infact should ideally Degree of parallelism

not be a fixed number in order to work efficiently on all servers, you should use a value Environment.ProcessorCount

, this ensures that the number of concurrent calls is never more than logical cores, it is will be adjusted internally, there will be no clutter of requests, otherwise consider that on a system with 50 cores you just want to use 2, this is unlikely to have an impact.

Another point with the code where you say the processing order is not maintained, which is actually a direct consequence of your design, while you are incrementing / decrementing the variable ro

, as some lines will enter if the loop and the rest are the rest and in fact this can never be ensured even when you implement the Interlocked

suggested above as it depends on the time it takes to process the DataRow. Since .Net 4.0 when was introduced TPL

there are two broad ways to parallelize either based on Task

or Data

, Parallel.ForEach will parallelize the data, you can even schedule to wrap each block in a separate Task and run them in parallel usingTask.Factory.StartNew

using ThreadPool threads and then wait for all of them to finish with Task.WaitAll

.

Remember that after using all of this you cannot guarantee the order in which the DataRows are processed, for that you need PLinq

which has the option OrderBy

to ensure that the results are aggregated in the same order as the original collection

0


source







All Articles