How do you parse a 150g csv file using Elixir? A shell? SQL database?

Problem. I have a 150GB csv file with headers and the same number of columns for each row. I only want the first column, minus the headers, and only unique items. The csv cannot be on my local machine as I have no space. It is located at Apple Airport. I will try to connect with a USB cable.

I've tried internet for a solution for about 3 days now. I've heard a couple of solutions but don't know which one works best. Which one is better and why?

  • Shell: I've heard that I can do it with a shell, but I have no experience with writing a shell in this area

  • Python script: I created a script but gave up after running for 4 hours. Perhaps this is due to the fact that I was accessing it via Wi-Fi.

  • Elixir: I am currently researching an elixir and I was told that Flow would be a good choice to share my work on my processor while I read new information. Comparison of a stream with a stream. on a 1 million list of similar data, it took 8 seconds with Stream and 2 seconds with Flow to get all the unique items in the file.

    def stream_parse(file_path, chunk_size) do file_path |> File.stream! |> Stream.drop(1) |> Stream.map(&String.split(&1, ",") |> List.first) |> Stream.chunk(chunk_size, chunk_size, []) |> Stream.map(&MapSet.new(&1)) |> Enum.to_list end

    def flow_parse(file_path, chunk_size) do file_path |> File.stream!(read_ahead: chunk_size) |> Stream.drop(1) |> Flow.from_enumerable |> Flow.map(&String.split(&1, ",") |> List.first) |> Flow.partition |> Flow.uniq |> Enum.to_list end

I don't have much trouble solving a thread, although it has high memory, uses 1 thread, and runs on one core. The thread solution is multithreaded, uses multiple cores, but has the problem of making everything finally one Enum.to_list

, which might turn out to be for those who know how long

  • SQL Server: In the end I was told to just create a linux server and get SQL and load the data into sql and run a query to get the data.

What is the best approach, and if so, is there an even better solution. Besides writing C.

Edit 1 12/6/2017. d / m / y

I was able to end the thread and example streams with an elixir. I was also provided with a wrapper script that does the required results. So far, the shell script and the thread are running at the same speed with the thread gain However, since it is not local to my machine thread, it won't make any difference, because I am tied to IO.

def stream_parse(file_path, chunk_size, output_file) do
  file_path
    |> File.stream!(read_ahead: chunk_size)
    |> Stream.drop(1)
    |> Stream.map(&String.split(&1, ",") |> List.first)
    |> Stream.uniq
    |> Stream.map(&"#{&1}\n")
    |> Stream.into(File.stream!(output_file, [:write, :utf8]))
    |> Stream.run
end

      

However, this does not allow writing a result file for each chunk and will store unique items for only 150g in memory (not an option).

Wrapper script (also keeps all unique elements in memory)

tail -n +2 my.csv | cut -d , -f 1 | sort -u > OUTPUT.csv

      

+3


source to share


1 answer


While finally searching many forums, elixir slack channel. Finally, we came to a solution. First it was necessary to split the file, since there is already a shell command for this, there is no need to complicate the elixir code. I've broken it down into methods to better explain what's going on.

Split file into 10 million subscripts

$ mkdir split-files 
$ split -a 8 -l 10000000 big_file.csv ./split-files
$ cd split-files 
$ for f in *; do mv "$f" "$f.csv"; done

      

Then we needed to get unique elements from each file and write a unique file to output. I can actually use Flow.uniq as the chunk_size will be 10 million which can fit into memory.

def flow_parse_dir(path, chunk_size) do
  Path.wildcard(path <> "/*.csv")
    |> Flow.from_enumerable
    |> Flow.map(fn filename ->
        [dir, file] = String.split(filename,"/")
        flow_parse(filename, chunk_size, dir <> "/unique_"<> file)
      end)
    |> Flow.run
end
def flow_parse(file_path, chunk_size, output_file) do
  file_path
    |> File.stream!(read_ahead: chunk_size)
    |> Stream.drop(1)
    |> Flow.from_enumerable
    |> Flow.map(&String.split(&1, ",") |> List.first)
    |> Flow.partition
    |> Flow.uniq
    |> Flow.map(&"#{&1}\n")
    |> Stream.into(File.stream!(output_file, [:write, :utf8]))
    |> Stream.run
end

      

After creating all unique files, we need to create a complete unique file.



def concat_files(path, totol_unique_file_name) do
  sum_file =  File.open!(path <> "/" <> totol_unique_file_name, [:read, :utf8, :write])

  Path.wildcard(path <> "/*.csv")
    |> Stream.map(fn filename ->
        [_, file] = String.split(filename, "/")
        if String.contains?(file, "unique") do
          write_concat_of_unique_files(file, path, sum_file)
        end
      end)
    |> Stream.run

  File.close(sum_file)
end
def write_concat_of_unique_files(file, path, totol_unique_file_name) do
  # read in file contents line by line
  path <> "/" <> file
    |> File.stream!()
    |> Stream.map(&String.trim(&1,"\n"))
    |> Stream.map(fn line ->
        IO.puts(totol_unique_file_name, line)
      end)
    |> Stream.run
end

      

Finally, there is a method at the end that should finish the job.

def unique_column(file_path, chunk_size, output) do
  total_file = File.open!(output, [:read, :utf8, :write])

  file_path
    |> File.stream!(read_ahead: chunk_size)
    |> Stream.map(&String.trim(&1,"\n"))
    |> Stream.chunk(chunk_size, chunk_size, [])
    |> Flow.from_enumerable
    |> Flow.map(fn chunk ->
        chunk
          |> MapSet.new
          |> MapSet.to_list
          |> List.flatten
      end)
    |> Flow.partition
    |> Flow.map(fn line ->
        Enum.map(line, fn item ->
            IO.puts(total_file, item)
          end)
        end)
    |> Flow.run

  File.close(total_file)
end

      

Check if the final file is completely unique. It was found that the number of unique elements from previous files is not very large and fits completely in memory. If the content is unique, you will receive a list as a return. If you are wrong, this is not unique.

def check_unique(file_path) do
  original_length = file_path
    |> File.stream!
    |> Enum.to_list

  unique_length = file_path
    |> File.stream!
    |> Stream.uniq
    |> Enum.to_list

  ^unique_length = original_length
end

      

+2


source







All Articles