How to implement getAvailable transformer on scalaz stream
Short version:
I would like to implement a function that returns a transformer awaiting an "emitted" block of values.
The function I mean will have the following signature:
/**
* The `Process1` which awaits the next "effect" to occur and passes all values emitted by
* this effect to `rcv` to determine the next state.
*/
def receiveBlock[I, O](rcv: Vector[I] => Process1[I,O]): Process1[I,O] = ???
More details:
I understand that then I could use this function to implement the following function, which I think would be quite useful:
/**
* Groups inputs into chunks of dynamic size based on the various effects
* that back emitted values.
*
* @example {{{
* val numberTask = Task.delay(1)
* val listOfNumbersTask = Task.delay(List(5,6,7))
* val sample = Process.eval(numberTask) ++ Process(2,3,4) ++ Process.await(listOfNumbersTask)(xs => Process.emitAll(xs))
* sample.chunkByEffect.runLog.run should be List(Vector(1), Vector(2,3,4), Vector(5,6,7))
* }}}
*/
def chunkByEffect[I]: Process1[I, Vector[I]] = {
receiveBlock(vec => emit(vec) ++ chunkByEffect)
}
[Refresh] More
My ultimate goal (slightly simplified) is to implement the following function:
/**
* Transforms a stream of audio into a stream of text.
*/
voiceRecognition(audio: Process[Task, Byte]): Process[Task, String]
The function makes an external call to the voice recognition service. Thus, it is unreasonable to make a network call for each individual Byte
in a thread. I need to concatenate the bytes together before making a network call. I could have done audio
a Process[Task, ByteVector]
, but this requires the test code to know the maximum block size supported by the function, I would rather have this control done by the function itself. Also, when this service is used inside a service, the service itself will receive network calls with a given audio size, I would like the function to chunkXXX
be smart about chunking so that it does not hold onto data which is already available.
Basically, the audio stream coming from the network will be in the form Process[Task, ByteVector]
and will be translated Process[Task, Byte]
to flatMap(Process.emitAll(_))
. However, the test code will directly produce Process[Task, Byte]
and feed it to voiceRecognition
. In theory, I believe it should be possible if an appropriate combinator could provide an implementation voiceRecognition
that does the right thing on both of these streams, and I think the function chunkByEffect
above is the key to that. Now I understand that I need a chunkByEffect function for the min
and parameter max
, which sets the minimum and maximum fragmentation size regardless of the underlying Task
byte-generating one.
source to share
You need to separate your bytes somehow. I suggest working with some higher level abstraction over the byte stream, i.e. ByteVector.
Then you might have to go through a manual process1, which is similarly implemented process1.chunkBy
, only it runs on ByteVector. those.
def chunkBy(separator:ByteVector): Process1[ByteVector, ByteVector] = {
def go(acc: ByteVector): Process1[ByteVector, ByteVector] =
receive1Or[ByteVector,ByteVector](emit(acc)) { i =>
// implement searching of separator in accumulated + new bytes
???
}
go(ByteVector.empty)
}
Then it will connect it all together
val speech: Process[Task,ByteVector] = ???
def chunkByWhatever: Process1[ByteVector,ByteVector] = ???
val recognizer: Channel[Task,ByteVector,String] = ???
//this shall do the trick
speech.pipe(chunkByWhatever).through(recognizer)
source to share