Monday, December 6, 2010

Hello Hadoop - Getting Started with Hadoop in Pseudo-Distributed Cluster Mode (Ubuntu)

In this blog, we will get started with Apache Hadoop by writing a map reduce job to process log files. We will start with the problem to solve, design the solution, download and install hadoop, and then execute the mapreduce solution that we design. Therefore, you can perceive this blog as "Hello Hadoop" blog.


Problem and Solution Sketch


The case that we will see is taken from a fictitious case from a web server for flight reservation. The web server logs the requests from its users, including the origin and destionation (airport code and city code), the date of departure and the date of the return. 


The log lines look like the following: 
PARCDGFRAFRA-02022011/06022011 
MRSMRSMUCMUC-10022011/20112011
NCENCEMUCMUC-11012011/19012011
NCENCELONLHR-21022011/03032011

The first three characters represent the city of origin, for example PAR to represent Paris. The second three characters represent the airport of origin, for example CDG to represent Charles de Gaulle airport. Then, the following six characters represent the city and airport of destinations. Starting from the 14th character, we have the date of departure in ddMMyy format, followed by date of the return flight in the same format.


We can have a lot of things to do with the logs, for example to know the distribution of the request for each city, the distribution of the request per stay duration, the distribution of the request for each date departure,  and son on and so forth. For this example, we will just try to get the distribution of the requests per each origin. That is, we want to have the following consolidated output:


LYS 80
MRS 42
NCE 834
PAR 838
TLS 296

Hadoop distributes the files in the cluster. The files are then served as input of the mapper nodes which maps each line of the input to a (key, value) pair.  Hadoop optimizes the computation by ensuring that the node maps the data that are local to it.  


For example, the line 


PARCDGFRAFRA-02022011/06022011 


may be mapped to (PAR, 1) in a mapreduce jobs that count the number of  Paris city code and 1 is to be used as the count. The same line may also be mapped to (PAR, 4) when the line is mapped to pair of <origin city,  duration of the stay>.  The line may also be mapped to (PARFRA, 4 days) when the line is mapped to <origin and destination city, duration of the stay>


The result of the mapping are then delivered to the reducer. Each (key, value) pair that has the same key value is delivered to the same reducer. The delivery to the same reducer is called shuffle and sort process.


For our problem here, we will then map each line of the log to <origin city, 1>. We map then the following lines 

PARCDGFRAFRA-02022011/06022011 
MRSMRSMUCMUC-10022011/20112011
NCENCEMUCMUC-11012011/19012011
NCENCELONLHR-21022011/03032011
PARCDGFRAFRA-02022011/06022011 


to (PAR, 1), (MRS, 1), (NCE, 1), (NCE, 1). The (PAR, [1, 1]), (MRS, [1]), and (NCE, [1, 1]) are then delivered to reducer. The reducer then reduces the three lines to (PAR, 2), (MRS, 1), and (NCE, 2) which is what we need as the solution of the problem.



Installing Hadoop in Pseudo-Distributed Mode 
We are now ready to install Hadoop in our environment. To do so, you will need to have the following configuration of machines:





I'm joking of course. No, you will not need those machines, you can use your personal computer. I don't even use my desktop computer for this example, only my laptop computer. However, you will need Ubuntu to be installed in your machine.


We will download hadoop from apache site: http://hadoop.apache.org/ . We will use the 0.21.0 version. Make sure to execute md5sum against the downloaded file. Then, copy the file to the target directory. For example to /home/arizal.



sudo mv /home/arizal/Downloads/hadoop-0.21.0.tar.gz /home/arizal

Then untar the file:

tar xzf hadoop-0.21.0.tar.gz



Check if the installation work fine by launching
bin/hadoop version.
You might get the following error:


Error: JAVA_HOME is not set.


To fix the error, you need to export JAVA_HOME that tells the directory of the JDK. Example:


export JAVA_HOME=/yourjdkdirectory/jdk1.6.0_22


When everything is OK you will have something like:



Hadoop 0.21.0
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.21 -r 985326
Compiled by tomwhite on Tue Aug 17 01:02:28 EDT 2010
From source with checksum a1aeb15b4854808d152989ba76f90fac



