JONON decoding with AKKA Stream
I have Source[ByteString, _]
from an input file with 3 lines like this (actually the input is a TCP socket with a continuous stream):
{"a":[2
33]
}
Now the problem is that I want to parse this into Source[ChangeMessage,_]
, however the only examples I have found deal with when there is a whole JSON message for each line, if each JSON message cannot be split across multiple lines.
One example I found is this library , however it expects }
either ,
as the last character, i.e. one JSON per line. The example below shows this setting.
"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")
val data = FileIO.fromPath(file)
.via(CirceStreamSupport.decode[ChangeMessage])
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectComplete()
}
Another alternative might be to use crease and balance }
and only emit when all the JSON is complete. The problem is that the fold statement only generates when the stream finishes, and since it is a continuous stream, I cannot use it here.
My question is, what's the fastest way to parse JSON streams? in AKKA Stream and is there any software available that already does this? If possible I would like to use circe
source to share
As the knutwalker / akka-stream-json documentation says:
This stream even supports parsing multiple json documents in all the chunks they can fetch, which is great for using stream / sse based APIs.
In your case, all you have to do is simply delimit the incoming byte strings:
"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")
val sourceUnderTest =
FileIO.fromPath(file)
.via(Framing.delimiter(ByteString("\n"), 8192, allowTruncation = true))
.via(CirceStreamSupport.decode[ChangeMessage])
sourceUnderTest
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectNext(ChangeMessage(List(233)))
.expectComplete()
}
This is because when reading from a file, ByteString elements contain multiple lines, and therefore Circe cannot parse invalid jsons. When you split on a new line, each element in the stream is a separate line, and therefore Circe can parse it using the above function.
source to share
Unfortunately, I am not aware of any Scala libraries that support stream-based JSON parsing. It seems to me that some support for this is available in Google Gson, but I'm not entirely sure if it can handle "broken" input correctly.
However, you can collect JSON documents in a streamed manner, similar to what it does Framing.delimiter
. This is very similar to the alternative you mentioned, but it doesn't use fold()
; if you do, you will probably have to emulate what it does Framing.delimiter
, but instead of looking for a single delimiter, you will need to balance the curly braces (and possibly the braces if top-level arrays are possible) buffering intermediate data while will not go through the entire document that you would select as a single piece suitable for parsing.
As a side note, a suitable interface for a streaming JSON parser suitable for use in Akka streams might look like this:
trait Parser {
def update(data: Array[Byte]) // or String
def pull(): Option[Either[Error, JsonEvent]]
}
where pull()
returns None
if he can not read, but the incoming document has no syntax errors, and JsonEvent
- this is a standard for describing the structure of the event parser thread (ie, sealed line with subclasses such as BeginObject
, BeginArray
, EndObject
, EndArray
, String
, etc.). If you find such a library or create one, you can use it to parse the data coming from the Akka ByteString
s stream .
source to share