GCP Dataflow 2.0 PubSub for GCS

I am having a hard time understanding the concepts of .withFileNamePolicy from TextIO.write (). The FileNamePolicy supply requirements seem incredibly complex to do something as simple as specifying a GCS bucket to write a stream feed.

At a high level, I have JSON messages being pushed to a PubSub topic, and I would like to write these raw messages to files in GCS for persistent storage (I will be doing other post processing as well). I first started with this Pipeline, thinking it would be pretty simple:

public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options); 

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply("Write to GCS", TextIO.write().to(gcs_bucket);

        p.run();

    }

      

I got an error regarding the need for WindowedWrites, which I applied and then I need the FileNamePolicy. This is where things get hairy.

I went to the Beam docs and checked the FilenamePolicy . It looks like I needed to extend this class, which also requires extending other abstract classes in order to make this work. Unfortunately, the Apache documentation is a bit sparse and I can't find examples for Dataflow 2.0 doing this, other than the Wordcount example , which even then implements this data in a helper class.

So I could probably make this work just by copying most of the WordCount example, but I'm trying to get a better understanding of the details of this. A few questions I have:

1) Is there any roadmap element to abstract this complexity? It looks like I should be able to supply the GCS bucket as is the case with non-indexedWrite, and then just suggest a few basic options like timing and file selection rule. I know that writing streaming window files to files is more difficult than just opening a file pointer (or the equivalent of an object store).

2) It looks like in order to make this work, I need to create a WindowedContext object that requires the delivery of an abstract BoundedWindow class and an Object PaneInfo class, and then some information about the shard. The information available to them is pretty bare-bones and it's hard for me to figure out what is really needed for all of these, especially given my simple use case. Are there any good examples that implement these? Also, it looks like I need to set # of shards as part of TextIO.write, but then also provide # shards as part of theNamePolicy file?

Thanks for all that helped me figure out the details of this, hoping to learn a thing or two!

Edit 7/20/17 So I finally got this pipeline to run with a FilenamePolicy extension. My task was to define the streaming data window from PubSub. Here's a pretty close representation of the code:

public class ReadData {
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options);

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply("Write to GCS", TextIO.write().to("gcs_bucket")
                .withWindowedWrites()
                .withFilenamePolicy(new TestPolicy())
                .withNumShards(10));

        p.run();

    }
}

class TestPolicy extends FileBasedSink.FilenamePolicy {
    @Override
    public ResourceId windowedFilename(
        ResourceId outputDirectory, WindowedContext context, String extension) {
        IntervalWindow window = (IntervalWindow) context.getWindow();
        String filename = String.format(
            "%s-%s-%s-%s-of-%s.json",
            "test",
            window.start().toString(),
            window.end().toString(),
            context.getShardNumber(),
            context.getShardNumber()
        );
        return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
        ResourceId outputDirectory, Context context, String extension) {
        throw new UnsupportedOperationException("Unsupported.");
    }
}

      

+5


source to share


2 answers


Beam 2.0 provides an example of writing raw messages from PubSub to windowed files in GCS below. The pipeline is fairly configurable, allowing you to specify the window duration via an option and subdirectory policy if you want logical subsections of your data to be simplified for processing / archiving. Note that this has an additional dependency on Apache Commons Lang 3.

PubSubToGcs

/**
 * This pipeline ingests incoming data from a Cloud Pub/Sub topic and
 * outputs the raw data into windowed files at the specified output
 * directory.
 */
public class PubsubToGcs {

  /**
   * Options supported by the pipeline.
   * 
   * <p>Inherits standard configuration options.</p>
   */
  public static interface Options extends DataflowPipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    ValueProvider<String> getTopic();
    void setTopic(ValueProvider<String> value);

    @Description("The directory to output files to. Must end with a slash.")
    @Required
    ValueProvider<String> getOutputDirectory();
    void setOutputDirectory(ValueProvider<String> value);

    @Description("The filename prefix of the files to write to.")
    @Default.String("output")
    @Required
    ValueProvider<String> getOutputFilenamePrefix();
    void setOutputFilenamePrefix(ValueProvider<String> value);

    @Description("The shard template of the output file. Specified as repeating sequences "
        + "of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the "
        + "shard number, or number of shards respectively")
    @Default.String("")
    ValueProvider<String> getShardTemplate();
    void setShardTemplate(ValueProvider<String> value);

