Write each line received via PubSub to your own file on cloud storage

I receive messages via pubsub. Each message should be stored in its own file in the GCS as coarse data, do some processing on the data, and then store it in a big query, having the filename in the data.

The data should be displayed immediately in BQ upon receipt.

Example:

data published to pubsub : {a:1, b:2} 
data saved to GCS file UUID: A1F432 
data processing :  {a:1, b:2} -> 
                   {a:11, b: 22} -> 
                   {fileName: A1F432, data: {a:11, b: 22}} 
data in BQ : {fileName: A1F432, data: {a:11, b: 22}} 

      

the idea is that the processed data is stored in a BQ having a link to the coarse data stored in the GCS.

Here is my code.

public class BotPipline {

public static void main(String[] args) {

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject(MY_PROJECT);
    options.setStagingLocation(MY_STAGING_LOCATION);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline.apply(PubsubIO.Read.subscription(MY_SUBSCRIBTION));

    String uuid = ...;
    input.apply(TextIO.Write.to(MY_STORAGE_LOCATION + uuid));

    input
    .apply(ParDo.of(new DoFn<String,String>(){..}).named("updateJsonAndInsertUUID"))
    .apply(convertToTableRow(...)).named("convertJsonStringToTableRow"))
            .apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
    );
    pipeline.run();
}

      

My code doesn't run since I wrote unlimited collections in TextIO.Write is not supported. After some research, I found that I have several options for solving this problem:

  • creating a custom Sink in the data stream
  • implement entry in GCS as my own DoFn
  • accessing the data window using the optional BoundedWindow

I don’t know how to start. Can anyone provide me with the code for one of the following solutions, or give me another solution that suits my case. (providing code)

0


source to share


1 answer


The best option is # 2 - a simple DoFn

one that creates files according to your data. Something like that:



class CreateFileFn extends DoFn<String, Void> {
  @ProcessElement
  public void process(ProcessContext c) throws IOException {
    String filename = ...generate filename from element...;
    try (WritableByteChannel channel = FileSystems.create(
            FileSystems.matchNewResource(filename, false),
            "application/text-plain")) {
      OutputStream out = Channels.newOutputStream(channel);
      ...write the element to out...
    }
  }
}

      

+2


source







All Articles