Big Data-4: Webserver log analysis with RDDs, Pyspark, SparkR and SparklyR

“There’s something so paradoxical about pi. On the one hand, it represents order, as embodied by the shape of a circle, long held to be a symbol of perfection and eternity. On the other hand, pi is unruly, disheveled in appearance, its digits obeying no obvious rule, or at least none that we can perceive. Pi is elusive and mysterious, forever beyond reach. Its mix of order and disorder is what makes it so bewitching. ” 

From  Infinite Powers by Steven Strogatz

Anybody who wants to be “anybody” in Big Data must necessarily be able to work on both large structured and unstructured data.  Log analysis is critical in any enterprise which is usually unstructured. As I mentioned in my previous post Big Data: On RDDs, Dataframes,Hive QL with Pyspark and SparkR-Part 3 RDDs are typically used to handle unstructured data. Spark has the Dataframe abstraction over RDDs which performs better as it is optimized with the Catalyst optimization engine. Nevertheless, it is important to be able to process with RDDs.  This post is a continuation of my 3 earlier posts on Big Data namely

1. Big Data-1: Move into the big league:Graduate from Python to Pyspark
2. Big Data-2: Move into the big league:Graduate from R to SparkR
3. Big Data: On RDDs, Dataframes,Hive QL with Pyspark and SparkR-Part 3

This post uses publicly available Webserver logs from NASA. The logs are for the months Jul 95 and Aug 95 and are a good place to start unstructured text analysis/log analysis. I highly recommend parsing these publicly available logs with regular expressions. It is only when you do that the truth of Jamie Zawinski’s pearl of wisdom

“Some people, when confronted with a problem, think “I know, I’ll use regular expressions.” Now they have two problems.” – Jamie Zawinksi

hits home. I spent many hours struggling with regex!!

For this post for the RDD part,  I had to refer to Dr. Fisseha Berhane’s blog post Webserver Log Analysis and for the Pyspark part, to the Univ. of California Specialization which I had done 3 years back Big Data Analysis with Apache Spark. Once I had played around with the regex for RDDs and PySpark I managed to get SparkR and SparklyR versions to work.

The notebooks used in this post have been published and are available at

  1. logsAnalysiswithRDDs
  2. logsAnalysiswithPyspark
  3. logsAnalysiswithSparkRandSparklyR

You can also download all the notebooks from Github at WebServerLogsAnalysis

An essential and unavoidable aspect of Big Data processing is the need to process unstructured text.Web server logs are one such area which requires Big Data techniques to process massive amounts of logs. The Common Log Format also known as the NCSA Common log format, is a standardized text file format used by web servers when generating server log files. Because the format is standardized, the files can be readily analyzed.

A publicly available webserver logs is the NASA-HTTP Web server logs. This is good dataset with which we can play around to get familiar to handling web server logs. The logs can be accessed at NASA-HTTP

Description These two traces contain two month’s worth of all HTTP requests to the NASA Kennedy Space Center WWW server in Florida.

Format The logs are an ASCII file with one line per request, with the following columns:

-host making the request. A hostname when possible, otherwise the Internet address if the name could not be looked up.

-timestamp in the format “DAY MON DD HH:MM:SS YYYY”, where DAY is the day of the week, MON is the name of the month, DD is the day of the month, HH:MM:SS is the time of day using a 24-hour clock, and YYYY is the year. The timezone is -0400.

-request given in quotes.

-HTTP reply code.

-bytes in the reply.

1 Parse Web server logs with RDDs

1.1 Read NASA Web server logs

Read the logs files from NASA for the months Jul 95 and Aug 95

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("Spark-Logs-Handling").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

sqlcontext = SQLContext(sc)
rdd = sc.textFile("/FileStore/tables/NASA_access_log_*.gz")
rdd.count()
Out[1]: 3461613

1.2Check content

Check the logs to identify the parsing rules required for the logs

i=0
for line in rdd.sample(withReplacement = False, fraction = 0.00001, seed = 100).collect():
    i=i+1
    print(line)
    if i >5:
      break
ix-stp-fl2-19.ix.netcom.com – – [03/Aug/1995:23:03:09 -0400] “GET /images/faq.gif HTTP/1.0” 200 263
slip183-1.kw.jp.ibm.net – – [04/Aug/1995:18:42:17 -0400] “GET /shuttle/missions/sts-70/images/DSC-95EC-0001.gif HTTP/1.0” 200 107133
piweba4y.prodigy.com – – [05/Aug/1995:19:17:41 -0400] “GET /icons/menu.xbm HTTP/1.0” 200 527
ruperts.bt-sys.bt.co.uk – – [07/Aug/1995:04:44:10 -0400] “GET /shuttle/countdown/video/livevideo2.gif HTTP/1.0” 200 69067
dal06-04.ppp.iadfw.net – – [07/Aug/1995:21:10:19 -0400] “GET /images/NASA-logosmall.gif HTTP/1.0” 200 786
p15.ppp-1.directnet.com – – [10/Aug/1995:01:22:54 -0400] “GET /images/KSC-logosmall.gif HTTP/1.0” 200 1204

1.3 Write the parsing rule for each of the fields

  • host
  • timestamp
  • path
  • status
  • content_bytes

1.21 Get IP address/host name

This regex is at the start of the log and includes any non-white characted

import re
rslt=(rdd.map(lambda line: re.search('\S+',line)
   .group(0))
   .take(3)) # Get the IP address \host name
rslt
Out[3]: [‘in24.inetnebr.com’, ‘uplherc.upl.com’, ‘uplherc.upl.com’]

1.22 Get timestamp

Get the time stamp

rslt=(rdd.map(lambda line: re.search(‘(\S+ -\d{4})’,line)
    .groups())
    .take(3))  #Get the  date
rslt
[(‘[01/Aug/1995:00:00:01 -0400’,),
(‘[01/Aug/1995:00:00:07 -0400’,),
(‘[01/Aug/1995:00:00:08 -0400’,)]

1.23 HTTP request

Get the HTTP request sent to Web server \w+ {GET}

# Get the REST call with ” “
rslt=(rdd.map(lambda line: re.search('"\w+\s+([^\s]+)\s+HTTP.*"',line)
    .groups())
    .take(3)) # Get the REST call
rslt
[(‘/shuttle/missions/sts-68/news/sts-68-mcc-05.txt’,),
(‘/’,),
(‘/images/ksclogo-medium.gif’,)]

1.23Get HTTP response status

Get the HTTP response to the request

rslt=(rdd.map(lambda line: re.search('"\s(\d{3})',line)
    .groups())
    .take(3)) #Get the status
rslt
Out[6]: [(‘200’,), (‘304’,), (‘304’,)]

1.24 Get content size

Get the HTTP response in bytes

rslt=(rdd.map(lambda line: re.search(‘^.*\s(\d*)$’,line)
    .groups())
    .take(3)) # Get the content size
rslt
Out[7]: [(‘1839’,), (‘0’,), (‘0’,)]

1.24 Putting it all together

Now put all the individual pieces together into 1 big regular expression and assign to the groups

  1. Host 2. Timestamp 3. Path 4. Status 5. Content_size
rslt=(rdd.map(lambda line: re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3}\s(\d*)$)',line)
    .groups())
    .take(3))
rslt
[(‘in24.inetnebr.com’,
‘ -‘,
‘ ‘,
‘-‘,
‘[01/Aug/1995:00:00:01 -0400]’,
‘”GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0″‘,
‘/shuttle/missions/sts-68/news/sts-68-mcc-05.txt’,
‘200 1839’,
‘1839’),
(‘uplherc.upl.com’,
‘ -‘,
‘ ‘,
‘-‘,
‘[01/Aug/1995:00:00:07 -0400]’,
‘”GET / HTTP/1.0″‘,
‘/’,
‘304 0’,
‘0’),
(‘uplherc.upl.com’,
‘ -‘,
‘ ‘,
‘-‘,
‘[01/Aug/1995:00:00:08 -0400]’,
‘”GET /images/ksclogo-medium.gif HTTP/1.0″‘,
‘/images/ksclogo-medium.gif’,
‘304 0’,
‘0’)]

1.25 Add a log parsing function

import re
def parse_log1(line):
    match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3}\s(\d*)$)',line)
    if match is None:    
        return(line,0)
    else:
        return(line,1)

