Multithreading multithreading acceleration: reading through CSV using TMemoryStream

I am writing parallel code to list a large set of CSV files, each filled with historical stock data (over 6500 characters) and calculate if each stock has reached its all-time maximum.

I implemented Thread Pool and TThread descendant class to split the character list equally among Threads, then threads are allocated to SEPARATE cores of my i7 machine. I set up threads for each copy of all the data they need as they are created before canceling them, so there is no need for blocking or in the process of processing threads. Once all threads are finished, I summarize the result data of each thread in the main program.

I have currently tested my code using a few of the multi-threaded memory managers mentioned on StackOverflow question . So far, SapMM appears to be the most efficient without causing access violations.

The problem is that adding more threads does not proportionally speed up the time it takes to complete the calculation of all the highs. Using 2 "core-d" threads does not cut runtime by one and a half 1/2, but 3 is not fully cut by 1/3, and 4 is not cut around 1/4.

Number of threads 1, 2, 3, 4

Predicted Acceleration Time (mm: ss) 6:37, 3:17 1/2, 2:12 1/3, 1:39 1/4

Actual Time (mm: ss) 6:37, 4:07, 3:05, 2:51

I've come to a point where I need some additional insight to fully speed up this operation. I need to understand why tiered acceleration is failing and not just "fooling around the edges of the problem". So what causes this code to stop receiving proportional benefits, and what do I need to do to achieve those benefits? In short, is there any other approach to speed up the analysis that I am doing for example. rather than using TMemoryStream?

The code I am using is below.

I am using Delphi XE4 Enterprise.

In each stream, I go through each character and:

  • Use TMemoryStream.LoadFromFile to load historical symbol data.
  • Use the function I wrote to get the highest maximum character data directly from the TMemoryStream.

(1) is validated and does not take any time (less than 1 second of total time taken to load 6500 total, one at a time into memory). The procedure I use in (2) is the one that takes all the time and is given below:

unit uTest;

implementation

uses
  SysUtils, Math, Classes;

type
  TDayIndexData = record
      Date: TDate;
      Open, High, Low, Close, AdjClose,
      Volume: extended;
    end;

type
  TTimeUnit = (tuDay, tuWeek, tuMonth, tuYear);

  TTimePeriod = record
    Length: integer;
    TimeUnit: TTimeUnit;
  end;

//#NO CHANGE
const
  AllDataPeriodStr = 'All Data';

type
  TRatePeriod = record
    PeriodStr: string;
    TimePeriod: TTimePeriod;
  end;

type
  TFieldType = (ftDate, ftOpen, ftHigh, ftLow, ftClose, ftVolume, ftAdjClose);

