Idiomatic prefetching in streaming library

I am working with streaming library but will accept the answer using pipes or conduit.

Let's say I have

import Streaming (Stream, Of)
import qualified Streaming.Prelude as S

streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
  flip fix 0 $ \go thingID ->
    unless (thingID > lastID) $ do
      thing <- highLatencyGet thingID
      S.yield thing
      go (thingID+1)

      

To reduce latency, I would like to fork highLatencyGet

to get the next one Thing

in parallel while processing the previous one Thing

from the consumer.

Obviously I could transform my function above creating a new one MVar

and expanding the next batch before calling yield

etc.

But I want to know if there is an idiomatic (capable) way to do this so that it can be packaged in a library and can be used on arbitrary IO Stream

s. Ideally, we could also tweak the prefetch value, for example:

prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()

      

+3


source to share


1 answer


This solution uses channels, but can be easily adapted to use streaming. To be precise, it requires pipes, pipes - concurrency and asynchronous packages.

It does not work in a "straight" style. Instead of a simple conversion, Producer

it also performs a "bending function" that consumes Producer

. This continuation style is necessary to set up and break the concurrency mechanism.

import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)

prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
    (outbox,inbox,seal) <- spawn' (bounded bufsize)
    let cutcord effect = effect `finally` atomically seal
    runConcurrently $
        Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
        *>
        Concurrently (cutcord (foldfunc (fromInput inbox)))

      



The output of the original producer is redirected to the bounded queue. At the same time, we apply the producer collapsing function to the producer that is reading from the queue.

Whenever each of the parallel actions is executed, we try to quickly close the channel to avoid leaving the other side.

+7


source







All Articles