Big Data: On RDDs, Dataframes,Hive QL with Pyspark and SparkR-Part 3

Some people, when confronted with a problem, think “I know, I’ll use regular expressions.” Now they have two problems. – Jamie Zawinski

Some programmers, when confronted with a problem, think “I know, I’ll use floating point arithmetic.” Now they have 1.999999999997 problems. – @tomscott

Some people, when confronted with a problem, think “I know, I’ll use multithreading”. Nothhw tpe yawrve o oblems. – @d6

Some people, when confronted with a problem, think “I know, I’ll use versioning.” Now they have 2.1.0 problems. – @JaesCoyle

Some people, when faced with a problem, think, “I know, I’ll use binary.” Now they have 10 problems. – @nedbat

Introduction

The power of Spark, which operates on in-memory datasets, is the fact that it stores the data as collections using Resilient Distributed Datasets (RDDs), which are themselves distributed in partitions across clusters. RDDs, are a fast way of processing data, as the data is operated on parallel based on the map-reduce paradigm. RDDs can be be used when the operations are low level. RDDs, are typically used on unstructured data like logs or text. For structured and semi-structured data, Spark has a higher abstraction called Dataframes. Handling data through dataframes are extremely fast as they are Optimized using the Catalyst Optimization engine and the performance is orders of magnitude faster than RDDs. In addition Dataframes also use Tungsten which handle memory management and garbage collection more effectively.

The picture below shows the performance improvement achieved with Dataframes over RDDs

Benefits from Project Tungsten

Npte: The above data and graph is taken from the course Big Data Analysis with Apache Spark at edX, UC Berkeley
This post is a continuation of my 2 earlier posts
1. Big Data-1: Move into the big league:Graduate from Python to Pyspark
2. Big Data-2: Move into the big league:Graduate from R to SparkR

In this post I perform equivalent operations on a small dataset using RDDs, Dataframes in Pyspark & SparkR and HiveQL. As in some of my earlier posts, I have used the tendulkar.csv file for this post. The dataset is small and allows me to do most everything from data cleaning, data transformation and grouping etc.
You can clone fork the notebooks from github at Big Data:Part 3

1. RDD – Select all columns of tables

from pyspark import SparkContext 
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
rdd.map(lambda line: (line.split(","))).take(5)
Out[90]: [[‘Runs’, ‘Mins’, ‘BF’, ‘4s’, ‘6s’, ‘SR’, ‘Pos’, ‘Dismissal’, ‘Inns’, ‘Opposition’, ‘Ground’, ‘Start Date’], [’15’, ’28’, ’24’, ‘2’, ‘0’, ‘62.5’, ‘6’, ‘bowled’, ‘2’, ‘v Pakistan’, ‘Karachi’, ’15-Nov-89′], [‘DNB’, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘4’, ‘v Pakistan’, ‘Karachi’, ’15-Nov-89′], [’59’, ‘254’, ‘172’, ‘4’, ‘0’, ‘34.3’, ‘6’, ‘lbw’, ‘1’, ‘v Pakistan’, ‘Faisalabad’, ’23-Nov-89′], [‘8′, ’24’, ’16’, ‘1’, ‘0’, ’50’, ‘6’, ‘run out’, ‘3’, ‘v Pakistan’, ‘Faisalabad’, ’23-Nov-89′]]

1b.RDD – Select columns 1 to 4

from pyspark import SparkContext 
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
rdd.map(lambda line: (line.split(",")[0:4])).take(5)
Out[91]:
[[‘Runs’, ‘Mins’, ‘BF’, ‘4s’],
[’15’, ’28’, ’24’, ‘2’],
[‘DNB’, ‘-‘, ‘-‘, ‘-‘],
[’59’, ‘254’, ‘172’, ‘4’],
[‘8′, ’24’, ’16’, ‘1’]]

1c. RDD – Select specific columns 0, 10

from pyspark import SparkContext 
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=rdd.map(lambda line: (line.split(",")))
df.map(lambda x: (x[10],x[0])).take(5)
Out[92]:
[(‘Ground’, ‘Runs’),
(‘Karachi’, ’15’),
(‘Karachi’, ‘DNB’),
(‘Faisalabad’, ’59’),
(‘Faisalabad’, ‘8’)]

2. Dataframe:Pyspark – Select all columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.show(5)
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|Runs|Mins| BF| 4s| 6s| SR|Pos|Dismissal|Inns|Opposition| Ground|Start Date|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
| 15| 28| 24| 2| 0| 62.5| 6| bowled| 2|v Pakistan| Karachi| 15-Nov-89|
| DNB| -| -| -| -| -| -| -| 4|v Pakistan| Karachi| 15-Nov-89|
| 59| 254|172| 4| 0| 34.3| 6| lbw| 1|v Pakistan|Faisalabad| 23-Nov-89|
| 8| 24| 16| 1| 0| 50| 6| run out| 3|v Pakistan|Faisalabad| 23-Nov-89|
| 41| 124| 90| 5| 0|45.55| 7| bowled| 1|v Pakistan| Lahore| 1-Dec-89|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
only showing top 5 rows

2a. Dataframe:Pyspark- Select specific columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.select("Runs","BF","Mins").show(5)
+—-+—+—-+
|Runs| BF|Mins|
+—-+—+—-+
| 15| 24| 28|
| DNB| -| -|
| 59|172| 254|
| 8| 16| 24|
| 41| 90| 124|
+—-+—+—-+

3. Dataframe:SparkR – Select all columns

# Load the SparkR library
library(SparkR)
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Check the dimensions of the dataframe
df=SparkR::select(tendulkar1,"*")
head(SparkR::collect(df))

  Runs Mins  BF 4s 6s    SR Pos Dismissal Inns Opposition     Ground Start Date
1   15   28  24  2  0  62.5   6    bowled    2 v Pakistan    Karachi  15-Nov-89
2  DNB    -   -  -  -     -   -         -    4 v Pakistan    Karachi  15-Nov-89
3   59  254 172  4  0  34.3   6       lbw    1 v Pakistan Faisalabad  23-Nov-89
4    8   24  16  1  0    50   6   run out    3 v Pakistan Faisalabad  23-Nov-89
5   41  124  90  5  0 45.55   7    bowled    1 v Pakistan     Lahore   1-Dec-89
6   35   74  51  5  0 68.62   6       lbw    1 v Pakistan    Sialkot   9-Dec-89

3a. Dataframe:SparkR- Select specific columns

# Load the SparkR library
library(SparkR)
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Check the dimensions of the dataframe
df=SparkR::select(tendulkar1, "Runs", "BF","Mins")
head(SparkR::collect(df))
Runs BF Mins
1 15 24 28
2 DNB – –
3 59 172 254
4 8 16 24
5 41 90 124
6 35 51 74