const CSV_DELIM_CHARSET = [#0..#31, ',',#127];

type
  TShallowEquityNewHighInfoRetrievalResults = record
    Success: boolean;
    High: extended;
  end;

function ShallowEquityNewHighInfoRetrieval(
  AStream: TStream;
  ARatePeriod: TRatePeriod;
  AGetNormalData: boolean = False): TShallowEquityNewHighInfoRetrievalResults;

var
  vStreamSize: int64;

  function EOF: boolean;
  begin
    Result := AStream.Position >= vStreamSize;//AStream.Size;
  end;

  procedure GotoEOF;
  begin
    AStream.Seek(0, soFromEnd);
  end;

//#OPTIMIZE
//var
  //vBuffer: FileString;

  type
    FileChar = AnsiChar;
    FileString = AnsiString;

  const
    ResultCharSize = SizeOf(FileChar);

  var
    MRReadChar: FileChar;

  procedure ReadNextChar;
  begin
    if not EOF then
      AStream.Read(MRReadChar, SizeOf(MRReadChar)) else
        raise EInvalidOperation.Create('Unexpected end of file found');
  end;

  var
    vPossDelimChars: boolean;

  procedure SkipExistingDelimChars;
  begin
    //*INTENTION: prevents redundant SkipDelimChars calls, which is destructive
    if not vPossDelimChars then Exit;

    //not requiring DelimChars

    if EOF then Exit;

    repeat
      ReadNextChar;
    until EOF or not (MRReadChar in CSV_DELIM_CHARSET);

    //#*NOTE: technically can be true if EOF,
    //but if EOF then CurChar is never used 3/13/2014
    vPossDelimChars := False;
  end;

  function SOF: boolean;
  begin
    Result := AStream.Position = 0;
  end;

  function NextChars(ACount: integer): FileString;
  begin
    //#OPTIMIZE: condition
    if ResultCharSize = 1 then
      begin
        SetLength(Result, Min(ACount, vStreamSize{AStream.Size} - AStream.Position));
        AStream.Read(Pointer(Result)^, Length(Result));
        AStream.Seek(-Length(Result), soFromCurrent);
      end else
        begin
          SetLength(Result, Min(ACount, (vStreamSize{AStream.Size} - AStream.Position) div ResultCharSize));
          AStream.Read(Pointer(Result)^, Length(Result) * ResultCharSize);
          AStream.Seek(-Length(Result) * ResultCharSize, soFromCurrent);
        end;
  end;

  procedure GotoNextChars(ACount: integer);
  begin
    //#OPTIMIZE: condition
    if ResultCharSize = 1 then
      AStream.Seek(ACount, soFromCurrent) else
        AStream.Seek(ACount*SizeOf(FileChar), soFromCurrent);
  end;

  procedure GotoPrevChars(ACount: integer);
  begin
    //#OPTIMIZE: condition
    if ResultCharSize = 1 then
      AStream.Seek(-ACount, soFromCurrent) else
        AStream.Seek(-ACount*SizeOf(FileChar), soFromCurrent);
  end;

  procedure GotoPreceedingEOLN(ForItem: boolean = False);
  var
    vOrigPos: integer;

  const
    NMinRowChars = 17;//Length('3-13-13,1,1,1,1,1')

  begin
    //assumes will not hit SOF
    //assumes ending CRLF taken care of by other places
    vOrigPos := AStream.Position;

    vPossDelimChars := True;

    while (NextChars(2) <> #13#10) or (AStream.Position = vOrigPos) do
      if (Length(NextChars(2)) = 2) and (NextChars(2)[2] = #10) and
        (AStream.Position < vOrigPos - SizeOf(FileChar)) then
          begin
            GotoNextChars(1);

            Exit;
          end else
        if (AStream.Position = vOrigPos) and ForItem then
          GotoPrevChars(NMinRowChars) else
            GotoPrevChars(1);
  end;

  var
    CurField: string;
    CurCol: integer;

  procedure InitParsingState;
  begin
    //Initialize Parsing State
    CurCol := -1;
    vPossDelimChars := True;
    SkipExistingDelimChars;
    vStreamSize := AStream.Size;
  end;

  procedure BacktrackTo(APos: integer; ASafeMode: boolean = False);
  begin
    if ASafeMode then
      AStream.Seek(Pred(APos), soFromBeginning) else
        AStream.Seek(APos, soFromBeginning);

    ReadNextChar;
    vPossDelimChars := False;
    CurCol := Ord(High(TFieldType));
  end;

  procedure ReadQuotedText;
  var
    vHadPrevQuoteChar: boolean;
  begin
    vHadPrevQuoteChar := False;
    while MRReadChar = '"' do
      begin
        if vHadPrevQuoteChar then
          CurField := CurField + MRReadChar;
        ReadNextChar;

        while MRReadChar <> '"' do
          begin
            CurField := CurField + MRReadChar;
            ReadNextChar;
          end;

        if EOF then
          break;

        ReadNextChar;
        vHadPrevQuoteChar := True;
      end;
  end;

  procedure GetNextFieldValue;
  begin
    if EOF then Exit;

    CurCol := (CurCol+1) mod Succ(Ord(High(TFieldType)));
    CurField := '';
    if MRReadChar = '"' then
      ReadQuotedText else
        begin
          repeat
            CurField := CurField + MRReadChar;
            if not EOF then
              ReadNextChar;
          until EOF or (MRReadChar in CSV_DELIM_CHARSET);
          if EOF then
            if not (MRReadChar in CSV_DELIM_CHARSET) then
              CurField := CurField + MRReadChar;
        end;
    vPossDelimChars := True;

    SkipExistingDelimChars;
  end;

  var
    ColFieldTypes: array [Ord(Low(TFieldType))..Ord(High(TFieldType))] of TFieldType;

  procedure ResolveCurColFieldType;
  var
    vField: string;
  begin
    vField := LowerCase(CurField);
    if vField = 'date' then
      ColFieldTypes[CurCol] := ftDate else
    if vField = 'open' then
      ColFieldTypes[CurCol] := ftOpen else
    if vField = 'high' then
      ColFieldTypes[CurCol] := ftHigh else
    if vField = 'low' then
      ColFieldTypes[CurCol] := ftLow else
    if vField = 'close' then
      ColFieldTypes[CurCol] := ftClose else
    if vField = 'volume' then
      ColFieldTypes[CurCol] := ftVolume else
    if Pos('close', vField) > 0 then
      ColFieldTypes[CurCol] := ftAdjClose else
        raise EInvalidOperation.Create('Unrecognized file format: unrecognized column name found.');
  end;

  procedure WriteItemAsFieldValue(var AData: TDayIndexData);
  begin
    case ColFieldTypes[CurCol] of
      ftDate:AData.Date := ExStrToDate(CurField);
      ftOpen:AData.Open := StrToFloat(CurField);
      ftHigh:AData.High := StrToFloat(CurField);
      ftLow:AData.Low := StrToFloat(CurField);
      ftClose:AData.Close := StrToFloat(CurField);
      ftVolume:AData.Volume := StrToFloat(CurField);
      ftAdjClose:AData.AdjClose := StrToFloat(CurField);
    end;
  end;

  procedure VerifyFields;
  var
    iField: TFieldType;
    iColumn: integer;

    IsUsedFlags: array [Low(TFieldType)..High(TFieldType)] of boolean;

  begin
    //* Set all to false
    for iField := Low(TFieldType) to High(TFieldType) do
      IsUsedFlags[iField] := False;

    //* set found to true
    for iColumn := Low(ColFieldTypes) to High(ColFieldTypes) do
      IsUsedFlags[ColFieldTypes[iColumn]] := True;

    //* throw error on first one not found
    for iField := Low(TFieldType) to High(TFieldType) do
      if not IsUsedFlags[iField] then
        begin
          raise EInvalidOperation.Create('Bad file format: one or more column names are missing!');
          break;
        end;
  end;

  procedure LoadHeader;
  var
    iField: TFieldType;

  begin
    for iField := Low(TFieldType) to High(TFieldType) do
      begin
        GetNextFieldValue;
        ResolveCurColFieldType;
      end;

    VerifyFields;

    if EOF then
      raise EInvalidOperation.Create('Cannot complete shallow Equity New High Info Retrieval: Not enough Data')
  end;

  procedure LoadRowInto(var ADayData: TDayIndexData);
  var
    iField: TFieldType;
  begin
    for iField := Low(TFieldType) to High(TFieldType) do
      begin
        GetNextFieldValue;
        WriteItemAsFieldValue(ADayData);
      end;
  end;

  var
    OrderReversed: boolean;

    vTopDay,
    vBottomDay,

    vFirstDay,
    vEarlierDay,
    vLastDay: TDayIndexData;

    vBeginDate: TDate;

    vBeforeLastDayPos,
    vFirstDayPos,
    vAfterFirstDayPos: integer;

  function HasUnprocessedDays: boolean;
  begin
    //** use Position of stream because we don't always have the first day in the
    //   file, due to optimization
    Result := (
      ((AStream.Position > vFirstDayPos) and not OrderReversed) or

      (((AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
        (AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#10)))
       and OrderReversed));
  end;

  function NotYetCoveredTimePeriod: boolean;
  begin
    Result :=
      (ARatePeriod.PeriodStr = AllDataPeriodStr)
      or
      (
        (ARatePeriod.PeriodStr <> AllDataPeriodStr) and
        (vEarlierDay.Date >= vBeginDate)
      );
  end;

  function FoundAllNeededData: boolean;
  begin
    Result := (
      (ARatePeriod.PeriodStr <> AllDataPeriodStr) and
      (vEarlierDay.Date <= vBeginDate)
    ) or
    (ARatePeriod.PeriodStr = AllDataPeriodStr);
  end;

  procedure GotoLastDay;
  begin
    //** Goto End of File
    GotoEOF;

    //** Goto Just before Last Day
    GotoPreceedingEOLN;
    if (AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
      (AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#10)) then
        GotoPreceedingEOLN;

    SkipExistingDelimChars;
  end;

  procedure DetermineDataOrder;
  begin
    //#ASSUMPTION: assume end day at BOTTOM of file if latest data less than 2 days ago
    //Problem when NDays = 2 ?

    if Trunc(Now) - Trunc(vBottomDay.Date) >= 2 then
      begin
        //** Get Top Day
        BacktrackTo(vFirstDayPos, True);
        LoadRowInto(vTopDay);

        //** Determine what order the data is in
        OrderReversed := vBottomDay.Date < vTopDay.Date;

        if not OrderReversed then
          BacktrackTo(vBeforeLastDayPos, True);

        if OrderReversed then
          vFirstDay := vBottomDay else
            vFirstDay := vTopDay;

        if OrderReversed then
          vLastDay := vTopDay else
            vLastDay := vBottomDay;
      end else
        begin
          OrderReversed := False;

          //vLastDay := vTopDay;
          vLastDay := vBottomDay;
        end;
  end;

  procedure LoadPrevRow;
  var
    vBeforeDayPos: integer;

  begin
    GotoPreceedingEOLN(True);

    vBeforeDayPos := AStream.Position;

    SkipExistingDelimChars;
    LoadRowInto(vEarlierDay);

    AStream.Seek(vBeforeDayPos, soFromBeginning);
  end;

begin
  //* Initialize
  Result.Success := False;
  AStream.Seek(0, soFromBeginning);
  InitParsingState;

  //** Load CSV Header
  LoadHeader;
  vFirstDayPos := AStream.Position;

  //** Get Last Day
  GotoLastDay;
  vBeforeLastDayPos := AStream.Position;
  LoadRowInto(vBottomDay);

  //** IF Only 1 Data Day:
  if vFirstDayPos = vBeforeLastDayPos then
    begin
      //return results
      Result.Success := True;
      Result.High := vBottomDay.High;
      Exit;
    end;

  //** Go back to Last Day in File
  BacktrackTo(vBeforeLastDayPos);

  //** Determine what order the data is in
  DetermineDataOrder;

  //** Determine Date to scan back to if opted for
  if ARatePeriod.PeriodStr <> AllDataPeriodStr then
    vBeginDate := MoveDateBack(vLastDay.Date, ARatePeriod.TimePeriod);

  //* Initialize Loop Variables
  Result.High := vLastDay.High;
  vEarlierDay := vLastDay;

  while HasUnProcessedDays and NotYetCoveredTimePeriod do
    begin
      //** Goto Previous Day Row
      if OrderReversed then
        LoadRowInto(vEarlierDay) else
          LoadPrevRow;

      //** Update High
      if NotYetCoveredTimePeriod then
        Result.High := Max(Result.High, vEarlierDay.High);
    end;

  Result.Success := FoundAllNeededData;
end;

end.

      

Below is an example CSV. Note that sometimes the CSV positions are in reverse order in the file (last date first).

Date,Open,High,Low,Close,Volume,Adj Close
11/3/2014,12,12.06,11.75,11.98,19700,11.98
11/4/2014,12,12,10.62,11.55,39200,11.55
11/5/2014,11.6,11.85,11.6,11.85,3100,11.85
11/6/2014,11.85,11.85,11.85,11.85,0,11.85
11/7/2014,11.5,11.5,10.35,11,35900,11
11/10/2014,11.12,11.12,11.12,11.12,200,11.12
11/11/2014,11.5,11.5,11.5,11.5,200,11.5
11/12/2014,11.75,11.85,11.15,11.45,3500,11.45
11/13/2014,11.45,11.45,11.45,11.45,0,11.45
11/14/2014,11.45,11.45,11.45,11.45,0,11.45
11/17/2014,11.07,11.28,11.07,11.28,1600,11.28
11/18/2014,11.07,11.74,11.06,11.74,8100,11.74
11/19/2014,11.1,11.5,11,11.5,11600,11.5
11/20/2014,11.1,11.5,11.1,11.5,3100,11.5
11/21/2014,11.49,11.5,11.23,11.25,15100,11.25
11/24/2014,11.25,11.35,11.25,11.25,900,11.25
11/25/2014,11.48,11.5,11.25,11.5,355300,11.5
11/26/2014,11.75,11.75,11.5,11.5,261300,11.5
11/28/2014,11.75,11.8,11.75,11.8,16300,11.8
12/1/2014,11.25,11.8,11.02,11.5,23800,11.5
12/2/2014,11.6,11.6,11.47,11.5,57600,11.5
12/3/2014,11.57,11.75,11.41,11.69,240700,11.69
12/4/2014,11.74,11.75,11.49,11.65,41100,11.65
12/5/2014,11.65,11.85,11.56,11.8,267200,11.8
12/8/2014,11.8,11.85,11.68,11.8,168700,11.8

      

+3


source to share


1 answer


First try taking Intel Threading Building Blocks as your memory manager. It scales pretty well to at least 16 cores (I had a similar problem in Why does multithreaded memory allocate / dealass an intensive application not scale with the number of threads?

In general, even when using Intel TBB in the main thread's run loop, memory should not be dynamically allocated / deallocated. These operations are always very bad.

  • Allocate enough memory (fixed) to process before the start of the loop and free the end of the loop.
  • The inner read / process loop simply restricts operations to pointer binding. Memory copying should only be done when needed.


The input data can be shared between different hard drives, even better if connected to different controllers. The input can be mapped to memory if you know the size in advance, i.e. Before processing in the flow circuit.

Optimize the handling of one thread as much as possible (profiling), then identify the parts that don't scale with the number of threads (number of threads as a parameter). These parts need to be rewritten. Read / output operations can be cached and / or read data in n * cluster fragments.

These fairly general guidelines are based on my experience while processing TB size input on Windows 7 (up to 12 ht cores and up to 128 GB of RAM) in parallel on threads.

0


source







All Articles