Reusing the results of the first calculation in the second calculation

I am trying to write a computation in Flink that requires two phases.

In the first step, I start with a text file and do some parameter estimation, resulting in a Java object that represents the statistical data model.

In the second phase, I would like to use this object to generate data for modeling.

I'm not sure how to do this. I tried with LocalCollectionOutputFormat

and it works locally, but when I deploy the job to the cluster I get NullPointerException

- which is not surprising.

What is the Flink way?

Here is my code:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GlobalConfiguration.includeConfiguration(configuration);

// Phase 1: read file and estimate model
DataSource<Tuple4<String, String, String, String>> source = env
        .readCsvFile(args[0])
        .types(String.class, String.class, String.class, String.class);

List<Tuple4<Bayes, Bayes, Bayes, Bayes>> bayesResult = new ArrayList<>();
// Processing here...
....output(new LocalCollectionOutputFormat<>(bayesResult));

env.execute("Bayes");

DataSet<BTP> btp = env
        .createInput(new BayesInputFormat(bayesResult.get(0)))
// Phase 2: BayesInputFormat generates data for further calculations
// ....

      

This is the exception I am getting:

Error: The program execution failed: java.lang.NullPointerException
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
    at java.lang.Thread.run(Thread.java:745)

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.NullPointerException
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
    at java.lang.Thread.run(Thread.java:745)

    at org.apache.flink.client.program.Client.run(Client.java:328)
    at org.apache.flink.client.program.Client.run(Client.java:294)
    at org.apache.flink.client.program.Client.run(Client.java:288)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:55)
    at it.list.flink.test.Test01.main(Test01.java:62)
    ...

      

+3


source to share


2 answers


With the latest version (0.9-milestone-1) to Flink

method was added collect()

public List<T> collect()

      



which extracts DataSet<T>

both List<T>

to the driver program. collect()

will also cause immediate program execution (no need to call ExecutionEnvironment.execute()

). There is currently a size limit for datasets of around 10 MB.

If you are not evaluating the models in the driver program, you can also merge both programs together and fix the model aside by plugging in the data sink. This will be more efficient because the data will not travel back down the client machine.

+3


source


If you are using Flink prior to 0.9, you can use the following snippet to collect your dataset into a local collection:

val dataJavaList = new ArrayList[K]
val outputFormat = new LocalCollectionOutputFormat[K](dataJavaList)
dataset.output(outputFormat)
env.execute("collect()")

      



Where K

is the type of object you want to collect

0


source







All Articles