Big Data 7: yorkr waltzes with Apache NiFi

In this post, I construct an end-to-end Apache NiFi pipeline with my R package yorkr. This post is a mirror of my earlier post Big Data-5: kNiFing through cricket data with yorkpy based on my Python package yorkpy. The  Apache NiFi Data Pipeilne  flows all the way from the source, where the data is obtained, all the way  to target analytics output. Apache NiFi was created to automate the flow of data between systems.  NiFi dataflows enable the automated and managed flow of information between systems. This post automates the flow of data from Cricsheet, from where the zip file it is downloaded, unpacked, processed, transformed and finally T20 players are ranked.

This post uses the functions of my R package yorkr to rank IPL players. This is a example flow, of a typical Big Data pipeline where the data is ingested from many diverse source systems, transformed and then finally insights are generated. While I execute this NiFi example with my R package yorkr, in a typical Big Data pipeline where the data is huge, of the order of 100s of GB, we would be using the Hadoop ecosystem with Hive, HDFS Spark and so on. Since the data is taken from Cricsheet, which are few Megabytes, this approach would suffice. However if we hypothetically assume that there are several batches of cricket data that are being uploaded to the source, of different cricket matches happening all over the world, and the historical data exceeds several GBs, then we could use a similar Apache NiFi pattern to process the data and generate insights. If the data is was large and distributed across the Hadoop cluster , then we would need to use SparkR or SparklyR to process the data.

This is shown below pictorially

While this post displays the ranks of IPL batsmen, it is possible to create a cool dashboard using UI/UX technologies like AngularJS/ReactJS.  Take a look at my post Big Data 6: The T20 Dance of Apache NiFi and yorkpy where I create a simple dashboard of multiple analytics

My R package yorkr can handle both men’s and women’s ODI, and all formats of T20 in Cricsheet namely Intl. T20 (men’s, women’s), IPL, BBL, Natwest T20, PSL, Women’s BBL etc. To know more details about yorkr see Revitalizing R package yorkr

The code can be forked from Github at yorkrWithApacheNiFi

You can take a look at the live demo of the NiFi pipeline at yorkr waltzes with Apache NiFi

 

Basic Flow

1. Overall flow

The overall NiFi flow contains 2 Process Groups a) DownloadAnd Unpack. b) Convert and Rank IPL batsmen. While it appears that the Process Groups are disconnected, they are not. The first process group downloads the T20 zip file, unpacks the. zip file and saves the YAML files in a specific folder. The second process group monitors this folder and starts processing as soon the YAML files are available. It processes the YAML converting it into dataframes before storing it as CSV file. The next  processor then does the actual ranking of the batsmen before writing the output into IPLrank.txt

1.1 DownloadAndUnpack Process Group

This process group is shown below

 

1.1.1 GetT20Data

The GetT20Data Processor downloads the zip file given the URL

The ${T20data} variable points to the specific T20 format that needs to be downloaded. I have set this to https://cricsheet.org/downloads/ipl.zip. This could be set any other data set. In fact we could have parallel data flows for different T20/ Sports data sets and generate

1.1.2 SaveUnpackedData

This processor stores the YAML files in a predetermined folder, so that the data can be picked up  by the 2nd Process Group for processing

 

1.2 ProcessAndRankT20Players Process Group

This is the second process group which converts the YAML files to pandas dataframes before storing them as. CSV files. The RankIPLPlayers will then read all the CSV files, stack them and then proceed to rank the IPL players. The Process Group is shown below

 

1.2.1 ListFile and FetchFile Processors

The left 2 Processors ListFile and FetchFile get all the YAML files from the folder and pass it to the next processor

1.2.2 convertYaml2DataFrame Processor

The convertYaml2DataFrame Processor uses the ExecuteStreamCommand which call Rscript. The Rscript invoked the yorkr function convertYaml2DataframeT20() as shown below

 

I also use a 16 concurrent tasks to convert 16 different flowfiles at once

 

library(yorkr)
args<-commandArgs(TRUE)
convertYaml2RDataframeT20(args[1], args[2], args[3])

1.2.3 MergeContent Processor

This processor’s only job is to trigger the rankIPLPlayers when all the FlowFiles have merged into 1 file.

1.2.4 RankT20Players

This processor is an ExecuteStreamCommand Processor that executes a Rscript which invokes a yorrkr function rankIPLT20Batsmen()

library(yorkr)
args<-commandArgs(TRUE)

rankIPLBatsmen(args[1],args[2],args[3])

1.2.5 OutputRankofT20Player Processor

This processor writes the generated rank to an output file.

 

1.3 Final Ranking of IPL T20 players

The Nodejs based web server picks up this file and displays on the web page the final ranks (the code is based on a good youtube for reading from file)

[1] "Chennai Super Kings"
[1] "Deccan Chargers"
[1] "Delhi Daredevils"
[1] "Kings XI Punjab"
[1] "Kochi Tuskers Kerala"
[1] "Kolkata Knight Riders"
[1] "Mumbai Indians"
[1] "Pune Warriors"
[1] "Rajasthan Royals"
[1] "Royal Challengers Bangalore"
[1] "Sunrisers Hyderabad"
[1] "Gujarat Lions"
[1] "Rising Pune Supergiants"
[1] "Chennai Super Kings-BattingDetails.RData"
[1] "Deccan Chargers-BattingDetails.RData"
[1] "Delhi Daredevils-BattingDetails.RData"
[1] "Kings XI Punjab-BattingDetails.RData"
[1] "Kochi Tuskers Kerala-BattingDetails.RData"
[1] "Kolkata Knight Riders-BattingDetails.RData"
[1] "Mumbai Indians-BattingDetails.RData"
[1] "Pune Warriors-BattingDetails.RData"
[1] "Rajasthan Royals-BattingDetails.RData"
[1] "Royal Challengers Bangalore-BattingDetails.RData"
[1] "Sunrisers Hyderabad-BattingDetails.RData"
[1] "Gujarat Lions-BattingDetails.RData"
[1] "Rising Pune Supergiants-BattingDetails.RData"
# A tibble: 429 x 4
   batsman     matches meanRuns meanSR
   <chr>         <int>    <dbl>  <dbl>
 1 DA Warner       130     37.9   128.
 2 LMP Simmons      29     37.2   106.
 3 CH Gayle        125     36.2   134.
 4 HM Amla          16     36.1   108.
 5 ML Hayden        30     35.9   129.
 6 SE Marsh         67     35.9   120.
 7 RR Pant          39     35.3   135.
 8 MEK Hussey       59     33.8   105.
 9 KL Rahul         59     33.5   128.
10 MN van Wyk        5     33.4   112.
# … with 419 more rows

 

Conclusion

This post demonstrated an end-to-end pipeline with Apache NiFi and R package yorkr. You can this pipeline and generated different analytics using the various functions of yorkr and display them on a dashboard.

Hope you enjoyed with post!

 

See also
1. The mechanics of Convolutional Neural Networks in Tensorflow and Keras
2. Deep Learning from first principles in Python, R and Octave – Part 7
3. Fun simulation of a Chain in Android
4. Natural language processing: What would Shakespeare say?
5. TWS-4: Gossip protocol: Epidemics and rumors to the rescue
6. Cricketr learns new tricks : Performs fine-grained analysis of players
7. Introducing QCSimulator: A 5-qubit quantum computing simulator in R
8. Practical Machine Learning with R and Python – Part 5
9. Cricpy adds team analytics to its arsenal!!

To see posts click Index of posts

Big Data-5: kNiFi-ing through cricket data with yorkpy

“The temptation to form premature theories upon insufficient data is the bane of our profession.”

                              Sherlock Holmes in the Valley of fear by Arthur Conan Doyle

“If we have data, let’s look at data. If all we have are opinions, let’s go with mine.”

                              Jim Barksdale, former CEO Netscape 

In this post I use  Apache NiFi Dataflow Pipeline along with my Python package yorkpy to crunch through cricket data from Cricsheet. The Data Pipelne  flows all the way from the source  to target analytics output. Apache NiFi was created to automate the flow of data between systems.  NiFi dataflows enable the automated and managed flow of information between systems. This post automates the flow of data from Cricsheet, from where the zip file it is downloaded, unpacked, processed, transformed and finally T20 players are ranked.

While this is a straight forward example of what can be done, this pattern can be applied to real Big Data systems. For example hypothetically, we could consider that we get several parallel streams of  cricket data or for that matter any sports related data. There could be parallel Data flow pipelines that get the data from the sources. This would then be  followed by data transformation modules and finally a module for generating analytics. At the other end a UI based on AngularJS or ReactJS could display the results in a cool and awesome way.

Incidentally, the NiFi pipeline that I discuss in this post, is a simplistic example, and does not use the Big Data stack like HDFS, Hive, Spark etc. Nevertheless, the pattern used, has all the modules for a Big Data pipeline namely ingestion, unpacking, transformation and finally analytics. This NiF pipeline demonstrates the flow using the regular file system of Mac and my python based package yorkpy. The concepts mentioned could be used in a real Big Data scenario which has much fatter pipes of data coming. If  this was the case the NiFi pipeline would utilize  HDFS/Hive for storing the ingested data and Pyspark/Scala for the transformation and analytics and other related technologies.

A pictorial representation is given below

In the diagram above each of the vertical boxes could be any technology from the ever proliferating Big Data stack namely HDFS, Hive, Spark, Sqoop, Kafka, Impala and so on.  Such a dataflow automation could be created when any big sporting event happens, as long as the data generated large, and there is a need for dynamic and automated reporting. The UI could be based on AngularJS/ReactJS and could display analytical tables and charts.

This post demonstrates one such scenario in which IPL T20 data is downloaded from Cricsheet site, unpacked and stored in a specific directory. This dataflow automation is based on my yorkpy package. To know more about the yorkpy package  see Pitching yorkpy … short of good length to IPL – Part 1  and the associated parts. The zip file, from Cricsheet, contains individual IPL T20 matches in YAML format. The convertYaml2DataframeT20() function is used to convert the YAML files into Pandas dataframes before storing them as CSV files. After this done, the function rankIPLT20batting() function is used to perform the overall ranking of the T20 players. My yorkpy Python package has about ~ 50+ functions that perform various analytics on any T20 data for e.g it has the following classes of functions

  • analyze T20 matches
  • analyze performance of a T20 team in all matches against another T20 team
  • analyze performance of a T20 team against all other T20 teams
  • analyze performance of T20 batsman and bowlers
  • rank T20 batsmen and bowlers

The functions of yorkpy generate tables or charts. While this post demonstrates one scenario, we could use any of the yorkpy T20 functions, generate the output and display on a widget in the UI display, created with cool technologies like AngularJS/ReactJS,  possibly in near real time as data keeps coming in.,

To use yorkpy with NiFI the following packages have to be installed in your environment

-pip install yorkpy
-pip install pyyaml
-pip install pandas
-yum install python-devel (equivalent in Windows)
-pip install matplotlib
-pip install seaborn
-pip install sklearn
-pip install datetime

I have created a video of the NiFi Pipeline with the real dataflow fro source to the ranked IPL T20 batsmen. Take a look at RankingT20PlayersWithNiFiYorkpy

You can clone/fork the NiFi template from rankT20withNiFiYorkpy

The NiFi Data Flow Automation is shown below

1. Overall flow

The overall NiFi flow contains 2 Process Groups a) DownloadAnd Unpack. b) Convert and Rank IPL batsmen. While it appears that the Process Groups are disconnected, they are not. The first process group downloads the T20 zip file, unpacks the. zip file and saves the YAML files in a specific folder. The second process group monitors this folder and starts processing as soon the YAML files are available. It processes the YAML converting it into dataframes before storing it as CSV file. The next  processor then does the actual ranking of the batsmen before writing the output into IPLrank.txt

1.1 DownloadAndUnpack Process Group

This process group is shown below

1.1.1 GetT20Data

The GetT20Data Processor downloads the zip file given the URL

The ${T20data} variable points to the specific T20 format that needs to be downloaded. I have set this to https://cricsheet.org/downloads/ipl.zip. This could be set any other data set. In fact we could have parallel data flows for different T20/ Sports data sets and generate

1.1.2 SaveUnpackedData

This processor stores the YAML files in a predetermined folder, so that the data can be picked up  by the 2nd Process Group for processing

1.2 ProcessAndRankT20Players Process Group

This is the second process group which converts the YAML files to pandas dataframes before storing them as. CSV files. The RankIPLPlayers will then read all the CSV files, stack them and then proceed to rank the IPL players. The Process Group is shown below

1.2.1 ListFile and FetchFile Processors

The left 2 Processors ListFile and FetchFile get all the YAML files from the folder and pass it to the next processor

1.2.2 convertYaml2DataFrame Processor

The convertYaml2DataFrame Processor uses the ExecuteStreamCommand which call a python script. The Python script invoked the yorkpy function convertYaml2Dataframe() as shown below

The ${convertYaml2Dataframe} variable points to the python file below which invoked the yorkpy function yka.convertYaml2PandasDataframeT20()

import yorkpy.analytics as yka
import argparse
parser = argparse.ArgumentParser(description='convert')
parser.add_argument("yamlFile",help="YAML File")
args=parser.parse_args()
yamlFile=args.yamlFile
yka.convertYaml2PandasDataframeT20(yamlFile,"/Users/tvganesh/backup/software/nifi/ipl","/Users/tvganesh/backup/software/nifi/ipldata")

