R - How to replicate rows in a sparkframe using sparklyr
Is there a way to replicate Spark frame rows using sparklyr / dplyr functions?
sc <- spark_connect(master = "spark://####:7077")
df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")
This is the desired output, stored in a new spark tbl:
> df2_tbl
row1 row2
<int> <chr>
1 1 A
2 1 A
3 1 A
4 2 B
5 2 B
6 2 B
7 3 C
8 3 C
9 3 C
source to share
With help sparklyr
you can use array
and explode
as suggested by @Oli:
df_tbl %>%
mutate(arr = explode(array(1, 1, 1))) %>%
select(-arr)
# # Source: lazy query [?? x 2]
# # Database: spark_connection
# row1 row2
# <int> <chr>
# 1 1 A
# 2 1 A
# 3 1 A
# 4 2 B
# 5 2 B
# 6 2 B
# 7 3 C
# 8 3 C
# 9 3 C
and generalized
library(rlang)
df_tbl %>%
mutate(arr = !!rlang::parse_quo(
paste("explode(array(", paste(rep(1, 3), collapse = ","), "))")
)) %>% select(-arr)
# # Source: lazy query [?? x 2]
# # Database: spark_connection
# row1 row2
# <int> <chr>
# 1 1 A
# 2 1 A
# 3 1 A
# 4 2 B
# 5 2 B
# 6 2 B
# 7 3 C
# 8 3 C
# 9 3 C
where you can easily adjust the number of lines.
source to share
The idea that comes to mind first is to use a function explode
(which is exactly what it is for Spark). However, arrays are not supported in SparkR (as far as I know).
> structField("a", "array")
Error in checkType(type) : Unsupported type for SparkDataframe: array
However, I can suggest two other methods:
-
Simple, but not very elegant:
head(rbind(df, df, df), n=30) # row1 row2 # 1 1 A # 2 2 B # 3 3 C # 4 1 A # 5 2 B # 6 3 C # 7 1 A # 8 2 B # 9 3 C
Or with a for loop for more generality:
df2 = df for(i in 1:2) df2=rbind(df, df2)
Note that this will also work with
union
. -
The second, more elegant method (because it only implies a spark operation) is based on a cross join (Cartesian product) with a size 3 dataframe (or any other number):
j <- as.DataFrame(data.frame(s=1:3)) head(drop(crossJoin(df, j), "s"), n=100) # row1 row2 # 1 1 A # 2 1 A # 3 1 A # 4 2 B # 5 2 B # 6 2 B # 7 3 C # 8 3 C # 9 3 C
source to share
I don't know the R function rep
from the cluster side. However, we can use a connection to emulate its cluster side.
df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")
replyr <- function(data, n, sc){
joiner_frame <- copy_to(sc, data.frame(joiner_index = rep(1,n)), "tmp_joining_frame", overwrite = TRUE)
data %>%
mutate(joiner_index = 1) %>%
left_join(joiner_frame) %>%
select(-joiner_index)
}
df_tbl2 <- replyr(df_tbl, 3, sc)
# row1 row2
# <int> <chr>
# 1 1 A
# 2 1 A
# 3 1 A
# 4 2 B
# 5 2 B
# 6 2 B
# 7 3 C
# 8 3 C
# 9 3 C
This work is being done, but it's a little messy as it tmp_joining_frame
will persist. I'm not sure how much this will work when evaluating multiple function calls lazily.
source to share