How to start a Google data flow pipeline from a Google App Engine application?

I need to run my Dataflow pipeline regularly. The data flow FAQ states the following:

You can automate pipeline execution using Google App Engine or custom (CRON) job processes on GCE. Future versions of the SDK will support command line parameters for finer control over job management.

I tried to start a very simple pipeline from my Java application using this code:

public class MyAnalyticsServlet extends HttpServlet {
    @Override
    public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
        resp.setContentType("text/plain");
        if (req.getRequestURI().equals("/dataflow/test")) {
            DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
            options.setProject("redacted");
            options.setRunner(DataflowPipelineRunner.class);
            Pipeline p = Pipeline.create(options);
            p.apply(TextIO.Read.named("TestInput").from("gs://redacted/test/in.txt"))
                    .apply(new TestTransform())
                    .apply(TextIO.Write.named("TestOutput")
                            .to("gs://redacted/test")
                            .withNumShards(0));
            p.run();
        } else {
            resp.setStatus(404);
            resp.getWriter().println("Not Found");
            return;
        }
        resp.getWriter().println("OK");
    }
}

      

I am getting the following error:

java.lang.IllegalArgumentException: Methods [setRunner(Class), getRunner()] on [com.google.cloud.dataflow.sdk.options.PipelineOptions] do not conform to being bean properties.
    at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
    at com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.validateClass(PipelineOptionsFactory.java:1059)
    ...

      

Any ideas?

+3


source to share


1 answer


I know you are using Java; however this example that walks through how to do it from a GAE Python Flex app might be helpful: http://amygdala.github.io/dataflow/app_engine/2017/04/14/gae_dataflow.html



+1


source







All Articles