How to build a logistic regression model in SparkR

I am new to Spark as well as SparkR. I have installed Spark and SparkR successfully.

When I tried to build a logistic regression model with R and Spark over a csv file stored in HDFS, I got the error "Invalid number of dimensions".

My code:

points <- cache(lapplyPartition(textFile(sc, "hdfs://localhost:54310/Henry/data.csv"), readPartition))

collect(points)

w <- runif(n=D, min = -1, max = 1)

cat("Initial w: ", w, "\n")

# Compute logistic regression gradient for a matrix of data points
gradient <- function(partition) {
  partition = partition[[1]]
  Y <- partition[, 1] # point labels (first column of input file)
  X <- partition[, -1] # point coordinates
  # For each point (x, y), compute gradient function

  dot <- X %*% w
  logit <- 1 / (1 + exp(-Y * dot))
  grad <- t(X) %*% ((logit - 1) * Y)
  list(grad)
}


for (i in 1:iterations) {
  cat("On iteration ", i, "\n")
  w <- w - reduce(lapplyPartition(points, gradient), "+")
}

      

Error message:

On iteration  1 
Error in partition[, 1] : incorrect number of dimensions
Calls: do.call ... func -> FUN -> FUN -> Reduce -> <Anonymous> -> FUN -> FUN
Execution halted
14/09/27 01:38:13 ERROR Executor: Exception in task 0.0 in stage 181.0 (TID 189)
java.lang.NullPointerException
    at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:125)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:701)
14/09/27 01:38:13 WARN TaskSetManager: Lost task 0.0 in stage 181.0 (TID 189, localhost): java.lang.NullPointerException: 
        edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:125)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:701)
14/09/27 01:38:13 ERROR TaskSetManager: Task 0 in stage 181.0 failed 1 times; aborting job
Error in .jcall(getJRDD(rdd), "Ljava/util/List;", "collect") : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 181.0 failed 1 times, most recent failure: Lost task 0.0 in stage 181.0 (TID 189, localhost): java.lang.NullPointerException: edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:125) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:701) Driver stacktrace:

      

Data size (sample):

data <- read.csv("/home/Henry/data.csv")

dim(data)

      

[1] 17,541

What is the possible reason for this error?

+3


source to share


1 answer


The problem is that it textFile()

reads some text data and returns a distributed set of lines , each corresponding to a line of the text file. Therefore later in the program partition[, -1]

fails. The real intent of the program is assumed to be points

a distributed set of data frames. We are currently working on providing support for data frames in SparkR ( SPARKR-1 ).



To resolve this problem, just use partition

using string operations, to properly remove X

, Y

. Some other ways involve (I think you've probably seen this before) producing a different type of distributed collection from the start, as done here: examples / logistic_regression.R .

0


source







All Articles