Importing an avro schema into Scala
I am writing a simple Twitter program where I am reading Tweets using Kafka and want to use Avro to serialize. So far, I've just set up a twitter config in Scala and now want to read tweets using that config.
How do I import the following avro schema as defined in the tweets.avsc file in my program?
{
"namespace": "tweetavro",
"type": "record",
"name": "Tweet",
"fields": [
{"name": "name", "type": "string"},
{"name": "text", "type": "string"}
]
}
I have provided some examples on the internet that show something like import tweetavro.Tweet
to import a schema into Scala so that we can use it like
def main (args: Array[String]) {
val twitterStream = TwitterStream.getStream
twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s))))
twitterStream.filter(filterUsOnly)
}
private def toTweet(s: Status): Tweet = {
new Tweet(s.getUser.getName, s.getText)
}
private def sendToKafka(t:Tweet) {
println(toJson(t.getSchema).apply(t))
val tweetEnc = toBinary[Tweet].apply(t)
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc)
kafkaProducer.send(msg)
}
I follow the same and use the following plugins below in pom.xml
<!-- AVRO MAVEN PLUGIN -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.7</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/scala/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- MAVEN COMPILER PLUGIN -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
Doing it all I still can't do import tweetavro.Tweet
Can anayone help?
Thank!
source to share
You must first compile this schema into a class. I'm not sure if there is a library for Avro in Scala that is production ready, but you can generate a class for Java and use it in Scala:
java -jar /path/to/avro-tools-1.7.7.jar compile schema tweet.avsc .
Modify this line to suit your needs and you should get the tweetavro.Tweet class generated by this tool. Then you can put it in your project and use it as you just described.
More details here
upd: FYI it seems like there is a library in Scala , but I have never used it before
source to share
You can also use avro4s . Define your case class (or generate one) based on the schema. Let's call this class Tweet
. Then you create AvroOutputStream
which also infers the schema from the case class and is used to serialize the instances. We can then write to the byte array and send that to the kafka. For example:
val tweet: Tweet= ... // the instance you want to serialize
val out = new ByteArrayOutputStream // we collect the serialized output in this
val avro = AvroOutputStream[Tweet](out) // you specify the type here as well
avro.write(tweet)
avro.close()
val bytes = out.toByteArray
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, bytes)
kafkaProducer.send(msg)
source to share
I recommend using Avrohugger. It's a new guy on the block in terms of the Scala classes for Avro, but it supports everything I need, and I really love that it's not macro-based, so I can actually see what's being generated.
The maintainer was awesome to work with and very accepting of contributions and feedback. It is not and will probably never be as functional as the official Java code, but it will suit most people's needs.
There is currently no support for unions (other than optional types) and recursive types.
The SBT plugin works really well and there is a new web interface if you want to quickly see what it does with your Avro schemas:
https://avro2caseclass.herokuapp.com/
More details here:
source to share