Problem and Solution Sketch
The case that we will see is taken from a fictitious case from a web server for flight reservation. The web server logs the requests from its users, including the origin and destionation (airport code and city code), the date of departure and the date of the return.
The log lines look like the following:
PARCDGFRAFRA-02022011/06022011
MRSMRSMUCMUC-10022011/20112011
NCENCEMUCMUC-11012011/19012011
NCENCELONLHR-21022011/03032011
We can have a lot of things to do with the logs, for example to know the distribution of the request for each city, the distribution of the request per stay duration, the distribution of the request for each date departure, and son on and so forth. For this example, we will just try to get the distribution of the requests per each origin. That is, we want to have the following consolidated output:
LYS 80
MRS 42
NCE 834
PAR 838
TLS 296
Hadoop distributes the files in the cluster. The files are then served as input of the mapper nodes which maps each line of the input to a (key, value) pair. Hadoop optimizes the computation by ensuring that the node maps the data that are local to it.
For example, the line
PARCDGFRAFRA-02022011/06022011
may be mapped to (PAR, 1) in a mapreduce jobs that count the number of Paris city code and 1 is to be used as the count. The same line may also be mapped to (PAR, 4) when the line is mapped to pair of <origin city, duration of the stay>. The line may also be mapped to (PARFRA, 4 days) when the line is mapped to <origin and destination city, duration of the stay>
The result of the mapping are then delivered to the reducer. Each (key, value) pair that has the same key value is delivered to the same reducer. The delivery to the same reducer is called shuffle and sort process.
For our problem here, we will then map each line of the log to <origin city, 1>. We map then the following lines
PARCDGFRAFRA-02022011/06022011
MRSMRSMUCMUC-10022011/20112011
NCENCEMUCMUC-11012011/19012011
NCENCELONLHR-21022011/03032011
PARCDGFRAFRA-02022011/06022011 to (PAR, 1), (MRS, 1), (NCE, 1), (NCE, 1). The (PAR, [1, 1]), (MRS, [1]), and (NCE, [1, 1]) are then delivered to reducer. The reducer then reduces the three lines to (PAR, 2), (MRS, 1), and (NCE, 2) which is what we need as the solution of the problem.
Installing Hadoop in Pseudo-Distributed Mode
We are now ready to install Hadoop in our environment. To do so, you will need to have the following configuration of machines:
I'm joking of course. No, you will not need those machines, you can use your personal computer. I don't even use my desktop computer for this example, only my laptop computer. However, you will need Ubuntu to be installed in your machine.
We will download hadoop from apache site: http://hadoop.apache.org/ . We will use the 0.21.0 version. Make sure to execute md5sum against the downloaded file. Then, copy the file to the target directory. For example to /home/arizal.
sudo mv /home/arizal/Downloads/hadoop-0.21.0.tar.gz /home/arizal
Then untar the file:
tar xzf hadoop-0.21.0.tar.gz
Check if the installation work fine by launching
bin/hadoop version.
You might get the following error:
Error: JAVA_HOME is not set.
To fix the error, you need to export JAVA_HOME that tells the directory of the JDK. Example:
export JAVA_HOME=/yourjdkdirectory/jdk1.6.0_22
When everything is OK you will have something like:
Hadoop 0.21.0
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.21 -r 985326
Compiled by tomwhite on Tue Aug 17 01:02:28 EDT 2010
From source with checksum a1aeb15b4854808d152989ba76f90fac
Now, we need to configure ssh that will be used by the pseudo nodes like name nodes, data nodes, and so on. I will not discuss what they are in this blog, you will find them in hadoop site, or I might come back later on the subject in another blog.
To install ssh, you can do
sudo apt-get install ssh
Once installed, we will configure passwordless ssh connection as follow:
First, generate the key using ssh-keygen
ssh-keygen -t rsa -P ""
Generating public/private rsa key pair.
Enter file in which to save the key (/home/arizal/.ssh/id_rsa):
Your identification has been saved in /home/arizal/.ssh/id_rsa.
Your public key has been saved in /home/arizal/.ssh/id_rsa.pub.
The key fingerprint is:
...
Then, enable SSH access to the localhost using the generated key as follow:
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
Test that the configuration works by launching:
ssh localhost
If the configuration works, you will not be asked for password.
We will modify the conf/hadoop-env.sh to add JAVA_HOME to the file.
JAVA_HOME=/yourjdkdirectory/jdk1.6.0_22
We're almost there, be patient. Just couple of steps before coding. Promise.
We need to modify three files conf/core-site.xml, conf/hdfs-site.xml, and conf/mapred-site.xml as follow:
<!-- core-site.xml -->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost/</value>
</property>
</configuration>
<!-- hdfs-site.xml -->
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
<!-- mapred-site.xml -->
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:8021</value>
</property>
</configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:8021</value>
</property>
</configuration>
Then, we format HDFS file system. The formatting creates an empty file system.
bin/hadoop namenode -format
When it's OK, the output will look like the following:
STARTUP_MSG: build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.21 -r 985326; compiled by 'tomwhite' on Tue Aug 17 01:02:28 EDT 2010
************************************************************/
10/12/05 21:38:49 INFO namenode.FSNamesystem: defaultReplication = 1
10/12/05 21:38:49 INFO namenode.FSNamesystem: maxReplication = 512
10/12/05 21:38:49 INFO namenode.FSNamesystem: minReplication = 1
10/12/05 21:38:49 INFO namenode.FSNamesystem: maxReplicationStreams = 2
10/12/05 21:38:49 INFO namenode.FSNamesystem: shouldCheckForEnoughRacks = false
10/12/05 21:38:49 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
10/12/05 21:38:50 INFO namenode.FSNamesystem: fsOwner=arizal
10/12/05 21:38:50 INFO namenode.FSNamesystem: supergroup=supergroup
10/12/05 21:38:50 INFO namenode.FSNamesystem: isPermissionEnabled=true
10/12/05 21:38:50 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
10/12/05 21:38:51 INFO common.Storage: Image file of size 112 saved in 0 seconds.
10/12/05 21:38:51 INFO common.Storage: Storage directory /tmp/hadoop-arizal/dfs/name has been successfully formatted.
10/12/05 21:38:51 INFO namenode.NameNode: SHUTDOWN_MSG:
10/12/05 21:38:49 INFO namenode.FSNamesystem: defaultReplication = 1
10/12/05 21:38:49 INFO namenode.FSNamesystem: maxReplication = 512
10/12/05 21:38:49 INFO namenode.FSNamesystem: minReplication = 1
10/12/05 21:38:49 INFO namenode.FSNamesystem: maxReplicationStreams = 2
10/12/05 21:38:49 INFO namenode.FSNamesystem: shouldCheckForEnoughRacks = false
10/12/05 21:38:49 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
10/12/05 21:38:50 INFO namenode.FSNamesystem: fsOwner=arizal
10/12/05 21:38:50 INFO namenode.FSNamesystem: supergroup=supergroup
10/12/05 21:38:50 INFO namenode.FSNamesystem: isPermissionEnabled=true
10/12/05 21:38:50 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
10/12/05 21:38:51 INFO common.Storage: Image file of size 112 saved in 0 seconds.
10/12/05 21:38:51 INFO common.Storage: Storage directory /tmp/hadoop-arizal/dfs/name has been successfully formatted.
10/12/05 21:38:51 INFO namenode.NameNode: SHUTDOWN_MSG:
Perfect. We're almost there. We start everything now. We can use
bin/start-all.sh but we will get this annoying warning message.
This script is Deprecated. Instead use start-dfs.sh and start-mapred.sh.
Don't worry, ignoring the warning looks OK. But you can try launching start-dfs.sh or start-mapred.sh you will get something even more annoying error message:
Hadoop common not found.
The solution described there solved the problem for me.
We can check the running nodes by using the jps command:
$jps
9530 DataNode
9325 NameNode
10131 Jps
9750 SecondaryNameNode
10087 TaskTracker
9880 JobTracker
Finally, to stop all the nodes, we can use
bin/stop-all.sh.
OK. That's all about installation, as promised, we're now ready to code.
Coding Mapper and Reducer
We can code the mapper and reducer now. You can use any IDE you like, I used NetBeans 6.9.1 for this purpose.
Make sure that hadoop-common-0.21.0.jar and hadoop-mapred-0.21.0.jar are in the project library.
Then, code the mapper as follow:
1
2 /**
3 * The class that maps an input of a map task
4 * into a set of integer that represents the
5 * existence of an origin in a line of a HDFS
6 * file.
7 *
8 * @author arizal
9 */
10 public class FlightOriginMapper
11 extends Mapper<LongWritable, Text, Text, IntWritable> {
12
13 /** Maps the first 3 characters of a line to 1.
14 * The line PARCDGFRAFRA-02022011/06022011 maps to (PAR,1)
15 * @param key the key, it is not used in the mapper.
16 * @param value the value that corresponds to the line
17 * of the file.
18 * @param context the execution context.
19 * @throws IOException the exception thrown on IO problem.
20 * @throws InterruptedException thrown when execution is
21 * interrupted.
22 */
23 @Override
24 public void map(LongWritable key, Text value, Context context)
25 throws IOException, InterruptedException {
26 String line = value.toString();
27 // discard any line whose length less than 3.
28 if (line.length() > 3) {
29 // pickup the origin
30 String origin = line.substring(0, 3);
31 // then provides the output of (origin, 1)
32 context.write(new Text(origin), new IntWritable(1));
33 }
34 }
35 }
36
The code for reducer receives the iterator of pair (origin, [ list of integer ] ). The reducer then reduces the pair to (origin, size of list).
1 /**
2 * The reducer that reduces a list of integer corresponding
3 * to the existence of an origin in a line of an HDFS file.
4 * The reducer may also be used as a combiner to compute
5 * the total number of origin appearance computed by a mapper.
6 *
7 * @author arizal
8 */
9 public class FlightOriginReducer
10 extends Reducer<Text, IntWritable, Text, IntWritable> {
11
12 /** Executes the reduction.
13 *
14 * @param key the key of mapped values
15 * @param values the values mapped by the mapper.
16 * @param context the context of execution.
17 * @throws IOException the exception thrown
18 * on IO operation.
19 * @throws InterruptedException the exception thrown when
20 * the reduction is interrupted.
21 */
22 @Override
23 public void reduce(Text key, Iterable<IntWritable> values,
24 Context context) throws IOException, InterruptedException {
25 int sum = 0;
26 for (IntWritable value: values) {
27 sum = sum + value.get();
28 }
29 context.write(key, new IntWritable(sum));
30 }
31 }
32
Finally, we prepare the driver class that glues the mapper and reducer:
1 /**
2 * The driver class for flight origin map reduce.
3 * @author arizal
4 */
5 public class Main {
6 /**
7 *
8 * @param args the command line arguments
9 */
10 public static void main(String[] args)
11 throws IOException, InterruptedException, ClassNotFoundException {
12
13 if (args.length != 2) {
14 System.err.println(
15 "Usage: Flight <input path> <output path>");
16 System.exit(-1);
17 }
18
19 Job job = new Job();
20 job.setJarByClass(Main.class);
21
22 FileInputFormat.addInputPath(job, new Path(args[0]));
23 FileOutputFormat.setOutputPath(job, new Path(args[1]));
24
25 job.setMapperClass(FlightOriginMapper.class);
26 job.setReducerClass(FlightOriginReducer.class);
27
28 job.setOutputKeyClass(Text.class);
29 job.setOutputValueClass(IntWritable.class);
30
31 job.waitForCompletion(true);
32 }
33 }
34
35
Put the three classes into a jar, for example mapreduce1.jar and we now have the program.
To launch the program we need to prepare the input files. We need to launch the nodes ,e.g. using bin/hadoop/start-all.sh again. In this example, we assume that there are two log files: flight-1.log and flight-2.log. The two files are copied first to HDFS using the following commands:
bin/hadoop dfs -copyFromLocal /home/arizal/flightlog/flight-1.log flightlog
bin/hadoop dfs -copyFromLocal /home/arizal/flightlog/flight-2.log flightlog
To verify that the two files are really taken into account, use the dfs -ls as follow:
bin/hadoop dfs -ls /user/arizal/flightlog
That returns:
Found 2 items
-rw-r--r-- 1 arizal supergroup 64792 2010-12-05 22:32 /user/arizal/flightlog/flight-1.log
-rw-r--r-- 1 arizal supergroup 64792 2010-12-05 22:49 /user/arizal/flightlog/flight-2.log
$ bin/hadoop jar /home/arizal/NetBeansProjects/mapreduce1/dist/mapreduce1.jar flightlog origin-out
The command tells hadoop to execute the main class inside mapreduce1.jar with two arguments flightlog and origin-out. The flightlog is used by the program as the path to be used and origin-out is used to be the output file path. From the preparation that we did before, we have two files inside flightlog directory (flight-1.log and flight-2.log).
When the command is launched, we have:
INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
WARN mapreduce.JobSubmitter: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO input.FileInputFormat: Total input paths to process : 2
WARN conf.Configuration: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
INFO mapreduce.JobSubmitter: number of splits:2
INFO mapreduce.JobSubmitter: adding the following namenodes' delegation tokens:null
INFO mapreduce.Job: Running job: job_201012052231_0002
INFO mapreduce.Job: map 0% reduce 0%
INFO mapreduce.Job: map 50% reduce 0%
INFO mapreduce.Job: map 100% reduce 0%
INFO mapreduce.Job: map 100% reduce 100%
INFO mapreduce.Job: Job complete: job_201012052231_0002
INFO mapreduce.Job: Counters: 33
FileInputFormatCounters
BYTES_READ=129584
FileSystemCounters
FILE_BYTES_READ=41806
FILE_BYTES_WRITTEN=83682
HDFS_BYTES_READ=129826
HDFS_BYTES_WRITTEN=41
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
Job Counters
Data-local map tasks=2
Total time spent by all maps waiting after reserving slots (ms)=0
Total time spent by all reduces waiting after reserving slots (ms)=0
SLOTS_MILLIS_MAPS=18785
SLOTS_MILLIS_REDUCES=6104
Launched map tasks=2
Launched reduce tasks=1
Map-Reduce Framework
Combine input records=0
Combine output records=0
Failed Shuffles=0
GC time elapsed (ms)=26
Map input records=4184
Map output bytes=33440
Map output records=4180
Merged Map outputs=2
Reduce input groups=5
Reduce input records=4180
Reduce output records=5
Reduce shuffle bytes=41812
Shuffled Maps =2
Spilled Records=8360
SPLIT_RAW_BYTES=242
bin/hadoop dfs -cat /user/arizal/origin-out/part-r-00000
In my case, it returns:
LYS 160
MRS 84
NCE 1668
PAR 1676
TLS 592
OK. This ends this "Hello Hadoop" blog. I'll try to come back with couple other things. Perhaps Avro or Thrift.
7 comments:
Hai mate, It was awesome to see the good explanation of Hadoop information over this blog. And keep updating on latest technology info. for getting more knowledge to the Hadoop Lovers.
Hadoop Training in hyderabad
hi,i have to very useful for this information.thanks a lot.
Hadoop Training in Chennai
Thank you so much for sharing this great information. Today I stand as a successful hadoop certified professional. Thanks to Big Data Training
Nice piece of article you have shared here, my dream of becoming a hadoop professional become true with the help of Hadoop Training in Chennai
, keep up your good work of sharing quality articles.
I get a lot of great information from your blog. Thank you for your sharing this informative blog. I have bookmarked this page for my future reference.
Regards...
Ethical Hacking Course in Chennai
Cloud computing is storing and accessing the large data sets over the internet instead of your PC computer. So that you can manage the data and program anywhere through the internet.
Regards..
Cloud Computing Training
Your blog is really useful for me. Thanks for sharing this informative blog. If anyone wants to get real time Oracle Training Chennai reach FITA located at Chennai. They give professional and job oriented training for all students.
Post a Comment