Data manipulation in Spark

I am new to sparks and I am trying to achieve some kind of data manipulation based on counts - the problem is like this: I have a text file with information that looks like this:

john, apple 
john, apple
john, orange
jill, apple
jill, orange
jill, orange

      

what I want to do is simple: I want to count the number of times each fetus appears for each person and divides that number by the total number of fetuses between two people, so the result will look like this:

john, apple, 2, 3
jill, apple, 1, 3
john, orange, 1, 3
jill orange, 2, 3

      

Then I can split line 3 by line 4 for this end product -

john, apple, 2, 3, 2/3
jill, apple, 1, 3, 1/3
john, orange, 1, 3, 1/3
jill orange, 2, 3, 2/3

      

I have tried several things in scala

like this -

var persons = sc.textFile("path_to_directory").map(_.split(",")).map(x=>(x(0),x(1)))
persons.map{case(person, fruit)=>((person, fruit), 1)}.reduceByKey(_+_).collect

      

The result of this provides -

((jill,orange),2)
((jill,apple),1)
((john,orange),1)
((john,apple),2)

      

This seems like a good start, but then I don't know how to proceed from here. Any help or hints would be much appreciated!

UPDATE:

I have a suggested solution to this problem -

var persons = sc.textFile("path_to_directory").map(_.split(",")).map(x=>(x(0),x(1)))

var count = persons.map{case(name, fruit)=>((name,fruit),1)}.reduceByKey(_+_)

var total = persons.map{case(name, fruit)=>(fruit,1)}.reduceByKey(_+_)

var fruit = count.map{case((name, fruit), count)=>(fruit, (name, count))}

fruit.join(total).map{case((fruit,((name, count), total)))=>(name, fruit, count, total, count.toDouble/total.toDouble)}.collect.foreach(println)

      

The output for this scala code in spark is

(jill,orange,2,3,0.6666666666666666)
(john,orange,1,3,0.3333333333333333)
(jill,apple,1,3,0.3333333333333333)
(john,apple,2,3,0.6666666666666666)

      

+3


source to share


1 answer


One possible solution:

def getFreqs(x: String, vals: Iterable[String]) = {
    val counts = vals.groupBy(identity).mapValues(_.size)
    val sum = counts.values.sum.toDouble
    counts.map { case (k, v) => (x, k, v, sum.toInt, v / sum) }
}

persons.groupByKey.flatMap { case(k, v) => getFreqs(k, v) }

      

And further:



val fruitsPerPerson = sc.broadcast(persons.countByKey)

persons.groupBy(identity).map { case (k, v) => {
    val sum: Float = fruitsPerPerson.value.get(k._1) match {
        case Some(x) => x
        case _ => 1
    }
    (k._1, k._2, v.size, sum.toInt, v.size / sum) 
}}

      

Both groupByKey

and groupBy

can be rather ineffective, so if you are looking for a more robust solution, you can use combineByKey

:

def create(value: String) = Map(value -> 1)

def mergeVals(x: Map[String, Int], value: String) = {
    val count = x.getOrElse(value, 0) + 1
    x ++ Map(value -> count)
}

def mergeCombs(x: Map[String, Int], y: Map[String, Int]) = {
    val keys = x.keys ++ y.keys
    keys.map((k: String) => (k -> (x.getOrElse(k, 0) + y.getOrElse(k, 0)))).toMap
}

val counts = persons.combineByKey(create, mergeVals, mergeCombs)

counts.flatMap { case (x: String, counts: Map[String, Int]) =>  {
    val sum = counts.values.sum.toDouble
    counts.map { case (k: String, v: Int) => (x, k, v, sum.toInt, v / sum) }
}}

      

+1


source







All Articles