Data failure problem in data stream

I am having trouble creating a Map PCollectionView using DataflowRunner.

Below, the pipeline aggregates the uncompressed input counter along with the values ​​from the side input (containing 10 generated values). When running a pipeline on gcp, it gets stuck inside the View.asMap () transformation. More specifically, ParDo (StreamingPCollectionViewWriter) has no exit.

I've tried this with 2.0.0-beta3 datastream as well as beam-0.7.0-SNAPSHOT without any results. Please note that my pipeline works without any problem when using local DirectRunner.

Am I doing something wrong? All help is appreciated, in advance for helping me!

public class SimpleSideInputPipeline {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleSideInputPipeline.class);

    public interface Options extends DataflowPipelineOptions {}

    public static void main(String[] args) throws IOException {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        Pipeline pipeline = Pipeline.create(options);

        final PCollectionView<Map<Integer, String>> sideInput = pipeline
                .apply(CountingInput.forSubrange(0L, 10L))
                .apply("Create KV<Integer, String>",ParDo.of(new DoFn<Long, KV<Integer, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.output(KV.of(c.element().intValue(), "TEST"));
                    }
                }))
                .apply(View.asMap());

        pipeline
            .apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(5)))
            .apply("Aggregate with side-input",ParDo.of(new DoFn<Long, KV<Long, String>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    Map<Integer, String> map = c.sideInput(sideInput);

                    //get first segment from map
                    Object[] values = map.values().toArray();
                    String firstVal = (String) values[0];
                    LOG.info("Combined: K: "+ c.element() + " V: " + firstVal + " MapSize: " + map.size());
                    c.output(KV.of(c.element(), firstVal));
                }
            }).withSideInputs(sideInput));

        pipeline.run();
    }
}

      

+3


source to share


1 answer


No need to worry about ParDo(StreamingPCollectionViewWriterFn)

not writing any output - what it does is actually writing every element to an internal location.



The code looks good to me and needs some research. I submitted BEAM-2155 .

+1


source







All Articles