4. Hive QL – Select all columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.createOrReplaceTempView('tendulkar1_table')
spark.sql('select  * from tendulkar1_table limit 5').show(10, truncate = False)
+—-+—+—-++—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|Runs|Mins|BF |4s |6s |SR |Pos|Dismissal|Inns|Opposition|Ground |Start Date|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|15 |28 |24 |2 |0 |62.5 |6 |bowled |2 |v Pakistan|Karachi |15-Nov-89 |
|DNB |- |- |- |- |- |- |- |4 |v Pakistan|Karachi |15-Nov-89 |
|59 |254 |172|4 |0 |34.3 |6 |lbw |1 |v Pakistan|Faisalabad|23-Nov-89 |
|8 |24 |16 |1 |0 |50 |6 |run out |3 |v Pakistan|Faisalabad|23-Nov-89 |
|41 |124 |90 |5 |0 |45.55|7 |bowled |1 |v Pakistan|Lahore |1-Dec-89 |
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+

4a. Hive QL – Select specific columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.createOrReplaceTempView('tendulkar1_table')
spark.sql('select  Runs, BF,Mins from tendulkar1_table limit 5').show(10, truncate = False)
+—-+—+—-+
|Runs|BF |Mins|
+—-+—+—-+
|15 |24 |28 |
|DNB |- |- |
|59 |172|254 |
|8 |16 |24 |
|41 |90 |124 |
+—-+—+—-+

5. RDD – Filter rows on specific condition

from pyspark import SparkContext
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=(rdd.map(lambda line: line.split(",")[:])
      .filter(lambda x: x !="DNB")
      .filter(lambda x: x!= "TDNB")
      .filter(lambda x: x!="absent")
      .map(lambda x: [x[0].replace("*","")] + x[1:]))

df.take(5)

Out[97]:
[[‘Runs’,
‘Mins’,
‘BF’,
‘4s’,
‘6s’,
‘SR’,
‘Pos’,
‘Dismissal’,
‘Inns’,
‘Opposition’,
‘Ground’,
‘Start Date’],
[’15’,
’28’,
’24’,
‘2’,
‘0’,
‘62.5’,
‘6’,
‘bowled’,
‘2’,
‘v Pakistan’,
‘Karachi’,
’15-Nov-89′],
[‘DNB’,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘4’,
‘v Pakistan’,
‘Karachi’,
’15-Nov-89′],
[’59’,
‘254’,
‘172’,
‘4’,
‘0’,
‘34.3’,
‘6’,
‘lbw’,
‘1’,
‘v Pakistan’,
‘Faisalabad’,
’23-Nov-89′],
[‘8′,
’24’,
’16’,
‘1’,
‘0’,
’50’,
‘6’,
‘run out’,
‘3’,
‘v Pakistan’,
‘Faisalabad’,
’23-Nov-89′]]

5a. Dataframe:Pyspark – Filter rows on specific condition

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'DNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'TDNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'absent')
tendulkar1 = tendulkar1.withColumn('Runs', regexp_replace('Runs', '[*]', ''))
tendulkar1.show(5)
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|Runs|Mins| BF| 4s| 6s| SR|Pos|Dismissal|Inns|Opposition| Ground|Start Date|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
| 15| 28| 24| 2| 0| 62.5| 6| bowled| 2|v Pakistan| Karachi| 15-Nov-89|
| 59| 254|172| 4| 0| 34.3| 6| lbw| 1|v Pakistan|Faisalabad| 23-Nov-89|
| 8| 24| 16| 1| 0| 50| 6| run out| 3|v Pakistan|Faisalabad| 23-Nov-89|
| 41| 124| 90| 5| 0|45.55| 7| bowled| 1|v Pakistan| Lahore| 1-Dec-89|
| 35| 74| 51| 5| 0|68.62| 6| lbw| 1|v Pakistan| Sialkot| 9-Dec-89|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
only showing top 5 rows

5b. Dataframe:SparkR – Filter rows on specific condition

sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
df=SparkR::select(tendulkar1,"*")
head(SparkR::collect(df))

5c Hive QL – Filter rows on specific condition

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.createOrReplaceTempView('tendulkar1_table')
spark.sql('select  Runs, BF,Mins from tendulkar1_table where Runs NOT IN  ("DNB","TDNB","absent")').show(10, truncate = False)
+—-+—+—-+
|Runs|BF |Mins|
+—-+—+—-+
|15 |24 |28 |
|59 |172|254 |
|8 |16 |24 |
|41 |90 |124 |
|35 |51 |74 |
|57 |134|193 |
|0 |1 |1 |
|24 |44 |50 |
|88 |266|324 |
|5 |13 |15 |
+—-+—+—-+
only showing top 10 rows

6. RDD – Find rows where Runs > 50

from pyspark import SparkContext
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=rdd.map(lambda line: (line.split(",")))
df=rdd.map(lambda line: line.split(",")[0:4]) \
   .filter(lambda x: x[0] not in ["DNB", "TDNB", "absent"])
df1=df.map(lambda x: [x[0].replace("*","")] + x[1:4])
header=df1.first()
df2=df1.filter(lambda x: x !=header)
df3=df2.map(lambda x: [float(x[0])] +x[1:4])
df3.filter(lambda x: x[0]>=50).take(10)
Out[101]: 
[[59.0, '254', '172', '4'],
 [57.0, '193', '134', '6'],
 [88.0, '324', '266', '5'],
 [68.0, '216', '136', '8'],
 [119.0, '225', '189', '17'],
 [148.0, '298', '213', '14'],
 [114.0, '228', '161', '16'],
 [111.0, '373', '270', '19'],
 [73.0, '272', '208', '8'],
 [50.0, '158', '118', '6']]

6a. Dataframe:Pyspark – Find rows where Runs >50

from pyspark.sql import SparkSession