1.26 Check for parsing failure

Check how many lines successfully parsed with the parsing function

n_logs = rdd.count()
failed = rdd.map(lambda line: parse_log1(line)).filter(lambda line: line[1] == 0).count()
print('Out of a total of {} logs, {} failed to parse'.format(n_logs,failed))
# Get the failed records line[1] == 0
failed1=rdd.map(lambda line: parse_log1(line)).filter(lambda line: line[1]==0)
failed1.take(3)
Out of a total of 3461613 logs, 38768 failed to parse
Out[10]:
[(‘gw1.att.com – – [01/Aug/1995:00:03:53 -0400] “GET /shuttle/missions/sts-73/news HTTP/1.0” 302 -‘,
0),
(‘js002.cc.utsunomiya-u.ac.jp – – [01/Aug/1995:00:07:33 -0400] “GET /shuttle/resources/orbiters/discovery.gif HTTP/1.0” 404 -‘,
0),
(‘pipe1.nyc.pipeline.com – – [01/Aug/1995:00:12:37 -0400] “GET /history/apollo/apollo-13/apollo-13-patch-small.gif” 200 12859’,
0)]

1.26 The above rule is not enough to parse the logs

It can be seen that the single rule only parses part of the logs and we cannot group the regex separately. There is an error “AttributeError: ‘NoneType’ object has no attribute ‘group'” which shows up

#rdd.map(lambda line: re.search(‘^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s(“\w+\s+([^\s]+)\s+HTTP.*”)\s(\d{3}\s(\d*)$)’,line[0]).group()).take(4)

File “/databricks/spark/python/pyspark/util.py”, line 99, in wrapper
return f(*args, **kwargs)
File “<command-1348022240961444>”, line 1, in <lambda>
AttributeError: ‘NoneType’ object has no attribute ‘group’

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)

1.27 Add rule for parsing failed records

One of the issues with the earlier rule is the content_size has “-” for some logs

import re
def parse_failed(line):
    match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3}\s-$)',line)
    if match is None:        
        return(line,0)
    else:
        return(line,1)

1.28 Parse records which fail

Parse the records that fails with the new rule

failed2=rdd.map(lambda line: parse_failed(line)).filter(lambda line: line[1]==1)
failed2.take(5)
Out[13]:
[(‘gw1.att.com – – [01/Aug/1995:00:03:53 -0400] “GET /shuttle/missions/sts-73/news HTTP/1.0” 302 -‘,
1),
(‘js002.cc.utsunomiya-u.ac.jp – – [01/Aug/1995:00:07:33 -0400] “GET /shuttle/resources/orbiters/discovery.gif HTTP/1.0” 404 -‘,
1),
(‘tia1.eskimo.com – – [01/Aug/1995:00:28:41 -0400] “GET /pub/winvn/release.txt HTTP/1.0” 404 -‘,
1),
(‘itws.info.eng.niigata-u.ac.jp – – [01/Aug/1995:00:38:01 -0400] “GET /ksc.html/facts/about_ksc.html HTTP/1.0” 403 -‘,
1),
(‘grimnet23.idirect.com – – [01/Aug/1995:00:50:12 -0400] “GET /www/software/winvn/winvn.html HTTP/1.0” 404 -‘,
1)]

1.28 Add both rules

Add both rules for parsing the log.

Note it can be shown that even with both rules all the logs are not parse.Further rules may need to be added

import re
def parse_log2(line):
    # Parse logs with the rule below
    match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3})\s(\d*)$',line)
    # If match failed then use the rule below
    if match is None:
        match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3}\s-$)',line)
    if match is None:
        return (line, 0) # Return 0 for failure
    else:
        return (line, 1) # Return 1 for success

1.29 Group the different regex to groups for handling

def map2groups(line):
    match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3})\s(\d*)$',line)
    if match is None:
        match = re.search('^(\S+)((\s)(-))+\s(\[\S+ -\d{4}\])\s("\w+\s+([^\s]+)\s+HTTP.*")\s(\d{3})\s(-)$',line)    
    return(match.groups())

1.30 Parse the logs and map the groups

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])

parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))

2. Parse Web server logs with Pyspark

2.1Read data into a Pyspark dataframe

