Run Word-Count Example of Hadoop (Python Version)

While Hadoop/MapReduce is based on Java, it is not necessary to use Java to write the mapper and reducer. The Hadoop framework provides the “Streaming API”, which lets us use any command line executable that reads from standard input and writes to standard output as the mapper or reducer. This tutorial (Link), although a bit old, provides an excellent introductory example to using Python and Hadoop streaming.

(1) Case Description

In this blog, I'm going to use a word-count example to show how to run MadReduce task by Python. The data I used in this example is twitter data (json), and the goal is to count the occurrence of a list of words. To achieve the goal, I would revise the Python code in the tutorial above.

(2) Adjust the Code For Python-Hadoop-Example

The tutorial above uses mapper.py and reducer.py to run MapReduce task but these two files are in Python 2.7, which is out of date. Therefore, I would change it into Python 3.+ format and adjust it according to the case requirement.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"""mapper.py"""

import sys
import json
import re
pronouns = ["han", "hon", "den", "det", "denna", "denne", "hen", "unique_tweet"]
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
if len(line) != 0:
# use json to load the tweets
jsonData = json.loads(line)
# determine whether it's retweet or not if 'retweeted_status' not in jsonData:
tweets = jsonData['text']
# split the text into words
pattern = re.compile(r"\w+")
words = list(map(str, pattern.findall(tweets)))
# increase counters
words.append('unique_tweet')
for word in words:
word = word.lower()
if word in pronouns:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py #
# tab-delimited; the trivial word count is 1
print(word, 1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
"""reducer.py"""

import sys
from operator import itemgetter

current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py word,
count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print(current_word, current_count)
current_count = count
current_word = word
# don't forget to output the last word if needed!
if current_word == word:
print(current_word, current_count)

(3) Run the Code on Hadoop

Before we actually run the task on Hadoop, it's better to have a local test about mapper.py and reducer.py on small data set to check if it's able to run successfully. The command for local test is given as below.

1
2
# The format is 'cat /path-of-data-set | /path-of-mapper.py  | sort -k1,1 | /path-of-reducer.py'
cat /home/ubuntu/tweets/tweets_0.txt | /home/ubuntu/python/mapper.py | sort -k1,1 | /home/ubuntu/python/reducer.py

If we can get the expected result on the test, then we can run the Hadoop task. First, we put the data set into HDFS, and then run these two Python file on Hadoop.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# The format is 'hadoop fs -put /path-of-data-set /folder-in-HDFS'
hadoop fs -put /home/ubuntu/test1/RC_2005-12 /test1

# Check if the data is in HDFS
hadoop fs -ls /test1

# Run the Python-Hadoop task
bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file /home/ubuntu/python/mapper.py -mapper /home/ubuntu/python/mapper.py \
-file /home/ubuntu/python/reducer.py -reducer /home/ubuntu/python/reducer.py \
-input /test1/* -output /test1-output

# Check the output file
hadoop fs -cat /test1-output/part-00000

All articles in this blog adopt the CC BY-SA 4.0 agreement except for special statements. Please indicate the source for reprinting!