This function takes as input $filename which comes from FetchFile processor which is a FlowFile. So I have added a concurrency of 8  to handle upto 8 Flowfiles at a time. The thumb rule as I read on the internet is 2x, 4x the number of cores of your system. Since I have an 8 core Mac, I could possibly have gone ~ 30 concurrent threads. Also the number of concurrent threads is less when the flow is run in a Oracle Box VirtualMachine. Box since a vCore < actual Core

The scheduling tab is as below

Here are the 8 concurrent Python threads on Mac at bottom right… (pretty cool!)

I have not fully tested how latency vs throughput slider changes, affects the performance.

1.2.3 MergeContent Processor

This processor’s only job is to trigger the rankIPLPlayers when all the FlowFiles have merged into 1 file.

1.2.4 RankT20Players

This processor is an ExecuteStreamCommand Processor that executes a Python script which invokes a yorkpy function rankIPLT20Batting()

import yorkpy.analytics as yka
rank=yka.rankIPLT20Batting("/Users/tvganesh/backup/software/nifi/ipldata")
print(rank.head(15))

1.2.5 OutputRankofT20Player Processor

This processor writes the generated rank to an output file.

1.3 Final Ranking of IPL T20 players

The Nodejs based web server picks up this file and displays on the web page the final ranks (the code is based on a good youtube for reading from file)

2. Final thoughts

As I have mentioned above though the above NiFi Cricket Dataflow automation does not use the Hadoop ecosystem, the pattern used is valid and can be used with some customization in Big Data flows as parallel stream. I could have also done this on Oracle VirtualBox but I thought since the code is based on Python and Pandas there is no real advantage of running on the VirtualBox.  GIve the NiFi flow a shot. Have fun!!!

Also see
1.My book ‘Deep Learning from first Practical Machine Learning with R and Python – Part 5
Edition’ now on Amazon

2. Introducing QCSimulator: A 5-qubit quantum computing simulator in R
3.De-blurring revisited with Wiener filter using OpenCV
4. Practical Machine Learning with R and Python – Part 5
5. Natural language processing: What would Shakespeare say?
6.Getting started with Tensorflow, Keras in Python and R
7.Revisiting World Bank data analysis with WDI and gVisMotionChart

To see all posts click Index of posts

Big Data-4: Webserver log analysis with RDDs, Pyspark, SparkR and SparklyR

“There’s something so paradoxical about pi. On the one hand, it represents order, as embodied by the shape of a circle, long held to be a symbol of perfection and eternity. On the other hand, pi is unruly, disheveled in appearance, its digits obeying no obvious rule, or at least none that we can perceive. Pi is elusive and mysterious, forever beyond reach. Its mix of order and disorder is what makes it so bewitching. ” 

From  Infinite Powers by Steven Strogatz

Anybody who wants to be “anybody” in Big Data must necessarily be able to work on both large structured and unstructured data.  Log analysis is critical in any enterprise which is usually unstructured. As I mentioned in my previous post Big Data: On RDDs, Dataframes,Hive QL with Pyspark and SparkR-Part 3 RDDs are typically used to handle unstructured data. Spark has the Dataframe abstraction over RDDs which performs better as it is optimized with the Catalyst optimization engine. Nevertheless, it is important to be able to process with RDDs.  This post is a continuation of my 3 earlier posts on Big Data namely

1. Big Data-1: Move into the big league:Graduate from Python to Pyspark
2. Big Data-2: Move into the big league:Graduate from R to SparkR
3. Big Data: On RDDs, Dataframes,Hive QL with Pyspark and SparkR-Part 3

This post uses publicly available Webserver logs from NASA. The logs are for the months Jul 95 and Aug 95 and are a good place to start unstructured text analysis/log analysis. I highly recommend parsing these publicly available logs with regular expressions. It is only when you do that the truth of Jamie Zawinski’s pearl of wisdom

“Some people, when confronted with a problem, think “I know, I’ll use regular expressions.” Now they have two problems.” – Jamie Zawinksi

hits home. I spent many hours struggling with regex!!

For this post for the RDD part,  I had to refer to Dr. Fisseha Berhane’s blog post Webserver Log Analysis and for the Pyspark part, to the Univ. of California Specialization which I had done 3 years back Big Data Analysis with Apache Spark. Once I had played around with the regex for RDDs and PySpark I managed to get SparkR and SparklyR versions to work.

The notebooks used in this post have been published and are available at

  1. logsAnalysiswithRDDs
  2. logsAnalysiswithPyspark
  3. logsAnalysiswithSparkRandSparklyR

You can also download all the notebooks from Github at WebServerLogsAnalysis

An essential and unavoidable aspect of Big Data processing is the need to process unstructured text.Web server logs are one such area which requires Big Data techniques to process massive amounts of logs. The Common Log Format also known as the NCSA Common log format, is a standardized text file format used by web servers when generating server log files. Because the format is standardized, the files can be readily analyzed.

A publicly available webserver logs is the NASA-HTTP Web server logs. This is good dataset with which we can play around to get familiar to handling web server logs. The logs can be accessed at NASA-HTTP

Description These two traces contain two month’s worth of all HTTP requests to the NASA Kennedy Space Center WWW server in Florida.

Format The logs are an ASCII file with one line per request, with the following columns:

-host making the request. A hostname when possible, otherwise the Internet address if the name could not be looked up.

-timestamp in the format “DAY MON DD HH:MM:SS YYYY”, where DAY is the day of the week, MON is the name of the month, DD is the day of the month, HH:MM:SS is the time of day using a 24-hour clock, and YYYY is the year. The timezone is -0400.

-request given in quotes.

-HTTP reply code.

-bytes in the reply.

1 Parse Web server logs with RDDs

1.1 Read NASA Web server logs

Read the logs files from NASA for the months Jul 95 and Aug 95

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("Spark-Logs-Handling").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

sqlcontext = SQLContext(sc)
rdd = sc.textFile("/FileStore/tables/NASA_access_log_*.gz")
rdd.count()
Out[1]: 3461613

1.2Check content

Check the logs to identify the parsing rules required for the logs

i=0
for line in rdd.sample(withReplacement = False, fraction = 0.00001, seed = 100).collect():
    i=i+1
    print(line)
    if i >5:
      break
ix-stp-fl2-19.ix.netcom.com – – [03/Aug/1995:23:03:09 -0400] “GET /images/faq.gif HTTP/1.0” 200 263
slip183-1.kw.jp.ibm.net – – [04/Aug/1995:18:42:17 -0400] “GET /shuttle/missions/sts-70/images/DSC-95EC-0001.gif HTTP/1.0” 200 107133
piweba4y.prodigy.com – – [05/Aug/1995:19:17:41 -0400] “GET /icons/menu.xbm HTTP/1.0” 200 527
ruperts.bt-sys.bt.co.uk – – [07/Aug/1995:04:44:10 -0400] “GET /shuttle/countdown/video/livevideo2.gif HTTP/1.0” 200 69067
dal06-04.ppp.iadfw.net – – [07/Aug/1995:21:10:19 -0400] “GET /images/NASA-logosmall.gif HTTP/1.0” 200 786
p15.ppp-1.directnet.com – – [10/Aug/1995:01:22:54 -0400] “GET /images/KSC-logosmall.gif HTTP/1.0” 200 1204

1.3 Write the parsing rule for each of the fields

  • host
  • timestamp
  • path
  • status
  • content_bytes

1.21 Get IP address/host name

This regex is at the start of the log and includes any non-white characted

import re
rslt=(rdd.map(lambda line: re.search('\S+',line)
   .group(0))
   .take(3)) # Get the IP address \host name
rslt
Out[3]: [‘in24.inetnebr.com’, ‘uplherc.upl.com’, ‘uplherc.upl.com’]

1.22 Get timestamp

Get the time stamp

rslt=(rdd.map(lambda line: re.search(‘(\S+ -\d{4})’,line)
    .groups())
    .take(3))  #Get the  date
rslt
[(‘[01/Aug/1995:00:00:01 -0400’,),
(‘[01/Aug/1995:00:00:07 -0400’,),
(‘[01/Aug/1995:00:00:08 -0400’,)]

1.23 HTTP request

Get the HTTP request sent to Web server \w+ {GET}

# Get the REST call with ” “
rslt=(rdd.map(lambda line: re.search('"\w+\s+([^\s]+)\s+HTTP.*"',line)
    .groups())
    .take(3)) # Get the REST call
rslt
[(‘/shuttle/missions/sts-68/news/sts-68-mcc-05.txt’,),
(‘/’,),
(‘/images/ksclogo-medium.gif’,)]

1.23Get HTTP response status

Get the HTTP response to the request

rslt=(rdd.map(lambda line: re.search('"\s(\d{3})',line)
    .groups())
    .take(3)) #Get the status
rslt
Out[6]: [(‘200’,), (‘304’,), (‘304’,)]

1.24 Get content size

Get the HTTP response in bytes

rslt=(rdd.map(lambda line: re.search(‘^.*\s(\d*)$’,line)
    .groups())
    .take(3)) # Get the content size
rslt
Out[7]: [(‘1839’,), (‘0’,), (‘0’,)]

1.24 Putting it all together

Now put all the individual pieces together into 1 big regular expression and assign to the groups

  1. Host 2. Timestamp 3. Path 4. Status 5. Content_size
rslt=(rdd.map(lambda line: re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3}\s(\d*)$)',line)
    .groups())
    .take(3))
rslt
[(‘in24.inetnebr.com’,
‘ -‘,
‘ ‘,
‘-‘,
‘[01/Aug/1995:00:00:01 -0400]’,
‘”GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0″‘,
‘/shuttle/missions/sts-68/news/sts-68-mcc-05.txt’,
‘200 1839’,
‘1839’),
(‘uplherc.upl.com’,
‘ -‘,
‘ ‘,
‘-‘,
‘[01/Aug/1995:00:00:07 -0400]’,
‘”GET / HTTP/1.0″‘,
‘/’,
‘304 0’,
‘0’),
(‘uplherc.upl.com’,
‘ -‘,
‘ ‘,
‘-‘,
‘[01/Aug/1995:00:00:08 -0400]’,
‘”GET /images/ksclogo-medium.gif HTTP/1.0″‘,
‘/images/ksclogo-medium.gif’,
‘304 0’,
‘0’)]

1.25 Add a log parsing function

import re
def parse_log1(line):
    match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3}\s(\d*)$)',line)
    if match is None:    
        return(line,0)
    else:
        return(line,1)

1.26 Check for parsing failure

Check how many lines successfully parsed with the parsing function

n_logs = rdd.count()
failed = rdd.map(lambda line: parse_log1(line)).filter(lambda line: line[1] == 0).count()
print('Out of a total of {} logs, {} failed to parse'.format(n_logs,failed))
# Get the failed records line[1] == 0
failed1=rdd.map(lambda line: parse_log1(line)).filter(lambda line: line[1]==0)
failed1.take(3)
Out of a total of 3461613 logs, 38768 failed to parse
Out[10]:
[(‘gw1.att.com – – [01/Aug/1995:00:03:53 -0400] “GET /shuttle/missions/sts-73/news HTTP/1.0” 302 -‘,
0),
(‘js002.cc.utsunomiya-u.ac.jp – – [01/Aug/1995:00:07:33 -0400] “GET /shuttle/resources/orbiters/discovery.gif HTTP/1.0” 404 -‘,
0),
(‘pipe1.nyc.pipeline.com – – [01/Aug/1995:00:12:37 -0400] “GET /history/apollo/apollo-13/apollo-13-patch-small.gif” 200 12859’,
0)]

1.26 The above rule is not enough to parse the logs

It can be seen that the single rule only parses part of the logs and we cannot group the regex separately. There is an error “AttributeError: ‘NoneType’ object has no attribute ‘group'” which shows up

#rdd.map(lambda line: re.search(‘^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s(“\w+\s+([^\s]+)\s+HTTP.*”)\s(\d{3}\s(\d*)$)’,line[0]).group()).take(4)

File “/databricks/spark/python/pyspark/util.py”, line 99, in wrapper
return f(*args, **kwargs)
File “<command-1348022240961444>”, line 1, in <lambda>
AttributeError: ‘NoneType’ object has no attribute ‘group’

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)

1.27 Add rule for parsing failed records

One of the issues with the earlier rule is the content_size has “-” for some logs

import re
def parse_failed(line):
    match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3}\s-$)',line)
    if match is None:        
        return(line,0)
    else:
        return(line,1)

1.28 Parse records which fail

Parse the records that fails with the new rule

failed2=rdd.map(lambda line: parse_failed(line)).filter(lambda line: line[1]==1)
failed2.take(5)
Out[13]:
[(‘gw1.att.com – – [01/Aug/1995:00:03:53 -0400] “GET /shuttle/missions/sts-73/news HTTP/1.0” 302 -‘,
1),
(‘js002.cc.utsunomiya-u.ac.jp – – [01/Aug/1995:00:07:33 -0400] “GET /shuttle/resources/orbiters/discovery.gif HTTP/1.0” 404 -‘,
1),
(‘tia1.eskimo.com – – [01/Aug/1995:00:28:41 -0400] “GET /pub/winvn/release.txt HTTP/1.0” 404 -‘,
1),
(‘itws.info.eng.niigata-u.ac.jp – – [01/Aug/1995:00:38:01 -0400] “GET /ksc.html/facts/about_ksc.html HTTP/1.0” 403 -‘,
1),
(‘grimnet23.idirect.com – – [01/Aug/1995:00:50:12 -0400] “GET /www/software/winvn/winvn.html HTTP/1.0” 404 -‘,
1)]

1.28 Add both rules

Add both rules for parsing the log.

Note it can be shown that even with both rules all the logs are not parse.Further rules may need to be added

