Tuesday, January 3, 2017

Key/Value Pair RDDs using Scala/Spark

Title



What are Key/Value Pairs ? How to work with Spark RDDs using key/value pairs?

Technology



Spark 1.6 and above

Explanation


A key/value pair is a set of two data items:
1) Key: which is a unique identifier for a value
2) Value: which is the data that is identified by the Key

Example:

(Age) Key      (Avg. No. of Friends) Value
   ----                 --------
    25                  130
    30                  90
    40                  55

Key/Value pair is the common data type in Spark that is required for many operations. And it is most commonly used for aggregation. And in Spark, the key/Value pair is represented as a tuple with two elements. Example: (25, 130) , (30, 90) and (40, 55). The first element is called Key and the second element is called Value.

Use Case: Find average number of friends each age has over a social network. The social network dataset contains following information.

userid, username, age, FriendCount
-------  ------------ ----- ---------------
101   ,   Ali        ,    33  ,   200
102   ,  Brian     ,   30   ,   150
103   , John       ,   45   ,   89
104   , Ian         ,   40   ,   100
105   ,  Katherine,  33   ,   320
106   ,  Aldo       ,  30   ,   200
107   ,  Zane      ,  33    ,   130

Solution

Our aim is to aggregate the above dataset to produce the following result dataset:

age   Avg. number of friends
----- , ---------------------------
30   ,  175
33   ,  217
40   ,  100
45   ,    89

If we analyze the original dataset with respect to our use case, we need only "age" and "FriendCount" data elements. So our first goal will be to discard the data that is not required. 

Let us define a function that will take "line" as an argument, that will parse the line based on "," delimiter and will return only age and FriendCount as a tuple or Key/Value pair. 

     def parseLine(line:String) ={
       //This will split the line into array with 4 elements
       val fields= line.split(",")  
       
       // This will return the age (as Int) from the array located at 3rd position
       val age = fields(2).toInt  

       // This will return the friendCount (as Int) from the array located at 4rd position
       val friendCount = fields(3).toInt 
       
      // Function Return as Tuple 
      (age, friendCount)
     } 
  
If RDD "dsRDD" holds the original dataset, map it to create a new RDD that will hold only age and friendCount data elements by applying parseLine function on each line of the original dataset.

//This will create a new RDD with Key/Value pairs (age, FriendCount) while discarding the rest data elements.
val ageFriendRDD = dsRDD.map { x => parseLine(x) }

Expected Result: New RDD with the following values
   (33, 200)
   (30, 150)
   (45, 89) 
     .....
     .....
   (33, 130)

Now we have an intermediate result RDD ageFriendRDD, that may hold same key multiple times. And we have to achieve following two things:

1) To find the average number of friends each age has, we need to know how many times each key is repeated in the RDD. 

2) Since our new RDD holds Key/Value Pairs with duplicate Keys , so we need to reduce it by the Keys based on applying average formula.

To know how many times each key is repeated, we will map values of intermediate RDD ageFriendRDD to (value, 1) as follows:

   (33,  (200, 1) )
   (30,  (150, 1) )
   (45,  (89, 1) )
     .....
     .....
   (33,  (130, 1) )

Mapping it in this fashion will help us to know how many times each Age is repeated.

   // Map the values to (value, 1)
    val ageFriendRDD1= ageFriendRDD.mapValues { x=> (x,1) } 

   //Reduce the Key/Value Pair by adding the Values' elements i.e. for Key 30 add the values (200,1) + (320,1)+(130,1) = (650, 3 )
  //Where 650 is the total number of friends and 3 is the number of instances for the key (Age) 30.
    val reducedRDD = ageFriendRDD1.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))  
 Note:- The addition will not happen in one go for each Key, instead ReduceByKey will apply the function to tuples in serial order.  The function takes two arguments Tuple1 and Tuple2 [Values of the Key/Value pair RDD],  where "x" represents the first tuple and "y" represents the second tuple related to a Key. So "x._1" represents the first element of the first tuple and "y._1" represents the first element of the second tuple. Similarly "x._2" represents the second element of the first tuple and "y._2" represents the second element of the second tuple.

