Tuesday, January 3, 2017

Key/Value Pair RDDs using Scala/Spark

Title



What are Key/Value Pairs ? How to work with Spark RDDs using key/value pairs?

Technology



Spark 1.6 and above

Explanation


A key/value pair is a set of two data items:
1) Key: which is a unique identifier for a value
2) Value: which is the data that is identified by the Key

Example:

(Age) Key      (Avg. No. of Friends) Value
   ----                 --------
    25                  130
    30                  90
    40                  55

Key/Value pair is the common data type in Spark that is required for many operations. And it is most commonly used for aggregation. And in Spark, the key/Value pair is represented as a tuple with two elements. Example: (25, 130) , (30, 90) and (40, 55). The first element is called Key and the second element is called Value.

Use Case: Find average number of friends each age has over a social network. The social network dataset contains following information.

userid, username, age, FriendCount
-------  ------------ ----- ---------------
101   ,   Ali        ,    33  ,   200
102   ,  Brian     ,   30   ,   150
103   , John       ,   45   ,   89
104   , Ian         ,   40   ,   100
105   ,  Katherine,  33   ,   320
106   ,  Aldo       ,  30   ,   200
107   ,  Zane      ,  33    ,   130

Solution

Our aim is to aggregate the above dataset to produce the following result dataset:

age   Avg. number of friends
----- , ---------------------------
30   ,  175
33   ,  217
40   ,  100
45   ,    89

If we analyze the original dataset with respect to our use case, we need only "age" and "FriendCount" data elements. So our first goal will be to discard the data that is not required. 

Let us define a function that will take "line" as an argument, that will parse the line based on "," delimiter and will return only age and FriendCount as a tuple or Key/Value pair. 

     def parseLine(line:String) ={
       //This will split the line into array with 4 elements
       val fields= line.split(",")  
       
       // This will return the age (as Int) from the array located at 3rd position
       val age = fields(2).toInt  

       // This will return the friendCount (as Int) from the array located at 4rd position
       val friendCount = fields(3).toInt 
       
      // Function Return as Tuple 
      (age, friendCount)
     } 
  
If RDD "dsRDD" holds the original dataset, map it to create a new RDD that will hold only age and friendCount data elements by applying parseLine function on each line of the original dataset.

//This will create a new RDD with Key/Value pairs (age, FriendCount) while discarding the rest data elements.
val ageFriendRDD = dsRDD.map { x => parseLine(x) }

Expected Result: New RDD with the following values
   (33, 200)
   (30, 150)
   (45, 89) 
     .....
     .....
   (33, 130)

Now we have an intermediate result RDD ageFriendRDD, that may hold same key multiple times. And we have to achieve following two things:

1) To find the average number of friends each age has, we need to know how many times each key is repeated in the RDD. 

2) Since our new RDD holds Key/Value Pairs with duplicate Keys , so we need to reduce it by the Keys based on applying average formula.

To know how many times each key is repeated, we will map values of intermediate RDD ageFriendRDD to (value, 1) as follows:

   (33,  (200, 1) )
   (30,  (150, 1) )
   (45,  (89, 1) )
     .....
     .....
   (33,  (130, 1) )

Mapping it in this fashion will help us to know how many times each Age is repeated.

   // Map the values to (value, 1)
    val ageFriendRDD1= ageFriendRDD.mapValues { x=> (x,1) } 

   //Reduce the Key/Value Pair by adding the Values' elements i.e. for Key 30 add the values (200,1) + (320,1)+(130,1) = (650, 3 )
  //Where 650 is the total number of friends and 3 is the number of instances for the key (Age) 30.
    val reducedRDD = ageFriendRDD1.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))  
 Note:- The addition will not happen in one go for each Key, instead ReduceByKey will apply the function to tuples in serial order.  The function takes two arguments Tuple1 and Tuple2 [Values of the Key/Value pair RDD],  where "x" represents the first tuple and "y" represents the second tuple related to a Key. So "x._1" represents the first element of the first tuple and "y._1" represents the first element of the second tuple. Similarly "x._2" represents the second element of the first tuple and "y._2" represents the second element of the second tuple.

The reduced RDD will have following Key/Value pairs:
 Key      Value
-----      ----------
(33   ,  (650, 3) )
(40   ,  (100, 1) )
(30   ,  (350, 2) )
(45   ,  (89, 1)   )

Each value has two elements, Element1 gives total number of friends for an Age and Element2 gives the number of times each Age appeared in the original dataset.
So to get the average number of friends for each Age, we need to map the values of reducedRDD by applying the function (divide Element2 to Element1) as follows:

// This will apply the average function to each value (which is a tuple). where "f._1" represents the first element of the tuple and "f._2" represents the second element of the tuple.
    val withAvgRDD = reducedRDD.mapValues(f=> f._1/f._2)

 //To print the final sorted result RDD
    val result = withAvgRDD.collect() 
    result.sorted.foreach(println)

Final Result:
(30   ,  175)
(33   ,  217)
(40   ,  100)
(45   ,    89)



Source Code:

package com.spark.amqadri
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object SocialNetworkFriends {
  
  //Function to Parse the line
  def parseLine(line:String) = {
    val fields = line.split(",")
    val age = fields(2).toInt
    val numberOfFriends = fields(3).toInt
    (age, numberOfFriends)
  }


  def main(arg: Array[String]) {
    
    val conf = new SparkConf()
      .setAppName("ScalaBasics")
      .setMaster("local")
    val sc = new SparkContext(conf)
    
    val dsRDDsc.textFile("/Users/amqadri/ScalaSparkTraining/friendlist.csv", 1)
    val ageFriendRDD = dsRDD.map { x => parseLine(x) }
    val ageFriendRDD1= ageFriendRDD.mapValues { x=> (x,1) } 
    val reducedRDD = ageFriendRDD1.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))  
    val withAvgRDD = reducedRDD.mapValues(f=> f._1/f._2)
    val result = withAvgRDD.collect()
    result.sorted.foreach(println)
  }  

}