SparkSQL DataFrame organizes into sections

I am using spark sql to run a query against my dataset. Query result is rather small, but still split.

I would like to concatenate the resulting DataFrame and arrange the rows by column. I tried

DataFrame result = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

      

I have also tried

DataFrame result = sparkSQLContext.sql("my sql").repartition(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

      

the output file is ordered in chunks (i.e., the sections are ordered, but the data frame is not ordered as a whole). For example, instead of

1, value
2, value
4, value
4, value
5, value
5, value
...

      

I get

2, value
4, value
5, value
-----------> partition boundary
1, value
4, value
5, value

      

  • What is the correct way to get the absolute order of my query result?
  • Why isn't the data frame merged into one section?
+3


source to share


2 answers


I want to mention a couple of things here. 1 - the source code shows that the orderBy operator internally calls the sorting api with the global order set to true. So the lack of ordering at the output level indicates that ordering was lost while writing to the target. My point is that calling orderBy always requires global ordering.

2 Using a radical coalescent like forcing one partition in your case can be really dangerous. I would recommend that you not do this. The source code assumes that calling coalesce (1) could potentially cause upstream conversions to use a single section. It would be brutal work.

3 You seem to be expecting the orderBy statement to be executed with one section. I don't think I agree with this statement. This will make Spark a really stupid distributed framework.



Community please let me know if you agree or disagree with the statements.

how do you collect output data anyway?

perhaps the output actually contains sorted data, but the transformations / actions you performed to read from the output are responsible for the lost order.

+1


source


OrderBy will create new sections after your merge. To have one output section, change the order of operations ...

DataFrame result = spark.sql("my sql").orderBy("col1").coalesce(1)
result.write.json("results.json")

      



As @JavaPlanet mentioned, for really big data, you don't want to be grouped into one section. It will drastically reduce your level of parallelism.

0


source







All Articles