Using Kafka to Import Data into Hadoop
First, I thought I should use Kafka or Flume to get events in Hadoop where they will be stored and analyzed periodically (perhaps using Ooozie to schedule periodic analysis) Kafka or Flume, and decided that Kafka is probably better solution, since we also have a component that handles events, so this way both packet and event handling components receive data in the same way.
But know that I am looking for specific suggestions on how to get data from a broker into Hadoop.
I found here that Flume can be used in conjunction with Kafka
- Flume - Contains the source of Kafka (consumer) and sink (producer).
And also found on the same page and in the Kafka documentation that there is something called Camus
- Camus - LinkedIn Kafka => HDFS pipeline. This one is used for all data on LinkedIn and works great.
I'm wondering what would be a better (and simpler, better documented solution) to do this? Also, are there any examples or tutorials on how to do this?
When should I use this option for a simpler high level user?
I open suggestions if there is another / better solution than two.
thank
source to share
You can use flume to transfer data from Kafka to HDFS. The stream has a source and a kafka shell. It's a matter of modifying the properties file. An example is shown below.
Steps:
-
Create a kafka theme
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic testkafka
-
Subscribe to the above theme using the kafka console producer
kafka-console-producer --broker-list localhost:9092 --topic testkafka
-
Configure the flume agent with the following properties
flume1.sources = kafka-source-1
flume1.channels = hdfs-channel-1
flume1.sinks = hdfs-sink-1
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = localhost: 2181
flume1.sources.kafka-source-1.topic = testkafka
flume1.sources.kafka-source-1.batchSize = 100
flume1.sources.kafka-source-1 .channels = hdfs-channel-1
flume1.channels.hdfs-channel-1.type = memory
flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink -1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test events
flume1.sinks.hdfs-sink-1.hdfs .useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = / tmp / kafka /% {topic} /% y-% m-% d
flume1.sinks.hdfs-sink 1.hdfs.rollCount = 100
flume1.sinks.hdfs-shell 1.hdfs.rollSize = 0
flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
Save the above config file as example.conf
- Run the feeder
flume-ng agent -n flume1 -c conf -f example.conf - Dflume.root.logger=INFO,console
The data will now be deleted to the HDFS location by the following path
/ Tmp / kafka /% {theme} /% Y-% m-% d
source to share