import os
logs_file_path="/FileStore/tables/" + os.path.join('NASA_access_log_*.gz')
from pyspark.sql.functions import split, regexp_extract
base_df = sqlContext.read.text(logs_file_path)
#base_df.show(truncate=False)
from pyspark.sql.functions import split, regexp_extract
split_df = base_df.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
                          regexp_extract('value', r'^.*\[(\d\d\/\w{3}\/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
                          regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
                          regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
                          regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
split_df.show(5,truncate=False)
+———————+————————–+———————————————–+——+————+
|host |timestamp |path |status|content_size|
+———————+————————–+———————————————–+——+————+
|199.72.81.55 |01/Jul/1995:00:00:01 -0400|/history/apollo/ |200 |6245 |
|unicomp6.unicomp.net |01/Jul/1995:00:00:06 -0400|/shuttle/countdown/ |200 |3985 |
|199.120.110.21 |01/Jul/1995:00:00:09 -0400|/shuttle/missions/sts-73/mission-sts-73.html |200 |4085 |
|burger.letters.com |01/Jul/1995:00:00:11 -0400|/shuttle/countdown/liftoff.html |304 |0 |
|199.120.110.21 |01/Jul/1995:00:00:11 -0400|/shuttle/missions/sts-73/sts-73-patch-small.gif|200 |4179 |
+———————+————————–+———————————————–+——+————+
only showing top 5 rows

2.2 Check data

bad_rows_df = split_df.filter(split_df[‘host’].isNull() |
                              split_df['timestamp'].isNull() |
                              split_df['path'].isNull() |
                              split_df['status'].isNull() |
                             split_df['content_size'].isNull())
bad_rows_df.count()
Out[20]: 33905

2.3Check no of rows which do not have digits

We have already seen that the content_type field has ‘-‘ instead of digits in RDDs

#bad_content_size_df = base_df.filter(~ base_df[‘value’].rlike(r’\d+$’))
bad_content_size_df.count()
Out[21]: 33905

2.4 Add ‘*’ to identify bad rows

To identify the rows that are bad, concatenate ‘*’ to the content_size field where the field does not have digits. It can be seen that the content_size has ‘-‘ instead of a valid number

from pyspark.sql.functions import lit, concat
bad_content_size_df.select(concat(bad_content_size_df['value'], lit('*'))).show(4,truncate=False)
+—————————————————————————————————————————————————+
|concat(value, *) |
+—————————————————————————————————————————————————+
|dd15-062.compuserve.com – – [01/Jul/1995:00:01:12 -0400] “GET /news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt HTTP/1.0” 404 -*|
|dynip42.efn.org – – [01/Jul/1995:00:02:14 -0400] “GET /software HTTP/1.0” 302 -* |
|ix-or10-06.ix.netcom.com – – [01/Jul/1995:00:02:40 -0400] “GET /software/winvn HTTP/1.0” 302 -* |
|ix-or10-06.ix.netcom.com – – [01/Jul/1995:00:03:24 -0400] “GET /software HTTP/1.0” 302 -* |
+—————————————————————————————————————————————————+

2.5 Fill NAs with 0s

# Replace all null content_size values with 0.

cleaned_df = split_df.na.fill({‘content_size’: 0})

3. Webserver  logs parsing with SparkR

library(SparkR)
library(stringr)
file_location = "/FileStore/tables/NASA_access_log_Jul95.gz"
file_location = "/FileStore/tables/NASA_access_log_Aug95.gz"
# Load the SparkR library


# Initiate a SparkR session
sparkR.session()
sc <- sparkR.session()
sqlContext <- sparkRSQL.init(sc)
df <- read.text(sqlContext,"/FileStore/tables/NASA_access_log_Jul95.gz")

#df=SparkR::select(df, "value")
#head(SparkR::collect(df))
#m=regexp_extract(df$value,'\\\\S+',1)

a=df %>% 
  withColumn('host', regexp_extract(df$value, '^(\\S+)', 1)) %>%
  withColumn('timestamp', regexp_extract(df$value, "((\\S+ -\\d{4}))", 2)) %>%
  withColumn('path', regexp_extract(df$value, '(\\"\\w+\\s+([^\\s]+)\\s+HTTP.*")', 2))  %>%
  withColumn('status', regexp_extract(df$value, '(^.*"\\s+([^\\s]+))', 2)) %>%
  withColumn('content_size', regexp_extract(df$value, '(^.*\\s+(\\d+)$)', 2))
#b=a%>% select(host,timestamp,path,status,content_type)
head(SparkR::collect(a),10)

1 199.72.81.55 – – [01/Jul/1995:00:00:01 -0400] “GET /history/apollo/ HTTP/1.0” 200 6245
2 unicomp6.unicomp.net – – [01/Jul/1995:00:00:06 -0400] “GET /shuttle/countdown/ HTTP/1.0” 200 3985
3 199.120.110.21 – – [01/Jul/1995:00:00:09 -0400] “GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0” 200 4085
4 burger.letters.com – – [01/Jul/1995:00:00:11 -0400] “GET /shuttle/countdown/liftoff.html HTTP/1.0” 304 0
5 199.120.110.21 – – [01/Jul/1995:00:00:11 -0400] “GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0” 200 4179
6 burger.letters.com – – [01/Jul/1995:00:00:12 -0400] “GET /images/NASA-logosmall.gif HTTP/1.0” 304 0
7 burger.letters.com – – [01/Jul/1995:00:00:12 -0400] “GET /shuttle/countdown/video/livevideo.gif HTTP/1.0” 200 0
8 205.212.115.106 – – [01/Jul/1995:00:00:12 -0400] “GET /shuttle/countdown/countdown.html HTTP/1.0” 200 3985
9 d104.aa.net – – [01/Jul/1995:00:00:13 -0400] “GET /shuttle/countdown/ HTTP/1.0” 200 3985
10 129.94.144.152 – – [01/Jul/1995:00:00:13 -0400] “GET / HTTP/1.0” 200 7074
host timestamp
1 199.72.81.55 [01/Jul/1995:00:00:01 -0400
2 unicomp6.unicomp.net [01/Jul/1995:00:00:06 -0400
3 199.120.110.21 [01/Jul/1995:00:00:09 -0400
4 burger.letters.com [01/Jul/1995:00:00:11 -0400
5 199.120.110.21 [01/Jul/1995:00:00:11 -0400
6 burger.letters.com [01/Jul/1995:00:00:12 -0400
7 burger.letters.com [01/Jul/1995:00:00:12 -0400
8 205.212.115.106 [01/Jul/1995:00:00:12 -0400
9 d104.aa.net [01/Jul/1995:00:00:13 -0400
10 129.94.144.152 [01/Jul/1995:00:00:13 -0400
path status content_size
1 /history/apollo/ 200 6245
2 /shuttle/countdown/ 200 3985
3 /shuttle/missions/sts-73/mission-sts-73.html 200 4085
4 /shuttle/countdown/liftoff.html 304 0
5 /shuttle/missions/sts-73/sts-73-patch-small.gif 200 4179
6 /images/NASA-logosmall.gif 304 0
7 /shuttle/countdown/video/livevideo.gif 200 0
8 /shuttle/countdown/countdown.html 200 3985
9 /shuttle/countdown/ 200 3985
10 / 200 7074

4 Webserver logs parsing with SparklyR

install.packages("sparklyr")
library(sparklyr)
library(dplyr)
library(stringr)
#sc <- spark_connect(master = "local", version = "2.1.0")
sc <- spark_connect(method = "databricks")
sdf <-spark_read_text(sc, name="df", path = "/FileStore/tables/NASA_access_log*.gz")
sdf
Installing package into ‘/databricks/spark/R/lib’
# Source: spark [?? x 1]
   line                                                                         
                                                                           
 1 "199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] \"GET /history/apollo/ HTTP/1…
 2 "unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] \"GET /shuttle/countd…
 3 "199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] \"GET /shuttle/missions/sts…
 4 "burger.letters.com - - [01/Jul/1995:00:00:11 -0400] \"GET /shuttle/countdow…
 5 "199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] \"GET /shuttle/missions/sts…
 6 "burger.letters.com - - [01/Jul/1995:00:00:12 -0400] \"GET /images/NASA-logo…
 7 "burger.letters.com - - [01/Jul/1995:00:00:12 -0400] \"GET /shuttle/countdow…
 8 "205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] \"GET /shuttle/countdown/c…
 9 "d104.aa.net - - [01/Jul/1995:00:00:13 -0400] \"GET /shuttle/countdown/ HTTP…
10 "129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] \"GET / HTTP/1.0\" 200 7074"
# … with more rows
#install.packages(“sparklyr”)
library(sparklyr)
library(dplyr)
library(stringr)
#sc <- spark_connect(master = "local", version = "2.1.0")
sc <- spark_connect(method = "databricks")
sdf <-spark_read_text(sc, name="df", path = "/FileStore/tables/NASA_access_log*.gz")
sdf <- sdf %>% mutate(host = regexp_extract(line, '^(\\\\S+)',1)) %>% 
               mutate(timestamp = regexp_extract(line, '((\\\\S+ -\\\\d{4}))',2)) %>%
               mutate(path = regexp_extract(line, '(\\\\"\\\\w+\\\\s+([^\\\\s]+)\\\\s+HTTP.*")',2)) %>%
               mutate(status = regexp_extract(line, '(^.*"\\\\s+([^\\\\s]+))',2)) %>%
               mutate(content_size = regexp_extract(line, '(^.*\\\\s+(\\\\d+)$)',2))

5 Hosts

5.1  RDD

5.11 Parse and map to hosts to groups

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])
parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))

# Create tuples of (host,1) and apply reduceByKey() and order by descending
rslt=(parsed_rdd2.map(lambda x😦x[0],1))
                 .reduceByKey(lambda a,b:a+b)
                 .takeOrdered(10, lambda x: -x[1]))
rslt
Out[18]:
[(‘piweba3y.prodigy.com’, 21988),
(‘piweba4y.prodigy.com’, 16437),
(‘piweba1y.prodigy.com’, 12825),
(‘edams.ksc.nasa.gov’, 11962),
(‘163.206.89.4’, 9697),
(‘news.ti.com’, 8161),
(‘www-d1.proxy.aol.com’, 8047),
(‘alyssa.prodigy.com’, 8037),
(‘siltb10.orl.mmc.com’, 7573),
(‘www-a2.proxy.aol.com’, 7516)]

5.12Plot counts of hosts

import seaborn as sns

import pandas as pd import matplotlib.pyplot as plt df=pd.DataFrame(rslt,columns=[‘host’,‘count’]) sns.barplot(x=‘host’,y=‘count’,data=df) plt.subplots_adjust(bottom=0.6, right=0.8, top=0.9) plt.xticks(rotation=“vertical”,fontsize=8) display()

5.2 PySpark

5.21 Compute counts of hosts

df= (cleaned_df
     .groupBy('host')
     .count()
     .orderBy('count',ascending=False))
df.show(5)
+——————–+—–+
| host|count|
+——————–+—–+
|piweba3y.prodigy….|21988|
|piweba4y.prodigy….|16437|
|piweba1y.prodigy….|12825|
| edams.ksc.nasa.gov |11964|
| 163.206.89.4 | 9697|
+——————–+—–+
only showing top 5 rows

5.22 Plot count of hosts

import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
df1=df.toPandas()
df2 = df1.head(10)
df2.count()
sns.barplot(x='host',y='count',data=df2)
plt.subplots_adjust(bottom=0.5, right=0.8, top=0.9)
plt.xlabel("Hosts")
plt.ylabel('Count')
plt.xticks(rotation="vertical",fontsize=10)
display()

5.3 SparkR

5.31 Compute count of hosts

c <- SparkR::select(a,a$host)
df=SparkR::summarize(SparkR::groupBy(c, a$host), noHosts = count(a$host))
df1 =head(arrange(df,desc(df$noHosts)),10)
head(df1)
                  host noHosts
1 piweba3y.prodigy.com   17572
2 piweba4y.prodigy.com   11591
3 piweba1y.prodigy.com    9868
4   alyssa.prodigy.com    7852
5  siltb10.orl.mmc.com    7573
6 piweba2y.prodigy.com    5922

5.32 Plot count of hosts

library(ggplot2)
p <-ggplot(data=df1, aes(x=host, y=noHosts,fill=host)) +   geom_bar(stat="identity") + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab('Host') + ylab('Count')
p

5.4 SparklyR

5.41 Compute count of Hosts

df <- sdf %>% select(host,timestamp,path,status,content_size)
df1 <- df %>% select(host) %>% group_by(host) %>% summarise(noHosts=n()) %>% arrange(desc(noHosts))
df2 <-head(df1,10)

5.42 Plot count of hosts

library(ggplot2)

p <-ggplot(data=df2, aes(x=host, y=noHosts,fill=host)) + geom_bar(stat=identity”)+ theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab(Host’) + ylab(Count’)

p

6 Paths

6.1 RDD

6.11 Parse and map to hosts to groups

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])
parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))
rslt=(parsed_rdd2.map(lambda x😦x[5],1))
                 .reduceByKey(lambda a,b:a+b)
                 .takeOrdered(10, lambda x: -x[1]))
rslt
[(‘”GET /images/NASA-logosmall.gif HTTP/1.0″‘, 207520),
(‘”GET /images/KSC-logosmall.gif HTTP/1.0″‘, 164487),
(‘”GET /images/MOSAIC-logosmall.gif HTTP/1.0″‘, 126933),
(‘”GET /images/USA-logosmall.gif HTTP/1.0″‘, 126108),
(‘”GET /images/WORLD-logosmall.gif HTTP/1.0″‘, 124972),
(‘”GET /images/ksclogo-medium.gif HTTP/1.0″‘, 120704),
(‘”GET /ksc.html HTTP/1.0″‘, 83209),
(‘”GET /images/launch-logo.gif HTTP/1.0″‘, 75839),
(‘”GET /history/apollo/images/apollo-logo1.gif HTTP/1.0″‘, 68759),
(‘”GET /shuttle/countdown/ HTTP/1.0″‘, 64467)]

6.12 Plot counts of HTTP Requests

import seaborn as sns

df=pd.DataFrame(rslt,columns=[‘path’,‘count’]) sns.barplot(x=‘path’,y=‘count’,data=df) plt.subplots_adjust(bottom=0.7, right=0.8, top=0.9) plt.xticks(rotation=“vertical”,fontsize=8)

display()

6.2 Pyspark

6.21 Compute count of HTTP Requests

df= (cleaned_df
     .groupBy('path')
     .count()
     .orderBy('count',ascending=False))
df.show(5)
Out[20]:
+——————–+——+
| path| count|
+——————–+——+
|/images/NASA-logo…|208362|
|/images/KSC-logos…|164813|
|/images/MOSAIC-lo…|127656|
|/images/USA-logos…|126820|
|/images/WORLD-log…|125676|
+——————–+——+
only showing top 5 rows

6.22 Plot count of HTTP Requests

import matplotlib.pyplot as plt

import pandas as pd import seaborn as sns df1=df.toPandas() df2 = df1.head(10) df2.count() sns.barplot(x=‘path’,y=‘count’,data=df2)

plt.subplots_adjust(bottom=0.7, right=0.8, top=0.9) plt.xlabel(“HTTP Requests”) plt.ylabel(‘Count’) plt.xticks(rotation=90,fontsize=8)

display()

 

6.3 SparkR

6.31Compute count of HTTP requests

library(SparkR)
c <- SparkR::select(a,a$path)
df=SparkR::summarize(SparkR::groupBy(c, a$path), numRequest = count(a$path))
df1=head(df)

3.14 Plot count of HTTP Requests

library(ggplot2)
p <-ggplot(data=df1, aes(x=path, y=numRequest,fill=path)) +   geom_bar(stat="identity") + theme(axis.text.x = element_text(angle = 90, hjust = 1))+ xlab('Path') + ylab('Count')
p

6.4 SparklyR

6.41 Compute count of paths

df <- sdf %>% select(host,timestamp,path,status,content_size)
df1 <- df %>% select(path) %>% group_by(path) %>% summarise(noPaths=n()) %>% arrange(desc(noPaths))
df2 <-head(df1,10)
df2
# Source: spark [?? x 2]
# Ordered by: desc(noPaths)
   path                                    noPaths
                                        
 1 /images/NASA-logosmall.gif               208362
 2 /images/KSC-logosmall.gif                164813
 3 /images/MOSAIC-logosmall.gif             127656
 4 /images/USA-logosmall.gif                126820
 5 /images/WORLD-logosmall.gif              125676
 6 /images/ksclogo-medium.gif               121286
 7 /ksc.html                                 83685
 8 /images/launch-logo.gif                   75960
 9 /history/apollo/images/apollo-logo1.gif   68858
10 /shuttle/countdown/                       64695

6.42 Plot count of Paths

library(ggplot2)
p <-ggplot(data=df2, aes(x=path, y=noPaths,fill=path)) +   geom_bar(stat="identity")+ theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab('Path') + ylab('Count')
p

7.1 RDD

7.11 Compute count of HTTP Status

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])

parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))
rslt=(parsed_rdd2.map(lambda x😦x[7],1))
                 .reduceByKey(lambda a,b:a+b)
                 .takeOrdered(10, lambda x: -x[1]))
rslt
Out[22]:
[(‘200’, 3095682),
(‘304’, 266764),
(‘302’, 72970),
(‘404’, 20625),
(‘403’, 225),
(‘500’, 65),
(‘501’, 41)]

1.37 Plot counts of HTTP response status’

import seaborn as sns

df=pd.DataFrame(rslt,columns=[‘status’,‘count’]) sns.barplot(x=‘status’,y=‘count’,data=df) plt.subplots_adjust(bottom=0.4, right=0.8, top=0.9) plt.xticks(rotation=“vertical”,fontsize=8)

display()

7.2 Pyspark

7.21 Compute count of HTTP status

status_count=(cleaned_df
                .groupBy('status')
                .count()
                .orderBy('count',ascending=False))
status_count.show()
+——+——-+
|status| count|
+——+——-+
| 200|3100522|
| 304| 266773|
| 302| 73070|
| 404| 20901|
| 403| 225|
| 500| 65|
| 501| 41|
| 400| 15|
| null| 1|

7.22 Plot count of HTTP status

Plot the HTTP return status vs the counts

df1=status_count.toPandas()

