Writing R-frames of data returned from SparkR: map

I am using SparkR: map and my function is returning a high value data array for each input line, each with the same shape. I would like to write these dataframes as parquet files without collecting them. Can I match write.df above my output list? Can I force work tasks to write parquet instead?

Now I have a job. example. I'm happy with this, except that I didn't expect the reduction to indirectly "collect" as I wanted to write the resulting DF as Parquet.

Also, I'm not sure if :: map is actually doing anything in parallel. Do I always need to call "parallelize"?

#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")


options(stringsAsFactors = FALSE)

# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
                         sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
                         appName = "Peter Spark test",
                         list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)

#### MAP function ####
run.model <- function(v) {
  x <- v$xs[1]
  y <- v$ys[1]
  startTime     <- format(Sys.time(), "%F %T")
  xs <- c(1:x)
  endTime <- format(Sys.time(), "%F %T")
  hostname <- system("hostname", intern = TRUE)
  xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)

main <- function() {

  # Make unique identifiers for each run
  xs <- c(1:365)
  ys <- c(1:1)
  xys <- data.frame(xs,ys,stringsAsFactors = FALSE)

  # Convert to Spark dataframe for mapping
  sqlContext <- get("sqlContext", envir = .GlobalEnv)
  xys.sdf <- createDataFrame(sqlContext, xys)

  # Let Spark do what Spark does
  output.list <- SparkR:::map(xys.sdf, run.model)

  # Reduce gives us a single R dataframe, which may not be what we want.
  output.redux <- SparkR:::reduce(output.list, rbind)

  # Or you can have it as a list of data frames.
  output.col <- collect(output.list)




Assuming your data looks more or less:

rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])


and all columns are of supported types, which you can convert to string format:

rows <- SparkR:::flatMap(dfs, function(x) {
  data <- as.list(x)
  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
  do.call(mapply, append(args, data))


call createDataFrame


sdf <- createDataFrame(sqlContext, rows)

##    mpg cyl  disp  hp drat   wt  qsec vs am gear carb
## 1 18.7   8 360.0 175 3.15 3.44 17.02  0  0    3    2
## 2 18.1   6 225.0 105 2.76 3.46 20.22  1  0    3    1
## 3 14.3   8 360.0 245 3.21 3.57 15.84  0  0    3    4
## 4 24.4   4 146.7  62 3.69 3.19 20.00  1  0    4    2
## 5 22.8   4 140.8  95 3.92 3.15 22.90  1  0    4    2
## 6 19.2   6 167.6 123 3.92 3.44 18.30  1  0    4    4


## root
##  |-- mpg: double (nullable = true)
##  |-- cyl: double (nullable = true)
##  |-- disp: double (nullable = true)
##  |-- hp: double (nullable = true)
##  |-- drat: double (nullable = true)
##  |-- wt: double (nullable = true)
##  |-- qsec: double (nullable = true)
##  |-- vs: double (nullable = true)
##  |-- am: double (nullable = true)
##  |-- gear: double (nullable = true)
##  |-- carb: double (nullable = true)


and just use write.df

/ saveDF


The problem is that you shouldn't be using the internal API in the first place. One of the reasons it was removed in the original release is not reliable enough to be used directly. Not to mention, it is still unclear if it will be supported or even available in the future. I'm just saying ...



