An example of using skalaz-flow
In the following usage example scalaz-stream
(taken from the documentation ), what do I need to change if the input and / or output is a gzip-enabled file? In other words, how to use compress
?
import scalaz.stream._
import scalaz.concurrent.Task
val converter: Task[Unit] =
io.linesR("testdata/fahrenheit.txt")
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.pipe(text.utf8Encode)
.to(io.fileChunkW("testdata/celsius.txt"))
.run
// at the end of the universe...
val u: Unit = converter.run
source to share
Compressing the output is very simple. Since compress.deflate()
- this Process1[ByteVector, ByteVector]
, you need to hook it up to the pipeline where you emit ByteVector
(this is right after text.utf8Encode
that is Process1[String, ByteVector]
):
val converter: Task[Unit] =
io.linesR("testdata/fahrenheit.txt")
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.pipe(text.utf8Encode)
.pipe(compress.deflate())
.to(io.fileChunkW("testdata/celsius.zip"))
.run
For inflate
you cannot use io.linesR
to read the compressed file. You need a process that creates ByteVector
instead String
in order to pass them to inflate
. (For this you can use io.fileChunkR
.) The next step would be to decode the uncompressed data by String
(eg s text.utf8Decode
) and then using text.lines()
to correct the text line by line. Something like this should do the trick:
val converter: Task[Unit] =
Process.constant(4096).toSource
.through(io.fileChunkR("testdata/fahrenheit.zip"))
.pipe(compress.inflate())
.pipe(text.utf8Decode)
.pipe(text.lines())
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.pipe(text.utf8Encode)
.to(io.fileChunkW("testdata/celsius.txt"))
.run
source to share