Sparking application development

so the problem I'm trying to solve is this:

  • I need a data source that emits messages at a specific frequency
  • There are N neural networks that must process each message separately
  • The outputs of all neural networks are aggregated and only when all N outputs for each message are collected, if the message is declared fully processed
  • At the end, I have to measure the time it takes to fully process the message (the time between when it was emitted and when all N neural network outputs from that message were collected)

I am curious how one can approach such a problem using a spark flow.

My current implementation uses 3 types of components: a custom receiver and two classes that implement the function, one for neural networks, one for the end aggregator.

In broad strokes, my application is structured like this:

JavaReceiverInputDStream<...> rndLists = jssc.receiverStream(new JavaRandomReceiver(...));

Function<JavaRDD<...>, Void> aggregator = new JavaSyncBarrier(numberOfNets);

for(int i = 0; i < numberOfNets; i++){
    rndLists.map(new NeuralNetMapper(neuralNetConfig)).foreachRDD(aggregator);
}

      

The main problem I am facing is that it is faster in local mode than when sending to a 4-node cluster.

Is my implementation wrong to start with or is something else going on here?

There is also a full post here http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-td12893.html with more details on the implementation of each of the three components mentioned earlier.

+3


source to share


1 answer


It seems like there can be a lot of duplicate instances and serialization of objects. A later version can hit your cluster performance.

You should only try to create your neural networks once. You will need to ensure that they are serialized. You should use + flatMap

instead of multiple . Something like this:map

union

// Initialize neural net first
List<NeuralNetMapper> neuralNetMappers = new ArrayList<>(numberOfNets);
for(int i = 0; i < numberOfNets; i++){
    neuralNetMappers.add(new NeuralNetMapper(neuralNetConfig));
}

// Then create a DStream applying all of them
JavaDStream<Result> neuralNetResults = rndLists.flatMap(new FlatMapFunction<Item, Result>() {
    @Override
    public Iterable<Result> call(Item item) {
        List<Result> results = new ArrayList<>(numberOfNets);
        for (int i = 0; i < numberOfNets; i++) {
            results.add(neuralNetMappers.get(i).doYourNeuralNetStuff(item));
        }
        return results;
    }
});

// The aggregation stuff
neuralNetResults.foreachRDD(aggregator);

      



If you can afford to initialize networks this way, you can save quite a lot of time. Also, the stuff union

you've included in your related posts seems like overkill and punishes your productivity: flatMap

will do.

Finally, you can use the Kryo serializer to further tune performance on the cluster .

+5


source







All Articles