The reduced RDD will have following Key/Value pairs:
 Key      Value
-----      ----------
(33   ,  (650, 3) )
(40   ,  (100, 1) )
(30   ,  (350, 2) )
(45   ,  (89, 1)   )

Each value has two elements, Element1 gives total number of friends for an Age and Element2 gives the number of times each Age appeared in the original dataset.
So to get the average number of friends for each Age, we need to map the values of reducedRDD by applying the function (divide Element2 to Element1) as follows:

// This will apply the average function to each value (which is a tuple). where "f._1" represents the first element of the tuple and "f._2" represents the second element of the tuple.
    val withAvgRDD = reducedRDD.mapValues(f=> f._1/f._2)

 //To print the final sorted result RDD
    val result = withAvgRDD.collect() 
    result.sorted.foreach(println)

Final Result:
(30   ,  175)
(33   ,  217)
(40   ,  100)
(45   ,    89)



Source Code:

package com.spark.amqadri
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object SocialNetworkFriends {
  
  //Function to Parse the line
  def parseLine(line:String) = {
    val fields = line.split(",")
    val age = fields(2).toInt
    val numberOfFriends = fields(3).toInt
    (age, numberOfFriends)
  }


  def main(arg: Array[String]) {
    
    val conf = new SparkConf()
      .setAppName("ScalaBasics")
      .setMaster("local")
    val sc = new SparkContext(conf)
    
    val dsRDDsc.textFile("/Users/amqadri/ScalaSparkTraining/friendlist.csv", 1)
    val ageFriendRDD = dsRDD.map { x => parseLine(x) }
    val ageFriendRDD1= ageFriendRDD.mapValues { x=> (x,1) } 
    val reducedRDD = ageFriendRDD1.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))  
    val withAvgRDD = reducedRDD.mapValues(f=> f._1/f._2)
    val result = withAvgRDD.collect()
    result.sorted.foreach(println)
  }  

}

Tuesday, June 16, 2015

Creation of Purchase Requisitions using PO_REQUISITIONS_INTERFACE_ALL Interface table

Brief

The Purchase Requisitions can be created using the following two methods:

  1. Using Seeded Function ( Requisitions Form ) available in Oracle eBusiness Suite
  2. Oracle Requisition Interface

The aim of this document is demonstrate how to create a requisition using interface table (PO_REQUISITIONS_INTERFACE_ALL) and then import as an Approved or Unapproved Requisition.

Important fields of PO_REQUISITIONS_INTERFACE_ALL and their meanings:

INTERFACE_SOURCE_CODE Provide a meaningful name that will identify Requisition Source
BATCH_ID Batch Id; a number that can used to group together the requisition lines in one Requisition
ORG_ID Operating unit unique identifier; References HR_ALL_ORGANIZATION_UNITS table
DESTINATION_TYPE_CODE Requisition Destination Type: INVENTORY or EXPENSE.
References PO_LOOKUP_CODES where lookup_type='DESTINATION TYPE'
AUTHORIZATION_STATUS Authorization status type: APPROVED ; INCOMPLETE
PREPARER_ID Requisition preparer's person_id; references PER_ALL_PEOPLE_F
CHARGE_ACCOUNT_ID General Ledger charge account; references to GL_CODE_COMBINATIONS
SOURCE_TYPE_CODE Requisition Source Type Code; INVENTORY or VENDOR. References PO_LOOKUP_CODES where lookup_type='REQUISITION SOURCE TYPE'
UNIT_OF_MEASURE Unit of measurement ; References MTL_UNITS_OF_MEASURE_TL
LINE_TYPE_ID Requisition line type is a unique identifier; from 1..n
CATEGORY_ID Requisition Item Category; References MTL_CATEGORIES_B
UNIT_PRICE Requisition Unit Price
QUANTITY Requisition Quantity
DESTINATION_ORGANIZATION_ID  Destination organization unique identifier; References HR_ALL_ORGANIZATION_UNITS table
DELIVER_TO_LOCATION_ID Deliver To Location ID; Reference HR_LOCATIONS_ALL.
DELIVER_TO_REQUESTOR_ID Deliver to Requester; References PER_ALL_PEOPLE_F(PERSON_ID)
ITEM_DESCRIPTION Item Description; Any meaningful Item Description

