Sparking application development
so the problem I'm trying to solve is this:
- I need a data source that emits messages at a specific frequency
- There are N neural networks that must process each message separately
- The outputs of all neural networks are aggregated and only when all N outputs for each message are collected, if the message is declared fully processed
- At the end, I have to measure the time it takes to fully process the message (the time between when it was emitted and when all N neural network outputs from that message were collected)
I am curious how one can approach such a problem using a spark flow.
My current implementation uses 3 types of components: a custom receiver and two classes that implement the function, one for neural networks, one for the end aggregator.
In broad strokes, my application is structured like this:
JavaReceiverInputDStream<...> rndLists = jssc.receiverStream(new JavaRandomReceiver(...));
Function<JavaRDD<...>, Void> aggregator = new JavaSyncBarrier(numberOfNets);
for(int i = 0; i < numberOfNets; i++){
rndLists.map(new NeuralNetMapper(neuralNetConfig)).foreachRDD(aggregator);
}
The main problem I am facing is that it is faster in local mode than when sending to a 4-node cluster.
Is my implementation wrong to start with or is something else going on here?
There is also a full post here http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-td12893.html with more details on the implementation of each of the three components mentioned earlier.
source to share
It seems like there can be a lot of duplicate instances and serialization of objects. A later version can hit your cluster performance.
You should only try to create your neural networks once. You will need to ensure that they are serialized. You should use + flatMap
instead of multiple . Something like this:map
union
// Initialize neural net first
List<NeuralNetMapper> neuralNetMappers = new ArrayList<>(numberOfNets);
for(int i = 0; i < numberOfNets; i++){
neuralNetMappers.add(new NeuralNetMapper(neuralNetConfig));
}
// Then create a DStream applying all of them
JavaDStream<Result> neuralNetResults = rndLists.flatMap(new FlatMapFunction<Item, Result>() {
@Override
public Iterable<Result> call(Item item) {
List<Result> results = new ArrayList<>(numberOfNets);
for (int i = 0; i < numberOfNets; i++) {
results.add(neuralNetMappers.get(i).doYourNeuralNetStuff(item));
}
return results;
}
});
// The aggregation stuff
neuralNetResults.foreachRDD(aggregator);
If you can afford to initialize networks this way, you can save quite a lot of time. Also, the stuff union
you've included in your related posts seems like overkill and punishes your productivity: flatMap
will do.
Finally, you can use the Kryo serializer to further tune performance on the cluster .
source to share