from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'DNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'TDNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'absent')
tendulkar1 = tendulkar1.withColumn("Runs", tendulkar1["Runs"].cast(IntegerType()))
tendulkar1.filter(tendulkar1['Runs']>=50).show(10)
+—-+—-+—+—+—+—–+—+———+—-+————–+————+———-+
|Runs|Mins| BF| 4s| 6s| SR|Pos|Dismissal|Inns| Opposition| Ground|Start Date|
+—-+—-+—+—+—+—–+—+———+—-+————–+————+———-+
| 59| 254|172| 4| 0| 34.3| 6| lbw| 1| v Pakistan| Faisalabad| 23-Nov-89|
| 57| 193|134| 6| 0|42.53| 6| caught| 3| v Pakistan| Sialkot| 9-Dec-89|
| 88| 324|266| 5| 0|33.08| 6| caught| 1| v New Zealand| Napier| 9-Feb-90|
| 68| 216|136| 8| 0| 50| 6| caught| 2| v England| Manchester| 9-Aug-90|
| 114| 228|161| 16| 0| 70.8| 4| caught| 2| v Australia| Perth| 1-Feb-92|
| 111| 373|270| 19| 0|41.11| 4| caught| 2|v South Africa|Johannesburg| 26-Nov-92|
| 73| 272|208| 8| 1|35.09| 5| caught| 2|v South Africa| Cape Town| 2-Jan-93|
| 50| 158|118| 6| 0|42.37| 4| caught| 1| v England| Kolkata| 29-Jan-93|
| 165| 361|296| 24| 1|55.74| 4| caught| 1| v England| Chennai| 11-Feb-93|
| 78| 285|213| 10| 0|36.61| 4| lbw| 2| v England| Mumbai| 19-Feb-93|
+—-+—-+—+—+—+—–+—+———+—-+————–+————+———-+

6b. Dataframe:SparkR – Find rows where Runs >50

# Load the SparkR library
library(SparkR)
sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
df=SparkR::select(tendulkar1,"*")
df=SparkR::filter(tendulkar1, tendulkar1$Runs > 50)
head(SparkR::collect(df))
  Runs Mins  BF 4s 6s    SR Pos Dismissal Inns    Opposition     Ground
1   59  254 172  4  0  34.3   6       lbw    1    v Pakistan Faisalabad
2   57  193 134  6  0 42.53   6    caught    3    v Pakistan    Sialkot
3   88  324 266  5  0 33.08   6    caught    1 v New Zealand     Napier
4   68  216 136  8  0    50   6    caught    2     v England Manchester
5  119  225 189 17  0 62.96   6   not out    4     v England Manchester
6  148  298 213 14  0 69.48   6   not out    2   v Australia     Sydney
  Start Date
1  23-Nov-89
2   9-Dec-89
3   9-Feb-90
4   9-Aug-90
5   9-Aug-90
6   2-Jan-92

 

7 RDD – groupByKey() and reduceByKey()

from pyspark import SparkContext
from pyspark.mllib.stat import Statistics
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=rdd.map(lambda line: (line.split(",")))
df=rdd.map(lambda line: line.split(",")[0:]) \
   .filter(lambda x: x[0] not in ["DNB", "TDNB", "absent"])
df1=df.map(lambda x: [x[0].replace("*","")] + x[1:])
header=df1.first()
df2=df1.filter(lambda x: x !=header)
df3=df2.map(lambda x: [float(x[0])] +x[1:])
df4 = df3.map(lambda x: (x[10],x[0]))
df5=df4.reduceByKey(lambda a,b: a+b,1)
df4.groupByKey().mapValues(lambda x: sum(x) / len(x)).take(10)

[(‘Georgetown’, 81.0),
(‘Lahore’, 17.0),
(‘Adelaide’, 32.6),
(‘Colombo (SSC)’, 77.55555555555556),
(‘Nagpur’, 64.66666666666667),
(‘Auckland’, 5.0),
(‘Bloemfontein’, 85.0),
(‘Centurion’, 73.5),
(‘Faisalabad’, 27.0),
(‘Bridgetown’, 26.0)]

7a Dataframe:Pyspark – Compute mean, min and max

from pyspark.sql.functions import *
tendulkar1= (sqlContext
         .read.format("com.databricks.spark.csv")
         .options(delimiter=',', header='true', inferschema='true')
         .load("/FileStore/tables/tendulkar.csv"))
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'DNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'TDNB')
tendulkar1 = tendulkar1.withColumn('Runs', regexp_replace('Runs', '[*]', ''))
tendulkar1.select('Runs').rdd.distinct().collect()

from pyspark.sql import functions as F
df=tendulkar1[['Runs','BF','Ground']].groupby(tendulkar1['Ground']).agg(F.mean(tendulkar1['Runs']),F.min(tendulkar1['Runs']),F.max(tendulkar1['Runs']))
df.show()
————-+—————–+———+———+
| Ground| avg(Runs)|min(Runs)|max(Runs)|
+————-+—————–+———+———+
| Bangalore| 54.3125| 0| 96|
| Adelaide| 32.6| 0| 61|
|Colombo (PSS)| 37.2| 14| 71|
| Christchurch| 12.0| 0| 24|
| Auckland| 5.0| 5| 5|
| Chennai| 60.625| 0| 81|
| Centurion| 73.5| 111| 36|
| Brisbane|7.666666666666667| 0| 7|
| Birmingham| 46.75| 1| 40|
| Ahmedabad| 40.125| 100| 8|
|Colombo (RPS)| 143.0| 143| 143|
| Chittagong| 57.8| 101| 36|
| Cape Town|69.85714285714286| 14| 9|
| Bridgetown| 26.0| 0| 92|
| Bulawayo| 55.0| 36| 74|
| Delhi|39.94736842105263| 0| 76|
| Chandigarh| 11.0| 11| 11|
| Bloemfontein| 85.0| 15| 155|
|Colombo (SSC)|77.55555555555556| 104| 8|
| Cuttack| 2.0| 2| 2|
+————-+—————–+———+———+
only showing top 20 rows

7b Dataframe:SparkR – Compute mean, min and max

sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
df=SparkR::summarize(SparkR::groupBy(tendulkar1, tendulkar1$Ground), mean = mean(tendulkar1$Runs), minRuns=min(tendulkar1$Runs),maxRuns=max(tendulkar1$Runs))
head(df,20)
          Ground       mean minRuns maxRuns
1      Bangalore  54.312500       0      96
2       Adelaide  32.600000       0      61
3  Colombo (PSS)  37.200000      14      71
4   Christchurch  12.000000       0      24
5       Auckland   5.000000       5       5
6        Chennai  60.625000       0      81
7      Centurion  73.500000     111      36
8       Brisbane   7.666667       0       7
9     Birmingham  46.750000       1      40
10     Ahmedabad  40.125000     100       8
11 Colombo (RPS) 143.000000     143     143
12    Chittagong  57.800000     101      36
13     Cape Town  69.857143      14       9
14    Bridgetown  26.000000       0      92
15      Bulawayo  55.000000      36      74
16         Delhi  39.947368       0      76
17    Chandigarh  11.000000      11      11
18  Bloemfontein  85.000000      15     155
19 Colombo (SSC)  77.555556     104       8
20       Cuttack   2.000000       2       2