df2 = df1.head(10) df2.count() sns.barplot(x=‘status’,y=‘count’,data=df2) plt.subplots_adjust(bottom=0.5, right=0.8, top=0.9) plt.xlabel(“HTTP Status”) plt.ylabel(‘Count’) plt.xticks(rotation=“vertical”,fontsize=10) display()

7.3 SparkR

7.31 Compute count of HTTP Response status

library(SparkR)
c <- SparkR::select(a,a$status)
df=SparkR::summarize(SparkR::groupBy(c, a$status), numStatus = count(a$status))
df1=head(df)

3.16 Plot count of HTTP Response status

library(ggplot2)
p <-ggplot(data=df1, aes(x=status, y=numStatus,fill=status)) +   geom_bar(stat="identity") + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab('Status') + ylab('Count')
p

7.4 SparklyR

7.41 Compute count of status

df <- sdf %>% select(host,timestamp,path,status,content_size)
df1 <- df %>% select(status) %>% group_by(status) %>% summarise(noStatus=n()) %>% arrange(desc(noStatus))
df2 <-head(df1,10)
df2
# Source: spark [?? x 2]
# Ordered by: desc(noStatus)
  status noStatus
       
1 200     3100522
2 304      266773
3 302       73070
4 404       20901
5 403         225
6 500          65
7 501          41
8 400          15
9 ""            1

7.42 Plot count of status

library(ggplot2)

p <-ggplot(data=df2, aes(x=status, y=noStatus,fill=status)) + geom_bar(stat=identity”)+ theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab(Status’) + ylab(Count’) p

8.1 RDD

8.12 Compute count of content size

parsed_rdd = rdd.map(lambda line: parse_log2(line)).filter(lambda line: line[1] == 1).map(lambda line : line[0])
parsed_rdd2 = parsed_rdd.map(lambda line: map2groups(line))
rslt=(parsed_rdd2.map(lambda x😦x[8],1))
                 .reduceByKey(lambda a,b:a+b)
                 .takeOrdered(10, lambda x: -x[1]))
rslt
Out[24]:
[(‘0’, 280017),
(‘786’, 167281),
(‘1204’, 140505),
(‘363’, 111575),
(‘234’, 110824),
(‘669’, 110056),
(‘5866’, 107079),
(‘1713’, 66904),
(‘1173’, 63336),
(‘3635’, 55528)]

8.21 Plot content size

import seaborn as sns

df=pd.DataFrame(rslt,columns=[‘content_size’,‘count’]) sns.barplot(x=‘content_size’,y=‘count’,data=df) plt.subplots_adjust(bottom=0.4, right=0.8, top=0.9) plt.xticks(rotation=“vertical”,fontsize=8) display()

8.2 Pyspark

8.21 Compute count of content_size

size_counts=(cleaned_df
                .groupBy('content_size')
                .count()
                .orderBy('count',ascending=False))
size_counts.show(10)
+------------+------+
|content_size| count|
+------------+------+
|           0|313932|
|         786|167709|
|        1204|140668|
|         363|111835|
|         234|111086|
|         669|110313|
|        5866|107373|
|        1713| 66953|
|        1173| 63378|
|        3635| 55579|
+------------+------+
only showing top 10 rows

8.22 Plot counts of content size

Plot the path access versus the counts

df1=size_counts.toPandas()

df2 = df1.head(10) df2.count() sns.barplot(x=‘content_size’,y=‘count’,data=df2) plt.subplots_adjust(bottom=0.5, right=0.8, top=0.9) plt.xlabel(“content_size”) plt.ylabel(‘Count’) plt.xticks(rotation=“vertical”,fontsize=10) display()

8.3 SparkR

8.31 Compute count of content size

library(SparkR)
c <- SparkR::select(a,a$content_size)
df=SparkR::summarize(SparkR::groupBy(c, a$content_size), numContentSize = count(a$content_size))
df1=head(df)
df1
     content_size numContentSize
1        28426           1414
2        78382            293
3        60053              4
4        36067              2
5        13282            236
6        41785            174
8.32 Plot count of content sizes
library(ggplot2)

p <-ggplot(data=df1, aes(x=content_size, y=numContentSize,fill=content_size)) + geom_bar(stat=identity”) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab(Content Size’) + ylab(Count’)

p

8.4 SparklyR

8.41Compute count of content_size

df <- sdf %>% select(host,timestamp,path,status,content_size)
df1 <- df %>% select(content_size) %>% group_by(content_size) %>% summarise(noContentSize=n()) %>% arrange(desc(noContentSize))
df2 <-head(df1,10)
df2
# Source: spark [?? x 2]
# Ordered by: desc(noContentSize)
   content_size noContentSize
                   
 1 0                   280027
 2 786                 167709
 3 1204                140668
 4 363                 111835
 5 234                 111086
 6 669                 110313
 7 5866                107373
 8 1713                 66953
 9 1173                 63378
10 3635                 55579

8.42 Plot count of content_size

library(ggplot2)
p <-ggplot(data=df2, aes(x=content_size, y=noContentSize,fill=content_size)) +   geom_bar(stat="identity")+ theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab('Content size') + ylab('Count')
p

Conclusion: I spent many,many hours struggling with Regex and getting RDDs,Pyspark to work. Also had to spend a lot of time trying to work out the syntax for SparkR and SparklyR for parsing. After you parse the logs plotting and analysis is a piece of cake! This is definitely worth a try!

Watch this space!!

Also see
1. Practical Machine Learning with R and Python – Part 3
2. Deep Learning from first principles in Python, R and Octave – Part 5
3. My book ‘Cricket analytics with cricketr and cricpy’ is now on Amazon
4. Latency, throughput implications for the Cloud
5. Modeling a Car in Android
6. Architecting a cloud based IP Multimedia System (IMS)
7. Dabbling with Wiener filter using OpenCV

