Why does multithreaded memory allocate / deallocate an intensive application, doesn't scale with the number of threads?
Note:
Original title
Why doesn't DWScript multithreaded JSON parser scale with the number of threads?
was changed because this issue is not related to the processing of JSON data with DWScript. The problem is with the default memory manager in Delphi XE2 for XE7 (tested XE2 and trial XE7), but the problem is new for this type of application.
I have a Win32 / Win64 vcl multithreaded application that processes JSON data in Delphi XE2.
Each thread parses the JSON data using TdwsJSONValue.ParseString(sJSON)
DWScript, reads the values using DWScript methods, and stores the result as records.
For testing purposes, I am processing the same JSON data in every thread.
In the thead single mode, processing N
seconds in the thread is performed to process the data. Increasing the number of threads to M
lineary (approximately M * N
) increases the time within a single thread required to process the same data.
As a result, there is no improvement in speed. The other parts of these applications (delivering JSON data, storing the results in the target environment) scale out as expected.
What could be the reason? Any ideas appreciated.
Additional Information:
-
Tested on Win7 / 32 and Win7 / 64, Win8 / 64 dual-core to 12-core (w / w-out HT) systems
-
DWScript was chosen as the fastest available (tested a bunch, including: Superobject, Delphi embedded). SO behaves similarly to the JSON block from DWS.
-
Below is a complete console application that illustrates the problem. To run it, we need the json sample data available here: https://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0 This file contains the data
json1.dat
for the first stream. For streams up to 16, just copy json1.dat to json2.dat ... json16.dat.Program and data must be in the same folder. To run: convert.exe N, where N is the number of threads.
The program writes the execution time to msecs to stout - it is spent on the stream, the data parsing time and the time it takes for the TdwsJSONValue object to be dropped (Destroy). The application is
_dwsjvData.Destroy;
not scalable.
program Convert;
{$APPTYPE CONSOLE}
{$R *.res}
uses
System.SysUtils,
System.Diagnostics,
System.Classes,
dwsJSON in 'dwsJSON.pas',
dwsStrings in 'dwsStrings.pas',
dwsUtils in 'dwsUtils.pas',
dwsXPlatform in 'dwsXPlatform.pas';
type
TWorkerThread = class (TThread)
private
_iUid: Integer;
_swWatch: TStopwatch;
_lRunning: Boolean;
_sFileJSonData: String;
_fJsonData: TextFile;
protected
constructor Create (AUid: Integer);
procedure Execute; override;
published
property Running: Boolean read _lRunning;
end;
TConverter = class (TObject)
private
_swWatch0, _swWatch1, _swWatch2: TStopwatch;
_dwsjvData: TdwsJSONValue;
protected
constructor Create;
destructor Destroy; override;
function Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
end;
const
MAX_THREADS = 16;
var
iHowMany: Integer;
athWorker: array [1..MAX_THREADS] of Pointer;
aiElapsed: array [1..MAX_THREADS] of Integer;
aiElapsedParse: array [1..MAX_THREADS] of Integer;
aiElapsedDestroy: array [1..MAX_THREADS] of Integer;
aiFares: array [1..MAX_THREADS] of Integer;
swWatchT, swWatchP: TStopwatch;
constructor TWorkerThread.Create (AUid: Integer);
begin
inherited Create (True);
_iUid := AUid;
_swWatch := TStopwatch.Create;
_sFileJSonData := ExtractFilePath (ParamStr (0)) + 'json' + Trim (IntToStr (_iUid)) + '.dat';
_lRunning := False;
Suspended := False;
end;
procedure TWorkerThread.Execute;
var
j: Integer;
sLine: String;
slLines: TStringList;
oS: TConverter;
begin
_lRunning := True;
oS := TConverter.Create;
slLines := TStringList.Create;
System.AssignFile (_fJsonData, _sFileJSonData);
System.Reset (_fJsonData);
j := 0;
repeat
System.Readln (_fJsonData, sLine);
slLines.Add (sLine);
Inc (j);
until (j = 50);
// until (System.Eof (_fJsonData));
System.Close (_fJsonData);
Sleep (1000);
_swWatch.Reset;
_swWatch.Start;
aiFares [_iUid] := 0;
aiElapsedParse [_iUid] := 0;
aiElapsedDestroy [_iUid] := 0;
for j := 1 to slLines.Count do
aiFares [_iUid] := aiFares [_iUid] + oS.Calculate (_iUid, slLines.Strings [j - 1], aiElapsedParse [_iUid], aiElapsedDestroy [_iUid]);
_swWatch.Stop;
slLines.Free;
os.Destroy;
aiElapsed [_iUid] := _swWatch.ElapsedMilliseconds;
_lRunning := False;
end;
constructor TConverter.Create;
begin
inherited Create;
_swWatch0 := TStopwatch.Create;
_swWatch1 := TStopwatch.Create;
_swWatch2 := TStopwatch.Create;
end;
destructor TConverter.Destroy;
begin
inherited;
end;
function TConverter.Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
var
jFare, jTotalFares, iElapsedParse, iElapsedDestroy, iElapsedTotal: Integer;
begin
_swWatch0.Reset;
_swWatch0.Start;
_swWatch1.Reset;
_swWatch1.Start;
_dwsjvData := TdwsJSONValue.ParseString (AJSonData);
_swWatch1.Stop;
iElapsedParse := _swWatch1.ElapsedMilliseconds;
if (_dwsjvData.ValueType = jvtArray) then
begin
_swWatch2.Reset;
_swWatch2.Start;
jTotalFares := _dwsjvData.ElementCount;
for jFare := 0 to (jTotalFares - 1) do
if (_dwsjvData.Elements [jFare].ValueType = jvtObject) then
begin
_swWatch1.Reset;
_swWatch1.Start;
_swWatch1.Stop;
end;
end;
_swWatch1.Reset;
_swWatch1.Start;
_dwsjvData.Destroy;
_swWatch1.Stop;
iElapsedDestroy := _swWatch1.ElapsedMilliseconds;
_swWatch0.Stop;
iElapsedTotal := _swWatch0.ElapsedMilliseconds;
Inc (AParse, iElapsedParse);
Inc (ADestroy, iElapsedDestroy);
result := jTotalFares;
end;
procedure MultithreadStart;
var
j: Integer;
begin
for j := 1 to iHowMany do
if (athWorker [j] = nil) then
begin
athWorker [j] := TWorkerThread.Create (j);
TWorkerThread (athWorker [j]).FreeOnTerminate := False;
TWorkerThread (athWorker [j]).Priority := tpNormal;
end;
end;
procedure MultithreadStop;
var
j: Integer;
begin
for j := 1 to MAX_THREADS do
if (athWorker [j] <> nil) then
begin
TWorkerThread (athWorker [j]).Terminate;
TWorkerThread (athWorker [j]).WaitFor;
TWorkerThread (athWorker [j]).Free;
athWorker [j] := nil;
end;
end;
procedure Prologue;
var
j: Integer;
begin
iHowMany := StrToInt (ParamStr (1));
for j := 1 to MAX_THREADS do
athWorker [j] := nil;
swWatchT := TStopwatch.Create;
swWatchT.Reset;
swWatchP := TStopwatch.Create;
swWatchP.Reset;
end;
procedure RunConvert;
function __IsRunning: Boolean;
var
j: Integer;
begin
result := False;
for j := 1 to MAX_THREADS do
result := result or ((athWorker [j] <> nil) and TWorkerThread (athWorker [j]).Running);
end;
begin
swWatchT.Start;
MultithreadStart;
Sleep (1000);
while (__isRunning) do
Sleep (500);
MultithreadStop;
swWatchT.Stop;
Writeln (#13#10, 'Total time:', swWatchT.ElapsedMilliseconds);
end;
procedure Epilogue;
var
j: Integer;
begin
for j := 1 to iHowMany do
Writeln ( #13#10, 'Thread # ', j, ' tot.time:', aiElapsed [j], ' fares:', aiFares [j], ' tot.parse:', aiElapsedParse [j], ' tot.destroy:', aiElapsedDestroy [j]);
Readln;
end;
begin
try
Prologue;
RunConvert;
Epilogue;
except
on E: Exception do
Writeln (E.ClassName, ': ', E.Message);
end;
end.
source to share
The solution is the default Exchange Manager Delphi XE2 or XE7 with Intel® Threading Building Blocks Memory Manager. In the sample app, it scales ca. lineary with up to 16 threads when the application is 64 bits.
update: with assumption that number of threads running is less than number of cores
This has been tested on machines ranging from 2cores / 4ht to 12cores / 24ht running KVM, virtualized Windows 7 with 124GB RAM.
The interesting thing is virtualization of Win 7. Memory allocation and deallocation is 2x faster than native Win 7.
Conclusion: if you are doing a lot of 10kB-10MB block memory allocation / deallocation operations in multithreaded application threads (more than 4-8 threads), use only Intel memory manager.
@André: thanks for the hint pointing me in the right direction!
Here is a block with TBB memory manager taken for tests, it should appear as 1st item list in the main project .dpr file
unit TBBMem;
interface
function ScalableGetMem (ASize: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_malloc';
procedure ScalableFreeMem (APtr: Pointer); cdecl; external 'tbbmalloc' name 'scalable_free';
function ScalableReAlloc (APtr: Pointer; Size: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_realloc';
implementation
Function TBBGetMem (ASize: Integer): Pointer;
begin
result := ScalableGetMem (ASize);
end;
Function TBBFreeMem (APtr: Pointer): Integer;
begin
ScalableFreeMem (APtr);
result := 0;
end;
Function TBBReAllocMem (APtr: Pointer; ASize: Integer): Pointer;
begin
result := ScalableRealloc (APtr, ASize);
end;
const
TBBMemoryManager: TMemoryManager = ( GetMem: TBBGetmem;
FreeMem: TBBFreeMem;
ReAllocMem: TBBReAllocMem; );
var
oldMemoryManager: TMemoryManager;
initialization
GetMemoryManager (oldMemoryManager);
SetMemoryManager (TBBMemoryManager);
finalization
SetMemoryManager (oldMemoryManager);
end.
source to share
Have you tried my scalable memory manager? Since Delphi (with fastmm built in) doesn't scale very well with strings and other memory related stuff: https://scalemm.googlecode.com/files/ScaleMM_v2_4_1.zip
And you can also try both modes of my profiler's profiler to see which part is the bottleneck: https://code.google.com/p/asmprofiler/
source to share
I ran (replay) the FastCode MM Challenge test and the results were not good for TBB (also an out of memory exception in the block downsize test).
In short: ScaleMM2 and Google TCmalloc are the fastest in this difficult test, Fastmm and ScaleMM2 use the least memory.
Average Speed Performance: (Scaled so that the winner = 100%)
XE6 : 70,4
TCmalloc : 89,1
ScaleMem2 : 100,0
TBBMem : 77,8
Average Memory Performance: (Scaled so that the winner = 100%)
XE6 : 100,0
TCmalloc : 29,6
ScaleMem2 : 75,6
TBBMem : 38,4
FastCode Challenge: https://code.google.com/p/scalemm/source/browse/#svn%2Ftrunk%2FChallenge
TBB 4.3: https://www.threadingbuildingblocks.org/download
source to share