26 Haziran 2014 Perşembe

How to Schedule Hadoop Jobs - Apache Oozie

I will explain scheduled Hadoop jobs, scheduled with Apache Oozie.

You have large volumes of data collected and stored into HDFS. Your analysis process may differ according to nature of your data and needs of your business. To further investigate, let's go through following scenarios:

1. Running in Scheduled Time Intervals

You want to make analysis on your data in scheduled time intervals.

For example, you have e-commerce site and want to put your customers into categories like sport, book, electronic and etc. You want to make this analyze every day or every week or every month.

As another example, you want to create a report that shows impression/purchase count (and other statistics) of your products. You want to generate this report every night at 01:00 am.

2. Running When Data is Present

You want to make analysis when a speficic data feed comes.

Like previous example, you have a e-commerce site, you are collecting event logs. However, you also have a dependency to another data to be available on HDFS.

3. Running Dependent Analyses

You want to make a sequnce of analyses that are dependent on each other's output.

You want to implement a basic suggestion system. Firstly, you will analyze impression/purchase counts of your products (Your products also have associated categories). Secondly you will analyze interest areas of your customers. Then you want to merge these two outputs and match products with customers. At the end, you will offer specific products to specific customers.

4. Minimal Technical Effort/Minimal Dependency

From a technical view, you want to build a scalable and extensible scheduling system. You want to make your analyses with minimal effort and minimal language dependencies.

One Good Solution - Oozie

What we use at our system is Apache Oozie. We have some of the use cases stated below and others will likely be valid for us in near future.


Excerpt from Oozie web page:

Oozie is a workflow scheduler system to manage Apache Hadoop jobs.

Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions.

Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availabilty.

Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop and Distcp) as well as system specific jobs (such as Java programs and shell scripts).

In later posts, I will explain Oozie workflow and coordinator applications and workflow actions with examples.












Pig 0.11.1 Installation on Ubuntu

I will try to outline basic Pig 0.11.1 installation.

This was tested on Ubuntu 12.04 with Java 1.7 installed. Hadoop 1.0.3 is used on the same machine as Pig.


1. Download Pig


You can download Pig from http://www.apache.org/dyn/closer.cgi/pig. In my location, download link is:
http://apache.bilkent.edu.tr/pig/pig-0.11.1/pig-0.11.1.tar.gz

> cd /home/myuser/hadoop/
> wget http://apache.bilkent.edu.tr/pig/pig-0.11.1/pig-0.11.1.tar.gz

After downloading Pig, extract it

> tar -xzvf pig-0.11.1.tar.gz

This will extract the files to /home/myuser/hadoop/pig-0.11.1

2. Set Environment Variables

If not set, set the following environment variables:
Recommended way is to put these variables in shell script under /etc/profile.d. Create env_variables.sh and write:

export JAVA_HOME=/path/to/java/home
export HADOOP_HOME=/path/to/hadoop/home
export PIG_HOME=/path/to/pig/home

To run pig command from anywhere, we must add it to PATH variable. Append following to env_variables.sh. If java and hadoop are also not on the PATH variable, add them also.

export PATH=$PATH:$PIG_HOME/bin 

3. Hadoop Cluster Information

If you have setupped $HADOOP_HOME environment variable, it will find namenode and jobtracker addresses from Hadoop's configuration files (core-site.xml and mapred-site.xml).

If it could not find your cluster you can add configuration directory of Hadoop to Pig's classpath
export PIG_CLASSPATH=$HADOOP_HOME/conf/

4. Map-Reduce Mode

Pig supports local and map-reduce mode. We will try map-reduce mode.
You can run an existing pig script with:

> pig myscript.pig

You can get your script with parameters substituted (dry-run) with:
> pig -r myscript.pig

You can enter to grant shell and run your Pig statement there.
> pig
2014-06-26 23:40:31,977 [main] INFO  org.apache.pig.Main - Apache Pig version 0.11.1 (r1459641) compiled Mar 22 2013, 02:13:53
2014-06-26 23:40:31,978 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/myuser/pig_1403815231975.log
2014-06-26 23:40:31,992 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/myuser/.pigbootup not found
2014-06-26 23:40:32,124 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:10000
2014-06-26 23:40:32,948 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:10001

grunt> logs = LOAD '/data/logs' using PigStorage() as (id:int, log:chararray);
grunt> DUMP logs;



23 Haziran 2014 Pazartesi

Hadoop Trash - Recover Your Data