    @Description("The suffix of the files to write.")
    @Default.String("")
    ValueProvider<String> getOutputFilenameSuffix();
    void setOutputFilenameSuffix(ValueProvider<String> value);

    @Description("The sub-directory policy which files will use when output per window.")
    @Default.Enum("NONE")
    SubDirectoryPolicy getSubDirectoryPolicy();
    void setSubDirectoryPolicy(SubDirectoryPolicy value);

    @Description("The window duration in which data will be written. Defaults to 5m. "
        + "Allowed formats are: "
        + "Ns (for seconds, example: 5s), "
        + "Nm (for minutes, example: 12m), "
        + "Nh (for hours, example: 2h).")
    @Default.String("5m")
    String getWindowDuration();
    void setWindowDuration(String value);

    @Description("The maximum number of output shards produced when writing.")
    @Default.Integer(10)
    Integer getNumShards();
    void setNumShards(Integer value);
  }

  /**
   * Main entry point for executing the pipeline.
   * @param args  The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {

    Options options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .as(Options.class);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   * 
   * @param options The execution parameters to the pipeline.
   * @return  The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /**
     * Steps:
     *   1) Read string messages from PubSub
     *   2) Window the messages into minute intervals specified by the executor.
     *   3) Output the windowed files to GCS
     */
    pipeline
      .apply("Read PubSub Events",
        PubsubIO
          .readStrings()
          .fromTopic(options.getTopic()))
      .apply(options.getWindowDuration() + " Window", 
          Window
            .into(FixedWindows.of(parseDuration(options.getWindowDuration()))))
      .apply("Write File(s)",
          TextIO
            .write()
            .withWindowedWrites()
            .withNumShards(options.getNumShards())
            .to(options.getOutputDirectory())
            .withFilenamePolicy(
                new WindowedFilenamePolicy(
                    options.getOutputFilenamePrefix(),
                    options.getShardTemplate(),
                    options.getOutputFilenameSuffix())
                .withSubDirectoryPolicy(options.getSubDirectoryPolicy())));

    // Execute the pipeline and return the result.
    PipelineResult result = pipeline.run();

    return result;
  }

  /**
   * Parses a duration from a period formatted string. Values
   * are accepted in the following formats:
   * <p>
   * Ns - Seconds. Example: 5s<br>
   * Nm - Minutes. Example: 13m<br>
   * Nh - Hours. Example: 2h
   * 
   * <pre>
   * parseDuration(null) = NullPointerException()
   * parseDuration("")   = Duration.standardSeconds(0)
   * parseDuration("2s") = Duration.standardSeconds(2)
   * parseDuration("5m") = Duration.standardMinutes(5)
   * parseDuration("3h") = Duration.standardHours(3)
   * </pre>
   * 
   * @param value The period value to parse.
   * @return  The {@link Duration} parsed from the supplied period string.
   */
  private static Duration parseDuration(String value) {
    Preconditions.checkNotNull(value, "The specified duration must be a non-null value!");

    PeriodParser parser = new PeriodFormatterBuilder()
      .appendSeconds().appendSuffix("s")
      .appendMinutes().appendSuffix("m")
      .appendHours().appendSuffix("h")
      .toParser();

    MutablePeriod period = new MutablePeriod();
    parser.parseInto(period, value, 0, Locale.getDefault());

    Duration duration = period.toDurationFrom(new DateTime(0));
    return duration;
  }
}

      




WindowedFilenamePolicy

/**
 * The {@link WindowedFilenamePolicy} class will output files
 * to the specified location with a format of output-yyyyMMdd'T'HHmmssZ-001-of-100.txt.
 */
@SuppressWarnings("serial")
public class WindowedFilenamePolicy extends FilenamePolicy {

    /**
     * Possible sub-directory creation modes.
     */
    public static enum SubDirectoryPolicy {
        NONE("."),
        PER_HOUR("yyyy-MM-dd/HH"),
        PER_DAY("yyyy-MM-dd");

        private final String subDirectoryPattern;

        private SubDirectoryPolicy(String subDirectoryPattern) {
            this.subDirectoryPattern = subDirectoryPattern;
        }

        public String getSubDirectoryPattern() {
            return subDirectoryPattern;
        }

        public String format(Instant instant) {
            DateTimeFormatter formatter = DateTimeFormat.forPattern(subDirectoryPattern);
            return formatter.print(instant);
        }
    }