Case:
A Company wants to send following Expense Purchase Request to Oracle Purchasing using a third party application (My Application):
Item Description: HP Printer Cartridge - CE400X
Item Quantity:  5
Requested Organization: Information Technology Division
Requested By:  Aadil Qadri

Solution:

Step 1: Interface Requisition Line

INSERT INTO po_requisitions_interface_all
(
interface_source_code,
batch_id,
org_id,
destination_type_code,
authorization_status,
preparer_id,
charge_account_id,
source_type_code,
unit_of_measure,
line_type_id,
category_id,
unit_price,
quantity,
destination_organization_id,
deliver_to_location_id,
deliver_to_requestor_id,
item_description
)
VALUES
('MyAPP',                 --Interface Source
1001,                         --Batch Id ; a number to group the requisition lines in a Requisition
161,                           --Operating Unit
'EXPENSE',              --Destination Type
'APPROVED',          --Status
573,                           --This comes from per_people_f.person_id of preparer
8717,                         --Code Combination ID of charge account
'VENDOR',               --Source Type
'Each',                        --UOM
1,                                --Line Type of Goods
2060,                          -- Category ID
100.5,                         --Price
7,                                --Quantity
181,                            --Destination Org ID.
121,                            --Deliver to Location ID
573,                            --Requester ID
'Cartridge CE400X'   --Item Description
);

Step 2: Run "Requisition Import" with the parameters as follows:

Import Source:      MyAPP
Import Batch ID:  1001
Group By:             All
Multiple Distributions:  No [In our case we do not have multiple distributions ]
Initiate Approval After ReqImport: No [Because we have set Authorization_status= APPROVED in the requisition interfaced line]





Sunday, June 14, 2015

Setting Up the Oracle E-Business Suite Security in Oracle BI Publisher 10.1.3.4.2

Brief:
Oracle BI Publisher can use Oracle eBusiness Suite security by which users can login to Oracle BI Publisher using EBS Username and Password. Once we setup the security, the EBS responsibilities will be available in the BI Publisher as roles. And then you can add Oracle BI folders and Data Sources to above mentioned auto imported Roles.

Setup:

   Part A: Setup Oracle eBusiness Suite
  1. Login to Oracle eBusiness Suite as SYSADMIN
  2. Create a dummy menu TEST BI PUBLISHER  (with no functions)
  3. Create following six responsibilities as shown in the screenshot :
    1. XMLP_ADMIN – this is the administrator role for the BI Publisher server
    2. XMLP_DEVELOPER – allows users to build reports in the system.
    3. XMLP_SCHEDULER – allows users to schedule reports.
    4. XMLP_ANALYZER_EXCEL – allows users to use the Excel analysis feature.
    5. XMLP_ANALYZER_ONLINE – allows users to use the online analysis feature.
    6. XMLP_TEMPLATE_DESIGNER - allows users to connect to the BI Publisher server from the Template Builder and to upload and download templates.



  4. Add the above responsibilities (based on the need) to the Oracle eBusiness Suite user
  5. Get dbc file from EBS Application server, which will be used during setting up the Oracle BI Publisher. And you can find the dbc at $FND_SECURE location.

    Part B: Setup Oracle BI Publisher Enterprise

  1. Install Oracle BI Publisher Enterprise (OBIPE) [ For Installation follow the Oracle Document ]
  2. Get Oracle eBusiness Suite dbc file. The file is located in Oracle 
  3. Login to OBIPE as Administrator
  4. Enable Local Superuser as follows:
    1. Click Admin Tab
    2. Click on Security Configuration
    3. Select Enable Local Superuser
    4. Enter Superuser Name [Any meaningful name]
    5. set Password
    6. Scroll down to Security Model
    7. Set Security Model to Oracle E-Business Suite
    8. Click on Upload DBC File button to upload the dbc file
    9. Apply to Save the changes


  5. Stop and Start Oracle BI Publisher Enterprise to take the effect.
  6. Login to the BI Publisher as a Local Superuser
  7. Click on Admin Tab then Roles and Permissions
  8. Add folder and Data Source to the Role
  9. Logout from OBIE as Admin
  10. Login again with E-Business suite credentials  