import re
def parse_log2(line):
    # Parse logs with the rule below
    match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3})\s(\d*)$',line)
    # If match failed then use the rule below
    if match is None:
        match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3}\s-$)',line)
    if match is None:
        return (line, 0) # Return 0 for failure
    else:
        return (line, 1) # Return 1 for success

1.29 Group the different regex to groups for handling

def map2groups(line):
    match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3})\s(\d*)$',line)
    if match is None:
        match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3})\s(-)$',line)    
    return(match.groups())

1.30 Parse the logs and map the groups

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])

parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))

2. Parse Web server logs with Pyspark

2.1Read data into a Pyspark dataframe

import os
logs_file_path="/FileStore/tables/" + os.path.join('NASA_access_log_*.gz')
from pyspark.sql.functions import split, regexp_extract
base_df = sqlContext.read.text(logs_file_path)
#base_df.show(truncate=False)
from pyspark.sql.functions import split, regexp_extract
split_df = base_df.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
                          regexp_extract('value', r'^.*\[(\d\d\/\w{3}\/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
                          regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
                          regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
                          regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
split_df.show(5,truncate=False)
+———————+————————–+———————————————–+——+————+
|host |timestamp |path |status|content_size|
+———————+————————–+———————————————–+——+————+
|199.72.81.55 |01/Jul/1995:00:00:01 -0400|/history/apollo/ |200 |6245 |
|unicomp6.unicomp.net |01/Jul/1995:00:00:06 -0400|/shuttle/countdown/ |200 |3985 |
|199.120.110.21 |01/Jul/1995:00:00:09 -0400|/shuttle/missions/sts-73/mission-sts-73.html |200 |4085 |
|burger.letters.com |01/Jul/1995:00:00:11 -0400|/shuttle/countdown/liftoff.html |304 |0 |
|199.120.110.21 |01/Jul/1995:00:00:11 -0400|/shuttle/missions/sts-73/sts-73-patch-small.gif|200 |4179 |
+———————+————————–+———————————————–+——+————+
only showing top 5 rows

2.2 Check data

bad_rows_df = split_df.filter(split_df[‘host’].isNull() |
                              split_df['timestamp'].isNull() |
                              split_df['path'].isNull() |
                              split_df['status'].isNull() |
                             split_df['content_size'].isNull())
bad_rows_df.count()
Out[20]: 33905

2.3Check no of rows which do not have digits

We have already seen that the content_type field has ‘-‘ instead of digits in RDDs

#bad_content_size_df = base_df.filter(~ base_df[‘value’].rlike(r’\d+$’))
bad_content_size_df.count()
Out[21]: 33905

2.4 Add ‘*’ to identify bad rows

To identify the rows that are bad, concatenate ‘*’ to the content_size field where the field does not have digits. It can be seen that the content_size has ‘-‘ instead of a valid number

from pyspark.sql.functions import lit, concat
bad_content_size_df.select(concat(bad_content_size_df['value'], lit('*'))).show(4,truncate=False)
+—————————————————————————————————————————————————+
|concat(value, *) |
+—————————————————————————————————————————————————+
|dd15-062.compuserve.com – – [01/Jul/1995:00:01:12 -0400] “GET /news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt HTTP/1.0” 404 -*|
|dynip42.efn.org – – [01/Jul/1995:00:02:14 -0400] “GET /software HTTP/1.0” 302 -* |
|ix-or10-06.ix.netcom.com – – [01/Jul/1995:00:02:40 -0400] “GET /software/winvn HTTP/1.0” 302 -* |
|ix-or10-06.ix.netcom.com – – [01/Jul/1995:00:03:24 -0400] “GET /software HTTP/1.0” 302 -* |
+—————————————————————————————————————————————————+

2.5 Fill NAs with 0s

# Replace all null content_size values with 0.

cleaned_df = split_df.na.fill({‘content_size’: 0})

3. Webserver  logs parsing with SparkR

library(SparkR)
library(stringr)
file_location = "/FileStore/tables/NASA_access_log_Jul95.gz"
file_location = "/FileStore/tables/NASA_access_log_Aug95.gz"
# Load the SparkR library


# Initiate a SparkR session
sparkR.session()
sc <- sparkR.session()
sqlContext <- sparkRSQL.init(sc)
df <- read.text(sqlContext,"/FileStore/tables/NASA_access_log_Jul95.gz")

#df=SparkR::select(df, "value")
#head(SparkR::collect(df))
#m=regexp_extract(df$value,'\\\\S+',1)

a=df %>% 
  withColumn('host', regexp_extract(df$value, '^(\\S+)', 1)) %>%
  withColumn('timestamp', regexp_extract(df$value, "((\\S+ -\\d{4}))", 2)) %>%
  withColumn('path', regexp_extract(df$value, '(\\"\\w+\\s+([^\\s]+)\\s+HTTP.*")', 2))  %>%
  withColumn('status', regexp_extract(df$value, '(^.*"\\s+([^\\s]+))', 2)) %>%
  withColumn('content_size', regexp_extract(df$value, '(^.*\\s+(\\d+)$)', 2))
#b=a%>% select(host,timestamp,path,status,content_type)
head(SparkR::collect(a),10)

1 199.72.81.55 – – [01/Jul/1995:00:00:01 -0400] “GET /history/apollo/ HTTP/1.0” 200 6245
2 unicomp6.unicomp.net – – [01/Jul/1995:00:00:06 -0400] “GET /shuttle/countdown/ HTTP/1.0” 200 3985
3 199.120.110.21 – – [01/Jul/1995:00:00:09 -0400] “GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0” 200 4085
4 burger.letters.com – – [01/Jul/1995:00:00:11 -0400] “GET /shuttle/countdown/liftoff.html HTTP/1.0” 304 0
5 199.120.110.21 – – [01/Jul/1995:00:00:11 -0400] “GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0” 200 4179
6 burger.letters.com – – [01/Jul/1995:00:00:12 -0400] “GET /images/NASA-logosmall.gif HTTP/1.0” 304 0
7 burger.letters.com – – [01/Jul/1995:00:00:12 -0400] “GET /shuttle/countdown/video/livevideo.gif HTTP/1.0” 200 0
8 205.212.115.106 – – [01/Jul/1995:00:00:12 -0400] “GET /shuttle/countdown/countdown.html HTTP/1.0” 200 3985
9 d104.aa.net – – [01/Jul/1995:00:00:13 -0400] “GET /shuttle/countdown/ HTTP/1.0” 200 3985
10 129.94.144.152 – – [01/Jul/1995:00:00:13 -0400] “GET / HTTP/1.0” 200 7074
host timestamp
1 199.72.81.55 [01/Jul/1995:00:00:01 -0400
2 unicomp6.unicomp.net [01/Jul/1995:00:00:06 -0400
3 199.120.110.21 [01/Jul/1995:00:00:09 -0400
4 burger.letters.com [01/Jul/1995:00:00:11 -0400
5 199.120.110.21 [01/Jul/1995:00:00:11 -0400
6 burger.letters.com [01/Jul/1995:00:00:12 -0400
7 burger.letters.com [01/Jul/1995:00:00:12 -0400
8 205.212.115.106 [01/Jul/1995:00:00:12 -0400
9 d104.aa.net [01/Jul/1995:00:00:13 -0400
10 129.94.144.152 [01/Jul/1995:00:00:13 -0400
path status content_size
1 /history/apollo/ 200 6245
2 /shuttle/countdown/ 200 3985
3 /shuttle/missions/sts-73/mission-sts-73.html 200 4085
4 /shuttle/countdown/liftoff.html 304 0
5 /shuttle/missions/sts-73/sts-73-patch-small.gif 200 4179
6 /images/NASA-logosmall.gif 304 0
7 /shuttle/countdown/video/livevideo.gif 200 0
8 /shuttle/countdown/countdown.html 200 3985
9 /shuttle/countdown/ 200 3985
10 / 200 7074

4 Webserver logs parsing with SparklyR

install.packages("sparklyr")
library(sparklyr)
library(dplyr)
library(stringr)
#sc <- spark_connect(master = "local", version = "2.1.0")
sc <- spark_connect(method = "databricks")
sdf <-spark_read_text(sc, name="df", path = "/FileStore/tables/NASA_access_log*.gz")
sdf
Installing package into ‘/databricks/spark/R/lib’
# Source: spark [?? x 1]
   line                                                                         
                                                                           
 1 "199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] \"GET /history/apollo/ HTTP/1…
 2 "unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] \"GET /shuttle/countd…
 3 "199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] \"GET /shuttle/missions/sts…
 4 "burger.letters.com - - [01/Jul/1995:00:00:11 -0400] \"GET /shuttle/countdow…
 5 "199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] \"GET /shuttle/missions/sts…
 6 "burger.letters.com - - [01/Jul/1995:00:00:12 -0400] \"GET /images/NASA-logo…
 7 "burger.letters.com - - [01/Jul/1995:00:00:12 -0400] \"GET /shuttle/countdow…
 8 "205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] \"GET /shuttle/countdown/c…
 9 "d104.aa.net - - [01/Jul/1995:00:00:13 -0400] \"GET /shuttle/countdown/ HTTP…
10 "129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] \"GET / HTTP/1.0\" 200 7074"
# … with more rows
#install.packages(“sparklyr”)
library(sparklyr)
library(dplyr)
library(stringr)
#sc <- spark_connect(master = "local", version = "2.1.0")
sc <- spark_connect(method = "databricks")
sdf <-spark_read_text(sc, name="df", path = "/FileStore/tables/NASA_access_log*.gz")
sdf <- sdf %>% mutate(host = regexp_extract(line, '^(\\\\S+)',1)) %>% 
               mutate(timestamp = regexp_extract(line, '((\\\\S+ -\\\\d{4}))',2)) %>%
               mutate(path = regexp_extract(line, '(\\\\"\\\\w+\\\\s+([^\\\\s]+)\\\\s+HTTP.*")',2)) %>%
               mutate(status = regexp_extract(line, '(^.*"\\\\s+([^\\\\s]+))',2)) %>%
               mutate(content_size = regexp_extract(line, '(^.*\\\\s+(\\\\d+)$)',2))

5 Hosts

5.1  RDD

5.11 Parse and map to hosts to groups

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])
parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))

# Create tuples of (host,1) and apply reduceByKey() and order by descending
rslt=(parsed_rdd2.map(lambda x😦x[0],1))
                 .reduceByKey(lambda a,b:a+b)
                 .takeOrdered(10, lambda x: -x[1]))
rslt
Out[18]:
[(‘piweba3y.prodigy.com’, 21988),
(‘piweba4y.prodigy.com’, 16437),
(‘piweba1y.prodigy.com’, 12825),
(‘edams.ksc.nasa.gov’, 11962),
(‘163.206.89.4’, 9697),
(‘news.ti.com’, 8161),
(‘www-d1.proxy.aol.com’, 8047),
(‘alyssa.prodigy.com’, 8037),
(‘siltb10.orl.mmc.com’, 7573),
(‘www-a2.proxy.aol.com’, 7516)]

5.12Plot counts of hosts

import seaborn as sns

import pandas as pd import matplotlib.pyplot as plt df=pd.DataFrame(rslt,columns=[‘host’,‘count’]) sns.barplot(x=‘host’,y=‘count’,data=df) plt.subplots_adjust(bottom=0.6, right=0.8, top=0.9) plt.xticks(rotation=“vertical”,fontsize=8) display()

5.2 PySpark

5.21 Compute counts of hosts

df= (cleaned_df
     .groupBy('host')
     .count()
     .orderBy('count',ascending=False))
df.show(5)
+——————–+—–+
| host|count|
+——————–+—–+
|piweba3y.prodigy….|21988|
|piweba4y.prodigy….|16437|
|piweba1y.prodigy….|12825|
| edams.ksc.nasa.gov |11964|
| 163.206.89.4 | 9697|
+——————–+—–+
only showing top 5 rows

5.22 Plot count of hosts

import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
df1=df.toPandas()
df2 = df1.head(10)
df2.count()
sns.barplot(x='host',y='count',data=df2)
plt.subplots_adjust(bottom=0.5, right=0.8, top=0.9)
plt.xlabel("Hosts")
plt.ylabel('Count')
plt.xticks(rotation="vertical",fontsize=10)
display()

5.3 SparkR

5.31 Compute count of hosts

c <- SparkR::select(a,a$host)
df=SparkR::summarize(SparkR::groupBy(c, a$host), noHosts = count(a$host))
df1 =head(arrange(df,desc(df$noHosts)),10)
head(df1)
                  host noHosts
1 piweba3y.prodigy.com   17572
2 piweba4y.prodigy.com   11591
3 piweba1y.prodigy.com    9868
4   alyssa.prodigy.com    7852
5  siltb10.orl.mmc.com    7573
6 piweba2y.prodigy.com    5922

5.32 Plot count of hosts

library(ggplot2)
p <-ggplot(data=df1, aes(x=host, y=noHosts,fill=host)) +   geom_bar(stat="identity") + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab('Host') + ylab('Count')
p

5.4 SparklyR

5.41 Compute count of Hosts

df <- sdf %>% select(host,timestamp,path,status,content_size)
df1 <- df %>% select(host) %>% group_by(host) %>% summarise(noHosts=n()) %>% arrange(desc(noHosts))
df2 <-head(df1,10)

5.42 Plot count of hosts

library(ggplot2)

p <-ggplot(data=df2, aes(x=host, y=noHosts,fill=host)) + geom_bar(stat=identity”)+ theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab(Host’) + ylab(Count’)

p

6 Paths

6.1 RDD

6.11 Parse and map to hosts to groups

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])
parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))
rslt=(parsed_rdd2.map(lambda x😦x[5],1))
                 .reduceByKey(lambda a,b:a+b)
                 .takeOrdered(10, lambda x: -x[1]))
rslt
[(‘”GET /images/NASA-logosmall.gif HTTP/1.0″‘, 207520),
(‘”GET /images/KSC-logosmall.gif HTTP/1.0″‘, 164487),
(‘”GET /images/MOSAIC-logosmall.gif HTTP/1.0″‘, 126933),
(‘”GET /images/USA-logosmall.gif HTTP/1.0″‘, 126108),
(‘”GET /images/WORLD-logosmall.gif HTTP/1.0″‘, 124972),
(‘”GET /images/ksclogo-medium.gif HTTP/1.0″‘, 120704),
(‘”GET /ksc.html HTTP/1.0″‘, 83209),
(‘”GET /images/launch-logo.gif HTTP/1.0″‘, 75839),
(‘”GET /history/apollo/images/apollo-logo1.gif HTTP/1.0″‘, 68759),
(‘”GET /shuttle/countdown/ HTTP/1.0″‘, 64467)]

6.12 Plot counts of HTTP Requests

import seaborn as sns

df=pd.DataFrame(rslt,columns=[‘path’,‘count’]) sns.barplot(x=‘path’,y=‘count’,data=df) plt.subplots_adjust(bottom=0.7, right=0.8, top=0.9) plt.xticks(rotation=“vertical”,fontsize=8)

display()

6.2 Pyspark

6.21 Compute count of HTTP Requests

df= (cleaned_df
     .groupBy('path')
     .count()
     .orderBy('count',ascending=False))
df.show(5)
Out[20]:
+——————–+——+
| path| count|
+——————–+——+
|/images/NASA-logo…|208362|
|/images/KSC-logos…|164813|
|/images/MOSAIC-lo…|127656|
|/images/USA-logos…|126820|
|/images/WORLD-log…|125676|
+——————–+——+
only showing top 5 rows

6.22 Plot count of HTTP Requests

import matplotlib.pyplot as plt

import pandas as pd import seaborn as sns df1=df.toPandas() df2 = df1.head(10) df2.count() sns.barplot(x=‘path’,y=‘count’,data=df2)

plt.subplots_adjust(bottom=0.7, right=0.8, top=0.9) plt.xlabel(“HTTP Requests”) plt.ylabel(‘Count’) plt.xticks(rotation=90,fontsize=8)

display()

 

6.3 SparkR

6.31Compute count of HTTP requests

library(SparkR)
c <- SparkR::select(a,a$path)
df=SparkR::summarize(SparkR::groupBy(c, a$path), numRequest = count(a$path))
df1=head(df)

3.14 Plot count of HTTP Requests

library(ggplot2)
p <-ggplot(data=df1, aes(x=path, y=numRequest,fill=path)) +   geom_bar(stat="identity") + theme(axis.text.x = element_text(angle = 90, hjust = 1))+ xlab('Path') + ylab('Count')
p

6.4 SparklyR

6.41 Compute count of paths

df <- sdf %>% select(host,timestamp,path,status,content_size)
df1 <- df %>% select(path) %>% group_by(path) %>% summarise(noPaths=n()) %>% arrange(desc(noPaths))
df2 <-head(df1,10)
df2
# Source: spark [?? x 2]
# Ordered by: desc(noPaths)
   path                                    noPaths
                                        
 1 /images/NASA-logosmall.gif               208362
 2 /images/KSC-logosmall.gif                164813
 3 /images/MOSAIC-logosmall.gif             127656
 4 /images/USA-logosmall.gif                126820
 5 /images/WORLD-logosmall.gif              125676
 6 /images/ksclogo-medium.gif               121286
 7 /ksc.html                                 83685
 8 /images/launch-logo.gif                   75960
 9 /history/apollo/images/apollo-logo1.gif   68858
10 /shuttle/countdown/                       64695

6.42 Plot count of Paths

library(ggplot2)
p <-ggplot(data=df2, aes(x=path, y=noPaths,fill=path)) +   geom_bar(stat="identity")+ theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab('Path') + ylab('Count')
p

7.1 RDD

7.11 Compute count of HTTP Status

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])

parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))
rslt=(parsed_rdd2.map(lambda x😦x[7],1))
                 .reduceByKey(lambda a,b:a+b)
                 .takeOrdered(10, lambda x: -x[1]))
rslt
Out[22]:
[(‘200’, 3095682),
(‘304’, 266764),
(‘302’, 72970),
(‘404’, 20625),
(‘403’, 225),
(‘500’, 65),
(‘501’, 41)]

1.37 Plot counts of HTTP response status’

import seaborn as sns

df=pd.DataFrame(rslt,columns=[‘status’,‘count’]) sns.barplot(x=‘status’,y=‘count’,data=df) plt.subplots_adjust(bottom=0.4, right=0.8, top=0.9) plt.xticks(rotation=“vertical”,fontsize=8)

display()

7.2 Pyspark

7.21 Compute count of HTTP status

status_count=(cleaned_df
                .groupBy('status')
                .count()
                .orderBy('count',ascending=False))
status_count.show()
+——+——-+
|status| count|
+——+——-+
| 200|3100522|
| 304| 266773|
| 302| 73070|
| 404| 20901|
| 403| 225|
| 500| 65|
| 501| 41|
| 400| 15|
| null| 1|

7.22 Plot count of HTTP status

Plot the HTTP return status vs the counts

df1=status_count.toPandas()

df2 = df1.head(10) df2.count() sns.barplot(x=‘status’,y=‘count’,data=df2) plt.subplots_adjust(bottom=0.5, right=0.8, top=0.9) plt.xlabel(“HTTP Status”) plt.ylabel(‘Count’) plt.xticks(rotation=“vertical”,fontsize=10) display()

7.3 SparkR

7.31 Compute count of HTTP Response status

library(SparkR)
c <- SparkR::select(a,a$status)
df=SparkR::summarize(SparkR::groupBy(c, a$status), numStatus = count(a$status))
df1=head(df)

3.16 Plot count of HTTP Response status

library(ggplot2)
p <-ggplot(data=df1, aes(x=status, y=numStatus,fill=status)) +   geom_bar(stat="identity") + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab('Status') + ylab('Count')
p

7.4 SparklyR

7.41 Compute count of status

df <- sdf %>% select(host,timestamp,path,status,content_size)
df1 <- df %>% select(status) %>% group_by(status) %>% summarise(noStatus=n()) %>% arrange(desc(noStatus))
df2 <-head(df1,10)
df2
# Source: spark [?? x 2]
# Ordered by: desc(noStatus)
  status noStatus
       
1 200     3100522
2 304      266773
3 302       73070
4 404       20901
5 403         225
6 500          65
7 501          41
8 400          15
9 ""            1

7.42 Plot count of status

library(ggplot2)

p <-ggplot(data=df2, aes(x=status, y=noStatus,fill=status)) + geom_bar(stat=identity”)+ theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab(Status’) + ylab(Count’) p

8.1 RDD

8.12 Compute count of content size

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])
parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))
rslt=(parsed_rdd2.map(lambda x😦x[8],1))
                 .reduceByKey(lambda a,b:a+b)
                 .takeOrdered(10, lambda x: -x[1]))
rslt
Out[24]:
[(‘0’, 280017),
(‘786’, 167281),
(‘1204’, 140505),
(‘363’, 111575),
(‘234’, 110824),
(‘669’, 110056),
(‘5866’, 107079),
(‘1713’, 66904),
(‘1173’, 63336),
(‘3635’, 55528)]

8.21 Plot content size

import seaborn as sns

df=pd.DataFrame(rslt,columns=[‘content_size’,‘count’]) sns.barplot(x=‘content_size’,y=‘count’,data=df) plt.subplots_adjust(bottom=0.4, right=0.8, top=0.9) plt.xticks(rotation=“vertical”,fontsize=8) display()

8.2 Pyspark

8.21 Compute count of content_size

size_counts=(cleaned_df
                .groupBy('content_size')
                .count()
                .orderBy('count',ascending=False))
size_counts.show(10)
+------------+------+
|content_size| count|
+------------+------+
|           0|313932|
|         786|167709|
|        1204|140668|
|         363|111835|
|         234|111086|
|         669|110313|
|        5866|107373|
|        1713| 66953|
|        1173| 63378|
|        3635| 55579|
+------------+------+
only showing top 10 rows

8.22 Plot counts of content size

Plot the path access versus the counts

df1=size_counts.toPandas()

df2 = df1.head(10) df2.count() sns.barplot(x=‘content_size’,y=‘count’,data=df2) plt.subplots_adjust(bottom=0.5, right=0.8, top=0.9) plt.xlabel(“content_size”) plt.ylabel(‘Count’) plt.xticks(rotation=“vertical”,fontsize=10) display()

8.3 SparkR

8.31 Compute count of content size

library(SparkR)
c <- SparkR::select(a,a$content_size)
df=SparkR::summarize(SparkR::groupBy(c, a$content_size), numContentSize = count(a$content_size))
df1=head(df)
df1
     content_size numContentSize
1        28426           1414
2        78382            293
3        60053              4
4        36067              2
5        13282            236
6        41785            174
8.32 Plot count of content sizes
library(ggplot2)

p <-ggplot(data=df1, aes(x=content_size, y=numContentSize,fill=content_size)) + geom_bar(stat=identity”) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab(Content Size’) + ylab(Count’)

p

8.4 SparklyR

8.41Compute count of content_size

df <- sdf %>% select(host,timestamp,path,status,content_size)
df1 <- df %>% select(content_size) %>% group_by(content_size) %>% summarise(noContentSize=n()) %>% arrange(desc(noContentSize))
df2 <-head(df1,10)
df2
# Source: spark [?? x 2]
# Ordered by: desc(noContentSize)
   content_size noContentSize
                   
 1 0                   280027
 2 786                 167709
 3 1204                140668
 4 363                 111835
 5 234                 111086
 6 669                 110313
 7 5866                107373
 8 1713                 66953
 9 1173                 63378
10 3635                 55579

8.42 Plot count of content_size

library(ggplot2)
p <-ggplot(data=df2, aes(x=content_size, y=noContentSize,fill=content_size)) +   geom_bar(stat="identity")+ theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab('Content size') + ylab('Count')
p

Conclusion: I spent many,many hours struggling with Regex and getting RDDs,Pyspark to work. Also had to spend a lot of time trying to work out the syntax for SparkR and SparklyR for parsing. After you parse the logs plotting and analysis is a piece of cake! This is definitely worth a try!

Watch this space!!

Also see
1. Practical Machine Learning with R and Python – Part 3
2. Deep Learning from first principles in Python, R and Octave – Part 5
3. My book ‘Cricket analytics with cricketr and cricpy’ is now on Amazon
4. Latency, throughput implications for the Cloud
5. Modeling a Car in Android
6. Architecting a cloud based IP Multimedia System (IMS)
7. Dabbling with Wiener filter using OpenCV

To see all posts click Index of posts

Big Data: On RDDs, Dataframes,Hive QL with Pyspark and SparkR-Part 3

Some people, when confronted with a problem, think “I know, I’ll use regular expressions.” Now they have two problems. – Jamie Zawinski

Some programmers, when confronted with a problem, think “I know, I’ll use floating point arithmetic.” Now they have 1.999999999997 problems. – @tomscott

Some people, when confronted with a problem, think “I know, I’ll use multithreading”. Nothhw tpe yawrve o oblems. – @d6

Some people, when confronted with a problem, think “I know, I’ll use versioning.” Now they have 2.1.0 problems. – @JaesCoyle

Some people, when faced with a problem, think, “I know, I’ll use binary.” Now they have 10 problems. – @nedbat

Introduction

The power of Spark, which operates on in-memory datasets, is the fact that it stores the data as collections using Resilient Distributed Datasets (RDDs), which are themselves distributed in partitions across clusters. RDDs, are a fast way of processing data, as the data is operated on parallel based on the map-reduce paradigm. RDDs can be be used when the operations are low level. RDDs, are typically used on unstructured data like logs or text. For structured and semi-structured data, Spark has a higher abstraction called Dataframes. Handling data through dataframes are extremely fast as they are Optimized using the Catalyst Optimization engine and the performance is orders of magnitude faster than RDDs. In addition Dataframes also use Tungsten which handle memory management and garbage collection more effectively.

The picture below shows the performance improvement achieved with Dataframes over RDDs

Benefits from Project Tungsten

Npte: The above data and graph is taken from the course Big Data Analysis with Apache Spark at edX, UC Berkeley
This post is a continuation of my 2 earlier posts
1. Big Data-1: Move into the big league:Graduate from Python to Pyspark
2. Big Data-2: Move into the big league:Graduate from R to SparkR

In this post I perform equivalent operations on a small dataset using RDDs, Dataframes in Pyspark & SparkR and HiveQL. As in some of my earlier posts, I have used the tendulkar.csv file for this post. The dataset is small and allows me to do most everything from data cleaning, data transformation and grouping etc.
You can clone fork the notebooks from github at Big Data:Part 3

The notebooks have also been published and can be accessed below

  1. Big Data-1: On RDDs, DataFrames and HiveQL with Pyspark
  2. Big Data-2:On RDDs, Dataframes and HiveQL with SparkR

1. RDD – Select all columns of tables

