Divide Foreach by Thread Pattern
I want to run a method SearchResultByOrderNumber(string orderNumber)
in Foreach
using multithreading. There OrderNumbers
are ten OrderNumbers in Datatable. When looking for these OrderNumbers in the OrderResults
Datatable, I want to split these OrderNumbers into five streams. Each thread will have two OrderNumbers searches. How can I do this using the Asp.Net 3.5 Framework?
I think I should update my question. How can I automatically split "OrderNumbers" into Async methods? First, I got it rowCount
. I'm going to define an Async counter. Then I get rowsPerAsyncMethods
by dividing rowCount
by asyncMethodCount
.
rowsPerAsyncMethods = rowCount / asyncMethodCount
Thank.
void Main()
{
var MyTask1Caller = new Func<DataTable>(MyTask1);
var asyncResultMyTask1 = MyTask1Caller.BeginInvoke(null, null);
var MyTask2Caller = new Func<DataTable>(MyTask2);
var asyncResultMyTask2 = MyTask2Caller.BeginInvoke(null, null);
DataTable dtMyTask1 = MyTask1Caller.EndInvoke(asyncResultMyTask1);
DataTable dtMyTask2 = MyTask2Caller.EndInvoke(asyncResultMyTask2);
Console.WriteLine("dtMyTask1");
Console.WriteLine("dtMyTask2");
asyncResultMyTask1.AsyncWaitHandle.WaitOne();
asyncResultMyTask2.AsyncWaitHandle.WaitOne();
}
public int RowCount()
{
DataTable dt = OrderNumbers();
int items = dt.Rows.Count;
return items;
}
public DataTable MyTask1()
{
DataTable dtResult = new DataTable();
DataColumn dc = new DataColumn("OrderNumber", typeof(System.Int32));
dtResult.Columns.Add(dc);
dc = new DataColumn("OrderResult", typeof(string));
dtResult.Columns.Add(dc);
DataTable dtOrders = new DataTable();
dtOrders = OrderNumbers();
var items = dtOrders.AsEnumerable()
.Select(n => n).Take(3).CopyToDataTable();
foreach(var order in items.AsEnumerable())
{
string orderNumber = order["OrderNumber"].ToString();
string orderResult = SearchResultByOrderNumber(orderNumber);
DataRow dr = dtResult.NewRow();
dr["OrderNumber"] = orderNumber;
dr["OrderResult"] = orderResult;
dtResult.Rows.Add(dr);
}
//Thread.Sleep(5000);
return dtResult;
}
public DataTable MyTask2()
{
DataTable dtResult = new DataTable();
DataColumn dc = new DataColumn("OrderNumber", typeof(System.Int32));
dtResult.Columns.Add(dc);
dc = new DataColumn("OrderResult", typeof(string));
dtResult.Columns.Add(dc);
DataTable dtOrders = new DataTable();
dtOrders = OrderNumbers();
var items = dtOrders.AsEnumerable()
.Select(n => n).Skip(3).Take(3).CopyToDataTable();
foreach(var order in items.AsEnumerable())
{
string orderNumber = order["OrderNumber"].ToString();
string orderResult = SearchResultByOrderNumber(orderNumber);
DataRow dr = dtResult.NewRow();
dr["OrderNumber"] = orderNumber;
dr["OrderResult"] = orderResult;
dtResult.Rows.Add(dr);
}
return dtResult;
}
public string SearchResultByOrderNumber(string orderNumber)
{
DataTable dt = new DataTable();
dt = OrderResults();
var query = (from n in dt.AsEnumerable()
where n["OrderNumber"].ToString() ==orderNumber
select n["OrderResult" ].ToString()).FirstOrDefault();
return query;
}
public DataTable OrderResults()
{
DataTable dt = new DataTable("OrderResults");
DataColumn dc = new DataColumn("OrderNumber", typeof(System.Int32));
dt.Columns.Add(dc);
dc = new DataColumn("OrderResult", typeof(string));
dt.Columns.Add(dc);
for(int i=1; i<10; i++)
{
DataRow dr = dt.NewRow();
dr["OrderNumber"] = i;
dr["OrderResult"] =i +" Result";
dt.Rows.Add(dr);
}
return dt;
}
public DataTable OrderNumbers()
{
DataTable dt = new DataTable("OrderNumbers");
DataColumn dc = new DataColumn("OrderNumber", typeof(System.Int32));
dt.Columns.Add(dc);
for(int i=0; i<10; i++)
{
DataRow dr = dt.NewRow();
dr["OrderNumber"] = i;
dt.Rows.Add(dr);
}
return dt;
}
source to share
If .NET 4.0 is available, you can simply use the Parallel.ForEach construct.
If not, handling this in parallel is as easy as using a class ThreadPool
, with some extra work to keep in sync:
int tasks = 0; // keep track of number of active tasks
object locker = new object(); // synchronization object
foreach(var order1 in dtOrders.AsEnumerable())
{
lock(locker) tasks++; // added a new task
var order = order1; // local copy to avoid data races
ThreadPool.QueueUserWorkItem(
o =>
{
string orderNumber = order["OrderNumber"].ToString();
string orderResult = SearchResultByOrderNumber(orderNumber);
DataRow dr = dtResult.NewRow();
dr["OrderNumber"] = orderNumber;
dr["OrderResult"] = orderResult;
lock(locker) // update shared data structure and signal termination
{
dtResult.Rows.Add(dr);
tasks--;
Monitor.Pulse(locker);
}
});
}
// barrier to wait for all tasks to finish
lock(locker)
{
while(tasks > 0) Monitor.Wait(locker);
}
source to share
You can use CountdownEvent
that blocks the current thread until the counter is 0:
var sync = new object();
var cd = new CountDownEvent(dtOrders.Rows.Count);
foreach(var order in dtOrders)
{
var dr = dtResult.NewRow();
dr["OrderNumber"] = order["OrderNumber"].ToString();
ThreadPool.QueueUserWorkItem(o =>
{
dr["OrderResult"] = SearchResultByOrderNumber(dr["OrderNumber"].ToString());
lock(sync) dtResult.Rows.Add(dr);
cd.Signal();
});
}
cd.Wait();
- The counter value is set in the constructor.
-
cd.Signal()
reduce the count by one. -
cd.Wait()
blocks the current thread until the counter is 0.
source to share