Multithreading multithreading acceleration: reading through CSV using TMemoryStream
I am writing parallel code to list a large set of CSV files, each filled with historical stock data (over 6500 characters) and calculate if each stock has reached its all-time maximum.
I implemented Thread Pool and TThread descendant class to split the character list equally among Threads, then threads are allocated to SEPARATE cores of my i7 machine. I set up threads for each copy of all the data they need as they are created before canceling them, so there is no need for blocking or in the process of processing threads. Once all threads are finished, I summarize the result data of each thread in the main program.
I have currently tested my code using a few of the multi-threaded memory managers mentioned on StackOverflow question . So far, SapMM appears to be the most efficient without causing access violations.
The problem is that adding more threads does not proportionally speed up the time it takes to complete the calculation of all the highs. Using 2 "core-d" threads does not cut runtime by one and a half 1/2, but 3 is not fully cut by 1/3, and 4 is not cut around 1/4.
Number of threads 1, 2, 3, 4
Predicted Acceleration Time (mm: ss) 6:37, 3:17 1/2, 2:12 1/3, 1:39 1/4
Actual Time (mm: ss) 6:37, 4:07, 3:05, 2:51
I've come to a point where I need some additional insight to fully speed up this operation. I need to understand why tiered acceleration is failing and not just "fooling around the edges of the problem". So what causes this code to stop receiving proportional benefits, and what do I need to do to achieve those benefits? In short, is there any other approach to speed up the analysis that I am doing for example. rather than using TMemoryStream?
The code I am using is below.
I am using Delphi XE4 Enterprise.
In each stream, I go through each character and:
- Use TMemoryStream.LoadFromFile to load historical symbol data.
- Use the function I wrote to get the highest maximum character data directly from the TMemoryStream.
(1) is validated and does not take any time (less than 1 second of total time taken to load 6500 total, one at a time into memory). The procedure I use in (2) is the one that takes all the time and is given below:
unit uTest;
implementation
uses
SysUtils, Math, Classes;
type
TDayIndexData = record
Date: TDate;
Open, High, Low, Close, AdjClose,
Volume: extended;
end;
type
TTimeUnit = (tuDay, tuWeek, tuMonth, tuYear);
TTimePeriod = record
Length: integer;
TimeUnit: TTimeUnit;
end;
//#NO CHANGE
const
AllDataPeriodStr = 'All Data';
type
TRatePeriod = record
PeriodStr: string;
TimePeriod: TTimePeriod;
end;
type
TFieldType = (ftDate, ftOpen, ftHigh, ftLow, ftClose, ftVolume, ftAdjClose);
const CSV_DELIM_CHARSET = [#0..#31, ',',#127];
type
TShallowEquityNewHighInfoRetrievalResults = record
Success: boolean;
High: extended;
end;
function ShallowEquityNewHighInfoRetrieval(
AStream: TStream;
ARatePeriod: TRatePeriod;
AGetNormalData: boolean = False): TShallowEquityNewHighInfoRetrievalResults;
var
vStreamSize: int64;
function EOF: boolean;
begin
Result := AStream.Position >= vStreamSize;//AStream.Size;
end;
procedure GotoEOF;
begin
AStream.Seek(0, soFromEnd);
end;
//#OPTIMIZE
//var
//vBuffer: FileString;
type
FileChar = AnsiChar;
FileString = AnsiString;
const
ResultCharSize = SizeOf(FileChar);
var
MRReadChar: FileChar;
procedure ReadNextChar;
begin
if not EOF then
AStream.Read(MRReadChar, SizeOf(MRReadChar)) else
raise EInvalidOperation.Create('Unexpected end of file found');
end;
var
vPossDelimChars: boolean;
procedure SkipExistingDelimChars;
begin
//*INTENTION: prevents redundant SkipDelimChars calls, which is destructive
if not vPossDelimChars then Exit;
//not requiring DelimChars
if EOF then Exit;
repeat
ReadNextChar;
until EOF or not (MRReadChar in CSV_DELIM_CHARSET);
//#*NOTE: technically can be true if EOF,
//but if EOF then CurChar is never used 3/13/2014
vPossDelimChars := False;
end;
function SOF: boolean;
begin
Result := AStream.Position = 0;
end;
function NextChars(ACount: integer): FileString;
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
begin
SetLength(Result, Min(ACount, vStreamSize{AStream.Size} - AStream.Position));
AStream.Read(Pointer(Result)^, Length(Result));
AStream.Seek(-Length(Result), soFromCurrent);
end else
begin
SetLength(Result, Min(ACount, (vStreamSize{AStream.Size} - AStream.Position) div ResultCharSize));
AStream.Read(Pointer(Result)^, Length(Result) * ResultCharSize);
AStream.Seek(-Length(Result) * ResultCharSize, soFromCurrent);
end;
end;
procedure GotoNextChars(ACount: integer);
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
AStream.Seek(ACount, soFromCurrent) else
AStream.Seek(ACount*SizeOf(FileChar), soFromCurrent);
end;
procedure GotoPrevChars(ACount: integer);
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
AStream.Seek(-ACount, soFromCurrent) else
AStream.Seek(-ACount*SizeOf(FileChar), soFromCurrent);
end;
procedure GotoPreceedingEOLN(ForItem: boolean = False);
var
vOrigPos: integer;
const
NMinRowChars = 17;//Length('3-13-13,1,1,1,1,1')
begin
//assumes will not hit SOF
//assumes ending CRLF taken care of by other places
vOrigPos := AStream.Position;
vPossDelimChars := True;
while (NextChars(2) <> #13#10) or (AStream.Position = vOrigPos) do
if (Length(NextChars(2)) = 2) and (NextChars(2)[2] = #10) and
(AStream.Position < vOrigPos - SizeOf(FileChar)) then
begin
GotoNextChars(1);
Exit;
end else
if (AStream.Position = vOrigPos) and ForItem then
GotoPrevChars(NMinRowChars) else
GotoPrevChars(1);
end;
var
CurField: string;
CurCol: integer;
procedure InitParsingState;
begin
//Initialize Parsing State
CurCol := -1;
vPossDelimChars := True;
SkipExistingDelimChars;
vStreamSize := AStream.Size;
end;
procedure BacktrackTo(APos: integer; ASafeMode: boolean = False);
begin
if ASafeMode then
AStream.Seek(Pred(APos), soFromBeginning) else
AStream.Seek(APos, soFromBeginning);
ReadNextChar;
vPossDelimChars := False;
CurCol := Ord(High(TFieldType));
end;
procedure ReadQuotedText;
var
vHadPrevQuoteChar: boolean;
begin
vHadPrevQuoteChar := False;
while MRReadChar = '"' do
begin
if vHadPrevQuoteChar then
CurField := CurField + MRReadChar;
ReadNextChar;
while MRReadChar <> '"' do
begin
CurField := CurField + MRReadChar;
ReadNextChar;
end;
if EOF then
break;
ReadNextChar;
vHadPrevQuoteChar := True;
end;
end;
procedure GetNextFieldValue;
begin
if EOF then Exit;
CurCol := (CurCol+1) mod Succ(Ord(High(TFieldType)));
CurField := '';
if MRReadChar = '"' then
ReadQuotedText else
begin
repeat
CurField := CurField + MRReadChar;
if not EOF then
ReadNextChar;
until EOF or (MRReadChar in CSV_DELIM_CHARSET);
if EOF then
if not (MRReadChar in CSV_DELIM_CHARSET) then
CurField := CurField + MRReadChar;
end;
vPossDelimChars := True;
SkipExistingDelimChars;
end;
var
ColFieldTypes: array [Ord(Low(TFieldType))..Ord(High(TFieldType))] of TFieldType;
procedure ResolveCurColFieldType;
var
vField: string;
begin
vField := LowerCase(CurField);
if vField = 'date' then
ColFieldTypes[CurCol] := ftDate else
if vField = 'open' then
ColFieldTypes[CurCol] := ftOpen else
if vField = 'high' then
ColFieldTypes[CurCol] := ftHigh else
if vField = 'low' then
ColFieldTypes[CurCol] := ftLow else
if vField = 'close' then
ColFieldTypes[CurCol] := ftClose else
if vField = 'volume' then
ColFieldTypes[CurCol] := ftVolume else
if Pos('close', vField) > 0 then
ColFieldTypes[CurCol] := ftAdjClose else
raise EInvalidOperation.Create('Unrecognized file format: unrecognized column name found.');
end;
procedure WriteItemAsFieldValue(var AData: TDayIndexData);
begin
case ColFieldTypes[CurCol] of
ftDate:AData.Date := ExStrToDate(CurField);
ftOpen:AData.Open := StrToFloat(CurField);
ftHigh:AData.High := StrToFloat(CurField);
ftLow:AData.Low := StrToFloat(CurField);
ftClose:AData.Close := StrToFloat(CurField);
ftVolume:AData.Volume := StrToFloat(CurField);
ftAdjClose:AData.AdjClose := StrToFloat(CurField);
end;
end;
procedure VerifyFields;
var
iField: TFieldType;
iColumn: integer;
IsUsedFlags: array [Low(TFieldType)..High(TFieldType)] of boolean;
begin
//* Set all to false
for iField := Low(TFieldType) to High(TFieldType) do
IsUsedFlags[iField] := False;
//* set found to true
for iColumn := Low(ColFieldTypes) to High(ColFieldTypes) do
IsUsedFlags[ColFieldTypes[iColumn]] := True;
//* throw error on first one not found
for iField := Low(TFieldType) to High(TFieldType) do
if not IsUsedFlags[iField] then
begin
raise EInvalidOperation.Create('Bad file format: one or more column names are missing!');
break;
end;
end;
procedure LoadHeader;
var
iField: TFieldType;
begin
for iField := Low(TFieldType) to High(TFieldType) do
begin
GetNextFieldValue;
ResolveCurColFieldType;
end;
VerifyFields;
if EOF then
raise EInvalidOperation.Create('Cannot complete shallow Equity New High Info Retrieval: Not enough Data')
end;
procedure LoadRowInto(var ADayData: TDayIndexData);
var
iField: TFieldType;
begin
for iField := Low(TFieldType) to High(TFieldType) do
begin
GetNextFieldValue;
WriteItemAsFieldValue(ADayData);
end;
end;
var
OrderReversed: boolean;
vTopDay,
vBottomDay,
vFirstDay,
vEarlierDay,
vLastDay: TDayIndexData;
vBeginDate: TDate;
vBeforeLastDayPos,
vFirstDayPos,
vAfterFirstDayPos: integer;
function HasUnprocessedDays: boolean;
begin
//** use Position of stream because we don't always have the first day in the
// file, due to optimization
Result := (
((AStream.Position > vFirstDayPos) and not OrderReversed) or
(((AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
(AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#10)))
and OrderReversed));
end;
function NotYetCoveredTimePeriod: boolean;
begin
Result :=
(ARatePeriod.PeriodStr = AllDataPeriodStr)
or
(
(ARatePeriod.PeriodStr <> AllDataPeriodStr) and
(vEarlierDay.Date >= vBeginDate)
);
end;
function FoundAllNeededData: boolean;
begin
Result := (
(ARatePeriod.PeriodStr <> AllDataPeriodStr) and
(vEarlierDay.Date <= vBeginDate)
) or
(ARatePeriod.PeriodStr = AllDataPeriodStr);
end;
procedure GotoLastDay;
begin
//** Goto End of File
GotoEOF;
//** Goto Just before Last Day
GotoPreceedingEOLN;
if (AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
(AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#10)) then
GotoPreceedingEOLN;
SkipExistingDelimChars;
end;
procedure DetermineDataOrder;
begin
//#ASSUMPTION: assume end day at BOTTOM of file if latest data less than 2 days ago
//Problem when NDays = 2 ?
if Trunc(Now) - Trunc(vBottomDay.Date) >= 2 then
begin
//** Get Top Day
BacktrackTo(vFirstDayPos, True);
LoadRowInto(vTopDay);
//** Determine what order the data is in
OrderReversed := vBottomDay.Date < vTopDay.Date;
if not OrderReversed then
BacktrackTo(vBeforeLastDayPos, True);
if OrderReversed then
vFirstDay := vBottomDay else
vFirstDay := vTopDay;
if OrderReversed then
vLastDay := vTopDay else
vLastDay := vBottomDay;
end else
begin
OrderReversed := False;
//vLastDay := vTopDay;
vLastDay := vBottomDay;
end;
end;
procedure LoadPrevRow;
var
vBeforeDayPos: integer;
begin
GotoPreceedingEOLN(True);
vBeforeDayPos := AStream.Position;
SkipExistingDelimChars;
LoadRowInto(vEarlierDay);
AStream.Seek(vBeforeDayPos, soFromBeginning);
end;
begin
//* Initialize
Result.Success := False;
AStream.Seek(0, soFromBeginning);
InitParsingState;
//** Load CSV Header
LoadHeader;
vFirstDayPos := AStream.Position;
//** Get Last Day
GotoLastDay;
vBeforeLastDayPos := AStream.Position;
LoadRowInto(vBottomDay);
//** IF Only 1 Data Day:
if vFirstDayPos = vBeforeLastDayPos then
begin
//return results
Result.Success := True;
Result.High := vBottomDay.High;
Exit;
end;
//** Go back to Last Day in File
BacktrackTo(vBeforeLastDayPos);
//** Determine what order the data is in
DetermineDataOrder;
//** Determine Date to scan back to if opted for
if ARatePeriod.PeriodStr <> AllDataPeriodStr then
vBeginDate := MoveDateBack(vLastDay.Date, ARatePeriod.TimePeriod);
//* Initialize Loop Variables
Result.High := vLastDay.High;
vEarlierDay := vLastDay;
while HasUnProcessedDays and NotYetCoveredTimePeriod do
begin
//** Goto Previous Day Row
if OrderReversed then
LoadRowInto(vEarlierDay) else
LoadPrevRow;
//** Update High
if NotYetCoveredTimePeriod then
Result.High := Max(Result.High, vEarlierDay.High);
end;
Result.Success := FoundAllNeededData;
end;
end.
Below is an example CSV. Note that sometimes the CSV positions are in reverse order in the file (last date first).
Date,Open,High,Low,Close,Volume,Adj Close
11/3/2014,12,12.06,11.75,11.98,19700,11.98
11/4/2014,12,12,10.62,11.55,39200,11.55
11/5/2014,11.6,11.85,11.6,11.85,3100,11.85
11/6/2014,11.85,11.85,11.85,11.85,0,11.85
11/7/2014,11.5,11.5,10.35,11,35900,11
11/10/2014,11.12,11.12,11.12,11.12,200,11.12
11/11/2014,11.5,11.5,11.5,11.5,200,11.5
11/12/2014,11.75,11.85,11.15,11.45,3500,11.45
11/13/2014,11.45,11.45,11.45,11.45,0,11.45
11/14/2014,11.45,11.45,11.45,11.45,0,11.45
11/17/2014,11.07,11.28,11.07,11.28,1600,11.28
11/18/2014,11.07,11.74,11.06,11.74,8100,11.74
11/19/2014,11.1,11.5,11,11.5,11600,11.5
11/20/2014,11.1,11.5,11.1,11.5,3100,11.5
11/21/2014,11.49,11.5,11.23,11.25,15100,11.25
11/24/2014,11.25,11.35,11.25,11.25,900,11.25
11/25/2014,11.48,11.5,11.25,11.5,355300,11.5
11/26/2014,11.75,11.75,11.5,11.5,261300,11.5
11/28/2014,11.75,11.8,11.75,11.8,16300,11.8
12/1/2014,11.25,11.8,11.02,11.5,23800,11.5
12/2/2014,11.6,11.6,11.47,11.5,57600,11.5
12/3/2014,11.57,11.75,11.41,11.69,240700,11.69
12/4/2014,11.74,11.75,11.49,11.65,41100,11.65
12/5/2014,11.65,11.85,11.56,11.8,267200,11.8
12/8/2014,11.8,11.85,11.68,11.8,168700,11.8
source to share
First try taking Intel Threading Building Blocks as your memory manager. It scales pretty well to at least 16 cores (I had a similar problem in Why does multithreaded memory allocate / dealass an intensive application not scale with the number of threads?
In general, even when using Intel TBB in the main thread's run loop, memory should not be dynamically allocated / deallocated. These operations are always very bad.
- Allocate enough memory (fixed) to process before the start of the loop and free the end of the loop.
- The inner read / process loop simply restricts operations to pointer binding. Memory copying should only be done when needed.
The input data can be shared between different hard drives, even better if connected to different controllers. The input can be mapped to memory if you know the size in advance, i.e. Before processing in the flow circuit.
Optimize the handling of one thread as much as possible (profiling), then identify the parts that don't scale with the number of threads (number of threads as a parameter). These parts need to be rewritten. Read / output operations can be cached and / or read data in n * cluster fragments.
These fairly general guidelines are based on my experience while processing TB size input on Windows 7 (up to 12 ht cores and up to 128 GB of RAM) in parallel on threads.
source to share