Now, we need to configure ssh that will be used by the pseudo nodes like name nodes, data nodes, and so on.  I will not discuss what they are in this blog, you will find them in hadoop site, or I might come back later on the subject in another blog.

To install ssh, you can do
sudo apt-get install ssh


Once installed, we will configure passwordless ssh connection as follow:
First, generate the key using ssh-keygen


ssh-keygen -t rsa -P ""
Generating public/private rsa key pair.


Enter file in which to save the key (/home/arizal/.ssh/id_rsa): 

Your identification has been saved in /home/arizal/.ssh/id_rsa.
Your public key has been saved in /home/arizal/.ssh/id_rsa.pub.
The key fingerprint is:
...

Then, enable SSH access to the localhost using the generated key as follow:
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys


Test that the configuration works by launching:
ssh localhost


If the configuration works, you will not be asked for password.

We will modify the conf/hadoop-env.sh to add JAVA_HOME to the file. 

JAVA_HOME=/yourjdkdirectory/jdk1.6.0_22

We're almost there, be patient. Just couple of steps before coding. Promise.
We need to modify three files conf/core-site.xml, conf/hdfs-site.xml, and conf/mapred-site.xml as follow:


<!-- core-site.xml -->
<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost/</value>
   </property>
</configuration>
<!-- hdfs-site.xml -->
<configuration>
   <property>
     <name>dfs.replication</name>
     <value>1</value>
   </property>
</configuration>
<!-- mapred-site.xml -->
<configuration>
   <property>
       <name>mapred.job.tracker</name>
       <value>localhost:8021</value>
   </property>
</configuration>


Then, we format HDFS file system. The formatting creates an empty file system.
bin/hadoop namenode -format

When it's OK, the output will look like the following:


STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.21 -r 985326; compiled by 'tomwhite' on Tue Aug 17 01:02:28 EDT 2010
************************************************************/
10/12/05 21:38:49 INFO namenode.FSNamesystem: defaultReplication = 1
10/12/05 21:38:49 INFO namenode.FSNamesystem: maxReplication = 512
10/12/05 21:38:49 INFO namenode.FSNamesystem: minReplication = 1
10/12/05 21:38:49 INFO namenode.FSNamesystem: maxReplicationStreams = 2
10/12/05 21:38:49 INFO namenode.FSNamesystem: shouldCheckForEnoughRacks = false
10/12/05 21:38:49 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
10/12/05 21:38:50 INFO namenode.FSNamesystem: fsOwner=arizal
10/12/05 21:38:50 INFO namenode.FSNamesystem: supergroup=supergroup
10/12/05 21:38:50 INFO namenode.FSNamesystem: isPermissionEnabled=true
10/12/05 21:38:50 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
10/12/05 21:38:51 INFO common.Storage: Image file of size 112 saved in 0 seconds.
10/12/05 21:38:51 INFO common.Storage: Storage directory /tmp/hadoop-arizal/dfs/name has been successfully formatted.
10/12/05 21:38:51 INFO namenode.NameNode: SHUTDOWN_MSG: 


Perfect. We're almost there. We start everything now. We can use
 bin/start-all.sh 
but we  will get this annoying warning message.

This script is Deprecated. Instead use start-dfs.sh and start-mapred.sh.


Don't worry, ignoring the warning looks OK. But you can try launching start-dfs.sh or start-mapred.sh you will get something even more annoying error message:
Hadoop common not found.

The fix for the "Hadoop common not found" can be found in the following JIRA entry:
The solution described there solved the problem for me.

We can check the running nodes by using the jps command:
$jps
9530 DataNode
9325 NameNode
10131 Jps
9750 SecondaryNameNode
10087 TaskTracker
9880 JobTracker

 Finally, to stop all the nodes, we can use
 bin/stop-all.sh.

OK. That's all about installation, as promised, we're now ready to code.

Coding Mapper and Reducer 
We can code the mapper and reducer now. You can use any IDE you like, I used NetBeans 6.9.1 for this purpose. 

Make sure that hadoop-common-0.21.0.jar and hadoop-mapred-0.21.0.jar are in the project library.