To see all posts click Index of posts

Big Data: On RDDs, Dataframes,Hive QL with Pyspark and SparkR-Part 3

Some people, when confronted with a problem, think “I know, I’ll use regular expressions.” Now they have two problems. – Jamie Zawinski

Some programmers, when confronted with a problem, think “I know, I’ll use floating point arithmetic.” Now they have 1.999999999997 problems. – @tomscott

Some people, when confronted with a problem, think “I know, I’ll use multithreading”. Nothhw tpe yawrve o oblems. – @d6

Some people, when confronted with a problem, think “I know, I’ll use versioning.” Now they have 2.1.0 problems. – @JaesCoyle

Some people, when faced with a problem, think, “I know, I’ll use binary.” Now they have 10 problems. – @nedbat

Introduction

The power of Spark, which operates on in-memory datasets, is the fact that it stores the data as collections using Resilient Distributed Datasets (RDDs), which are themselves distributed in partitions across clusters. RDDs, are a fast way of processing data, as the data is operated on parallel based on the map-reduce paradigm. RDDs can be be used when the operations are low level. RDDs, are typically used on unstructured data like logs or text. For structured and semi-structured data, Spark has a higher abstraction called Dataframes. Handling data through dataframes are extremely fast as they are Optimized using the Catalyst Optimization engine and the performance is orders of magnitude faster than RDDs. In addition Dataframes also use Tungsten which handle memory management and garbage collection more effectively.

The picture below shows the performance improvement achieved with Dataframes over RDDs

Benefits from Project Tungsten

Npte: The above data and graph is taken from the course Big Data Analysis with Apache Spark at edX, UC Berkeley
This post is a continuation of my 2 earlier posts
1. Big Data-1: Move into the big league:Graduate from Python to Pyspark
2. Big Data-2: Move into the big league:Graduate from R to SparkR

In this post I perform equivalent operations on a small dataset using RDDs, Dataframes in Pyspark & SparkR and HiveQL. As in some of my earlier posts, I have used the tendulkar.csv file for this post. The dataset is small and allows me to do most everything from data cleaning, data transformation and grouping etc.
You can clone fork the notebooks from github at Big Data:Part 3

The notebooks have also been published and can be accessed below

  1. Big Data-1: On RDDs, DataFrames and HiveQL with Pyspark
  2. Big Data-2:On RDDs, Dataframes and HiveQL with SparkR

1. RDD – Select all columns of tables

from pyspark import SparkContext 
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
rdd.map(lambda line: (line.split(","))).take(5)
Out[90]: [[‘Runs’, ‘Mins’, ‘BF’, ‘4s’, ‘6s’, ‘SR’, ‘Pos’, ‘Dismissal’, ‘Inns’, ‘Opposition’, ‘Ground’, ‘Start Date’], [’15’, ’28’, ’24’, ‘2’, ‘0’, ‘62.5’, ‘6’, ‘bowled’, ‘2’, ‘v Pakistan’, ‘Karachi’, ’15-Nov-89′], [‘DNB’, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘-‘, ‘4’, ‘v Pakistan’, ‘Karachi’, ’15-Nov-89′], [’59’, ‘254’, ‘172’, ‘4’, ‘0’, ‘34.3’, ‘6’, ‘lbw’, ‘1’, ‘v Pakistan’, ‘Faisalabad’, ’23-Nov-89′], [‘8′, ’24’, ’16’, ‘1’, ‘0’, ’50’, ‘6’, ‘run out’, ‘3’, ‘v Pakistan’, ‘Faisalabad’, ’23-Nov-89′]]

1b.RDD – Select columns 1 to 4

from pyspark import SparkContext 
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
rdd.map(lambda line: (line.split(",")[0:4])).take(5)
Out[91]:
[[‘Runs’, ‘Mins’, ‘BF’, ‘4s’],
[’15’, ’28’, ’24’, ‘2’],
[‘DNB’, ‘-‘, ‘-‘, ‘-‘],
[’59’, ‘254’, ‘172’, ‘4’],
[‘8′, ’24’, ’16’, ‘1’]]

1c. RDD – Select specific columns 0, 10

from pyspark import SparkContext 
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=rdd.map(lambda line: (line.split(",")))
df.map(lambda x: (x[10],x[0])).take(5)
Out[92]:
[(‘Ground’, ‘Runs’),
(‘Karachi’, ’15’),
(‘Karachi’, ‘DNB’),
(‘Faisalabad’, ’59’),
(‘Faisalabad’, ‘8’)]

2. Dataframe:Pyspark – Select all columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.show(5)
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|Runs|Mins| BF| 4s| 6s| SR|Pos|Dismissal|Inns|Opposition| Ground|Start Date|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
| 15| 28| 24| 2| 0| 62.5| 6| bowled| 2|v Pakistan| Karachi| 15-Nov-89|
| DNB| -| -| -| -| -| -| -| 4|v Pakistan| Karachi| 15-Nov-89|
| 59| 254|172| 4| 0| 34.3| 6| lbw| 1|v Pakistan|Faisalabad| 23-Nov-89|
| 8| 24| 16| 1| 0| 50| 6| run out| 3|v Pakistan|Faisalabad| 23-Nov-89|
| 41| 124| 90| 5| 0|45.55| 7| bowled| 1|v Pakistan| Lahore| 1-Dec-89|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
only showing top 5 rows

2a. Dataframe:Pyspark- Select specific columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.select("Runs","BF","Mins").show(5)
+—-+—+—-+
|Runs| BF|Mins|
+—-+—+—-+
| 15| 24| 28|
| DNB| -| -|
| 59|172| 254|
| 8| 16| 24|
| 41| 90| 124|
+—-+—+—-+

3. Dataframe:SparkR – Select all columns

# Load the SparkR library
library(SparkR)
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Check the dimensions of the dataframe
df=SparkR::select(tendulkar1,"*")
head(SparkR::collect(df))

  Runs Mins  BF 4s 6s    SR Pos Dismissal Inns Opposition     Ground Start Date
1   15   28  24  2  0  62.5   6    bowled    2 v Pakistan    Karachi  15-Nov-89
2  DNB    -   -  -  -     -   -         -    4 v Pakistan    Karachi  15-Nov-89
3   59  254 172  4  0  34.3   6       lbw    1 v Pakistan Faisalabad  23-Nov-89
4    8   24  16  1  0    50   6   run out    3 v Pakistan Faisalabad  23-Nov-89
5   41  124  90  5  0 45.55   7    bowled    1 v Pakistan     Lahore   1-Dec-89
6   35   74  51  5  0 68.62   6       lbw    1 v Pakistan    Sialkot   9-Dec-89

3a. Dataframe:SparkR- Select specific columns

# Load the SparkR library
library(SparkR)
# Initiate a SparkR session
sparkR.session()
tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

# Check the dimensions of the dataframe
df=SparkR::select(tendulkar1, "Runs", "BF","Mins")
head(SparkR::collect(df))
Runs BF Mins
1 15 24 28
2 DNB – –
3 59 172 254
4 8 16 24
5 41 90 124
6 35 51 74