Hadoop gives the capability to recover your deleted files. When files are deleted, they are moved to .Trash folder under user's home directory (for example "/home/myuser/.Trash" ) and remain for a minimum period of time before being deleted permanently. You can recover your files by copying under .Trash folder to your desired path.
However, Hadoop trash only stores files that are deleted from filesystem shell. Files that are deleted programmatically are deletely immediately. Though you can use trash programmatically by using its org.apache.hadoop.fs.Trash class.

Hadoop 1.0.3 is used on Ubuntu 12.04 machine.

1. Enable Trash

By default trash feature is disabled.

To enable it, write following property in core-site.xml on NameNode machine:

  fs.trash.interval
  60
  Number of minutes after which the checkpoint
  gets deleted.
  If zero, the trash feature is disabled.
  
As description states, deleted files will be moved to .Trash folder and remain there for 60 minutes before being deleted permanently. A thread checks trash and removes the files that remained more than this interval.

In Hadoop 1.0.3, time interval for this thread to run is not specified in core-default.xml and code, therefore states that this property is not available in Hadoop 1.0.3. However in newer versions, you can configure it:


  fs.trash.checkpoint.interval
  15
  Number of minutes between trash checkpoints.
  Should be smaller or equal to fs.trash.interval.
  Every time the checkpointer runs it creates a new checkpoint
  out of current and removes checkpoints created more than
  fs.trash.interval minutes ago.
  


2. fs -rm/-rmr Commands

If you use "hadoop fs -rm" or "hadoop fs -rmr" commands, these files will be moved to trash and you can restore them under .Trash directory.

> hadoop fs -rmr /data/logs/data.log

Moved to trash: hdfs://localhost:10000/data/logs/data.log
> hadoop fs -mv /home/myuser/.Trash/Current/data/logs/data.log /data/recovered_data.log

3. skipTrash

You can delete your files immediately by using skipTrash option

> hadoop fs -rmr -skipTrash /data/logs/data.log

Deleted hdfs://localhost:10000/data/logs/data.log

4. fs -expunge

You can empty your .Trash folder by expunge method. This will delete files in .Trash folder and creates a new checkpoint

> hadoop fs -expunge

14/06/20 20:25:20 INFO fs.Trash: Created trash checkpoint: /user/myuser/.Trash/1406202025


//TODO
In later versions, client side configuration of trash enables trash feature for that user running the client. It is TODO for me to try this out in Hadoop 1.0.3.




Hadoop Does Not Stop - Missing Pid Files

In our cluster, we have experienced that if pid files of Hadoop daemons go missing, daemons will not stop. If daemons do not stop properly and you try to kill forcefully (kill -9), Hadoop can stay in an erroneous state. For example, if you kill datanode daemon, blocks can go missing.

Hadoop 1.0.3 running on a pseudo-distributed single node cluster is used on Ubuntu 12.04

1. Pid Files

Hadoop stores process ids in files under /tmp directory by default.
Files are named as:

  • hadoop-myuser-namenode.pid
  • hadoop-myuser-datanode.pid
  • hadoop-myuser-jobtracker.pid
  • hadoop-myuser-tasktracker.pid
  • hadoop-myuser-secondarynamenode.pid

These files store process ids as text.
If these files are deleted and you try to run Hadoop stop scripts, they cannot find Hadoop daemons and cannot stop them.

2. Create Pid Files

2.1. First, learn process ids of running Hadoop daemons by jps command or ps aux | grep hadoop. If jps is not on the PATH, you can try with full path.
> jps

25564 JobTracker
24896 NameNode
25168 DataNode
25878 TaskTracker
12726 Jps
25448 SecondaryNameNode

2.2. Find out the directory where the pid files should be stored. It is /tmp by default. However this path can be changed in $HADOOP_HOME/hadoop-env.sh.

# The directory where pid files are stored. /tmp by default.
# export HADOOP_PID_DIR=/var/hadoop/pids

2.3. Go to pid directory and create missing files. Write corresponding process ids and save.
> vi hadoop-myuser-namenode.pid
> vi hadoop-myuser-datanode.pid
> vi hadoop-myuser-jobtracker.pid
> vi hadoop-myuser-tasktracker.pid
> vi hadoop-myuser-secondarynamenode.pid


2.4. Change permissions of these files so that the user running Hadoop daemons can read and write.
I personally use chown (given that file permissions are 664)
> chown -R myuser:myuser /tmp/hadoop*.pid

2.5. Then you can stop your deamons as explained in this post:
> $HADOOP_HOME/bin/stop-all.sh