Then, code the mapper as follow:



   1 
   2 /**
   3  * The class that maps an input of a map task
   4  * into a set of integer that represents the
   5  * existence of an origin in a line of a HDFS
   6  * file.
   7  *
   8  * @author arizal
   9  */
  10 public class FlightOriginMapper
  11     extends Mapper<LongWritable, Text, Text, IntWritable> {
  12 
  13   /** Maps the first 3 characters of a line to 1.
  14    *  The line PARCDGFRAFRA-02022011/06022011 maps to (PAR,1)
  15    *  @param key the key, it is not used in the mapper.
  16    *  @param value the value that corresponds to the line
  17    *  of the file.
  18    *  @param context the execution context.
  19    *  @throws IOException the exception thrown on IO problem.
  20    *  @throws InterruptedException thrown when execution is
  21    *  interrupted.
  22    */
  23   @Override
  24   public void map(LongWritable key, Text value, Context context)
  25     throws IOException, InterruptedException {
  26     String line = value.toString();
  27     // discard any line whose length less than 3.
  28     if (line.length() > 3) { 
  29         // pickup the origin
  30         String origin = line.substring(0, 3);
  31         // then provides the output of (origin, 1)
  32         context.write(new Text(origin), new IntWritable(1));
  33     }
  34   }
  35 }
  36 



The code for reducer receives the iterator of pair (origin, [ list of integer ] ). The reducer then reduces the pair to (origin, size of list). 



   1 /**
   2  * The reducer that reduces a list of integer corresponding
   3  * to the existence of an origin in a line of an HDFS file.
   4  * The reducer may also be used as a combiner to compute
   5  * the total number of origin appearance computed by a mapper.
   6  *
   7  * @author arizal
   8  */
   9 public class FlightOriginReducer
  10     extends Reducer<Text, IntWritable, Text, IntWritable> {
  11 
  12     /** Executes the reduction.
  13      *
  14      * @param key the key of mapped values
  15      * @param values the values mapped by the mapper.
  16      * @param context the context of execution.
  17      * @throws IOException the exception thrown
  18      * on IO operation.
  19      * @throws InterruptedException the exception thrown when
  20      * the reduction is interrupted.
  21      */
  22     @Override
  23     public void reduce(Text key, Iterable<IntWritable> values,
  24         Context context) throws IOException, InterruptedException {
  25         int sum = 0;
  26         for (IntWritable value: values) {
  27            sum = sum + value.get();
  28         }
  29         context.write(key, new IntWritable(sum));
  30     }
  31 }
  32 


Finally, we prepare the driver class that glues the mapper and reducer:



   1 /**
   2  * The driver class for flight origin map reduce.
   3  * @author arizal
   4  */
   5 public class Main {
   6     /**
   7      *
   8      * @param args the command line arguments
   9      */
  10     public static void main(String[] args) 
  11      throws IOException, InterruptedException, ClassNotFoundException {
  12         
  13         if (args.length != 2) {
  14             System.err.println(
  15                 "Usage: Flight <input path> <output path>");
  16             System.exit(-1);
  17         }
  18 
  19         Job job = new Job();
  20         job.setJarByClass(Main.class);
  21 
  22         FileInputFormat.addInputPath(job, new Path(args[0]));
  23         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  24 
  25         job.setMapperClass(FlightOriginMapper.class);
  26         job.setReducerClass(FlightOriginReducer.class);
  27 
  28         job.setOutputKeyClass(Text.class);
  29         job.setOutputValueClass(IntWritable.class);
  30 
  31         job.waitForCompletion(true);
  32     }
  33 }
  34 
  35 



Put the three classes into a jar, for example mapreduce1.jar and we now have the program. 

To launch the program we need to prepare the input files.  We need to launch the nodes ,e.g. using bin/hadoop/start-all.sh again.  In this example, we assume that there are two log files: flight-1.log and flight-2.log. The two files are copied first to HDFS using the following commands:

bin/hadoop dfs -copyFromLocal /home/arizal/flightlog/flight-1.log flightlog
bin/hadoop dfs -copyFromLocal /home/arizal/flightlog/flight-2.log flightlog

To verify that the two files are really taken into account, use the dfs -ls as follow:
bin/hadoop dfs -ls /user/arizal/flightlog

