SparkSQL PostgresQL DataFrame Sections

I have a very simple SparkSQL setup connecting to Postgres DB and I am trying to get a DataFrame from a table, a Dataframe with X number of partitions (say 2). The code will look like this:

Map<String, String> options = new HashMap<String, String>();
options.put("url", DB_URL);
options.put("driver", POSTGRES_DRIVER);
options.put("dbtable", "select ID, OTHER from TABLE limit 1000");
options.put("partitionColumn", "ID");
options.put("lowerBound", "100");
options.put("upperBound", "500");
options.put("numPartitions","2");
DataFrame housingDataFrame = sqlContext.read().format("jdbc").options(options).load();

      

For some reason, a single DataFrame section contains almost all of the rows.

For what I can understand lowerBound/upperBound

are the parameters used to fine tune it. The SparkSQL documentation (Spark 1.4.0 - spark-sql_2.11) says they are used to define the step, not filter / range the section column. But this raises several questions:

  • Step is the frequency (the number of items returned by each query) that Spark will query the DB for each executor (section)?
  • If not, what is the purpose of these parameters, what do they depend on and how can I balance the DataFrame sections in a stable manner (without asking that all sections contain the same number of elements, there is simply equilibrium - for example 2 sections of 100 elements 55/45, 60 / 40 or even 65/35)

I can't seem to find a clear answer to these questions, and I was wondering if maybe some of you can clear this up for me, because it is currently affecting my cluster's performance when handling X million rows and all the heavy lifting to one performer.

Cheers and thanks for your time.

+3


source to share


3 answers


The bottom border is indeed used against the split column; refer to this code (current version at the time of writing this):

https://github.com/apache/spark/blob/40ed2af587cedadc6e5249031857a922b3b234ca/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala



The function columnPartition

contains the code for the partitioning logic and the use of a lower / upper bound.

+2


source


Essentially, the lower and upper bounds and the number of partitions are used to calculate the increment or split for each parallel task.

Let's say a table has a section column "year" and has data from 2006 to 2016.

If you define the number of partitions as 10, with a lower bound of 2006 and a later bound of 2016, you will have each data fetch task for its own year - ideal.

Even if you have incorrectly specified the lower and / or upper border, for example. set lower = 0 and upper = 2016, there will be skewed data transfer, but you will not lose or cannot receive any data, because:

The first task will fetch data for a year <0.



The second task will retrieve data for a year between 0 and 2016/10.

The third task will get data for the year between 2016/10 and 2 * 2016/10.

...

And the last task will have a where clause with year -> 2016.

T.

+5


source


the lower and upper bounds are currently defined to do what they do in the previous answers. The next step in this will be how to balance the data across the partitions without looking at the min max values ​​or if your data is heavily distorted.

If your database supports hash function, it can do the trick.

partitionColumn = "hash (column_name)% num_partitions"

numPartitions = 10 // what you want

lowerBound = 0

upperBound = numPartitions

This will work as long as the unit operation returns a uniform distribution over [0, numPartitions)

0


source







All Articles