Also see
1. My book ‘Practical Machine Learning in R and Python: Third edition’ on Amazon
2.My book ‘Deep Learning from first principles:Second Edition’ now on Amazon
3.The Clash of the Titans in Test and ODI cricket
4. Introducing QCSimulator: A 5-qubit quantum computing simulator in R
5.Latency, throughput implications for the Cloud
6. Simulating a Web Joint in Android
5. Pitching yorkpy … short of good length to IPL – Part 1

To see all posts click Index of Posts

Big Data-2: Move into the big league:Graduate from R to SparkR

This post is a continuation of my earlier post Big Data-1: Move into the big league:Graduate from Python to Pyspark. While the earlier post discussed parallel constructs in Python and Pyspark, this post elaborates similar and key constructs in R and SparkR. While this post just focuses on the programming part of R and SparkR it is essential to understand and fully grasp the concept of Spark, RDD and how data is distributed across the clusters. This post like the earlier post shows how if you already have a good handle of R, you can easily graduate to Big Data with SparkR

Note 1: This notebook has also been published at Databricks community site Big Data-2: Move into the big league:Graduate from R to SparkR

Note 2: You can download this RMarkdown file from Github at Big Data- Python to Pyspark and R to SparkR
1a. Read CSV- R

Note: To upload the CSV to databricks see the video Upload Flat File to Databricks Table

# Read CSV file
tendulkar= read.csv("/dbfs/FileStore/tables/tendulkar.csv",stringsAsFactors = FALSE,na.strings=c(NA,"-"))
#Check the dimensions of the dataframe
dim(tendulkar)
[1] 347  12
1b. Read CSV – SparkR
# Load the SparkR library
library(SparkR)
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Check the dimensions of the dataframe
dim(tendulkar1)
[1] 347  12
2a. Data frame shape – R
# Get the shape of the dataframe in R
dim(tendulkar)
[1] 347  12
2b. Dataframe shape – SparkR

The same ‘dim’ command works in SparkR too!

dim(tendulkar1)
[1] 347  12
3a . Dataframe columns – R
# Get the names
names(tendulkar) # Also colnames(tendulkar)
 [1] "Runs"       "Mins"       "BF"         "X4s"        "X6s"       
 [6] "SR"         "Pos"        "Dismissal"  "Inns"       "Opposition"
[11] "Ground"     "Start.Date"
3b. Dataframe columns – SparkR
names(tendulkar1)
 [1] "Runs"       "Mins"       "BF"         "4s"         "6s"        
 [6] "SR"         "Pos"        "Dismissal"  "Inns"       "Opposition"
[11] "Ground"     "Start Date"
4a. Rename columns – R
names(tendulkar)=c('Runs','Minutes','BallsFaced','Fours','Sixes','StrikeRate','Position','Dismissal','Innings','Opposition','Ground','StartDate')
names(tendulkar)
 [1] "Runs"       "Minutes"    "BallsFaced" "Fours"      "Sixes"     
 [6] "StrikeRate" "Position"   "Dismissal"  "Innings"    "Opposition"
[11] "Ground"     "StartDate"
4b. Rename columns – SparkR
names(tendulkar1)=c('Runs','Minutes','BallsFaced','Fours','Sixes','StrikeRate','Position','Dismissal','Innings','Opposition','Ground','StartDate')
names(tendulkar1)
 [1] "Runs"       "Minutes"    "BallsFaced" "Fours"      "Sixes"     
 [6] "StrikeRate" "Position"   "Dismissal"  "Innings"    "Opposition"
[11] "Ground"     "StartDate"
5a. Summary – R
summary(tendulkar)
     Runs              Minutes        BallsFaced         Fours       
 Length:347         Min.   :  1.0   Min.   :  0.00   Min.   : 0.000  
 Class :character   1st Qu.: 33.0   1st Qu.: 22.00   1st Qu.: 1.000  
 Mode  :character   Median : 82.0   Median : 58.50   Median : 4.000  
                    Mean   :125.5   Mean   : 89.75   Mean   : 6.274  
                    3rd Qu.:181.0   3rd Qu.:133.25   3rd Qu.: 9.000  
                    Max.   :613.0   Max.   :436.00   Max.   :35.000  
                    NA's   :18      NA's   :19       NA's   :19      
     Sixes          StrikeRate        Position     Dismissal        
 Min.   :0.0000   Min.   :  0.00   Min.   :2.00   Length:347        
 1st Qu.:0.0000   1st Qu.: 38.09   1st Qu.:4.00   Class :character  
 Median :0.0000   Median : 52.25   Median :4.00   Mode  :character  
 Mean   :0.2097   Mean   : 51.79   Mean   :4.24                     
 3rd Qu.:0.0000   3rd Qu.: 65.09   3rd Qu.:4.00                     
 Max.   :4.0000   Max.   :166.66   Max.   :7.00                     
 NA's   :18       NA's   :20       NA's   :18                       
    Innings       Opposition           Ground           StartDate        
 Min.   :1.000   Length:347         Length:347         Length:347        
 1st Qu.:1.000   Class :character   Class :character   Class :character  
 Median :2.000   Mode  :character   Mode  :character   Mode  :character  
 Mean   :2.376                                                           
 3rd Qu.:3.000                                                           
 Max.   :4.000                                                           
 NA's   :1
5b. Summary – SparkR
summary(tendulkar1)
SparkDataFrame[summary:string, Runs:string, Minutes:string, BallsFaced:string, Fours:string, Sixes:string, StrikeRate:string, Position:string, Dismissal:string, Innings:string, Opposition:string, Ground:string, StartDate:string]
6a. Displaying details of dataframe with str() – R
str(tendulkar)
'data.frame':	347 obs. of  12 variables:
 $ Runs      : chr  "15" "DNB" "59" "8" ...
 $ Minutes   : int  28 NA 254 24 124 74 193 1 50 324 ...
 $ BallsFaced: int  24 NA 172 16 90 51 134 1 44 266 ...
 $ Fours     : int  2 NA 4 1 5 5 6 0 3 5 ...
 $ Sixes     : int  0 NA 0 0 0 0 0 0 0 0 ...
 $ StrikeRate: num  62.5 NA 34.3 50 45.5 ...
 $ Position  : int  6 NA 6 6 7 6 6 6 6 6 ...
 $ Dismissal : chr  "bowled" NA "lbw" "run out" ...
 $ Innings   : int  2 4 1 3 1 1 3 2 3 1 ...
 $ Opposition: chr  "v Pakistan" "v Pakistan" "v Pakistan" "v Pakistan" ...
 $ Ground    : chr  "Karachi" "Karachi" "Faisalabad" "Faisalabad" ...
 $ StartDate : chr  "15-Nov-89" "15-Nov-89" "23-Nov-89" "23-Nov-89" ...