from pyspark import SparkContext 
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
rdd.map(lambda line: (line.split(","))).take(5)
Out[90]: [[‘Runs’, ‘Mins’, ‘BF’, ‘4s’, ‘6s’, ‘SR’, ‘Pos’, ‘Dismissal’, ‘Inns’, ‘Opposition’, ‘Ground’, ‘Start Date’], [’15’, ’28’, ’24’, ‘2’, ‘0’, ‘62.5’, ‘6’, ‘bowled’, ‘2’, ‘v Pakistan’, ‘Karachi’, ’15-Nov-89′], [‘DNB’, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘4’, ‘v Pakistan’, ‘Karachi’, ’15-Nov-89′], [’59’, ‘254’, ‘172’, ‘4’, ‘0’, ‘34.3’, ‘6’, ‘lbw’, ‘1’, ‘v Pakistan’, ‘Faisalabad’, ’23-Nov-89′], [‘8′, ’24’, ’16’, ‘1’, ‘0’, ’50’, ‘6’, ‘run out’, ‘3’, ‘v Pakistan’, ‘Faisalabad’, ’23-Nov-89′]]

1b.RDD – Select columns 1 to 4

from pyspark import SparkContext 
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
rdd.map(lambda line: (line.split(",")[0:4])).take(5)
Out[91]:
[[‘Runs’, ‘Mins’, ‘BF’, ‘4s’],
[’15’, ’28’, ’24’, ‘2’],
[‘DNB’, ‘-‘, ‘-‘, ‘-‘],
[’59’, ‘254’, ‘172’, ‘4’],
[‘8′, ’24’, ’16’, ‘1’]]

1c. RDD – Select specific columns 0, 10

from pyspark import SparkContext 
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=rdd.map(lambda line: (line.split(",")))
df.map(lambda x: (x[10],x[0])).take(5)
Out[92]:
[(‘Ground’, ‘Runs’),
(‘Karachi’, ’15’),
(‘Karachi’, ‘DNB’),
(‘Faisalabad’, ’59’),
(‘Faisalabad’, ‘8’)]

2. Dataframe:Pyspark – Select all columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.show(5)
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|Runs|Mins| BF| 4s| 6s| SR|Pos|Dismissal|Inns|Opposition| Ground|Start Date|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
| 15| 28| 24| 2| 0| 62.5| 6| bowled| 2|v Pakistan| Karachi| 15-Nov-89|
| DNB| -| -| -| -| -| -| -| 4|v Pakistan| Karachi| 15-Nov-89|
| 59| 254|172| 4| 0| 34.3| 6| lbw| 1|v Pakistan|Faisalabad| 23-Nov-89|
| 8| 24| 16| 1| 0| 50| 6| run out| 3|v Pakistan|Faisalabad| 23-Nov-89|
| 41| 124| 90| 5| 0|45.55| 7| bowled| 1|v Pakistan| Lahore| 1-Dec-89|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
only showing top 5 rows

2a. Dataframe:Pyspark- Select specific columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.select("Runs","BF","Mins").show(5)
+—-+—+—-+
|Runs| BF|Mins|
+—-+—+—-+
| 15| 24| 28|
| DNB| -| -|
| 59|172| 254|
| 8| 16| 24|
| 41| 90| 124|
+—-+—+—-+

3. Dataframe:SparkR – Select all columns

# Load the SparkR library
library(SparkR)
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Check the dimensions of the dataframe
df=SparkR::select(tendulkar1,"*")
head(SparkR::collect(df))

  Runs Mins  BF 4s 6s    SR Pos Dismissal Inns Opposition     Ground Start Date
1   15   28  24  2  0  62.5   6    bowled    2 v Pakistan    Karachi  15-Nov-89
2  DNB    -   -  -  -     -   -         -    4 v Pakistan    Karachi  15-Nov-89
3   59  254 172  4  0  34.3   6       lbw    1 v Pakistan Faisalabad  23-Nov-89
4    8   24  16  1  0    50   6   run out    3 v Pakistan Faisalabad  23-Nov-89
5   41  124  90  5  0 45.55   7    bowled    1 v Pakistan     Lahore   1-Dec-89
6   35   74  51  5  0 68.62   6       lbw    1 v Pakistan    Sialkot   9-Dec-89

3a. Dataframe:SparkR- Select specific columns

# Load the SparkR library
library(SparkR)
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Check the dimensions of the dataframe
df=SparkR::select(tendulkar1, "Runs", "BF","Mins")
head(SparkR::collect(df))
Runs BF Mins
1 15 24 28
2 DNB – –
3 59 172 254
4 8 16 24
5 41 90 124
6 35 51 74

4. Hive QL – Select all columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.createOrReplaceTempView('tendulkar1_table')
spark.sql('select  * from tendulkar1_table limit 5').show(10, truncate = False)
+—-+—+—-++—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|Runs|Mins|BF |4s |6s |SR |Pos|Dismissal|Inns|Opposition|Ground |Start Date|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|15 |28 |24 |2 |0 |62.5 |6 |bowled |2 |v Pakistan|Karachi |15-Nov-89 |
|DNB |- |- |- |- |- |- |- |4 |v Pakistan|Karachi |15-Nov-89 |
|59 |254 |172|4 |0 |34.3 |6 |lbw |1 |v Pakistan|Faisalabad|23-Nov-89 |
|8 |24 |16 |1 |0 |50 |6 |run out |3 |v Pakistan|Faisalabad|23-Nov-89 |
|41 |124 |90 |5 |0 |45.55|7 |bowled |1 |v Pakistan|Lahore |1-Dec-89 |
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+

4a. Hive QL – Select specific columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.createOrReplaceTempView('tendulkar1_table')
spark.sql('select  Runs, BF,Mins from tendulkar1_table limit 5').show(10, truncate = False)
+—-+—+—-+
|Runs|BF |Mins|
+—-+—+—-+
|15 |24 |28 |
|DNB |- |- |
|59 |172|254 |
|8 |16 |24 |
|41 |90 |124 |
+—-+—+—-+

5. RDD – Filter rows on specific condition

from pyspark import SparkContext
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=(rdd.map(lambda line: line.split(",")[:])
      .filter(lambda x: x !="DNB")
      .filter(lambda x: x!= "TDNB")
      .filter(lambda x: x!="absent")
      .map(lambda x: [x[0].replace("*","")] + x[1:]))

df.take(5)

Out[97]:
[[‘Runs’,
‘Mins’,
‘BF’,
‘4s’,
‘6s’,
‘SR’,
‘Pos’,
‘Dismissal’,
‘Inns’,
‘Opposition’,
‘Ground’,
‘Start Date’],
[’15’,
’28’,
’24’,
‘2’,
‘0’,
‘62.5’,
‘6’,
‘bowled’,
‘2’,
‘v Pakistan’,
‘Karachi’,
’15-Nov-89′],
[‘DNB’,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘4’,
‘v Pakistan’,
‘Karachi’,
’15-Nov-89′],
[’59’,
‘254’,
‘172’,
‘4’,
‘0’,
‘34.3’,
‘6’,
‘lbw’,
‘1’,
‘v Pakistan’,
‘Faisalabad’,
’23-Nov-89′],
[‘8′,
’24’,
’16’,
‘1’,
‘0’,
’50’,
‘6’,
‘run out’,
‘3’,
‘v Pakistan’,
‘Faisalabad’,
’23-Nov-89′]]

5a. Dataframe:Pyspark – Filter rows on specific condition

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'DNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'TDNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'absent')
tendulkar1 = tendulkar1.withColumn('Runs', regexp_replace('Runs', '[*]', ''))
tendulkar1.show(5)
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|Runs|Mins| BF| 4s| 6s| SR|Pos|Dismissal|Inns|Opposition| Ground|Start Date|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
| 15| 28| 24| 2| 0| 62.5| 6| bowled| 2|v Pakistan| Karachi| 15-Nov-89|
| 59| 254|172| 4| 0| 34.3| 6| lbw| 1|v Pakistan|Faisalabad| 23-Nov-89|
| 8| 24| 16| 1| 0| 50| 6| run out| 3|v Pakistan|Faisalabad| 23-Nov-89|
| 41| 124| 90| 5| 0|45.55| 7| bowled| 1|v Pakistan| Lahore| 1-Dec-89|
| 35| 74| 51| 5| 0|68.62| 6| lbw| 1|v Pakistan| Sialkot| 9-Dec-89|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
only showing top 5 rows

5b. Dataframe:SparkR – Filter rows on specific condition

sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
df=SparkR::select(tendulkar1,"*")
head(SparkR::collect(df))

5c Hive QL – Filter rows on specific condition

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.createOrReplaceTempView('tendulkar1_table')
spark.sql('select  Runs, BF,Mins from tendulkar1_table where Runs NOT IN  ("DNB","TDNB","absent")').show(10, truncate = False)
+—-+—+—-+
|Runs|BF |Mins|
+—-+—+—-+
|15 |24 |28 |
|59 |172|254 |
|8 |16 |24 |
|41 |90 |124 |
|35 |51 |74 |
|57 |134|193 |
|0 |1 |1 |
|24 |44 |50 |
|88 |266|324 |
|5 |13 |15 |
+—-+—+—-+
only showing top 10 rows

6. RDD – Find rows where Runs > 50

from pyspark import SparkContext
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=rdd.map(lambda line: (line.split(",")))
df=rdd.map(lambda line: line.split(",")[0:4]) \
   .filter(lambda x: x[0] not in ["DNB", "TDNB", "absent"])
df1=df.map(lambda x: [x[0].replace("*","")] + x[1:4])
header=df1.first()
df2=df1.filter(lambda x: x !=header)
df3=df2.map(lambda x: [float(x[0])] +x[1:4])
df3.filter(lambda x: x[0]>=50).take(10)
Out[101]: 
[[59.0, '254', '172', '4'],
 [57.0, '193', '134', '6'],
 [88.0, '324', '266', '5'],
 [68.0, '216', '136', '8'],
 [119.0, '225', '189', '17'],
 [148.0, '298', '213', '14'],
 [114.0, '228', '161', '16'],
 [111.0, '373', '270', '19'],
 [73.0, '272', '208', '8'],
 [50.0, '158', '118', '6']]

6a. Dataframe:Pyspark – Find rows where Runs >50

from pyspark.sql import SparkSession

from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'DNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'TDNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'absent')
tendulkar1 = tendulkar1.withColumn("Runs", tendulkar1["Runs"].cast(IntegerType()))
tendulkar1.filter(tendulkar1['Runs']>=50).show(10)
+—-+—-+—+—+—+—–+—+———+—-+————–+————+———-+
|Runs|Mins| BF| 4s| 6s| SR|Pos|Dismissal|Inns| Opposition| Ground|Start Date|
+—-+—-+—+—+—+—–+—+———+—-+————–+————+———-+
| 59| 254|172| 4| 0| 34.3| 6| lbw| 1| v Pakistan| Faisalabad| 23-Nov-89|
| 57| 193|134| 6| 0|42.53| 6| caught| 3| v Pakistan| Sialkot| 9-Dec-89|
| 88| 324|266| 5| 0|33.08| 6| caught| 1| v New Zealand| Napier| 9-Feb-90|
| 68| 216|136| 8| 0| 50| 6| caught| 2| v England| Manchester| 9-Aug-90|
| 114| 228|161| 16| 0| 70.8| 4| caught| 2| v Australia| Perth| 1-Feb-92|
| 111| 373|270| 19| 0|41.11| 4| caught| 2|v South Africa|Johannesburg| 26-Nov-92|
| 73| 272|208| 8| 1|35.09| 5| caught| 2|v South Africa| Cape Town| 2-Jan-93|
| 50| 158|118| 6| 0|42.37| 4| caught| 1| v England| Kolkata| 29-Jan-93|
| 165| 361|296| 24| 1|55.74| 4| caught| 1| v England| Chennai| 11-Feb-93|
| 78| 285|213| 10| 0|36.61| 4| lbw| 2| v England| Mumbai| 19-Feb-93|
+—-+—-+—+—+—+—–+—+———+—-+————–+————+———-+

6b. Dataframe:SparkR – Find rows where Runs >50

# Load the SparkR library
library(SparkR)
sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
df=SparkR::select(tendulkar1,"*")
df=SparkR::filter(tendulkar1, tendulkar1$Runs > 50)
head(SparkR::collect(df))
  Runs Mins  BF 4s 6s    SR Pos Dismissal Inns    Opposition     Ground
1   59  254 172  4  0  34.3   6       lbw    1    v Pakistan Faisalabad
2   57  193 134  6  0 42.53   6    caught    3    v Pakistan    Sialkot
3   88  324 266  5  0 33.08   6    caught    1 v New Zealand     Napier
4   68  216 136  8  0    50   6    caught    2     v England Manchester
5  119  225 189 17  0 62.96   6   not out    4     v England Manchester
6  148  298 213 14  0 69.48   6   not out    2   v Australia     Sydney
  Start Date
1  23-Nov-89
2   9-Dec-89
3   9-Feb-90
4   9-Aug-90
5   9-Aug-90
6   2-Jan-92

 

7 RDD – groupByKey() and reduceByKey()

from pyspark import SparkContext
from pyspark.mllib.stat import Statistics
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=rdd.map(lambda line: (line.split(",")))
df=rdd.map(lambda line: line.split(",")[0:]) \
   .filter(lambda x: x[0] not in ["DNB", "TDNB", "absent"])
df1=df.map(lambda x: [x[0].replace("*","")] + x[1:])
header=df1.first()
df2=df1.filter(lambda x: x !=header)
df3=df2.map(lambda x: [float(x[0])] +x[1:])
df4 = df3.map(lambda x: (x[10],x[0]))
df5=df4.reduceByKey(lambda a,b: a+b,1)
df4.groupByKey().mapValues(lambda x: sum(x) / len(x)).take(10)

