R - How to replicate rows in a sparkframe using sparklyr

Is there a way to replicate Spark frame rows using sparklyr / dplyr functions?

sc <- spark_connect(master = "spark://####:7077")

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")

      

This is the desired output, stored in a new spark tbl:

> df2_tbl
   row1  row2
  <int> <chr>
1     1     A
2     1     A
3     1     A
4     2     B
5     2     B
6     2     B
7     3     C
8     3     C
9     3     C

      

+3


source to share


3 answers


With help sparklyr

you can use array

and explode

as suggested by @Oli:

df_tbl %>% 
  mutate(arr = explode(array(1, 1, 1))) %>% 
  select(-arr)

# # Source:   lazy query [?? x 2]
# # Database: spark_connection
#    row1 row2 
#   <int> <chr>
# 1     1 A    
# 2     1 A    
# 3     1 A    
# 4     2 B    
# 5     2 B    
# 6     2 B    
# 7     3 C    
# 8     3 C    
# 9     3 C    

      

and generalized



library(rlang)

df_tbl %>%  
  mutate(arr = !!rlang::parse_quo(
    paste("explode(array(", paste(rep(1, 3), collapse = ","), "))")
  )) %>% select(-arr)

# # Source:   lazy query [?? x 2]
# # Database: spark_connection
#    row1 row2 
#   <int> <chr>
# 1     1 A    
# 2     1 A    
# 3     1 A    
# 4     2 B    
# 5     2 B    
# 6     2 B    
# 7     3 C    
# 8     3 C    
# 9     3 C   

      

where you can easily adjust the number of lines.

+3


source


The idea that comes to mind first is to use a function explode

(which is exactly what it is for Spark). However, arrays are not supported in SparkR (as far as I know).

> structField("a", "array")
Error in checkType(type) : Unsupported type for SparkDataframe: array

      



However, I can suggest two other methods:

  • Simple, but not very elegant:

    head(rbind(df, df, df), n=30)
    #    row1 row2
    # 1    1    A
    # 2    2    B
    # 3    3    C
    # 4    1    A
    # 5    2    B
    # 6    3    C
    # 7    1    A
    # 8    2    B
    # 9    3    C
    
          

    Or with a for loop for more generality:

    df2 = df
    for(i in 1:2) df2=rbind(df, df2)
    
          

    Note that this will also work with union

    .

  • The second, more elegant method (because it only implies a spark operation) is based on a cross join (Cartesian product) with a size 3 dataframe (or any other number):

    j <- as.DataFrame(data.frame(s=1:3))
    head(drop(crossJoin(df, j), "s"), n=100)
    #    row1 row2
    # 1    1    A
    # 2    1    A
    # 3    1    A
    # 4    2    B
    # 5    2    B
    # 6    2    B
    # 7    3    C
    # 8    3    C
    # 9    3    C
    
          

+1


source


I don't know the R function rep

from the cluster side. However, we can use a connection to emulate its cluster side.

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")

replyr <- function(data, n, sc){
  joiner_frame <- copy_to(sc, data.frame(joiner_index = rep(1,n)), "tmp_joining_frame", overwrite = TRUE)

  data %>%
    mutate(joiner_index = 1) %>%
    left_join(joiner_frame) %>%
    select(-joiner_index)

}

df_tbl2 <- replyr(df_tbl, 3, sc)
#    row1 row2 
#    <int> <chr>
# 1     1 A    
# 2     1 A    
# 3     1 A    
# 4     2 B    
# 5     2 B    
# 6     2 B    
# 7     3 C    
# 8     3 C    
# 9     3 C  

      

This work is being done, but it's a little messy as it tmp_joining_frame

will persist. I'm not sure how much this will work when evaluating multiple function calls lazily.

0


source







All Articles