That returns:
Found 2 items
-rw-r--r--   1 arizal supergroup      64792 2010-12-05 22:32 /user/arizal/flightlog/flight-1.log
-rw-r--r--   1 arizal supergroup      64792 2010-12-05 22:49 /user/arizal/flightlog/flight-2.log

At this point, we're ready to launch the mapreduce job. This can be done using the following command:

$ bin/hadoop jar /home/arizal/NetBeansProjects/mapreduce1/dist/mapreduce1.jar flightlog origin-out

The command tells hadoop to execute the main class inside mapreduce1.jar with two arguments flightlog and origin-out. The flightlog is used by the program as the path to be used and origin-out is used to be the output file path.  From the preparation that we did before, we have two files inside flightlog directory (flight-1.log and flight-2.log). 

When the command is launched, we have:

INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000
WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
WARN mapreduce.JobSubmitter: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO input.FileInputFormat: Total input paths to process : 2
WARN conf.Configuration: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
INFO mapreduce.JobSubmitter: number of splits:2
INFO mapreduce.JobSubmitter: adding the following namenodes' delegation tokens:null
INFO mapreduce.Job: Running job: job_201012052231_0002
INFO mapreduce.Job:  map 0% reduce 0%
INFO mapreduce.Job:  map 50% reduce 0%
INFO mapreduce.Job:  map 100% reduce 0%
INFO mapreduce.Job:  map 100% reduce 100%
INFO mapreduce.Job: Job complete: job_201012052231_0002
INFO mapreduce.Job: Counters: 33
FileInputFormatCounters
BYTES_READ=129584
FileSystemCounters
FILE_BYTES_READ=41806
FILE_BYTES_WRITTEN=83682
HDFS_BYTES_READ=129826
HDFS_BYTES_WRITTEN=41
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
Job Counters 
Data-local map tasks=2
Total time spent by all maps waiting after reserving slots (ms)=0
Total time spent by all reduces waiting after reserving slots (ms)=0
SLOTS_MILLIS_MAPS=18785
SLOTS_MILLIS_REDUCES=6104
Launched map tasks=2
Launched reduce tasks=1
Map-Reduce Framework
Combine input records=0
Combine output records=0
Failed Shuffles=0
GC time elapsed (ms)=26
Map input records=4184
Map output bytes=33440
Map output records=4180
Merged Map outputs=2
Reduce input groups=5
Reduce input records=4180
Reduce output records=5
Reduce shuffle bytes=41812
Shuffled Maps =2
Spilled Records=8360
SPLIT_RAW_BYTES=242

We can check the output in the origin-out directory using hadoop dfs -cat command:
bin/hadoop dfs -cat /user/arizal/origin-out/part-r-00000

In my case, it returns:
LYS  160
MRS   84
NCE 1668
PAR 1676
TLS  592

OK. This ends this "Hello Hadoop" blog. I'll try to come back with couple other things. Perhaps Avro or Thrift. 

7 comments:

Unknown said...

Hai mate, It was awesome to see the good explanation of Hadoop information over this blog. And keep updating on latest technology info. for getting more knowledge to the Hadoop Lovers.
Hadoop Training in hyderabad

vignesh said...

hi,i have to very useful for this information.thanks a lot.



Hadoop Training in Chennai

Stephen said...

Thank you so much for sharing this great information. Today I stand as a successful hadoop certified professional. Thanks to Big Data Training

Unknown said...

Nice piece of article you have shared here, my dream of becoming a hadoop professional become true with the help of Hadoop Training in Chennai
, keep up your good work of sharing quality articles.

Unknown said...

I get a lot of great information from your blog. Thank you for your sharing this informative blog. I have bookmarked this page for my future reference.

Regards...
Ethical Hacking Course in Chennai

Unknown said...

Cloud computing is storing and accessing the large data sets over the internet instead of your PC computer. So that you can manage the data and program anywhere through the internet.
Regards..
Cloud Computing Training


Unknown said...

Your blog is really useful for me. Thanks for sharing this informative blog. If anyone wants to get real time Oracle Training Chennai reach FITA located at Chennai. They give professional and job oriented training for all students.