[(‘Georgetown’, 81.0),
(‘Lahore’, 17.0),
(‘Adelaide’, 32.6),
(‘Colombo (SSC)’, 77.55555555555556),
(‘Nagpur’, 64.66666666666667),
(‘Auckland’, 5.0),
(‘Bloemfontein’, 85.0),
(‘Centurion’, 73.5),
(‘Faisalabad’, 27.0),
(‘Bridgetown’, 26.0)]

7a Dataframe:Pyspark – Compute mean, min and max

from pyspark.sql.functions import *
tendulkar1= (sqlContext
         .read.format("com.databricks.spark.csv")
         .options(delimiter=',', header='true', inferschema='true')
         .load("/FileStore/tables/tendulkar.csv"))
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'DNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'TDNB')
tendulkar1 = tendulkar1.withColumn('Runs', regexp_replace('Runs', '[*]', ''))
tendulkar1.select('Runs').rdd.distinct().collect()

from pyspark.sql import functions as F
df=tendulkar1[['Runs','BF','Ground']].groupby(tendulkar1['Ground']).agg(F.mean(tendulkar1['Runs']),F.min(tendulkar1['Runs']),F.max(tendulkar1['Runs']))
df.show()
————-+—————–+———+———+
| Ground| avg(Runs)|min(Runs)|max(Runs)|
+————-+—————–+———+———+
| Bangalore| 54.3125| 0| 96|
| Adelaide| 32.6| 0| 61|
|Colombo (PSS)| 37.2| 14| 71|
| Christchurch| 12.0| 0| 24|
| Auckland| 5.0| 5| 5|
| Chennai| 60.625| 0| 81|
| Centurion| 73.5| 111| 36|
| Brisbane|7.666666666666667| 0| 7|
| Birmingham| 46.75| 1| 40|
| Ahmedabad| 40.125| 100| 8|
|Colombo (RPS)| 143.0| 143| 143|
| Chittagong| 57.8| 101| 36|
| Cape Town|69.85714285714286| 14| 9|
| Bridgetown| 26.0| 0| 92|
| Bulawayo| 55.0| 36| 74|
| Delhi|39.94736842105263| 0| 76|
| Chandigarh| 11.0| 11| 11|
| Bloemfontein| 85.0| 15| 155|
|Colombo (SSC)|77.55555555555556| 104| 8|
| Cuttack| 2.0| 2| 2|
+————-+—————–+———+———+
only showing top 20 rows

7b Dataframe:SparkR – Compute mean, min and max

sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
df=SparkR::summarize(SparkR::groupBy(tendulkar1, tendulkar1$Ground), mean = mean(tendulkar1$Runs), minRuns=min(tendulkar1$Runs),maxRuns=max(tendulkar1$Runs))
head(df,20)
          Ground       mean minRuns maxRuns
1      Bangalore  54.312500       0      96
2       Adelaide  32.600000       0      61
3  Colombo (PSS)  37.200000      14      71
4   Christchurch  12.000000       0      24
5       Auckland   5.000000       5       5
6        Chennai  60.625000       0      81
7      Centurion  73.500000     111      36
8       Brisbane   7.666667       0       7
9     Birmingham  46.750000       1      40
10     Ahmedabad  40.125000     100       8
11 Colombo (RPS) 143.000000     143     143
12    Chittagong  57.800000     101      36
13     Cape Town  69.857143      14       9
14    Bridgetown  26.000000       0      92
15      Bulawayo  55.000000      36      74
16         Delhi  39.947368       0      76
17    Chandigarh  11.000000      11      11
18  Bloemfontein  85.000000      15     155
19 Colombo (SSC)  77.555556     104       8
20       Cuttack   2.000000       2       2

Also see
1. My book ‘Practical Machine Learning in R and Python: Third edition’ on Amazon
2.My book ‘Deep Learning from first principles:Second Edition’ now on Amazon
3.The Clash of the Titans in Test and ODI cricket
4. Introducing QCSimulator: A 5-qubit quantum computing simulator in R
5.Latency, throughput implications for the Cloud
6. Simulating a Web Joint in Android
5. Pitching yorkpy … short of good length to IPL – Part 1

To see all posts click Index of Posts

Big Data-2: Move into the big league:Graduate from R to SparkR

This post is a continuation of my earlier post Big Data-1: Move into the big league:Graduate from Python to Pyspark. While the earlier post discussed parallel constructs in Python and Pyspark, this post elaborates similar and key constructs in R and SparkR. While this post just focuses on the programming part of R and SparkR it is essential to understand and fully grasp the concept of Spark, RDD and how data is distributed across the clusters. This post like the earlier post shows how if you already have a good handle of R, you can easily graduate to Big Data with SparkR

Note 1: This notebook has also been published at Databricks community site Big Data-2: Move into the big league:Graduate from R to SparkR

Note 2: You can download this RMarkdown file from Github at Big Data- Python to Pyspark and R to SparkR
1a. Read CSV- R

Note: To upload the CSV to databricks see the video Upload Flat File to Databricks Table

# Read CSV file
tendulkar= read.csv("/dbfs/FileStore/tables/tendulkar.csv",stringsAsFactors = FALSE,na.strings=c(NA,"-"))
#Check the dimensions of the dataframe
dim(tendulkar)
[1] 347  12
1b. Read CSV – SparkR
# Load the SparkR library
library(SparkR)
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Check the dimensions of the dataframe
dim(tendulkar1)
[1] 347  12
2a. Data frame shape – R
# Get the shape of the dataframe in R
dim(tendulkar)
[1] 347  12
2b. Dataframe shape – SparkR

The same ‘dim’ command works in SparkR too!

dim(tendulkar1)
[1] 347  12
3a . Dataframe columns – R
# Get the names
names(tendulkar) # Also colnames(tendulkar)
 [1] "Runs"       "Mins"       "BF"         "X4s"        "X6s"       
 [6] "SR"         "Pos"        "Dismissal"  "Inns"       "Opposition"
[11] "Ground"     "Start.Date"
3b. Dataframe columns – SparkR
names(tendulkar1)
 [1] "Runs"       "Mins"       "BF"         "4s"         "6s"        
 [6] "SR"         "Pos"        "Dismissal"  "Inns"       "Opposition"
[11] "Ground"     "Start Date"
4a. Rename columns – R
names(tendulkar)=c('Runs','Minutes','BallsFaced','Fours','Sixes','StrikeRate','Position','Dismissal','Innings','Opposition','Ground','StartDate')
names(tendulkar)
 [1] "Runs"       "Minutes"    "BallsFaced" "Fours"      "Sixes"     
 [6] "StrikeRate" "Position"   "Dismissal"  "Innings"    "Opposition"
[11] "Ground"     "StartDate"
4b. Rename columns – SparkR
names(tendulkar1)=c('Runs','Minutes','BallsFaced','Fours','Sixes','StrikeRate','Position','Dismissal','Innings','Opposition','Ground','StartDate')
names(tendulkar1)
 [1] "Runs"       "Minutes"    "BallsFaced" "Fours"      "Sixes"     
 [6] "StrikeRate" "Position"   "Dismissal"  "Innings"    "Opposition"
[11] "Ground"     "StartDate"
5a. Summary – R
summary(tendulkar)
     Runs              Minutes        BallsFaced         Fours       
 Length:347         Min.   :  1.0   Min.   :  0.00   Min.   : 0.000  
 Class :character   1st Qu.: 33.0   1st Qu.: 22.00   1st Qu.: 1.000  
 Mode  :character   Median : 82.0   Median : 58.50   Median : 4.000  
                    Mean   :125.5   Mean   : 89.75   Mean   : 6.274  
                    3rd Qu.:181.0   3rd Qu.:133.25   3rd Qu.: 9.000  
                    Max.   :613.0   Max.   :436.00   Max.   :35.000  
                    NA's   :18      NA's   :19       NA's   :19      
     Sixes          StrikeRate        Position     Dismissal        
 Min.   :0.0000   Min.   :  0.00   Min.   :2.00   Length:347        
 1st Qu.:0.0000   1st Qu.: 38.09   1st Qu.:4.00   Class :character  
 Median :0.0000   Median : 52.25   Median :4.00   Mode  :character  
 Mean   :0.2097   Mean   : 51.79   Mean   :4.24                     
 3rd Qu.:0.0000   3rd Qu.: 65.09   3rd Qu.:4.00                     
 Max.   :4.0000   Max.   :166.66   Max.   :7.00                     
 NA's   :18       NA's   :20       NA's   :18                       
    Innings       Opposition           Ground           StartDate        
 Min.   :1.000   Length:347         Length:347         Length:347        
 1st Qu.:1.000   Class :character   Class :character   Class :character  
 Median :2.000   Mode  :character   Mode  :character   Mode  :character  
 Mean   :2.376                                                           
 3rd Qu.:3.000                                                           
 Max.   :4.000                                                           
 NA's   :1
5b. Summary – SparkR
summary(tendulkar1)
SparkDataFrame[summary:string, Runs:string, Minutes:string, BallsFaced:string, Fours:string, Sixes:string, StrikeRate:string, Position:string, Dismissal:string, Innings:string, Opposition:string, Ground:string, StartDate:string]
6a. Displaying details of dataframe with str() – R
str(tendulkar)
'data.frame':	347 obs. of  12 variables:
 $ Runs      : chr  "15" "DNB" "59" "8" ...
 $ Minutes   : int  28 NA 254 24 124 74 193 1 50 324 ...
 $ BallsFaced: int  24 NA 172 16 90 51 134 1 44 266 ...
 $ Fours     : int  2 NA 4 1 5 5 6 0 3 5 ...
 $ Sixes     : int  0 NA 0 0 0 0 0 0 0 0 ...
 $ StrikeRate: num  62.5 NA 34.3 50 45.5 ...
 $ Position  : int  6 NA 6 6 7 6 6 6 6 6 ...
 $ Dismissal : chr  "bowled" NA "lbw" "run out" ...
 $ Innings   : int  2 4 1 3 1 1 3 2 3 1 ...
 $ Opposition: chr  "v Pakistan" "v Pakistan" "v Pakistan" "v Pakistan" ...
 $ Ground    : chr  "Karachi" "Karachi" "Faisalabad" "Faisalabad" ...
 $ StartDate : chr  "15-Nov-89" "15-Nov-89" "23-Nov-89" "23-Nov-89" ...
6b. Displaying details of dataframe with str() – SparkR
str(tendulkar1)
'SparkDataFrame': 12 variables:
 $ Runs      : chr "15" "DNB" "59" "8" "41" "35"
 $ Minutes   : chr "28" "-" "254" "24" "124" "74"
 $ BallsFaced: chr "24" "-" "172" "16" "90" "51"
 $ Fours     : chr "2" "-" "4" "1" "5" "5"
 $ Sixes     : chr "0" "-" "0" "0" "0" "0"
 $ StrikeRate: chr "62.5" "-" "34.3" "50" "45.55" "68.62"
 $ Position  : chr "6" "-" "6" "6" "7" "6"
 $ Dismissal : chr "bowled" "-" "lbw" "run out" "bowled" "lbw"
 $ Innings   : chr "2" "4" "1" "3" "1" "1"
 $ Opposition: chr "v Pakistan" "v Pakistan" "v Pakistan" "v Pakistan" "v Pakistan" "v Pakistan"
 $ Ground    : chr "Karachi" "Karachi" "Faisalabad" "Faisalabad" "Lahore" "Sialkot"
 $ StartDate : chr "15-Nov-89" "15-Nov-89" "23-Nov-89" "23-Nov-89" "1-Dec-89" "9-Dec-89"
7a. Head & tail -R
print(head(tendulkar),3)
print(tail(tendulkar),3)
 Runs Minutes BallsFaced Fours Sixes StrikeRate Position Dismissal Innings
1   15      28         24     2     0      62.50        6    bowled       2
2  DNB      NA         NA    NA    NA         NA       NA             4
3   59     254        172     4     0      34.30        6       lbw       1
4    8      24         16     1     0      50.00        6   run out       3
5   41     124         90     5     0      45.55        7    bowled       1
6   35      74         51     5     0      68.62        6       lbw       1
  Opposition     Ground StartDate
1 v Pakistan    Karachi 15-Nov-89
2 v Pakistan    Karachi 15-Nov-89
3 v Pakistan Faisalabad 23-Nov-89
4 v Pakistan Faisalabad 23-Nov-89
5 v Pakistan     Lahore  1-Dec-89
6 v Pakistan    Sialkot  9-Dec-89
    Runs Minutes BallsFaced Fours Sixes StrikeRate Position Dismissal Innings
342   37     125         81     5     0      45.67        4    caught       2
343   21      71         23     2     0      91.30        4   run out       4
344   32      99         53     5     0      60.37        4       lbw       2
345    1       8          5     0     0      20.00        4       lbw       4
346   10      41         24     2     0      41.66        4       lbw       2
347   74     150        118    12     0      62.71        4    caught       2
       Opposition  Ground StartDate
342   v Australia  Mohali 14-Mar-13
343   v Australia  Mohali 14-Mar-13
344   v Australia   Delhi 22-Mar-13
345   v Australia   Delhi 22-Mar-13
346 v West Indies Kolkata  6-Nov-13
347 v West Indies  Mumbai 14-Nov-13
7b. Head – SparkR
head(tendulkar1,3)
  Runs Minutes BallsFaced Fours Sixes StrikeRate Position Dismissal Innings