stopping jobtracker
localhost: stopping tasktracker
stopping namenode
localhost: stopping datanode
localhost: stopping secondarynamenode

You can check with jps or ps aux | grep hadoop
> jps

14237 Jps






Hadoop - Map and Reduce Slots

We have seen how number of maps and reduce tasks are calculated here. I will continue the topic with configuration of avaiable map and reduce slots in the cluster. Hadoop uses these slots to run map and reduce tasks and these slots are fixed with certain properties.

Hadoop 1.0.3 is used on Ubuntu 12.04 machine.

1. Map Slots

Number of map slots can be defined in mapred-site.xml under $HADOOP_HOME/conf directory on tasktracker nodes.
This value is 2 by default as defined in mapred-default.xml:

  mapred.tasktracker.map.tasks.maximum
  2
  The maximum number of map tasks that will be run
  simultaneously by a task tracker.
  


To change its value, open mapred-site.xml and set:

  mapred.tasktracker.map.tasks.maximum
  7


You should configure all your tasktracker nodes this way. After setting these properties, you should restart your tasktracker daemons. You can see new values on jobtracker web interface at http://<jobtracker-server>:50030

2. Reduce Slots

Like map slots, number of reduce slots can be defined in mapred-site.xml under $HADOOP_HOME/conf directory on tasktracker nodes.
This value is 2 by default as defined in mapred-default.xml:

  mapred.tasktracker.reduce.tasks.maximum
  2
  The maximum number of reduce tasks that will be run
  simultaneously by a task tracker.
  


To change its value, open mapred-site.xml and set:

  mapred.tasktracker.reduce.tasks.maximum
  7


You should configure all your tasktracker nodes this way.

3. How to decide the number of map/reduce slots?

To decide the number of map slots, you should investigate number of processors on the machine.
As a general practice, you can allow 2 tasks per CPU core. If your tasktracker machine have 8 cores, you can have 16 task slots counting both map and reduce tasks.
You can identify number of cpu cores by:
> lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                8
On-line CPU(s) list:   0-7
Thread(s) per core:    2
Core(s) per socket:    4
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 58
Stepping:              9
CPU MHz:               1600.000
BogoMIPS:              6784.81
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              8192K
NUMA node0 CPU(s):     0-7

I have 8 cpu cores stated by CPU(s) line. I will configure mapred.tasktracker.reduce.tasks.maximum and mapred.tasktracker.map.tasks.maximum as 8-1 = 7, since tasktracker and datanode daemons will consume 1 slot resource.


21 Haziran 2014 Cumartesi

Hadoop - Safe Mode

I will explain some details of Hadoop safe mode and related properties/commands.

Hadoop 1.0.3. is used on Ubuntu 12.04

1. Safe Mode

When namenode first starts, it loads its image file into memory. In this state, namenode has metadata of its filesystem. However it does not have the location of blocks. When datanodes begin to check in with the block list they have, namenode builds a map that holds blocks and their locations. Until a certain percentage of block locations are informed, namenode waits for other datanodes to check in.

During this time, namenode stays in safe mode. It does not replicate blocks and does not allow write, deletion and renaming. It gives a read-only view to clients, it gives directory listing and file reads will succeed if blocks are already informed by checked in datanodes. When configured percentage is met, namenode waits for 30 seconds and leaves safe mode.

Safe mode is beneficial. For example, replication is stopped thereby preventing unnecessary replication of a file, when it is just a matter of time for datanodes to check in with that file's block locations.

2. Configuration Parameters

dfs.replication.min 
Default value: 1
Description: The minimum number of replicas that have to be written for a write to be successful.

dfs.safemode.threshold.pct 
Default value: 0.999
Description: The proportion of blocks in the system that must meet the minimum replication level defined by dfs.replication.min before the namenode will exit safe mode. Setting this value to 0 or less forces the namenode not to start in safe mode. Setting this value to more than 1 means the namenode never exits safe mode.

dfs.safemode.extension 
Default value: 30,000
Description: The time, in milliseconds, to extend safe mode by after the minimum replication condition defined by dfs.safemode.threshold.pct has been satisfied. For small clusters (tens of nodes), it can be set to 0.

3. Safe Mode Commands

Get safe mode status
hadoop dfsadmin -safemode get
Safe mode is ON

Namenode web interface also gives information about safe mode status: 

Leave safe mode
hadoop dfsadmin -safemode leave
Safe mode is OFF

Control file system status
hadoop fsck /





Hadoop - Small Files Problem

