Write each line received via PubSub to your own file on cloud storage
I receive messages via pubsub. Each message should be stored in its own file in the GCS as coarse data, do some processing on the data, and then store it in a big query, having the filename in the data.
The data should be displayed immediately in BQ upon receipt.
Example:
data published to pubsub : {a:1, b:2}
data saved to GCS file UUID: A1F432
data processing : {a:1, b:2} ->
{a:11, b: 22} ->
{fileName: A1F432, data: {a:11, b: 22}}
data in BQ : {fileName: A1F432, data: {a:11, b: 22}}
the idea is that the processed data is stored in a BQ having a link to the coarse data stored in the GCS.
Here is my code.
public class BotPipline {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(MY_PROJECT);
options.setStagingLocation(MY_STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.Read.subscription(MY_SUBSCRIBTION));
String uuid = ...;
input.apply(TextIO.Write.to(MY_STORAGE_LOCATION + uuid));
input
.apply(ParDo.of(new DoFn<String,String>(){..}).named("updateJsonAndInsertUUID"))
.apply(convertToTableRow(...)).named("convertJsonStringToTableRow"))
.apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
);
pipeline.run();
}
My code doesn't run since I wrote unlimited collections in TextIO.Write is not supported. After some research, I found that I have several options for solving this problem:
- creating a custom Sink in the data stream
- implement entry in GCS as my own DoFn
- accessing the data window using the optional BoundedWindow
I donβt know how to start. Can anyone provide me with the code for one of the following solutions, or give me another solution that suits my case. (providing code)
source to share
The best option is # 2 - a simple DoFn
one that creates files according to your data. Something like that:
class CreateFileFn extends DoFn<String, Void> {
@ProcessElement
public void process(ProcessContext c) throws IOException {
String filename = ...generate filename from element...;
try (WritableByteChannel channel = FileSystems.create(
FileSystems.matchNewResource(filename, false),
"application/text-plain")) {
OutputStream out = Channels.newOutputStream(channel);
...write the element to out...
}
}
}
source to share