6b. Displaying details of dataframe with str() – SparkR
str(tendulkar1)
'SparkDataFrame': 12 variables:
 $ Runs      : chr "15" "DNB" "59" "8" "41" "35"
 $ Minutes   : chr "28" "-" "254" "24" "124" "74"
 $ BallsFaced: chr "24" "-" "172" "16" "90" "51"
 $ Fours     : chr "2" "-" "4" "1" "5" "5"
 $ Sixes     : chr "0" "-" "0" "0" "0" "0"
 $ StrikeRate: chr "62.5" "-" "34.3" "50" "45.55" "68.62"
 $ Position  : chr "6" "-" "6" "6" "7" "6"
 $ Dismissal : chr "bowled" "-" "lbw" "run out" "bowled" "lbw"
 $ Innings   : chr "2" "4" "1" "3" "1" "1"
 $ Opposition: chr "v Pakistan" "v Pakistan" "v Pakistan" "v Pakistan" "v Pakistan" "v Pakistan"
 $ Ground    : chr "Karachi" "Karachi" "Faisalabad" "Faisalabad" "Lahore" "Sialkot"
 $ StartDate : chr "15-Nov-89" "15-Nov-89" "23-Nov-89" "23-Nov-89" "1-Dec-89" "9-Dec-89"
7a. Head & tail -R
print(head(tendulkar),3)
print(tail(tendulkar),3)
 Runs Minutes BallsFaced Fours Sixes StrikeRate Position Dismissal Innings
1   15      28         24     2     0      62.50        6    bowled       2
2  DNB      NA         NA    NA    NA         NA       NA             4
3   59     254        172     4     0      34.30        6       lbw       1
4    8      24         16     1     0      50.00        6   run out       3
5   41     124         90     5     0      45.55        7    bowled       1
6   35      74         51     5     0      68.62        6       lbw       1
  Opposition     Ground StartDate
1 v Pakistan    Karachi 15-Nov-89
2 v Pakistan    Karachi 15-Nov-89
3 v Pakistan Faisalabad 23-Nov-89
4 v Pakistan Faisalabad 23-Nov-89
5 v Pakistan     Lahore  1-Dec-89
6 v Pakistan    Sialkot  9-Dec-89
    Runs Minutes BallsFaced Fours Sixes StrikeRate Position Dismissal Innings
342   37     125         81     5     0      45.67        4    caught       2
343   21      71         23     2     0      91.30        4   run out       4
344   32      99         53     5     0      60.37        4       lbw       2
345    1       8          5     0     0      20.00        4       lbw       4
346   10      41         24     2     0      41.66        4       lbw       2
347   74     150        118    12     0      62.71        4    caught       2
       Opposition  Ground StartDate
342   v Australia  Mohali 14-Mar-13
343   v Australia  Mohali 14-Mar-13
344   v Australia   Delhi 22-Mar-13
345   v Australia   Delhi 22-Mar-13
346 v West Indies Kolkata  6-Nov-13
347 v West Indies  Mumbai 14-Nov-13
7b. Head – SparkR
head(tendulkar1,3)
  Runs Minutes BallsFaced Fours Sixes StrikeRate Position Dismissal Innings
1   15      28         24     2     0       62.5        6    bowled       2
2  DNB       -          -     -     -          -        -         -       4
3   59     254        172     4     0       34.3        6       lbw       1
  Opposition     Ground StartDate
1 v Pakistan    Karachi 15-Nov-89
2 v Pakistan    Karachi 15-Nov-89
3 v Pakistan Faisalabad 23-Nov-89
8a. Determining the column types with sapply -R
sapply(tendulkar,class)
       Runs     Minutes  BallsFaced       Fours       Sixes  StrikeRate 
"character"   "integer"   "integer"   "integer"   "integer"   "numeric" 
   Position   Dismissal     Innings  Opposition      Ground   StartDate 
  "integer" "character"   "integer" "character" "character" "character"
8b. Determining the column types with printSchema – SparkR
printSchema(tendulkar1)
root
 |-- Runs: string (nullable = true)
 |-- Minutes: string (nullable = true)
 |-- BallsFaced: string (nullable = true)
 |-- Fours: string (nullable = true)
 |-- Sixes: string (nullable = true)
 |-- StrikeRate: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Dismissal: string (nullable = true)
 |-- Innings: string (nullable = true)
 |-- Opposition: string (nullable = true)
 |-- Ground: string (nullable = true)
 |-- StartDate: string (nullable = true)
9a. Selecting columns – R
library(dplyr)
df=select(tendulkar,Runs,BallsFaced,Minutes)
head(df,5)
  Runs BallsFaced Minutes
1   15         24      28
2  DNB         NA      NA
3   59        172     254
4    8         16      24
5   41         90     124
9b. Selecting columns – SparkR
library(SparkR)
Sys.setenv(SPARK_HOME="/usr/hdp/2.6.0.3-8/spark")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")
df=SparkR::select(tendulkar1, "Runs", "BF","Mins")
head(SparkR::collect(df))
  Runs  BF Mins
1   15  24   28
2  DNB   -    -
3   59 172  254
4    8  16   24
5   41  90  124
6   35  51   74
10a. Filter rows by criteria – R
library(dplyr)
df=tendulkar %>% filter(Runs > 50)
head(df,5)
  Runs Minutes BallsFaced Fours Sixes StrikeRate Position Dismissal Innings
1  DNB      NA         NA    NA    NA         NA       NA             4
2   59     254        172     4     0      34.30        6       lbw       1
3    8      24         16     1     0      50.00        6   run out       3
4   57     193        134     6     0      42.53        6    caught       3
5   88     324        266     5     0      33.08        6    caught       1
     Opposition     Ground StartDate
1    v Pakistan    Karachi 15-Nov-89
2    v Pakistan Faisalabad 23-Nov-89
3    v Pakistan Faisalabad 23-Nov-89
4    v Pakistan    Sialkot  9-Dec-89
5 v New Zealand     Napier  9-Feb-90
10b. Filter rows by criteria – SparkR
df=SparkR::filter(tendulkar1, tendulkar1$Runs > 50)
head(SparkR::collect(df))
  Runs Mins  BF 4s 6s    SR Pos Dismissal Inns     Opposition       Ground
