Running Google Dataflow pipeline from Google App Engine application?

I am creating a data flow job using DataflowPipelineRunner. I tried the following scenarios.

  • Without specifying any machine type
  • With a small car g1
  • with n1-highmem-2

In all the above scenarios, Input is a file from GCS, which is a very small file (KB size), and the output is a Big Query table.

I got an Out of Memory error in all scenarios

The size of my compiled code is 94 MB. I'm trying to use just the word count example and it didn't read any data (it doesn't work until the job starts). Please help me understand why I am getting this error.

Note. I am using appengine to run a job.

Note: The same code works with beta version 0.4.150414

EDIT 1

In line with the suggestions, the response read as follows:

  • Switched from Auto Scaling to Basic Scaling .
  • Used machine type B2 , which provides 256MB memory

After this configuration, Java Heap Memory issue is resolved. But it tries to load the jar to a split location which is over 10MB, so it doesn't work.

It contains the following exception

com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

      

I tried to directly download the jar file - appengine-api-1.0-sdk-1.9.20.jar , but it is trying to download this jar of appengine-api-L4wtoWwoElWmstI1Ia93cg.jar . which I don't know what it is. Any idea what this jar is valued at.

Please help me to solve this problem.

+3


source to share


3 answers


The short answer is that if you are using AppEngine in a Managed VM , you will not run into Sandbox AppEngine limitations (OOM when using instance class F1 or B1 , runtime timing issues, JRE whitelisted classes). If you really want to run in the App Engine sandbox, then your use of the Dataflow SDK is more in line with the AppEngine sandbox constraints. Below, I'll cover common issues and what people have done to meet the AppEngine sandbox limits.

The Dataflow SDK requires an AppEngine instance class that has enough memory to run a user application to build a pipeline, create any resources, and submit a job description to the Dataflow service. We've usually seen that users are required to use a class with more than 128MB of memory to avoid seeing OOM errors.

It usually takes less than a couple of seconds to build a pipeline and send it to the Dataflow service if the required resources for your application are already supplied. Loading your JARs and any other resources into GCS can take over 60 seconds. This can be solved manually by pre-preparing the JARs for GCS in advance (the Dataflow SDK will skip them again if it detects that they already exist) or by using a queue task to get the 10-minute limit (note that for large applications, 10 minutes can not enough to create all of your resources).

Finally, in the AppEngine sandbox, you and all your dependencies are limited to whitelisting in the JRE, or you will get an exception like:



java.lang.SecurityException:
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ
  ...

      

EDIT 1

We perform a hash of the contents of the jars on the classpath and load them into the GCS with the changed filename. AppEngine runs a sandbox with its own JARs, appengine-api-L4wtoWwoElWmstI1Ia93cg.jar refers to appengine-api.jar which is a jar added by the sandbox. You can see from our PackageUtil # getUniqueContentName (...) that we just add - $ HASH before .jar .

We are working on why you are seeing the RequestPayloadToLarge exception and it is currently recommended that you set the filesToStage parameter and filter out the jars not required to run your Dataflow to work around the problem you are having. You can see how we create files to build with DataflowPipelineRunner # detectClassPathResourcesToStage (...) .

+4


source


I had the same problem with 10MB limit. What I was doing was filtering out JAR files larger than this limit (instead of specific files) and then setting the rename files DataflowPipelineOptions

to using setFilesToStage

.

So, I just copied the method detectClassPathResourcesToStage

from Dataflow SDK and changed it to look like this:

private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB

protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
    if (!(classLoader instanceof URLClassLoader)) {
        String message = String.format("Unable to use ClassLoader to detect classpath elements. "
                + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
        throw new IllegalArgumentException(message);
    }

    List<String> files = new ArrayList<>();
    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
        try {
            File file = new File(url.toURI());
            if (file.length() < FILE_BYTES_THRESHOLD) {
                files.add(file.getAbsolutePath());
            }
        } catch (IllegalArgumentException | URISyntaxException e) {
            String message = String.format("Unable to convert url (%s) to file.", url);
            throw new IllegalArgumentException(message, e);
        }
    }
    return files;
}

      



And then when I create DataflowPipelineOptions

:

DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
...
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));

      

+1


source


Here's a version of Helder 10MB filtering that will adapt to the staged position of the file from DataflowPipelineOptions

even if it changes in a future SDK release.

Instead of duplicating logic, it passes a lazy copy from DataflowPipelineOptions

to DataflowPipelineRunner

to see which files it gets delivered and then deletes the ones that are too big.

Note that this code assumes that you have created a custom PipelineOptions

named class MyOptions

along with a field java.util.Logger

named logger

.

// The largest file size that can be staged to the dataflow service.
private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024;

/**
 * Returns the list of .jar/etc files to stage based on the
 * Options, filtering out any files that are too large for
 * DataflowPipelineRunner.
 *
 * <p>If this accidentally filters out a necessary file, it should
 * be obvious when the pipeline fails with a runtime link error.
 */
private static ImmutableList<String> getFilesToStage(MyOptions options) {
  // Construct a throw-away runner with a copy of the Options to see
  // which files it would have wanted to stage. This could be an
  // explicitly-specified list of files from the MyOptions param, or
  // the default list of files determined by DataflowPipelineRunner.
  List<String> baseFiles;
  {
    DataflowPipelineOptions tmpOptions =
        options.cloneAs(DataflowPipelineOptions.class);
    // Ignore the result; we only care about how fromOptions()
    // modifies its parameter.
    DataflowPipelineRunner.fromOptions(tmpOptions);
    baseFiles = tmpOptions.getFilesToStage();
    // Some value should have been set.
    Preconditions.checkNotNull(baseFiles);
  }
  // Filter out any files that are too large to stage.
  ImmutableList.Builder<String> filteredFiles = ImmutableList.builder();
  for (String file : baseFiles) {
    long size = new File(file).length();
    if (size < MAX_STAGED_FILE_SIZE_BYTES) {
      filteredFiles.add(file);
    } else {
      logger.info("Not staging large file " + file + ": length " + size
          + " >= max length " + MAX_STAGED_FILE_SIZE_BYTES);
    }
  }
  return filteredFiles.build();
}

/** Runs the processing pipeline with given options. */
public void runPipeline(MyOptions options)
    throws IOException, InterruptedException {
  // DataflowPipelineRunner can't stage large files;
  // remove any from the list.
  DataflowPipelineOptions dpOpts =
      options.as(DataflowPipelineOptions.class);
  dpOpts.setFilesToStage(getFilesToStage(options));

  // Run the pipeline as usual using "options".
  // ...
}

      

+1


source







All Articles