Exceptions for Apache Beam PubSub Reader

I am running Pipeline with PubSub source and I am experiencing some strange exceptions due to my pipeline crashing. I can handle multiple elements (3-10) just fine and then suddenly one of the following two error messages is thrown. Both keep me from knowing what I might be doing wrong, so I deleted all my transforms and left only the source and the problem still exists. I am only posting a few test lines to PubSub. Any help is appreciated.

Exception 1:

[WARNING]
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.NullPointerException
        at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubReader.ackBatch(PubsubUnboundedSource.java:640)
        at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:313)
        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:174)
        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:127)
        at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
        at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

      

Exception 2:

[WARNING]
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.IllegalStateException: Cannot finalize a restored checkpoint
        at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
        at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:293)
        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishRead(UnboundedReadEvaluatorFactory.java:205)
        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:142)
        at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
        at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

      

Main code:

PipelineOptions options = PipelineOptionsFactory.create(); 
PubsubOptions dataflowOptions = options.as(PubsubOptions.class);
dataflowOptions.setStreaming(true);

Pipeline p = Pipeline.create(options);

p.apply(PubsubIO.<String>read().subscription("my-subscription")
    .withCoder(StringUtf8Coder.of())));

      

Execution:

mvn compile exec:java -Dexec.mainClass=my.package.SalesTransactions -Dexec.args="--runner BlockingDataflowRunner --project=my-project --tempLocation=gs://my-project/tmp"

      

+3


source to share


1 answer


This issue exists due to a bug ( BEAM-1656 ) in DirectRunner and a prerequisite in PubsubCheckpoint.



Apache Beam's answer : PubsubReader does not work with NPE has more information on the error and how to resolve it. Thank!

+1


source







All Articles