Alternative way to continue without a list in scala

I have scala code like this

 def avgCalc(buffer: Iterable[Array[String]], list: Array[String]) = {
    val currentTimeStamp = list(1).toLong // loads the timestamp column
    var sum = 0.0
    var count = 0
    var check = false
    import scala.util.control.Breaks._
    breakable {
      for (array <- buffer) {
        val toCheckTimeStamp = array(1).toLong // timestamp column
        if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) { // to check the timestamp for 10 seconds difference
          sum += array(5).toDouble // RSSI weightage values
          count += 1
        }

        if ((currentTimeStamp - 10L) > toCheckTimeStamp) {

          check = true
          break

        }
      }
    }
     list :+ sum

  }

      

I will call the above function like this

 import spark.implicits._
  val averageDF =
    filterop.rdd.map(_.mkString(",")).map(line => line.split(",").map(_.trim))
      .sortBy(array => array(1), false) // Sort by timestamp
      .groupBy(array => (array(0), array(2))) // group by tag and listner
      .mapValues(buffer => {
        buffer.map(list => {
         avgCalc(buffer, list) // calling the average function 
        })
      })
      .flatMap(x => x._2)
      .map(x => findingavg(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble)) // defining the schema through case class
      .toDF // converting to data frame

      

The above code works fine. But I need to get rid of the list. My elder will ask me to delete the list because the list slows down the execution speed. Any suggestions to continue without a list? Any help would be appreciated.

+3


source to share


2 answers


The following solution should work, I think I was trying to avoid passing an iterative and one array.

def avgCalc(buffer: Iterable[Array[String]]) = {
  var finalArray = Array.empty[Array[String]]
  import scala.util.control.Breaks._
  breakable {
    for (outerArray <- buffer) {
      val currentTimeStamp = outerArray(1).toLong
      var sum = 0.0
      var count = 0
      var check = false
      var list = outerArray
      for (array <- buffer) {
        val toCheckTimeStamp = array(1).toLong
        if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) {
          sum += array(5).toDouble
          count += 1
        }
        if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
          check = true
          break
        }
      }
      if (sum != 0.0 && check) list = list :+ (sum / count).toString
      else list = list :+ list(5).toDouble.toString

      finalArray ++= Array(list)
    }
  }
  finalArray
}

      

and you can call it like



import sqlContext.implicits._
val averageDF =
  filter_op.rdd.map(_.mkString(",")).map(line => line.split(",").map(_.trim))
    .sortBy(array => array(1), false)
    .groupBy(array => (array(0), array(2)))
    .mapValues(buffer => {
        avgCalc(buffer)
    })
    .flatMap(x => x._2)
    .map(x => findingavg(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble))
    .toDF

      

Hope this is the desired answer.

+4


source


I see that you have accepted the answer, but I must say that you have a lot of unnecessary code. As far as I can see, you have no reason to initially convert to type Array

in the first place, nor sortBy

is it necessary at this point. I suggest you work directly with Row

.

Also you have a few unused variables that should be removed and converted to a case class followed only by what toDF

seems like overkill IMHO.

I would do something like this:



import org.apache.spark.sql.Row

def avgCalc(sortedList: List[Row]) = {
  sortedList.indices.map(i =>  {
    var sum = 0.0
    val row = sortedList(i)
    val currentTimeStamp = row.getString(1).toLong // loads the timestamp column

    import scala.util.control.Breaks._
    breakable {
      for (j <- 0 until sortedList.length) {
        if (j != i) {
          val anotherRow = sortedList(j)
          val toCheckTimeStamp = anotherRow.getString(1).toLong // timestamp column
          if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) { // to check the timestamp for 10 seconds difference
            sum += anotherRow.getString(5).toDouble // RSSI weightage values
          }

          if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
            break
          }
        }
      }
    }
    (row.getString(0), row.getString(1), row.getString(2), row.getString(3), row.getString(4), row.getString(5), sum.toString)
  })
}


val averageDF = filterop.rdd
  .groupBy(row => (row(0), row(2)))
  .flatMap{case(_,buffer) => avgCalc(buffer.toList.sortBy(_.getString(1).toLong))}
  .toDF("Tag", "Timestamp", "Listner", "X", "Y", "RSSI", "AvgCalc")

      

And as a final comment, I'm sure we can get started with a better implementation of the function avgCalc

, but I'll leave that to you to play with :)

+1


source







All Articles