1   59  254 172  4  0  34.3   6       lbw    1     v Pakistan   Faisalabad
2   57  193 134  6  0 42.53   6    caught    3     v Pakistan      Sialkot
3   88  324 266  5  0 33.08   6    caught    1  v New Zealand       Napier
4   68  216 136  8  0    50   6    caught    2      v England   Manchester
5  114  228 161 16  0  70.8   4    caught    2    v Australia        Perth
6  111  373 270 19  0 41.11   4    caught    2 v South Africa Johannesburg
  Start Date
1  23-Nov-89
2   9-Dec-89
3   9-Feb-90
4   9-Aug-90
5   1-Feb-92
6  26-Nov-92
11a. Unique values -R
unique(tendulkar$Runs)
  [1] "15"   "DNB"  "59"   "8"    "41"   "35"   "57"   "0"    "24"   "88"  
 [11] "5"    "10"   "27"   "68"   "119*" "21"   "11"   "16"   "7"    "40"  
 [21] "148*" "6"    "17"   "114"  "111"  "1"    "73"   "50"   "9*"   "165" 
 [31] "78"   "62"   "TDNB" "28"   "104*" "71"   "142"  "96"   "43"   "11*" 
 [41] "34"   "85"   "179"  "54"   "4"    "0*"   "52*"  "2"    "122"  "31"  
 [51] "177"  "74"   "42"   "18"   "61"   "36"   "169"  "9"    "15*"  "92"  
 [61] "83"   "143"  "139"  "23"   "148"  "13"   "155*" "79"   "47"   "113" 
 [71] "67"   "136"  "29"   "53"   "124*" "126*" "44*"  "217"  "116"  "52"  
 [81] "45"   "97"   "20"   "39"   "201*" "76"   "65"   "126"  "36*"  "69"  
 [91] "155"  "22*"  "103"  "26"   "90"   "176"  "117"  "86"   "12"   "193" 
[101] "16*"  "51"   "32"   "55"   "37"   "44"   "241*" "60*"  "194*" "3"   
[111] "32*"  "248*" "94"   "22"   "109"  "19"   "14"   "28*"  "63"   "64"  
[121] "101"  "122*" "91"   "82"   "56*"  "154*" "153"  "49"   "10*"  "103*"
[131] "160"  "100*" "105*" "100"  "106"  "84"   "203"  "98"   "38"   "214" 
[141] "53*"  "111*" "146"  "14*"  "56"   "80"   "25"   "81"   "13*"
11b. Unique values – SparkR
head(SparkR::distinct(tendulkar1[,"Runs"]),5)
  Runs
1 119*
2    7
3   51
4  169
5  32*
12a. Aggregate – Mean, min and max – R
library(dplyr)
library(magrittr)
a <- tendulkar$Runs != "DNB"
tendulkar <- tendulkar[a,]
dim(tendulkar)

# Remove rows with 'TDNB'
c <- tendulkar$Runs != "TDNB"
tendulkar <- tendulkar[c,]

# Remove rows with absent
d <- tendulkar$Runs != "absent"
tendulkar <- tendulkar[d,]
dim(tendulkar)

# Remove the "* indicating not out
tendulkar$Runs <- as.numeric(gsub("\\*","",tendulkar$Runs))
c <- complete.cases(tendulkar)

#Subset the rows which are complete
tendulkar <- tendulkar[c,]
print(dim(tendulkar))
df <-tendulkar %>%  group_by(Ground) %>% summarise(meanRuns= mean(Runs), minRuns=min(Runs), maxRuns=max(Runs)) 
#names(tendulkar)
head(df)
[1] 327  12
# A tibble: 6 x 4
  Ground       meanRuns minRuns maxRuns
                   
1 Adelaide        32.6       0.    153.
2 Ahmedabad       40.1       4.    217.
3 Auckland         5.00      5.      5.
4 Bangalore       57.9       4.    214.
5 Birmingham      46.8       1.    122.
6 Bloemfontein    85.0      15.    155.
12b. Aggregate- Mean, Min, Max – SparkR
sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
df=SparkR::summarize(SparkR::groupBy(tendulkar1, tendulkar1$Ground), mean = mean(tendulkar1$Runs), minRuns=min(tendulkar1$Runs),maxRuns=max(tendulkar1$Runs))
head(df,20)
[1] 347  12
[1] 330  12
[1] 329  12
[1] 329  12
          Ground       mean minRuns maxRuns
1      Bangalore  54.312500       0      96
2       Adelaide  32.600000       0      61
3  Colombo (PSS)  37.200000      14      71
4   Christchurch  12.000000       0      24
5       Auckland   5.000000       5       5
6        Chennai  60.625000       0      81
7      Centurion  73.500000     111      36
8       Brisbane   7.666667       0       7
9     Birmingham  46.750000       1      40
10     Ahmedabad  40.125000     100       8
11 Colombo (RPS) 143.000000     143     143
12    Chittagong  57.800000     101      36
13     Cape Town  69.857143      14       9
14    Bridgetown  26.000000       0      92
15      Bulawayo  55.000000      36      74
16         Delhi  39.947368       0      76
17    Chandigarh  11.000000      11      11
18  Bloemfontein  85.000000      15     155
19 Colombo (SSC)  77.555556     104       8
20       Cuttack   2.000000       2       2
13a Using SQL with SparkR
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(tendulkar1, "tendulkar2")

# SQL statements can be run by using the sql method
df=SparkR::sql("SELECT * FROM tendulkar2 WHERE Ground='Karachi'")

head(df)

  Runs Mins BF 4s 6s    SR Pos Dismissal Inns Opposition  Ground Start Date
1   15   28 24  2  0  62.5   6    bowled    2 v Pakistan Karachi  15-Nov-89
2  DNB    -  -  -  -     -   -         -    4 v Pakistan Karachi  15-Nov-89
3   23   49 29  5  0 79.31   4    bowled    2 v Pakistan Karachi  29-Jan-06
4   26   74 47  5  0 55.31   4    bowled    4 v Pakistan Karachi  29-Jan-06
Conclusion

This post discusses some of the key constructs in R and SparkR and how one can transition from R to SparkR fairly easily. I will be adding more constructs later. Do check back!

You may also like
1. Exploring Quantum Gate operations with QCSimulator
2. Deep Learning from first principles in Python, R and Octave – Part 4
3. A Bluemix recipe with MongoDB and Node.js
4. Practical Machine Learning with R and Python – Part 5
5. Introducing cricketr! : An R package to analyze performances of cricketers

To see all posts click Index of posts

Big Data-1: Move into the big league:Graduate from Python to Pyspark

This post discusses similar constructs in Python and Pyspark. As in my earlier post R vs Python: Different similarities and similar differences the focus is on the key and common constructs to highlight the similarities.

Important Note:You can also access this notebook at databricks public site  Big Data-1: Move into the big league:Graduate from Python to Pyspark (the formatting here is much better!!).