I will explain Hadoop small files problem. I will also mention some approaches that deal with this problem.

Hadoop 1.0.3 is used on Ubuntu 12.04


Hadoop is designed to work with large data, HDFS also is efficient with large volumes of data. If there are lots of small files that are significantly smaller than the HDFS block size (which is 64MB by default), both HDFS and Hadoop mapreduce will suffer from it.

1. Implications on HDFS

HDFS stores metadata of every file, directory and block on memory. One file, using a block, holds 300 byte on namenode's memory. If there are 10 million records, this gives us 3GB memory usage. If this number accumulates over time, namenode cannot maintain its operation. 
In addition to that, HDFS is designed to stream large amount of data. There is an overhead when accessing small files, gathering files under a directory from different datanodes. 

2. Implications on MapReduce

When a job is submitted, Hadoop first calculates input splits using FileInputFormat class. If files are smaller than split size (generally equals to HDFS block size), FileInputFormat creates a split per file. For each split, a map task is created. For example if you have 100000 10KB files (totaling to 1GB), you will have 100000 map tasks. Map tasks will process very small data and there will be overhead of creating map tasks. If the files have been 64 MB, there would be only be 16 splits and map tasks.

3. Some Options

Sequence Files: Small files can be written as sequence files.
Har Files: Hadoop archive files can be used to group multiple file to one. This way, namenode will store only Har file's information. However, as an input to mapreduce, Har files does not offer any enhancement.
CombineFileInputFormat: This class can be used to combine small files into one map task. This does not offer any enhancement on HDFS.

Hadoop - Number of Map and Reduce Tasks

I will try to explain how number of map and reduce tasks is calculated.

Test are done with Hadoop 1.0.3 on Ubuntu 12.04 machine.

1. Number of Map Tasks

When you submit a map-reduce job (or pig/hive job), Hadoop first calculates the input splits, each input split size generally equals to HDFS block size. For example, for a file of 1GB size, there will be 16 input splits, if block size is 64MB. However, split size can be configured to be less/more than HDFS block size. Calculation of input splits is done with org.apache.hadoop.mapred.FileInputFormat. For each of these input splits, a map task must be started.


First, let's investigate properties that govern the input split size:
mapred.min.split.size 
Default value: 1
Description: The minimum size chunk that map input should be split into. Note that some file formats may have minimum split sizes that take priority over this setting.

mapred.max.split.size 
Default value: This configuration cannot be set in Hadoop 1.0.3, it is calculated in code. However in later versions, its default value is Long.MAX_VALUE, that is 9223372036854775807.
Description: The largest valid size inbytes for a file split.

dfs.block.size
Default value: 64 MB, that is 67108864
Description: The default block size for new files.

If you are using a newer Hadoop version, some of the above properties are deprecated. You can check from here.


Configure the properties:
You can set mapred.min.split.size in mapred-site.xml
<property>
  <name>mapred.min.split.size</name>
  <value>127108864</value>
</property>

You can set dfs.block.size in hdfs-site.xml
<property>
  <name>dfs.block.size</name>
  <value>67108864</value>
</property>

Split Size Calculation Examples:
Calculation of input split size is done  in InputFileFormat as:
Math.max(minSize, Math.min(maxSize, blockSize));
mapred.min.split.sizemapred.max.split.sizedfs.block.sizeInput Split SizeInput Splits (1GB file)
1 (default)Long.MAX_VALUE(default)64MB(Default)64MB16
1 (default)Long.MAX_VALUE(default)128MB128MB8
128MBLong.MAX_VALUE(default)64MB128MB8
1 (default)32MB64MB32MB32


Configuring input split size larger than block size decreases data locality and can degrade performance.
According to above table, if file size is 1GB, there will be respectively 16, 8, 8 and 32 input splits.

What if input files are too small?
FileInputFormat splits files that are larger than split size. What if out input files are too small? In this point, FileInputFormat creates a input split per file. For example, if you have 100 10KB files and input split size is 64MB, there will be 100 input splits. Total file size is 1MB, but we will have 100 input splits and 100 map tasks. This is known as Hadoop small files problem. You can look at CombineFileInputFormat for a solution. Apache Pig combines small input files into one map by default.

So the number of map tasks depends on
- Total size of input
- Input split size
- Structure of input files (small files problem)

2. Number of Reducer Tasks

The number of reduce tasks to create is determined by themapred.reduce.tasks property in the JobConf, which is set by the setNumReduceTasks() method, and Hadoop simply creates this number of reduce tasks to be run.