1   15      28         24     2     0       62.5        6    bowled       2
2  DNB       -          -     -     -          -        -         -       4
3   59     254        172     4     0       34.3        6       lbw       1
  Opposition     Ground StartDate
1 v Pakistan    Karachi 15-Nov-89
2 v Pakistan    Karachi 15-Nov-89
3 v Pakistan Faisalabad 23-Nov-89
8a. Determining the column types with sapply -R
sapply(tendulkar,class)
       Runs     Minutes  BallsFaced       Fours       Sixes  StrikeRate 
"character"   "integer"   "integer"   "integer"   "integer"   "numeric" 
   Position   Dismissal     Innings  Opposition      Ground   StartDate 
  "integer" "character"   "integer" "character" "character" "character"
8b. Determining the column types with printSchema – SparkR
printSchema(tendulkar1)
root
 |-- Runs: string (nullable = true)
 |-- Minutes: string (nullable = true)
 |-- BallsFaced: string (nullable = true)
 |-- Fours: string (nullable = true)
 |-- Sixes: string (nullable = true)
 |-- StrikeRate: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Dismissal: string (nullable = true)
 |-- Innings: string (nullable = true)
 |-- Opposition: string (nullable = true)
 |-- Ground: string (nullable = true)
 |-- StartDate: string (nullable = true)
9a. Selecting columns – R
library(dplyr)
df=select(tendulkar,Runs,BallsFaced,Minutes)
head(df,5)
  Runs BallsFaced Minutes
1   15         24      28
2  DNB         NA      NA
3   59        172     254
4    8         16      24
5   41         90     124
9b. Selecting columns – SparkR
library(SparkR)
Sys.setenv(SPARK_HOME="/usr/hdp/2.6.0.3-8/spark")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")
df=SparkR::select(tendulkar1, "Runs", "BF","Mins")
head(SparkR::collect(df))
  Runs  BF Mins
1   15  24   28
2  DNB   -    -
3   59 172  254
4    8  16   24
5   41  90  124
6   35  51   74
10a. Filter rows by criteria – R
library(dplyr)
df=tendulkar %>% filter(Runs > 50)
head(df,5)
  Runs Minutes BallsFaced Fours Sixes StrikeRate Position Dismissal Innings
1  DNB      NA         NA    NA    NA         NA       NA             4
2   59     254        172     4     0      34.30        6       lbw       1
3    8      24         16     1     0      50.00        6   run out       3
4   57     193        134     6     0      42.53        6    caught       3
5   88     324        266     5     0      33.08        6    caught       1
     Opposition     Ground StartDate
1    v Pakistan    Karachi 15-Nov-89
2    v Pakistan Faisalabad 23-Nov-89
3    v Pakistan Faisalabad 23-Nov-89
4    v Pakistan    Sialkot  9-Dec-89
5 v New Zealand     Napier  9-Feb-90
10b. Filter rows by criteria – SparkR
df=SparkR::filter(tendulkar1, tendulkar1$Runs > 50)
head(SparkR::collect(df))
  Runs Mins  BF 4s 6s    SR Pos Dismissal Inns     Opposition       Ground
1   59  254 172  4  0  34.3   6       lbw    1     v Pakistan   Faisalabad
2   57  193 134  6  0 42.53   6    caught    3     v Pakistan      Sialkot
3   88  324 266  5  0 33.08   6    caught    1  v New Zealand       Napier
4   68  216 136  8  0    50   6    caught    2      v England   Manchester
5  114  228 161 16  0  70.8   4    caught    2    v Australia        Perth
6  111  373 270 19  0 41.11   4    caught    2 v South Africa Johannesburg
  Start Date
1  23-Nov-89
2   9-Dec-89
3   9-Feb-90
4   9-Aug-90
5   1-Feb-92
6  26-Nov-92
11a. Unique values -R
unique(tendulkar$Runs)
  [1] "15"   "DNB"  "59"   "8"    "41"   "35"   "57"   "0"    "24"   "88"  
 [11] "5"    "10"   "27"   "68"   "119*" "21"   "11"   "16"   "7"    "40"  
 [21] "148*" "6"    "17"   "114"  "111"  "1"    "73"   "50"   "9*"   "165" 
 [31] "78"   "62"   "TDNB" "28"   "104*" "71"   "142"  "96"   "43"   "11*" 
 [41] "34"   "85"   "179"  "54"   "4"    "0*"   "52*"  "2"    "122"  "31"  
 [51] "177"  "74"   "42"   "18"   "61"   "36"   "169"  "9"    "15*"  "92"  
 [61] "83"   "143"  "139"  "23"   "148"  "13"   "155*" "79"   "47"   "113" 
 [71] "67"   "136"  "29"   "53"   "124*" "126*" "44*"  "217"  "116"  "52"  
 [81] "45"   "97"   "20"   "39"   "201*" "76"   "65"   "126"  "36*"  "69"  
 [91] "155"  "22*"  "103"  "26"   "90"   "176"  "117"  "86"   "12"   "193" 
[101] "16*"  "51"   "32"   "55"   "37"   "44"   "241*" "60*"  "194*" "3"   
[111] "32*"  "248*" "94"   "22"   "109"  "19"   "14"   "28*"  "63"   "64"  
[121] "101"  "122*" "91"   "82"   "56*"  "154*" "153"  "49"   "10*"  "103*"
[131] "160"  "100*" "105*" "100"  "106"  "84"   "203"  "98"   "38"   "214" 
[141] "53*"  "111*" "146"  "14*"  "56"   "80"   "25"   "81"   "13*"
11b. Unique values – SparkR
head(SparkR::distinct(tendulkar1[,"Runs"]),5)
  Runs
1 119*
2    7
3   51
4  169
5  32*
12a. Aggregate – Mean, min and max – R
library(dplyr)
library(magrittr)
a <- tendulkar$Runs != "DNB"
tendulkar <- tendulkar[a,]
dim(tendulkar)

# Remove rows with 'TDNB'
c <- tendulkar$Runs != "TDNB"
tendulkar <- tendulkar[c,]

# Remove rows with absent
d <- tendulkar$Runs != "absent"
tendulkar <- tendulkar[d,]
dim(tendulkar)

# Remove the "* indicating not out
tendulkar$Runs <- as.numeric(gsub("\\*","",tendulkar$Runs))
c <- complete.cases(tendulkar)

#Subset the rows which are complete
tendulkar <- tendulkar[c,]
print(dim(tendulkar))
df <-tendulkar %>%  group_by(Ground) %>% summarise(meanRuns= mean(Runs), minRuns=min(Runs), maxRuns=max(Runs)) 
#names(tendulkar)
head(df)
[1] 327  12
# A tibble: 6 x 4
  Ground       meanRuns minRuns maxRuns
                   
1 Adelaide        32.6       0.    153.
2 Ahmedabad       40.1       4.    217.
3 Auckland         5.00      5.      5.
4 Bangalore       57.9       4.    214.
5 Birmingham      46.8       1.    122.
6 Bloemfontein    85.0      15.    155.
12b. Aggregate- Mean, Min, Max – SparkR
sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
df=SparkR::summarize(SparkR::groupBy(tendulkar1, tendulkar1$Ground), mean = mean(tendulkar1$Runs), minRuns=min(tendulkar1$Runs),maxRuns=max(tendulkar1$Runs))
head(df,20)
[1] 347  12
[1] 330  12
[1] 329  12
[1] 329  12
          Ground       mean minRuns maxRuns
1      Bangalore  54.312500       0      96
2       Adelaide  32.600000       0      61
3  Colombo (PSS)  37.200000      14      71
4   Christchurch  12.000000       0      24
5       Auckland   5.000000       5       5
6        Chennai  60.625000       0      81
7      Centurion  73.500000     111      36
8       Brisbane   7.666667       0       7
9     Birmingham  46.750000       1      40
10     Ahmedabad  40.125000     100       8
11 Colombo (RPS) 143.000000     143     143
12    Chittagong  57.800000     101      36
13     Cape Town  69.857143      14       9
14    Bridgetown  26.000000       0      92
15      Bulawayo  55.000000      36      74
16         Delhi  39.947368       0      76
17    Chandigarh  11.000000      11      11
18  Bloemfontein  85.000000      15     155
19 Colombo (SSC)  77.555556     104       8
20       Cuttack   2.000000       2       2
13a Using SQL with SparkR
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(tendulkar1, "tendulkar2")

# SQL statements can be run by using the sql method
df=SparkR::sql("SELECT * FROM tendulkar2 WHERE Ground='Karachi'")

head(df)

  Runs Mins BF 4s 6s    SR Pos Dismissal Inns Opposition  Ground Start Date
1   15   28 24  2  0  62.5   6    bowled    2 v Pakistan Karachi  15-Nov-89
2  DNB    -  -  -  -     -   -         -    4 v Pakistan Karachi  15-Nov-89
3   23   49 29  5  0 79.31   4    bowled    2 v Pakistan Karachi  29-Jan-06
4   26   74 47  5  0 55.31   4    bowled    4 v Pakistan Karachi  29-Jan-06
Conclusion

This post discusses some of the key constructs in R and SparkR and how one can transition from R to SparkR fairly easily. I will be adding more constructs later. Do check back!

You may also like
1. Exploring Quantum Gate operations with QCSimulator
2. Deep Learning from first principles in Python, R and Octave – Part 4
3. A Bluemix recipe with MongoDB and Node.js
4. Practical Machine Learning with R and Python – Part 5
5. Introducing cricketr! : An R package to analyze performances of cricketers

To see all posts click Index of posts

Big Data-1: Move into the big league:Graduate from Python to Pyspark

This post discusses similar constructs in Python and Pyspark. As in my earlier post R vs Python: Different similarities and similar differences the focus is on the key and common constructs to highlight the similarities.

Important Note:You can also access this notebook at databricks public site  Big Data-1: Move into the big league:Graduate from Python to Pyspark (the formatting here is much better!!).

For this notebook I have used Databricks community edition


Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

You can download the notebook from Github at Big Data-1:PythontoPysparkAndRtoSparkR

Hope you found this useful!

Note: There are still a few more important constructs which I will be adding to this post.

Also see
1. My book “Deep Learning from first principles” now on Amazon
2. My book ‘Practical Machine Learning in R and Python: Second edition’ on Amazon
3. Re-introducing cricketr! : An R package to analyze performances of cricketers
4. GooglyPlus: yorkr analyzes IPL players, teams, matches with plots and tables
5. Deblurring with OpenCV: Weiner filter reloaded
6. Design Principles of Scalable, Distributed Systems

My travels through the realms of Data Science, Machine Learning, Deep Learning and (AI)

Then felt I like some watcher of the skies 
When a new planet swims into his ken; 
Or like stout Cortez when with eagle eyes 
He star’d at the Pacific—and all his men 
Look’d at each other with a wild surmise— 
Silent, upon a peak in Darien. 
On First Looking into Chapman’s Homer by John Keats

The above excerpt from John Keat’s poem captures the the exhilaration that one experiences, when discovering something for the first time. This also  summarizes to some extent my own as enjoyment while pursuing Data Science, Machine Learning and the like.

I decided to write this post, as occasionally youngsters approach me and ask me where they should start their adventure in Data Science & Machine Learning. There are other times, when the ‘not-so-youngsters’ want to know what their next step should be after having done some courses. This post includes my travels through the domains of Data Science, Machine Learning, Deep Learning and (soon to be done AI).

By no means, am I an authority in this field, which is ever-widening and almost bottomless, yet I would like to share some of my experiences in this fascinating field. I include a short review of the courses I have done below. I also include alternative routes through  courses which I did not do, but are probably equally good as well.  Feel free to pick and choose any course or set of courses. Alternatively, you may prefer to read books or attend bricks-n-mortar classes, In any case,  I hope the list below will provide you with some overall direction.

All my learning in the above domains have come from MOOCs and I restrict myself to the top 3 MOOCs, or in my opinion, ‘the original MOOCs’, namely Coursera, edX or Udacity, but may throw in some courses from other online sites if they are only available there. I would recommend these 3 MOOCs over the other numerous online courses and also over face-to-face classroom courses for the following reasons. These MOOCs

  • Are taken by world class colleges and the lectures are delivered by top class Professors who have a great depth of knowledge and a wealth of experience
  • The Professors, besides delivering quality content, also point out to important tips, tricks and traps
  • You can revisit lectures in online courses anytime to refresh your memory
  • Lectures are usually short between 8 -15 mins (Personally, my attention span is around 15-20 mins at a time!)

Here is a fair warning and something quite obvious. No amount of courses, lectures or books will help if you don’t put it to use through some language like Octave, R or Python.

The journey
My trip through Data Science, Machine Learning  started with an off-chance remark,about 3 years ago,  from an old friend of mine who spoke to me about having done a few  courses at Coursera, and really liked it.  He further suggested that I should try. This was the final push which set me sailing into this vast domain.

I have included the list of the courses I have done over the past 5 years (37+ certifications completed and another 9 audited-listened only without doing the assignments). For each of the courses I have included a short review of the course, whether I think the course is mandatory, the language in which the course is based on, and finally whether I have done the course myself etc. I have also included alternative courses, which I may have not done, but which I think are equally good. Finally, I suggest some courses which I have heard of and which are very good and worth taking.

1. Machine Learning, Stanford, Prof Andrew Ng, Coursera
(Requirement: Mandatory, Language:Octave,Status:Completed)
This course provides an excellent foundation to build your Machine Learning citadel on. The course covers the mathematical details of linear, logistic and multivariate regression. There is also a good coverage of topics like Neural Networks, SVMs, Anamoly Detection, underfitting, overfitting, regularization etc. Prof Andrew Ng presents the material in a very lucid manner. It is a great course to start with. It would be a good idea to brush up  some basics of linear algebra, matrices and a little bit of calculus, specifically computing the local maxima/minima. You should be able to take this course even if you don’t know Octave as the Prof goes over the key aspects of the language.