4. Hive QL – Select all columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.createOrReplaceTempView('tendulkar1_table')
spark.sql('select  * from tendulkar1_table limit 5').show(10, truncate = False)
+—-+—+—-++—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|Runs|Mins|BF |4s |6s |SR |Pos|Dismissal|Inns|Opposition|Ground |Start Date|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|15 |28 |24 |2 |0 |62.5 |6 |bowled |2 |v Pakistan|Karachi |15-Nov-89 |
|DNB |- |- |- |- |- |- |- |4 |v Pakistan|Karachi |15-Nov-89 |
|59 |254 |172|4 |0 |34.3 |6 |lbw |1 |v Pakistan|Faisalabad|23-Nov-89 |
|8 |24 |16 |1 |0 |50 |6 |run out |3 |v Pakistan|Faisalabad|23-Nov-89 |
|41 |124 |90 |5 |0 |45.55|7 |bowled |1 |v Pakistan|Lahore |1-Dec-89 |
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+

4a. Hive QL – Select specific columns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.createOrReplaceTempView('tendulkar1_table')
spark.sql('select  Runs, BF,Mins from tendulkar1_table limit 5').show(10, truncate = False)
+—-+—+—-+
|Runs|BF |Mins|
+—-+—+—-+
|15 |24 |28 |
|DNB |- |- |
|59 |172|254 |
|8 |16 |24 |
|41 |90 |124 |
+—-+—+—-+

5. RDD – Filter rows on specific condition

from pyspark import SparkContext
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=(rdd.map(lambda line: line.split(",")[:])
      .filter(lambda x: x !="DNB")
      .filter(lambda x: x!= "TDNB")
      .filter(lambda x: x!="absent")
      .map(lambda x: [x[0].replace("*","")] + x[1:]))

df.take(5)

Out[97]:
[[‘Runs’,
‘Mins’,
‘BF’,
‘4s’,
‘6s’,
‘SR’,
‘Pos’,
‘Dismissal’,
‘Inns’,
‘Opposition’,
‘Ground’,
‘Start Date’],
[’15’,
’28’,
’24’,
‘2’,
‘0’,
‘62.5’,
‘6’,
‘bowled’,
‘2’,
‘v Pakistan’,
‘Karachi’,
’15-Nov-89′],
[‘DNB’,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘-‘,
‘4’,
‘v Pakistan’,
‘Karachi’,
’15-Nov-89′],
[’59’,
‘254’,
‘172’,
‘4’,
‘0’,
‘34.3’,
‘6’,
‘lbw’,
‘1’,
‘v Pakistan’,
‘Faisalabad’,
’23-Nov-89′],
[‘8′,
’24’,
’16’,
‘1’,
‘0’,
’50’,
‘6’,
‘run out’,
‘3’,
‘v Pakistan’,
‘Faisalabad’,
’23-Nov-89′]]

5a. Dataframe:Pyspark – Filter rows on specific condition

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'DNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'TDNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'absent')
tendulkar1 = tendulkar1.withColumn('Runs', regexp_replace('Runs', '[*]', ''))
tendulkar1.show(5)
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
|Runs|Mins| BF| 4s| 6s| SR|Pos|Dismissal|Inns|Opposition| Ground|Start Date|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
| 15| 28| 24| 2| 0| 62.5| 6| bowled| 2|v Pakistan| Karachi| 15-Nov-89|
| 59| 254|172| 4| 0| 34.3| 6| lbw| 1|v Pakistan|Faisalabad| 23-Nov-89|
| 8| 24| 16| 1| 0| 50| 6| run out| 3|v Pakistan|Faisalabad| 23-Nov-89|
| 41| 124| 90| 5| 0|45.55| 7| bowled| 1|v Pakistan| Lahore| 1-Dec-89|
| 35| 74| 51| 5| 0|68.62| 6| lbw| 1|v Pakistan| Sialkot| 9-Dec-89|
+—-+—-+—+—+—+—–+—+———+—-+———-+———-+———-+
only showing top 5 rows

5b. Dataframe:SparkR – Filter rows on specific condition

sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
df=SparkR::select(tendulkar1,"*")
head(SparkR::collect(df))

5c Hive QL – Filter rows on specific condition

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1.createOrReplaceTempView('tendulkar1_table')
spark.sql('select  Runs, BF,Mins from tendulkar1_table where Runs NOT IN  ("DNB","TDNB","absent")').show(10, truncate = False)
+—-+—+—-+
|Runs|BF |Mins|
+—-+—+—-+
|15 |24 |28 |
|59 |172|254 |
|8 |16 |24 |
|41 |90 |124 |
|35 |51 |74 |
|57 |134|193 |
|0 |1 |1 |
|24 |44 |50 |
|88 |266|324 |
|5 |13 |15 |
+—-+—+—-+
only showing top 10 rows

6. RDD – Find rows where Runs > 50

from pyspark import SparkContext
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=rdd.map(lambda line: (line.split(",")))
df=rdd.map(lambda line: line.split(",")[0:4]) \
   .filter(lambda x: x[0] not in ["DNB", "TDNB", "absent"])
df1=df.map(lambda x: [x[0].replace("*","")] + x[1:4])
header=df1.first()
df2=df1.filter(lambda x: x !=header)
df3=df2.map(lambda x: [float(x[0])] +x[1:4])
df3.filter(lambda x: x[0]>=50).take(10)
Out[101]: 
[[59.0, '254', '172', '4'],
 [57.0, '193', '134', '6'],
 [88.0, '324', '266', '5'],
 [68.0, '216', '136', '8'],
 [119.0, '225', '189', '17'],
 [148.0, '298', '213', '14'],
 [114.0, '228', '161', '16'],
 [111.0, '373', '270', '19'],
 [73.0, '272', '208', '8'],
 [50.0, '158', '118', '6']]

6a. Dataframe:Pyspark – Find rows where Runs >50

from pyspark.sql import SparkSession

from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.appName('Read CSV DF').getOrCreate()
tendulkar1 = spark.read.format('csv').option('header','true').load('/FileStore/tables/tendulkar.csv')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'DNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'TDNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'absent')
tendulkar1 = tendulkar1.withColumn("Runs", tendulkar1["Runs"].cast(IntegerType()))
tendulkar1.filter(tendulkar1['Runs']>=50).show(10)
+—-+—-+—+—+—+—–+—+———+—-+————–+————+———-+
|Runs|Mins| BF| 4s| 6s| SR|Pos|Dismissal|Inns| Opposition| Ground|Start Date|
+—-+—-+—+—+—+—–+—+———+—-+————–+————+———-+
| 59| 254|172| 4| 0| 34.3| 6| lbw| 1| v Pakistan| Faisalabad| 23-Nov-89|
| 57| 193|134| 6| 0|42.53| 6| caught| 3| v Pakistan| Sialkot| 9-Dec-89|
| 88| 324|266| 5| 0|33.08| 6| caught| 1| v New Zealand| Napier| 9-Feb-90|
| 68| 216|136| 8| 0| 50| 6| caught| 2| v England| Manchester| 9-Aug-90|
| 114| 228|161| 16| 0| 70.8| 4| caught| 2| v Australia| Perth| 1-Feb-92|
| 111| 373|270| 19| 0|41.11| 4| caught| 2|v South Africa|Johannesburg| 26-Nov-92|
| 73| 272|208| 8| 1|35.09| 5| caught| 2|v South Africa| Cape Town| 2-Jan-93|
| 50| 158|118| 6| 0|42.37| 4| caught| 1| v England| Kolkata| 29-Jan-93|
| 165| 361|296| 24| 1|55.74| 4| caught| 1| v England| Chennai| 11-Feb-93|
| 78| 285|213| 10| 0|36.61| 4| lbw| 2| v England| Mumbai| 19-Feb-93|
+—-+—-+—+—+—+—–+—+———+—-+————–+————+———-+