For this notebook I have used Databricks community edition

You can download the notebook from Github at Big Data-1:PythontoPysparkAndRtoSparkR

Hope you found this useful!

Note: There are still a few more important constructs which I will be adding to this post.

Also see
1. My book “Deep Learning from first principles” now on Amazon
2. My book ‘Practical Machine Learning in R and Python: Second edition’ on Amazon
3. Re-introducing cricketr! : An R package to analyze performances of cricketers
4. GooglyPlus: yorkr analyzes IPL players, teams, matches with plots and tables
5. Deblurring with OpenCV: Weiner filter reloaded
6. Design Principles of Scalable, Distributed Systems

Perils and pitfalls of Big Data

Big Data is hurtling towards us in a big way. It is already in the news and the blip seems to getting bigger. Big Data will soon become the key driver for almost any kind of decision that is to be made in manufacturing, retail, finance all the way to astronomy, oceanography etc. The common aspect of all industries and areas is that data is generated in the order of several petabytes to exabytes. Big Data is the technique to analyze such large volumes of data.

Big Data represents the technique to handle the huge deluge of data that is already becoming enmeshed in our lives. Multiple disparate, varied streams of data (text. tweets, click streams, html) flow through with tremendous volume & velocity. The key aspects of data in the world are the volume, variety and the velocity. It is never ending and never seems to stop. How do we handle this deluge? How do we make sense of this data is what Big Data is all about.

Big Data provides algorithms to find patterns, determine trends or classify data depending on the features provided. It is supposed to enable the decision makes to make key decisions based on the answers the algorithms spew forth.

Big Data is also complicated by the fact that data comes is multiple forms from click streams, tweets, html, texts, CSVs, structured and non –structured data.

The ability to detect patterns, determine trends, classify, identify outliers is no easy task

In this post I try to take a philosophical look at Big Data and ask whether it can really help us. Will it help or will take is on wild goose chase? Can we trust the results?

Big Data depends on algorithms to make sense of data. Big Data deals with data that is in the order of Petabytes to Exabytes. At this scale with multiple features our cognitive abilities are of no use. We must rely on machines and algorithms to make sense of these large amounts of data. Our mind can handle a few hundred data points and at most 3 dimensions. Beyond that the data  can hardly make any sense.

Data by itself, in the absence of features & algorithms, is indistinguishable from noise. It is data science that makes sense of data. Data science separates the signal from the noise.

It is the algorithms that try to determine the best fit for a given set of data. But how reliable are the results. For example let us take the following case

1

An unsupervised learning algorithm for the above data points could try to separate the data into 2 sets. Clearly this is one way but what is more appropriate is that we have 2 shapes, the circle & the rectangle. A machine algorithm would try to work based on the features that we choose. Are we in a position to decide whether the answer the algorithm gives us is correct? We have no way of knowing because the amount of data is beyond our cognitive capabilities,

In other words, Big Data is full of perils and pitfalls.

When we let the machine to analyze on our behalf the possibility of coming to a wrong conclusion is fairly high.  This coupled with the fact that we are sometimes led to erroneous judgments, as discussed below, the problem is further compounded.

In his book “Thinking fast, thinking slow” Daniel Kahneman discusses several situations where our mind falls into the traps of lazy thinking. We come to wrong conclusions. Also our minds tend to detect patterns in data where there are none. Sometimes according to Kahneman ‘randomness appears as regularity or a tendency to cluster’. Also he says ‘the tendency to see patterns in randomness is overwhelming’. We could argue that in Big Data it is the algorithm that is determining the pattern we could be tricked into coming to false conclusions. Sometimes the human mind sees causality where there is none. Occasionally we fail to see the obvious.

In the ‘famous gorilla experiment’ the researchers tried to assess selective attention. The participants are asked to count the number of passes those in white t-shirts make. Surprisingly a large number of the participants were complete oblivious a gorilla that appears midway in the video. When we, as human fail to see such large objects, can we expect the machine to accurately identify patterns and perform accurate classifications?

There are techniques that help in determining false positives for e.g. the Bonferroni correction. Simply put the Bonferroni correction tries to determine the possibility of getting at least 1 significant result when one is testing 20 hypothesis simultaneously. If we want to test 20 hypotheses with the significance of 0.05 then the probability of at least 1 significant result is

P(at least one significant result) = 1 – P(no significant results)

= 1 – (1 – 0:05)^20

= 0.64

So, with 20 tests being considered, we have a 64% chance of observing at least one significant result, even if all of the tests are actually not significant. This would be a false positive.

Given that our ability to come to significant conclusions depends largely on being able to choose appropriate features, we must also be able to maneuver between false negatives and false positives. In addition we must also take into account the fallibility of the human mind.

Clearly, Big Data is the future! However with Big Data we are really on treacherous, slippery ground!

Find me on Google+

Big Data – Getting bigger!

Published in Telecom Asia – Big Data is getting bigger

There are two very significant ways that our world has changed in the past decade. Firstly, we are more “connected”. Secondly we are “awash with data.” In a planet with 7 billion people there are now 2 billion PCs and upward of 6 billion mobile connections. Besides the connection which we as human beings have there are now numerous connections to the internet from devices, sensors and actuators. In other words the world is getting more and more instrumented. There are in excess of 30 billion RFID tags which enable tracking of goods as they move from warehouse, to retail store, sensors on cars and bridges besides cardiac implants in the human body that are constantly sending a stream of data to the network (do look at my post The Internet of Things” . In addition we have the emergence of the Smart Grid with its millions and millions of smart meters that are capable of sensing power loads and appropriately redistributing power and drawing less power during peak hours.

All these devices be it laptops, cell phones, sensors, RFIDs or smart meters are sending enormous amounts of data to the network. In other words there is an enormous data overload happening in the networks of today. According to a Cisco report the projected increase in data traffic between 2014 and 2015 is of the order of 200 exabytes (10^18)). In addition the report states that the total number of connected to the network will be twice the world population or around 15 billion).

Fortunately the explosion in data has been accompanied by falling prices in storage and extraordinary increases in processing capacity. The data that is generated by the devices by the devices, cell phones, PC etc by themselves are useless. However if processed they can provide insights into trends and patterns which can be used to make key decisions. For e.g. the data exhaust that comes from a user’s browsing trail, click stream provide important insight into user behavior which can be mined to make important decisions. Similarly inputs from social media like Twitter, Facebook provide businesses with key inputs which can be used for making business decisions. Call Detail records that are created for mobile calls can also be a source of user behavior. Data from retail store provide insights into consumer choices. For all these to happen the enormous amounts of data has to be analyzed using algorithms to determine statistical trends, patterns and tendencies in the data.

