Example of grouping Storm Fields

I am using kafka storm, kafka is sending / emitting json string to storm, in storm, I want to distribute the load to a pair of workers based on a key / field in json. How to do it? In my case, this is the groupid field in the json string.

For example, I have a Json like this:

{groupid: 1234, userid: 145, comments:"I want to distribute all this group 1234  to one worker", size:50,type:"group json"}
{groupid: 1235, userid: 134, comments:"I want to distribute all this group 1234 to another worker", size:90,type:"group json"}
{groupid: 1234, userid: 158, comments:"I want to be sent to same worker as group 1234", size:50,type:"group json"}   

      

=== Storm 0.9.4. used =====

My source codes are as follows:

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class KafkaBoltMain {
   private static final String SPOUTNAME="TopicSpout"; 
   private static final String ANALYSISBOLT = "AnalysisWorker";
   private static final String CLIENTID = "Storm";
   private static final String TOPOLOGYNAME = "LocalTopology";


   private static class AppAnalysisBolt extends BaseRichBolt {
       private static final long serialVersionUID = -6885792881303198646L;
        private OutputCollector _collector;
       private long groupid=-1L;
       private String log="test";

       public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
           _collector = collector;
       }

       public void execute(Tuple tuple) {
           List<Object> objs = tuple.getValues();
           int i=0;
           for(Object obj:objs){
               System.out.println(""+i+"th object value is:"+obj.toString());
               i++;
           }

//         _collector.emit(new Values(groupid,log));
           _collector.ack(tuple);
       }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("groupid","log"));
        }
   } 

   public static void main(String[] args){
       String zookeepers = null;
       String topicName = null;
       if(args.length == 2 ){
           zookeepers = args[0];
           topicName = args[1];
        }else if(args.length == 1 && args[0].equalsIgnoreCase("help")){ 
           System.out.println("xxxx");
           System.exit(0);
        }
       else{
           System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
           System.out.println("xxxx");
           System.exit(-1);
        }       

        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
                topicName,
                "",// zookeeper root path for offset storing
                CLIENTID);
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SPOUTNAME, kafkaSpout, 1);
        builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt(),2)
            .fieldsGrouping(SPOUTNAME,new Fields("groupid"));

        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGYNAME, conf, builder.createTopology());
    }
}

      

But when I submit the topology it gives the following error:

12794 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='LocalTopology') #<InvalidTopologyException InvalidTopologyException(msg:Component:
 [AnalysisWorker] subscribes from stream: [default] of component [TopicSpout] with non-existent fields: #{"groupid"})>
12800 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

      

Why am I getting a warning message about non-existent files? Any hints?

+3


source to share


1 answer


You need to pull the json attribute from the json object and pass the two values ​​(json object and String groupId) as a two-digit tuple. When you declare a stream as part of the topology specification logic, you must name the second field "groupId" and everything should work fine. If you don't want to modify the Kafka spout, you need to have a mediation bolt whose sole purpose is to split the group from the json object. The intermediate bolt could also use the directed flow method (emitDirect ()) based on the target groupId in the json object.



This is one of the reasons why I don't reuse Kafka's spout - often I want to do something other than just blindly writing data to a stream, but it's neither here nor there.

+2


source







All Articles