Piping piping from App Engine

I have a requirement to send Datastore objects to BigQuery table and at the same time do data transformation. My design looks like this:
AppEngine Java application publishes data to theme in PUB / SUB service - this works. Then connect DataflowPipeline to this thread and read the post. The transformation is then performed and the result is written to BigQuery. I have some sample code to test this.

I have a raw pipeline running from my local development machine that I can run - all work as demo code. This is done locally via mvn appengine:devserver

The question is, how do you deploy the data flow pipeline from Google App Engine? The development machine does not have access to the production environment, so I cannot start my pipeline on the Google Pipeline Service. I tried to represent this from Google App Engine but got in-memory errors. This looks like some kind of authentication issue. From another post here on StackOverflow, it looks like this "deployment" from App Engine is not "officially" supported.

How do I do this in a production environment?

Environment dependencies so far:
maven 3.3.0
Google AppEngine 1.9.28
Google API Client 1.20.0
Java 1.7.0_79
Workstation - Windows 7
Google Development Environment: golden package
This is my sample code to start a pipeline process ... ...

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        options.setNumWorkers(2);
        options.setRunner(DataflowPipelineRunner.class);
        options.setStagingLocation("gs://pipeline_bucket2");
        options.setProject("projectname");
        options.setJobName("starterpipeline");
        options.setUpdate(true);

        Pipeline p = Pipeline.create(options);

        p.apply(Create.of("Hello", "World")).apply(ParDo.named("StringExtract").of(new DoFn<String, String>() {
            @Override
            public void processElement(ProcessContext c) {
                c.output(c.element().toUpperCase());
            }
        })).apply(ParDo.named("StringLogger").of(new DoFn<String, Void>() {
            @Override
            public void processElement(ProcessContext c) {
                LOG.info(c.element());
            }
        }));

        p.run();

      

This is my stack trace for the error when trying to execute the code above:

Uncaught exception from servlet
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection$BufferingOutputStream.write(URLFetchServiceStreamHandler.java:586)
    at com.google.api.client.util.ByteStreams.copy(ByteStreams.java:55)
    at com.google.api.client.util.IOUtils.copy(IOUtils.java:94)
    at com.google.api.client.http.AbstractInputStreamContent.writeTo(AbstractInputStreamContent.java:72)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:79)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
    at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
    at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
    at java.util.concurrent.FutureTask.run(FutureTask.java:260)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
    at java.security.AccessController.doPrivileged(Native Method)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
    at java.lang.Thread.run(Thread.java:745)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

      

+1


source to share


1 answer


Dataflow uses a 64MB buffer when communicating with Google Cloud Storage when loading app artifacts. OOM can be called if your usage instance is out of memory, for example if you are using an AppEngine instance with 128MB of memory.



Also note that the first time you run the data flow pipeline, whenever you update a module, or the AppEngine does an internal update, the Dataflow SDK needs to download any application artifacts that have changed in Google Cloud Storage. Depending on the size of the application, this can take more than 60 seconds, which is the limit for requesting a frontend instance and could result in time out of error .

+3


source







All Articles