2. Statistical Learning, Prof Trevor Hastie & Prof Robert Tibesherani, Online Stanford– (Requirement:Mandatory, Language:R, Status;Completed) –
The course includes linear and polynomial regression, logistic regression. Details also include cross-validation and the bootstrap methods, how to do model selection and regularization (ridge and lasso). It also touches on non-linear models, generalized additive models, boosting and SVMs. Some unsupervised learning methods are  also discussed. The 2 Professors take turns in delivering lectures with a slight touch of humor.

3a. Data Science Specialization: Prof Roger Peng, Prof Brian Caffo & Prof Jeff Leek, John Hopkins University (Requirement: Option A, Language: R Status: Completed)
This is a comprehensive 10 module specialization based on R. This Specialization gives a very broad overview of Data Science and Machine Learning. The modules cover R programming, Statistical Inference, Practical Machine Learning, how to build R products and R packages and finally has a very good Capstone project on NLP

3b. Applied Data Science with Python Specialization: University of Michigan (Requirement: Option B, Language: Python, Status: Not done)
In this specialization I only did  the Applied Machine Learning in Python (Prof Kevyn-Collin Thomson). This is a very good course that covers a lot of Machine Learning algorithms(linear, logistic, ridge, lasso regression, knn, SVMs etc. Also included are confusion matrices, ROC curves etc. This is based on Python’s Scikit Learn

3c. Machine Learning Specialization, University Of Washington (Requirement:Option C, Language:Python, Status : Not completed). This appears to be a very good Specialization in Python

4. Statistics with R Specialization, Duke University (Requirement: Useful and a must know, Language R, Status:Not Completed)
I audited (listened only) to the following 2 modules from this Specialization.
a.Inferential Statistics
b.Linear Regression and Modeling
Both these courses are taught by Prof Mine Cetikya-Rundel who delivers her lessons with extraordinary clarity.  Her lectures are filled with many examples which she walks you through in great detail

5.Bayesian Statistics: From Concept to Data Analysis: Univ of California, Santa Cruz (Requirement: Optional, Language : R, Status:Completed)
This is an interesting course and provides an alternative point of view to frequentist approach

6. Data Science and Engineering with Spark, University of California, Berkeley, Prof Antony Joseph, Prof Ameet Talwalkar, Prof Jon Bates
(Required: Mandatory for Big Data, Status:Completed, Language; pySpark)
This specialization contains 3 modules
a.Introduction to Apache Spark
b.Distributed Machine Learning with Apache Spark
c.Big Data Analysis with Apache Spark

This is an excellent course for those who want to make an entry into Distributed Machine Learning. The exercises are fairly challenging and your code will predominantly be made of map/reduce and lambda operations as you process data that is distributed across Spark RDDs. I really liked  the part where the Prof shows how a matrix multiplication on a single machine is of the order of O(nd^2+d^3) (which is the basis of Machine Learning) is reduced to O(nd^2) by taking outer products on data which is distributed.

7. Deep Learning Prof Andrew Ng, Younes Bensouda Mourri, Kian Katanforoosh : Requirement:Mandatory,Language:Python, Tensorflow Status:Completed)

This course had 5 Modules which start from the fundamentals of Neural Networks, their derivation and vectorized Python implementation. The specialization also covers regularization, optimization techniques, mini batch normalization, Convolutional Neural Networks, Recurrent Neural Networks, LSTMs applied to a wide variety of real world problems

The modules are
a. Neural Networks and Deep Learning
In this course Prof Andrew Ng explains differential calculus, linear algebra and vectorized Python implementations of Deep Learning algorithms. The derivation for back-propagation is done and then the Prof shows how to compute a multi-layered DL network
b.Improving Deep Neural Networks: Hyperparameter tuning, Regularization and Optimization
Deep Neural Networks can be very flexible, and come with a lots of knobs (hyper-parameters) to tune with. In this module, Prof Andrew Ng shows a systematic way to tune hyperparameters and by how much should one tune. The course also covers regularization(L1,L2,dropout), gradient descent optimization and batch normalization methods. The visualizations used to explain the momentum method, RMSprop, Adam,LR decay and batch normalization are really powerful and serve to clarify the concepts. As an added bonus,the module also includes a great introduction to Tensorflow.
c.Structuring Machine Learning Projects
A very good module with useful tips, tricks and traps that need to be considered while working on Machine Learning and Deep Learning projects
d. Convolutional Neural Networks
This domain has a lot of really cool ideas, where images represented as 3D volumes, are compressed and stretched longitudinally before applying a multi-layered deep learning neural network to this thin slice for performing classification,detection etc. The Prof provides a glimpse into this fascinating world of image classification, detection andl neural art transfer with frameworks like Keras and Tensorflow.
e. Sequence Models
In this module covers in good detail concepts like RNNs, GRUs, LSTMs, word embeddings, beam search and attention model.

8. Neural Networks for Machine Learning, Prof Geoffrey Hinton,University of Toronto
(Requirement: Mandatory, Language;Octave, Status:Completed)
This is a broad course which starts from the basic of Perceptrons, all the way to Boltzman Machines, RNNs, CNNS, LSTMs etc The course also covers regularisation, learning rate decay, momentum method etc

9.Probabilistic Graphical Models, Stanford  Prof Daphne Koller(Language:Octave, Status: Partially completed)
This has 3 courses
a.Probabilistic Graphical Models 1: Representation – Done
b.Probabilistic Graphical Models 2: Inference – To do
c.Probabilistic Graphical Models 3: Learning – To do
This course discusses how a system, which can be represented as a complex interaction
of probability distributions, will behave. This is probably the toughest course I did.  I did manage to get through the 1st module, While I felt that grasped a few things, I did not wholly understand the import of this. However I feel this is an important domain and I will definitely revisit this in future

10. Reinforcement Specialization : University of Alberta, Prof Adam White and Prof Martha White
(Requirement: Very important, Language;Python, Status: Partially Completed)
This is a set of 4 courses. I did the first 2 of the 4. Reinforcement Learning appears deceptively simple, but it is anything but simple. Definitely a very critical area to learn.

a.Fundamentals of Reinforcement Learning: This course discusses Markov models, value functions and Bellman equations and dynamic programming.
b.Sample based learning Learning methods: This course touches on Monte Carlo methods, Temporal Difference methods, Q Learning etc.

Reinforcement Learning is a must-have in your AI arsenal.

11. Tensorflow in Practice Specialization – Prof Laurence Moroney – Deep Learning.AI
(Requirement: Important, Language;Python, Status: Completed)
This is a good course but definitely do the Deep Learning Specialization by Prof Andrew Ng
There are 4 courses in this Specialization. I completed all 4 courses. They are fairly straight forward
a. Introduction to TensorFlow – This course introduces you to Tensorflow, image recognition with brute-force method
b. Convolutional Neural Networks in Tensorflow – This course touches on how to build a CNN, image augmentation, transfer learning and multi-class classification
c. Natural Language Processing in Tensorflow – Word embeddings, sentiment analysis, LSTMs, RNNs are discussed.
d. Sequences, time series and prediction – This course discusses using RNNs for time series, auto correlation

12. Natural Language Processing  Specialization – Prof Younes Bensouda, Lukasz Kaiser from DeepLearning.AI
(Requirement: Very Important, Language;Python, Status: Partially Completed)
This is the latest specialization from Deep Learning.AI. I have completed the first 2 courses
a.Natural Language Processing with Classification and Vector Spaces -The first course deals with sentiment analysis with Naive Bayes, vector space models, capturing dependencies using PCA etc
b. Natural Language Processing with Probabilistic Models – In this course techniques for auto correction, Markov models and Viterbi algorithm for Parts of Speech tagging, auto completion and word embedding are discussed.

13. Mining Massive Data Sets Prof Jure Leskovec, Prof Anand Rajaraman and ProfJeff Ullman. Online Stanford, Status Partially done.,
I did quickly audit this course, a year back, when it used to be in Coursera. It now seems to have moved to Stanford online. But this is a very good course that discusses key concepts of Mining Big Data of the order a few Petabytes

14. Introduction to Artificial Intelligence, Prof Sebastian Thrun & Prof Peter Norvig, Udacity
This is a really good course. I have started on this course a couple of times and somehow gave up. Will revisit to complete in future. Quite extensive in its coverage.Touches BFS,DFS, A-Star, PGM, Machine Learning etc.

15.Deep Learning (with TensorFlow), Vincent Vanhoucke, Principal Scientist at Google Brain.
Got started on this one and abandoned some time back. In my to do list though

16. Generative AI with LLMs

My learning journey is based on Lao Tzu’s dictum of ‘A good traveler has no fixed plans and is not intent on arriving’. You could have a goal and try to plan your courses accordingly.
And so my journey continues…

I hope you find this list useful.
Have a great journey ahead!!!

IBM Data Science Experience:  First steps with yorkr

Fresh, and slightly dizzy, from my foray into Quantum Computing with IBM’s Quantum Experience, I now turn my attention to IBM’s Data Science Experience (DSE).

I am on the verge of completing a really great 3 module ‘Data Science and Engineering with Spark XSeries’ from the University of California, Berkeley and I have been thinking of trying out some form of integrated delivery platform for performing analytics, for quite some time.  Coincidentally,  IBM comes out with its Data Science Experience. a month back. There are a couple of other collaborative platforms available for playing around with Apache Spark or Data Analytics namely Jupyter notebooks, Databricks, Data.world.

I decided to go ahead with IBM’s Data Science Experience as  the GUI is a lot cooler, includes shared data sets and integrates with Object Storage, Cloudant DB etc,  which seemed a lot closer to the cloud, literally!  IBM’s DSE is an interactive, collaborative, cloud-based environment for performing data analysis with Apache Spark. DSE is hosted on IBM’s PaaS environment, Bluemix. It should be possible to access in DSE the plethora of cloud services available on Bluemix. IBM’s DSE uses Jupyter notebooks for creating and analyzing data which can be easily shared and has access to a few hundred publicly available datasets

Disclaimer: This article represents the author’s viewpoint only and doesn’t necessarily represent IBM’s positions, strategies or opinions

In this post, I use IBM’s DSE and my R package yorkr, for analyzing the performance of 1 ODI match (Aus-Ind, 2 Feb 2012)  and the batting performance of Virat Kohli in IPL matches. These are my ‘first’ steps in DSE so, I use plain old “R language” for analysis together with my R package ‘yorkr’. I intend to  do more interesting stuff on Machine learning with SparkR, Sparklyr and PySpark in the weeks and months to come.

You can checkout the Jupyter notebooks created with IBM’s DSE Y at Github  – “Using R package yorkr – A quick overview’ and  on NBviewer at “Using R package yorkr – A quick overview

Working with Jupyter notebooks are fairly straight forward which can handle code in R, Python and Scala. Each cell can either contain code (Python or Scala), Markdown text, NBConvert or Heading. The code is written into the cells and can be executed sequentially. Here is a screen shot of the notebook.

Untitled

The ‘File’ menu can be used for ‘saving and checkpointing’ or ‘reverting’ to a checkpoint. The ‘kernel’ menu can be used to start, interrupt, restart and run all cells etc. Data Sources icon can be used to load data sources to your code. The data is uploaded to Object Storage with appropriate credentials. You will have to  import this data from Object Storage using the credentials. In my notebook with yorkr I directly load the data from Github.  You can use the sharing to share the notebook. The shared notebook has an extension ‘ipynb’. You can use the ‘Sharing’ icon  to share the notebook. The shared notebook has an extension ‘ipynb’. You an import this notebook directly into your environment and can get started with the code available in the notebook.

You can import existing R, Python or Scala notebooks as shown below. My notebook ‘Using R package yorkr – A quick overview’ can be downloaded using the link ‘yorkrWithDSE’ and clicking the green download icon on top right corner.

Untitled2

I have also uploaded the file to Github and you can download from here too ‘yorkrWithDSE’. This notebook can be imported into your DSE as shown below

Untitled1

Jupyter notebooks have been integrated with Github and are rendered directly from Github.  You can view my Jupyter notebook here  – “Using R package yorkr – A quick overview’. You can also view it on NBviewer at “Using R package yorkr – A quick overview

So there it is. You can download my notebook, import it into IBM’s Data Science Experience and then use data from ‘yorkrData” as shown. As already mentioned yorkrData contains converted data for ODIs, T20 and IPL. For details on how to use my R package yorkr  please my posts on yorkr at “Index of posts

Hope you have fun playing wit IBM’s Data Science Experience and my package yorkr.

I will be exploring IBM’s DSE in weeks and months to come in the areas of Machine Learning with SparkR,SparklyR or pySpark.

Watch this space!!!

Disclaimer: This article represents the author’s viewpoint only and doesn’t necessarily represent IBM’s positions, strategies or opinions

Also see

1. Introducing QCSimulator: A 5-qubit quantum computing simulator in R
2. Natural Processing Language : What would Shakespeare say?
3. Introducing cricket package yorkr:Part 1- Beaten by sheer pace!
4. A closer look at “Robot horse on a Trot! in Android”
5.  Re-introducing cricketr! : An R package to analyze performances of cricketers
6.   What’s up Watson? Using IBM Watson’s QAAPI with Bluemix, NodeExpress – Part 1
7.  Deblurring with OpenCV: Wiener filter reloaded

To see all my posts check
Index of posts