Parallel processing in pipeline flow

I really like the concept of conduit / pipes for applying streaming I / O source operations. I am interested in creating tools that work with very large log files. One of the attractions of switching to Haskell from Python / Ruby is the easier way to write parallel code, but I can't find any documentation on that. How could I set up a pipe stream that reads lines from a file and runs on them in parallel (i.e. with 8 cores, it has to read eight lines and pipe them to eight different threads to be processed and then reassembled, etc.) etc.), ideally with as little "ceremony" as possible ...

Optionally, it could be noted whether the lines should be connected in order or not, if this could affect the speed of the process?

I'm sure it would be possible to tempt something with yourself using ideas from the book Parallel Haskell, but it seems to me that running a pure function in parallel (parmap, etc.) in the middle of a Conduit workflow should be very easy?

+3


source to share


1 answer


As an example of the "internal parallelism" mentioned by Petr Pudlak in his comment, consider this function (I use pipes

, but can be implemented with conduit

just as easily):

import Control.Monad
import Control.Lens (view)
import Control.Concurrent.Async (mapConcurrently)
import Pipes
import qualified Pipes.Group as G
import qualified Control.Foldl as L

concProd :: Int -> (a -> IO b) -> Producer a IO r -> Producer b IO r
concProd groupsize action producer = 
      L.purely G.folds L.list (view (G.chunksOf groupsize) producer)
      >->
      forever (await >>= liftIO . mapConcurrently action >>= mapM G.yield) 

      

This function takes as parameters the size of the group, the action we want to run for each value of the type a

, and Producer

values a

.



He returns a new one Producer

. Internally, the manufacturer reads the values a

in batches groupsize

, processes them simultaneously, and gives the results one by one.

The code is used Pipes.Group

to "split" the original producer into size sub-producers groupsize

and then Control.Foldl

"dump" each sub-producer to the list.

For more complex tasks, you can refer to the asynchronous pipes provided by pipes-concurrency

or stm-conduit

. But they are getting you out of the "one pipeline" vanilla pipe / piping worldview.

+6


source







All Articles