    /**
     * The formatter used to format the window timestamp for outputting to the filename.
     */
    private static final DateTimeFormatter formatter = ISODateTimeFormat
            .basicDateTimeNoMillis()
            .withZone(DateTimeZone.getDefault());

    /**
     * The filename prefix.
     */
    private final ValueProvider<String> prefix;

    /**
     * The filenmae suffix.
     */
    private final ValueProvider<String> suffix;

    /**
     * The shard template used during file formatting.
     */
    private final ValueProvider<String> shardTemplate;

    /**
     * The policy which dictates when or if sub-directories are created
     * for the windowed file output.
     */
    private ValueProvider<SubDirectoryPolicy> subDirectoryPolicy = StaticValueProvider.of(SubDirectoryPolicy.NONE);

    /**
     * Constructs a new {@link WindowedFilenamePolicy} with the
     * supplied prefix used for output files.
     * 
     * @param prefix    The prefix to append to all files output by the policy.
     * @param shardTemplate The template used to create uniquely named sharded files.
     * @param suffix    The suffix to append to all files output by the policy.
     */
    public WindowedFilenamePolicy(String prefix, String shardTemplate, String suffix) {
        this(StaticValueProvider.of(prefix), 
                StaticValueProvider.of(shardTemplate),
                StaticValueProvider.of(suffix));
    }

    /**
     * Constructs a new {@link WindowedFilenamePolicy} with the
     * supplied prefix used for output files.
     * 
     * @param prefix    The prefix to append to all files output by the policy.
     * @param shardTemplate The template used to create uniquely named sharded files.
     * @param suffix    The suffix to append to all files output by the policy.
     */
    public WindowedFilenamePolicy(
            ValueProvider<String> prefix, 
            ValueProvider<String> shardTemplate, 
            ValueProvider<String> suffix) {
        this.prefix = prefix;
        this.shardTemplate = shardTemplate;
        this.suffix = suffix; 
    }

    /**
     * The subdirectory policy will create sub-directories on the
     * filesystem based on the window which has fired.
     * 
     * @param policy    The subdirectory policy to apply.
     * @return The filename policy instance.
     */
    public WindowedFilenamePolicy withSubDirectoryPolicy(SubDirectoryPolicy policy) {
        return withSubDirectoryPolicy(StaticValueProvider.of(policy));
    }

    /**
     * The subdirectory policy will create sub-directories on the
     * filesystem based on the window which has fired.
     * 
     * @param policy    The subdirectory policy to apply.
     * @return The filename policy instance.
     */
    public WindowedFilenamePolicy withSubDirectoryPolicy(ValueProvider<SubDirectoryPolicy> policy) {
        this.subDirectoryPolicy = policy;
        return this;
    }

    /**
     * The windowed filename method will construct filenames per window in the
     * format of output-yyyyMMdd'T'HHmmss-001-of-100.txt.
     */
    @Override
    public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext c, String extension) {
        Instant windowInstant = c.getWindow().maxTimestamp();
        String datetimeStr = formatter.print(windowInstant.toDateTime());

        // Remove the prefix when it is null so we don't append the literal 'null'
        // to the start of the filename
        String filenamePrefix = prefix.get() == null ? datetimeStr : prefix.get() + "-" + datetimeStr;
        String filename = DefaultFilenamePolicy.constructName(
                filenamePrefix, 
                shardTemplate.get(), 
                StringUtils.defaultIfBlank(suffix.get(), extension),  // Ignore the extension in favor of the suffix.
                c.getShardNumber(), 
                c.getNumShards());

        String subDirectory = subDirectoryPolicy.get().format(windowInstant);
        return outputDirectory
                .resolve(subDirectory, StandardResolveOptions.RESOLVE_DIRECTORY)
                .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
    }

    /**
     * Unwindowed writes are unsupported by this filename policy so an {@link UnsupportedOperationException}
     * will be thrown if invoked.
     */
    @Override
    public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) {
    throw new UnsupportedOperationException("There is no windowed filename policy for unwindowed file"
        + " output. Please use the WindowedFilenamePolicy with windowed writes or switch filename policies.");
    }
}

      

+6


source


Beam currently supports windowed entries, so there is no need to write a custom FilenamePolicy. You can control the output filename by placing W and P placeholders (for the window and panel, respectively) in the filename template. This exists in the beams repository and will also be in the upcoming Beam 2.1 release (which will be released as we speak).



+1


source







All Articles