It is here that Big Data enters the picture. Big Data enables the management of the 3 V’s of data , namely volume, velocity and variety. As mentioned above the volume of data is growing at an exponential rate and should exceed 200 exabytes by 2015. The rate at which the data is generated, or the velocity, is also growing phenomenally given the variety and the number of devices that are connected to the network. Besides there is a tremendous variety to the data. Data is both structured, semi-structured and unstructured. Logs could be in plain text, CSV,XML, JSON and so on. The issue of 3 V’s of data makes Big Data most suited for crunching this enormous proliferation of data at the velocity at which it is generated.

Big Data : Big Data or Analytics (see my post “The Rise of Analytics” ) deals with the algorithms that analyze petabytes (10^15)of data and identify key patterns in them. The patterns that are so identified can be used to make important predictions in the future. For example Big Data has been used by energy companies in identifying key locations for positioning their wind turbines. To identify the precise location requires that petabytes of data be crunched rapidly and appropriate patterns be identified. There are several applications of Big Data including identifying brand sentiment from social media, to customer behavior from click exhaust to identifying optimal power usage by consumers.

The key difference between Big Data and traditional processing methods are that the volume of data that has be processed and the speed with which it has to be processed. As mentioned before the 3 V’s of volume, velocity and variety make traditional methods unsuitable for handling this data. In this context, besides the key algorithms of analytics another player is extremely important in Big Data – that is Hadoop. Hadoop is a processing technique that involves tremendous parallelization of the task (for details look at To Hadoop, or not to Hadoop)

The Hadoop Ecosystem – Hadoop had its origins at Google during its work with the Google’s File System (GFS) and the Map Reduce programming paradigm.

HDFS and Map-Reduce : Hadoop in essence is the Hadoop Distributed File System (HDFS) and the Map Reduce paradigm. The Hadoop System is made up of thousands of distributed commodity servers. The data is stored in the HDFS in blocks of 64 MB or 128 MB. The data is replicated among two or more servers to maintain redundancy. Since Hadoop is made of regular commodity servers which are prone to failures, fault tolerance is included by design. The Map Reduce Paradigm essentially breaks a job into multiple tasks which are executed in parallel. Initially the “Map” part processes the input data and outputs a pair of tuples. The “Reduce” part then scans the pair of tuples and generates a consolidated output. For e.g. The “map” part could count the number of occurrences of different words in different sets of files and output the words and their count as pairs. The “reduce” would then sum up the counts of the word from the individual ‘map’ parts and provide the total occurrences of the words in multiple files.

Pig and PigLatin : This is a programming language developed at Yahoo to relieve programmers of the intricacies of programming the Map-Reduce and assigning tasks to individual parts. Pig is made up of two parts namely PigLatin, the language and the environment in which it will execute.

Hive: Hive is a Hadoop run-time support structure that was developed by Facebook. Hive has a distinct SQL flavor to it and also simplifies the task of Hadoop programming.

JAQL : JAQL is a declarative query language developed by IBM for handling JSON objects. JAQL is another programming paradigm that is used to programming Hadoop.

Conclusion: It is a foregone conclusion that Big Data and Hadoop will take center stage in the not too distant future given the explosion of data and the dire need of being able to glean useful business insights from them. Big Data and its algorithms provide the way for identifying useful pearls of wisdom from otherwise useless data. Big Data is bound to become mission critical in the enterprises of the future.

Find me on Google+

The rise of analytics

Published in The Hindu – The rise of analytics

We are slowly, but surely, heading towards the age of “information overload”. The Sloan Digital Sky Survey started in the year 2000 returned around 620 terabytes of data in 11 months — more data than had ever been amassed in the entire history of astronomy.

The Large Hadron Collider (LHC) at CERN, Europe’s particle physics laboratory, in Geneva will during its search for the origins of the universe and the elusive Higgs particle, early next year, spew out terabytes of data in its wake. Now there are upward of five billion devices connected to the Internet and the numbers are showing no signs of slowing down.

A recent report from Cisco, the data networking giant, states that the total data navigating the Net will cross 1/2 a zettabyte (10 {+2} {+1}) by the year 2013.

Such astronomical volumes of data are also handled daily by retail giants including Walmart and Target and telcos such as AT&T and Airtel. Also, advances in the Human Genome Project and technologies like the “Internet of Things” are bound to throw up large quantities of data.

The issue of storing data is now slowly becoming non-existent with the plummeting prices of semi-conductor memory and processors coupled with a doubling of their capacity every 18 months with the inevitability predicted by Moore’s law.

Plumbing the depths

Raw data is by itself quite useless. Data has to be classified, winnowed and analysed into useful information before if it can be utilised. This is where analytics and data mining come into play. Analytics, once the exclusive preserve of research labs and academia, has now entered the mainstream. Data mining and analytics are now used across a broad swath of industries — retail, insurance, manufacturing, healthcare and telecommunication. Analytics enables the extraction of intelligence, identification of trends and the ability to highlight the non-obvious from raw, amorphous data. Using the intelligence that is gleaned from predictive analytics, businesses can make strategic game-changing decisions.

Analytics uses statistical methods to classify data, determine correlations, identify patterns, and highlight and detect deviations among large data sets. Analytics includes in its realms complex software algorithms such as decision trees and neural nets to make predictions from existing data sets. For e.g. a retail store would be interested in knowing the buying patterns of its consumers. If the store could determine that product Y is almost always purchased when product X is purchased then the store could come up with clever schemes like an additional discount on product Z when both products X & Y are purchased. Similarly, telcos could use analytics to identify predominant trends that promote customer loyalty.

Studying behaviour

Telcos could come with voice and data plans that attract customers based on consumer behaviour, after analysing data from its point of sale and retail stores. They could use analytics to determine causes for customer churn and come with strategies to prevent it.

Analytics has also been used in the health industry in predicting and preventing fatal infections in infants based on patterns in real-time data like blood pressure, heart rate and respiration.

Analytics requires at its disposal large processing power. Advances in this field have been largely fuelled by similar advances in a companion technology, namely cloud computing. The latter allows computing power to be purchased on demand almost like a utility and has been a key enabler for analytics.

Data mining and analytics allows industries to plumb the data sets that are held in the organisations through the process of selecting, exploring and modelling large amount of data to uncover previously unknown data patterns which can be channelised to business advantage.

Analytics help in unlocking the secrets hidden in data and provide real insights to businesses; and enable businesses and industries to make intelligent and informed choices.

In this age of information deluge, data mining and analytics are bound to play an increasingly important role and will become indispensable to the future of businesses.

Find me on Google+