Using Apache Camel ProducerTemplate in Apache Storm bolt

I am trying to write a simple Storm + Camel project. My Storm topology parses tweets and one bolt is to send a twist text to the apache camel route, which in turn uses websocket to notify some web applications.

I can't get it to work due to NotSerializableExceptions received from bolts when trying to use build after CamelContext.

What I've already tried:

  • pass CamelContext in bolt constructor - result of NotSerializableException
  • pass CamelContext to storm conf and use it in bolt prepare (...) method to access it. Results in:

    14484 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread [main, 5, main] has died java.lang.IllegalArgumentException: Topology conf is not json-serializable at backtype.storm.testing $ submit_local_topology.invoke ( testing.clj: 262) ~ [storm-core-0.9.4.jar: 0.9.4] at backtype.storm.LocalCluster $ _submitTopology.invoke (LocalCluster.clj: 43) ~ [storm-core-0.9.4.jar : 0.9.4] at backtype.storm.LocalCluster.submitTopology (Unknown source) ~ [storm-core-0.9.4.jar: 0.9.4]

Camel route:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:main")
                .to("websocket:localhost:8085/main?sendToAll=true");
    }
}

      

Storm topology: Tweet Spout extends tweets with twitter4j stremaing API.

public class TwitterStreamTopology {

    public static void main(String[] args) {
        CamelContext producerTemplate = new RouteStarter().buildRoute();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
        builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
        Config conf = new Config();
        conf.put("producerTemplate", producerTemplate.createProducerTemplate());
        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("mytopology", conf, builder.createTopology());

        Utils.sleep(20000);
        cluster.shutdown();
    }
}

      

WebsocketBolt:

public class WebSocketBolt extends BaseBasicBolt {
    private ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
    }
}

      

Is there a way to do this beautifully?

Or should I make the camel route access over http and create some HttpClient method in the bolt preparation method (...)? This still looks a little overkill and there must be a way to make it easier.

Thanks for the help!

+3


source to share


1 answer


The root cause of your problem is that you are adding ProducerTemplate to your storm config and it is throwing an exception because it is not serializable. If this is your own class, you can change the code to make it work, but since this is a Camel class, I would recommend a different approach.

  • WebSocketBolt: Change your manufacturerTemplate private member to transition: private transient ProducerTemplate producerTemplate;

    so it doesn't try to serialize (same problem you faced in conf).
  • WebSocketBolt: Initialize the producer pattern in your provisioning method, not in your topology.


Something like that:

public class WebSocketBolt extends BaseBasicBolt {
    private transient ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        CamelContext producerTemplate = new RouteStarter().buildRoute();
        this.producerTemplate = producerTemplate.createProducerTemplate();
    }
}

      

+3


source







All Articles