Sunday, May 17, 2015

Oracle R11/R12 Special Value Set

Goal
How to use Special Value Sets in Oracle Applications

Applies To
Oracle eBusiness Suite (EBS) R11/R12

Solution

Use case:
For a given Concurrent Program, user enters an Employee Name followed by some Process Date. Restrict user to enter invalid process date. The process date should be between employees’ joining date and Current Date.

Solution Steps:
1.     Create Value Set#1
a.      Value Set
                                                             i.      Value Set Name: Employee_list
                                                           ii.      Maximum Size: 100
                                                        iii.      Validation type: Table
                                                         iv.      Keep all other default values as it is.
b.     Click Edit Information
                                                             i.      Table Name: PER_PEOPLE_X
                                                           ii.      Value: FULL_NAME
                                                        iii.      Type: Varchar2 Size: 100
                                                         iv.      ID:   PERSON_ID
                                                           v.      Type: Number    Size: 100





2.     Create value Set#2:
a.      Value Set
                                                             i.      Value Set Name: Validate Process Date
                                                           ii.      Format Type Standard Date
                                                        iii.      Maximum Size: 11
                                                         iv.      Validation type: Special
                                                           v.      Keep all other default values
b.     Click Edit Information
                                                             i.      Event : Validate
                                                           ii.      Function:
FND PLSQL
"
Declare
l_temp number;
begin
   select
   distinct '1' into l_temp
   from per_people_x
   where person_id=:$FLEX$.Employee_List
   and :!VALUE between start_date and trunc(sysdate);
exception
when no_data_found then
   fnd_message.set_name('XXADFD_GEN_MSG','Invalid Process Date');
   fnd_message.raise_error;
end;
"








3.     Register Concurrent Program
a.      Register a dummy Concurrent Program. And add the following two parameters.
                                                             i.      Parameter#1
1.     Seq: 10
2.     Parameter:  Employee Name
3.     Value Set: Employee_List
4.     Check the Required field
5.     Display Size:  30
                                                           ii.      Parameter#2
1.     Seq: 20
2.     Parameter:  Process Date
3.     Value Set: Validate Process Date
4.     Check the Required field
                                                        iii.      Save and ignore any warnings





Finally run your Concurrent Program and Test different values for the Process Date.









Wednesday, March 18, 2015

Oracle Java Concurrent Program

Goal

How to create a Java Concurrent Program?

Applied To

Oracle eBusiness Suite (EBS) R11/R12

Solution

Most of us write Concurrent Programs using the following Technologies:
  1. Oracle Reports
  2. PL/SQL Stored Procedure
  3. SQL*PLUS
But there are times when we require Java instead of Oracle native PL/SQL programming language. We require Java Programming especially (but not limited to) in the following cases:

  1. File handling (More efficient using Java instead of PL/SQL )
  2. Secure FTP of files between servers
  3. Networking
  4. Connecting to other non Oracle Databases

First of all let me assure you that writing a Java Concurrent Program is an easy task, just need some basic skills of Java Programming. 
Oracle EBS provides many java classes to support Java Concurrent Programming.The most important and common classes are as follows:

  • JavaConcurrentProgram.class
  • CpContext.class
  • LogFile.class
  • OutFile.class
You can find full list of Java classes related to Java Concurrent Programming under Oracle EBS Application server at $JAVA_TOP/oracle/apps/fnd/cp/request.

Most of the newbies raise the following common questions related to Java Concurrent Program:

  1. What is the starting point
  2. Where should I keep the Java file
  3. How to compile the Java class
  4. I know how to resister a concurrent program in EBS, but in case of Java Concurrent Program what should be the Execution File Path