6b. Dataframe:SparkR – Find rows where Runs >50

# Load the SparkR library
library(SparkR)
sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
df=SparkR::select(tendulkar1,"*")
df=SparkR::filter(tendulkar1, tendulkar1$Runs > 50)
head(SparkR::collect(df))
  Runs Mins  BF 4s 6s    SR Pos Dismissal Inns    Opposition     Ground
1   59  254 172  4  0  34.3   6       lbw    1    v Pakistan Faisalabad
2   57  193 134  6  0 42.53   6    caught    3    v Pakistan    Sialkot
3   88  324 266  5  0 33.08   6    caught    1 v New Zealand     Napier
4   68  216 136  8  0    50   6    caught    2     v England Manchester
5  119  225 189 17  0 62.96   6   not out    4     v England Manchester
6  148  298 213 14  0 69.48   6   not out    2   v Australia     Sydney
  Start Date
1  23-Nov-89
2   9-Dec-89
3   9-Feb-90
4   9-Aug-90
5   9-Aug-90
6   2-Jan-92

 

7 RDD – groupByKey() and reduceByKey()

from pyspark import SparkContext
from pyspark.mllib.stat import Statistics
rdd = sc.textFile( "/FileStore/tables/tendulkar.csv")
df=rdd.map(lambda line: (line.split(",")))
df=rdd.map(lambda line: line.split(",")[0:]) \
   .filter(lambda x: x[0] not in ["DNB", "TDNB", "absent"])
df1=df.map(lambda x: [x[0].replace("*","")] + x[1:])
header=df1.first()
df2=df1.filter(lambda x: x !=header)
df3=df2.map(lambda x: [float(x[0])] +x[1:])
df4 = df3.map(lambda x: (x[10],x[0]))
df5=df4.reduceByKey(lambda a,b: a+b,1)
df4.groupByKey().mapValues(lambda x: sum(x) / len(x)).take(10)

[(‘Georgetown’, 81.0),
(‘Lahore’, 17.0),
(‘Adelaide’, 32.6),
(‘Colombo (SSC)’, 77.55555555555556),
(‘Nagpur’, 64.66666666666667),
(‘Auckland’, 5.0),
(‘Bloemfontein’, 85.0),
(‘Centurion’, 73.5),
(‘Faisalabad’, 27.0),
(‘Bridgetown’, 26.0)]

7a Dataframe:Pyspark – Compute mean, min and max

from pyspark.sql.functions import *
tendulkar1= (sqlContext
         .read.format("com.databricks.spark.csv")
         .options(delimiter=',', header='true', inferschema='true')
         .load("/FileStore/tables/tendulkar.csv"))
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'DNB')
tendulkar1= tendulkar1.where(tendulkar1['Runs'] != 'TDNB')
tendulkar1 = tendulkar1.withColumn('Runs', regexp_replace('Runs', '[*]', ''))
tendulkar1.select('Runs').rdd.distinct().collect()

from pyspark.sql import functions as F
df=tendulkar1[['Runs','BF','Ground']].groupby(tendulkar1['Ground']).agg(F.mean(tendulkar1['Runs']),F.min(tendulkar1['Runs']),F.max(tendulkar1['Runs']))
df.show()
————-+—————–+———+———+
| Ground| avg(Runs)|min(Runs)|max(Runs)|
+————-+—————–+———+———+
| Bangalore| 54.3125| 0| 96|
| Adelaide| 32.6| 0| 61|
|Colombo (PSS)| 37.2| 14| 71|
| Christchurch| 12.0| 0| 24|
| Auckland| 5.0| 5| 5|
| Chennai| 60.625| 0| 81|
| Centurion| 73.5| 111| 36|
| Brisbane|7.666666666666667| 0| 7|
| Birmingham| 46.75| 1| 40|
| Ahmedabad| 40.125| 100| 8|
|Colombo (RPS)| 143.0| 143| 143|
| Chittagong| 57.8| 101| 36|
| Cape Town|69.85714285714286| 14| 9|
| Bridgetown| 26.0| 0| 92|
| Bulawayo| 55.0| 36| 74|
| Delhi|39.94736842105263| 0| 76|
| Chandigarh| 11.0| 11| 11|
| Bloemfontein| 85.0| 15| 155|
|Colombo (SSC)|77.55555555555556| 104| 8|
| Cuttack| 2.0| 2| 2|
+————-+—————–+———+———+
only showing top 20 rows

7b Dataframe:SparkR – Compute mean, min and max

sparkR.session()

tendulkar1 <- read.df("/FileStore/tables/tendulkar.csv", 
                header = "true", 
                delimiter = ",", 
                source = "csv", 
                inferSchema = "true", 
                na.strings = "")

print(dim(tendulkar1))
tendulkar1 <-SparkR::filter(tendulkar1,tendulkar1$Runs != "DNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "TDNB")
print(dim(tendulkar1))
tendulkar1<-SparkR::filter(tendulkar1,tendulkar1$Runs != "absent")
print(dim(tendulkar1))

# Cast the string type Runs to double
withColumn(tendulkar1, "Runs", cast(tendulkar1$Runs, "double"))
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
# Remove the "* indicating not out
tendulkar1$Runs=SparkR::regexp_replace(tendulkar1$Runs, "\\*", "")
head(SparkR::distinct(tendulkar1[,"Runs"]),20)
df=SparkR::summarize(SparkR::groupBy(tendulkar1, tendulkar1$Ground), mean = mean(tendulkar1$Runs), minRuns=min(tendulkar1$Runs),maxRuns=max(tendulkar1$Runs))
head(df,20)
          Ground       mean minRuns maxRuns
1      Bangalore  54.312500       0      96
2       Adelaide  32.600000       0      61
3  Colombo (PSS)  37.200000      14      71
4   Christchurch  12.000000       0      24
5       Auckland   5.000000       5       5
6        Chennai  60.625000       0      81
7      Centurion  73.500000     111      36
8       Brisbane   7.666667       0       7
9     Birmingham  46.750000       1      40
10     Ahmedabad  40.125000     100       8
11 Colombo (RPS) 143.000000     143     143
12    Chittagong  57.800000     101      36
13     Cape Town  69.857143      14       9
14    Bridgetown  26.000000       0      92
15      Bulawayo  55.000000      36      74
16         Delhi  39.947368       0      76
17    Chandigarh  11.000000      11      11
18  Bloemfontein  85.000000      15     155
19 Colombo (SSC)  77.555556     104       8
20       Cuttack   2.000000       2       2

Also see
1. My book ‘Practical Machine Learning in R and Python: Third edition’ on Amazon
2.My book ‘Deep Learning from first principles:Second Edition’ now on Amazon
3.The Clash of the Titans in Test and ODI cricket
4. Introducing QCSimulator: A 5-qubit quantum computing simulator in R
5.Latency, throughput implications for the Cloud
6. Simulating a Web Joint in Android
5. Pitching yorkpy … short of good length to IPL – Part 1

To see all posts click Index of Posts