Writing an unlimited collection for GCS
I have seen many questions on the same topic. But I still have a problem writing in GCS. I am reading this thread from pubsub and am trying to push it towards GCS. I referred to this link . But could not find IOChannelUtils in recent ray packets.
PCollection<String> details = pipeline
.apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic"));
PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
public String apply(String s) {
return "constant";
}
}));
PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)
.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10))
.withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))))
.discardingFiredPanes()).apply(GroupByKey.create());
PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());
This I took from many other similar topics on Stack Overflow. Now I understand that TextIO supports unlimited PCollection writing with WindowedWrites and NumShards.
ref: Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn
But I didn't understand how I should do it.
I am trying to write to GCS like this.
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
details.apply(TextIO.write().to("gs://<bucket>/topicfile").withWindowedWrites()
.withFilenamePolicy(policy).withNumShards(4));
I don't have enough points to add comments to these topics on Stack Overflow, so I bring them up as another question.
source to share
I could fix this problem by changing Windowing as below
PCollection<String> streamedDataWindows = streamedData.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(30))
)).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes());
streamedDataWindows.apply(TextIO.write().to(CLOUD_STORAGE).withWindowedWrites().withNumShards(1).withFilenamePolicy(new PerWindowFiles()));
public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
// OVERRIDE THE FILE NAME CREATION
}
}
While I could work it out, I'm still not sure about the concept of windows. I'll add more details when I find it. If anyone has more understanding please add more details. Thanks to
source to share
Check out Pub / Sub on GCS Pipeline which provides a complete example of writing window files to GCS.
source to share