The starting point is to create your own package where you will keep you custom Java Classes. In order to create your package, you should follow the Oracle Standards. As per the Oracle Standard, you should create the package using the following structure:  oracle.apps.yourcustomappname.packagename.subpackage.....
Let us presume the name of your custom application is xxcust and the directory where we will keep your new custom Java class is request directory. And the request directory is under cp directory. This means the fully qualified path for our custom Java Class is oracle.apps.xxcust.cp.request.
Now you will be wondering where to create this above mentioned directory/package structure. You have to create this structure in the EBS Application Server under $JAVA_TOP. Since our directory structure starts with oracle followed by apps, both of these directories already exist. So to create our directory structure, follow the below steps:
$cd $JAVA_TOP/oracle/apps
$mkdir -p xxcust/cp/request
$cd xxcust/cp/request
$pwd
....../oracle/apps/xxcust/cp/request
And this is directory where you write your Java Class.

Structure of your Java Class
Oracle EBS provides an interface Class "JavaConcurrentProgram" with abstract method runProgram() which passes the concurrent processing context "CpContext". And you will write you business logic in the runProgram() method. The CpContext will feed log and output file specific information to the runProgram method.The name of the custom class which will implement the JavaConcurrentProgram interface will be used in EBS to register it as a Java concurrent program Executable.
The basic structure of the Java Class (to be used as a concurrent program) is given below:

//**************Template.java****************************//
package oracle.apps.xxcust.cp.request;

import oracle.apps.fnd.cp.request.*;

public class Template implements JavaConcurrentProgram{

    public void runProgram(CpContext ctx){

        // Write your Business Logic here
       
         //This line will signal successful end of the program to the concurrent manager.
         ctx.getReqCompletion().setCompletion(ReqCompletion.NORMAL, "Completed");         

    }

}
//**************End of Template.java********************//


Example
Let us start writing a Java Concurrent program "Test.java" to do the following tasks:

  1. Write text to Output File
  2. Write Test to Log File
  3. Get userName of the concurrent Program run requester and write to Output File
  4. Send success message to the Concurrent Manager
Make sure you are inside " $JAVA_TOP/oracle/apps/xxcust/cp/request " location, and create the following Test.java file.
//*************************Test.java****************************/
package oracle.apps.xxadfd.cp.request;

import oracle.apps.fnd.cp.request.*;
public class Test implements JavaConcurrentProgram{
        
        public void runProgram(CpContext ctx){

                // get reference to Out and Log files
                OutFile out = ctx.getOutFile();
                LogFile log = ctx.getLogFile();

                out.writeln("This is my Output file ! ");
                log.writeln("This is my Log file",LogFile.STATEMENT);

                //get concurrent program Request Details
                ReqDetails rDet = ctx.getReqDetails();
                String userName = rDet.getUserInfo().getUserName();
                
                // write username to the Out File
                out.writeln("User Name = "+userName);
                
                // Success message to the Concurrent Manager
                ctx.getReqCompletion().setCompletion(ReqCompletion.NORMAL, "Completed");
        }

}
//***********************End of Test.java****************************/

Compile the Java Program Test.java
Make sure you are inside "$JAVA_TOP/oracle/apps/xxcust/cp/request" location, and run the following command:
$java Test [without extension]
It will generate Test.class file in the current directory i.e."$JAVA_TOP/oracle/apps/xxcust/cp/request".

Program Registration in EBS
Registration of Executable
Executable:                      TestJavaProg [Any Meaningful Name]
Short Name:                     TestJavaProg [Any Meaningful short Name]
Application:                      Application Name [Select (Custom or any other) Application Name]
Execution Method:            Java Concurrent Program
Execution File Name:        Test     [Name of our Java Class Compiled above]
Execution File Patch:        oracle.apps.xxadfd.cp.request   [Our Java Class Package Name]

Registration of Program
Program:                         Test Java Prog
Short Name:                    TESTJAVAPROG
Application:                     Application Name [Select any Application name]
Executable:                     TestJavaProg  [Name of the Executable registered above]