How to implement getAvailable transformer on scalaz stream

Short version:

I would like to implement a function that returns a transformer awaiting an "emitted" block of values.

The function I mean will have the following signature:

/**
 * The `Process1` which awaits the next "effect" to occur and passes all values emitted by
 * this effect to `rcv` to determine the next state.
 */
def receiveBlock[I, O](rcv: Vector[I] => Process1[I,O]): Process1[I,O] = ???

      

More details:

I understand that then I could use this function to implement the following function, which I think would be quite useful:

/**
  * Groups inputs into chunks of dynamic size based on the various effects
  * that back emitted values.
  *
  * @example {{{
  * val numberTask = Task.delay(1)
  * val listOfNumbersTask = Task.delay(List(5,6,7))
  * val sample = Process.eval(numberTask) ++ Process(2,3,4) ++ Process.await(listOfNumbersTask)(xs => Process.emitAll(xs))
  * sample.chunkByEffect.runLog.run should be List(Vector(1), Vector(2,3,4), Vector(5,6,7))
  * }}}
  */
  def chunkByEffect[I]: Process1[I, Vector[I]] = {
    receiveBlock(vec => emit(vec) ++ chunkByEffect)
  }

      

[Refresh] More

My ultimate goal (slightly simplified) is to implement the following function:

/**
 * Transforms a stream of audio into a stream of text.
 */
voiceRecognition(audio: Process[Task, Byte]): Process[Task, String]

      

The function makes an external call to the voice recognition service. Thus, it is unreasonable to make a network call for each individual Byte

in a thread. I need to concatenate the bytes together before making a network call. I could have done audio

a Process[Task, ByteVector]

, but this requires the test code to know the maximum block size supported by the function, I would rather have this control done by the function itself. Also, when this service is used inside a service, the service itself will receive network calls with a given audio size, I would like the function to chunkXXX

be smart about chunking so that it does not hold onto data which is already available.

Basically, the audio stream coming from the network will be in the form Process[Task, ByteVector]

and will be translated Process[Task, Byte]

to flatMap(Process.emitAll(_))

. However, the test code will directly produce Process[Task, Byte]

and feed it to voiceRecognition

. In theory, I believe it should be possible if an appropriate combinator could provide an implementation voiceRecognition

that does the right thing on both of these streams, and I think the function chunkByEffect

above is the key to that. Now I understand that I need a chunkByEffect function for the min

and parameter max

, which sets the minimum and maximum fragmentation size regardless of the underlying Task

byte-generating one.

+3


source to share


2 answers


I guess the answer at this point is that it is really difficult or impossible to accomplish in scalaz-stream

. The new version of this library is called fs2

and has first-class "chunking" support, which I mostly looked for here.



0


source


You need to separate your bytes somehow. I suggest working with some higher level abstraction over the byte stream, i.e. ByteVector.

Then you might have to go through a manual process1, which is similarly implemented process1.chunkBy

, only it runs on ByteVector. those.

def chunkBy(separator:ByteVector): Process1[ByteVector, ByteVector] = {
  def go(acc: ByteVector): Process1[ByteVector, ByteVector] =
    receive1Or[ByteVector,ByteVector](emit(acc)) { i =>
       // implement searching of separator in accumulated + new bytes
       ???
    }
  go(ByteVector.empty)
}

      



Then it will connect it all together

val speech: Process[Task,ByteVector] = ???
def chunkByWhatever: Process1[ByteVector,ByteVector] = ??? 
val recognizer: Channel[Task,ByteVector,String] = ???

//this shall do the trick
speech.pipe(chunkByWhatever).through(recognizer)

      

+1


source







All Articles