Data stream performance issues

I know there was an update to the CDF service a few weeks ago (default was working type and attached PD) and it was clear that this would slow down batch jobs. However, the productivity of our jobs has deteriorated as they actually meet our business needs.

For example, for one of our assignments, specifically: it reads ~ 2.7 million rows from a table in BigQuery, has 6 side inputs (BQ tables), does some simple row transformations, and finally writes multiple outputs (3) to BigQuery ... This was used for 5-6 minutes, and now it takes 15 to 20 minutes - no matter how many VM we get stuck in it.

Is there anything we can do to bring the speed back to what we saw?

Here are some characteristics:

  • Reading from a BQ table with 2,744,897 rows (294MB)
  • 6 side inputs BQ
  • 3 multi-outputs to BQ, 2 of which are 2 744 897, and the remaining 1500 lines
  • Launch in asia-east1-b zone
  • Times below include worker pool and take-off.

10 virtual machines (n1-standard-2) 16 minutes 5 seconds 2015-04-22_19_42_20-4740106543213058308

10 virtual machines (n1-standard-4) 17 min. 11 sec. 2015-04-22_20_04_58-948224342106865432

10 virtual machines (n1-standard-1) 18 min 44 s 2015-04-22_19_42_20-4740106543213058308

20 virtual machines (n1-standard-2) 22 min 53 s 2015-04-22_21_26_53-18171886778433479315

50 virtual machines (n1-standard-2) 17 minutes 26 seconds 2015-04-22_21_51_37-16026777746175810525

100 virtual machines (n1-standard-2) 19 min. 33 sec. 2015-04-22_22_32_13-9727928405932256127


source to share

2 answers

We've figured out the problem. This is when the side input is read from a BigQuery table in which data streams have been transferred rather than loaded in bulk. When we copy the table (s) and read from the copies everything works fine.

However, this is just a workaround. Dataflow should be able to handle streaming tables in BigQuery as side inputs.



The data appears to indicate that there is a problem with how your pipeline handles side inputs. In particular, it is likely that side inputs could be re-read over and over from BigQuery for every element of the main input. This is completely orthogonal to the changes in the type of virtual machines used by Dataflow workers, described below.

This is closely related to the changes made to the Dataflow SDK for Java, version 0.3.150326. In this release, we have changed the input API for each window. Calls sideInput()

now only return values ​​to a specific window corresponding to the window of the main input element, rather than the entire input input PCollectionView

. Hence, it is sideInput()

no longer possible to call from startBundle

and finishBundle

for DoFn

, because the window is not yet known.

For example, the following code snippet has a problem that will cause side input to be re-scanned for every input element.

public void processElement(ProcessContext c) throws Exception {
  Iterable<String> uniqueIds = c.sideInput(iterableView);

  for (String item : uniqueIds) {



This code can be improved by caching the side entry into the List

transform member variable (assuming it fits into memory) during the first call processElement

, and use that cached List

instead of the side entry in subsequent calls.

This workaround should restore the performance you saw before when side inputs could be called from startBundle

. We will be working on improving side entry caching for the long term. (If this does not completely solve the problem, please email us and share the relevant code snippets.)

Separately, there was indeed an update to the Cloud Dataflow service around 4/9/15 that changed the default VM type used by Dataflow workers. In particular, we have reduced the default core count for each worker because our tests have shown it to be cost effective for typical workplaces. This is not a slowdown in any type of Dataflow Service β€” it just runs with fewer resources per worker by default. Users are still given the option to override both the number of workers and the type of VM used by workers.



All Articles