21 Mart 2015 Cumartesi

Hadoop Shuffle and Sort Phase

I will try to explain some points about shuffle&sort phase of Hadoop map-reduce.

It is tested with Hadoop 1.0.3 on Ubuntu 12.04.

1. Map-Reduce Mechanics

When writing map-reduce code, we implement map and reduce functions to define the logic of data flow. We get key-value pair as an input to our map function and emit a new key-value pair. These key-value pairs emitted from map tasks are then given to reduce task. Reduce function gets a key and set of values and then emits a key-value pair finishing the processing for that data.

This is an efficient abstraction hiding many details of processing done in the background. However, understanding the mechanics of underlying process is key to grasp the full capability of map-reduce and also key to optimize the performance of map-reduce jobs.

I will focus on shuffle and sort phase. I will also give an example to clarify some points.

2. Shuffle And Sort

Shuffle phase covers the transformation of map outputs and their transfer to reducers. Sort phase covers the merge and sort of map outputs in reducers. Below figure taken from "Hadoop Definitive Guide" outlines this process.

Map Side

When map function starts to produce key-value pairs, these values are not directly written to disk but stored in memory. While in memory they are divided into different partitions corresponding to the reducers they will be sent. If the number of reducers is set to 3 (by Job.setNumReduceTasks(3)), there will be 3 partitions.
Data coming to these partitions are then sorted by key. If combiner function is specified, it is run on sorted data. When memory limit reserved for this operation is reached, data is spilled to disk. When spill operation is continuing, new coming data can go to memory. At the end of map task, there will be several spill files. These spill files are then merged and sorted to produce the output file of map task.

To summarize, in the map side,

  • Map output is written to local disk as a single file
  • Map output file contains partitions that are ready to be copied to reducers
  • Partitions in map output are sorted by key 
  • if combiner function is specified, combiner operation is performed on map output
  • if map output compression is enabled, map output is compressed
Note: If combiner function and map output compression are used, they both increase performance. These operations reduces the size of data that is written to disk and transferred over network to reducers.

Shuffle&sort term somewhat implies that reducer side is performing sort operation, however initial sort is performed on map side. There are multiple configuration parameters that govern this map side process. For further reading, I recommend Hadoop Definitive Guide, Chapter 6.


Reduce Side

As soon as there are finished map tasks, reducers start to copy map outputs. These outputs are copied to reducer's memory if data is small enough, otherwise written to disk. Reducer collects its partition data from multiple map outputs and then merges them as they are available. When merging outputs, it maintains the sort order. When all partitions are collected and merged, reduce function is called on data.
When reduce function finishes processing merged data, it writes its output to HDFS. If replication factor is larger than 1, the first replica is stored on reducer machine assuming it has Datanode daemon running.

To summarize, in the reduce side,

  • Partitions from multiple map outputs are copied to memory and local disk
  • Copied partitions are merged maintaining the sort order. 
    • This can be thought as second sort of data, given that first sort is performed on map side.
  • If map and reduce functions use same field as key, output of reduce function will also be sorted. This is because input data is sorted.
    • However, reduce outputs are not sorted globally. Every reduce output has sort order limited to its own data.
  • If reduce output compression is enabled, reduce output is compressed
  • Reducer output is written to HDFS

3. Example

I will list some implications through an example:
We have a map-reduce job that takes a file with size 100 MB and we set the number of reducers to 5. HDFS block size and input split size for job is 64 MB.

Implications

On map side:
  • We will have 2 map output files written to disk. 
  • This disk location is determined by mapred.local.dir. If mapred.local.dir equals to /app/hadoop, one map output file may be "/app/hadoop/mapred/local/taskTracker/isa/jobcache/job_201503211800_0022/attempt_201503211800_0022_m_000000_0/output/file.out"
  • Every output file can have up to 5 partitions which we can observe by opening the file.
  • Every partition in output file will contain data sorted by key.
On reduce side
  • We will have 5 output files in HDFS
  • Every output file will have its own keys sorted
    • This show that multiple map outputs are merged maintaining the sort order.
  • There is no global sort order in whole output. (Although this can be achieved)

Further Reading

Hadoop Definitive Guide Chapter 6






























12 Şubat 2015 Perşembe

Nodejs Inheritance

I will give an example on how inheritance can be applied in JavaScript and Node.js. This post does not cover the topic in its entirety, but explains crucial points.

It is tested with node 0.6.12 on Ubuntu 12.04

1. Prototype Chain

When it comes to inheritance, JavaScript only has one construct: objects. Each object has an internal link to another object called its prototype. That prototype object has a prototype of its own, and so on until an object is reached with null as its prototype. null, by definition, has no prototype, and acts as the final link in this prototype chain. In addition to this, every object has Object.prototype (prototype of Object) in its prototype chain.
An object inherits methods and properties in its prototype chain. This also implies that all objects inherit methods and properties of Object.prototype.

1.1. Property Access

Functions and variables are both treated same way as properties. When it is attempted to access a property of an object, this property is looked up in object's own properties.  If the property is not found directly in the object, it is looked up in its direct prototype. If it is not in the direct prototype, it is looked up in second prototype in the chain.  Until a null prototype is found, this traversal continues. Usage of this in a function in an object's prototype does not refer to prototype but inheriting object.

These has following implications
  • Properties can be shadowed:When a object redefines a property (function, variable etc.), it shadows the property down in its prototype chain. 
  • This property shadowing can be used as a method overriding mechanism.
  • Resolution of this to inheriting object also provides method overriding rules similar to Java method invocation rules.
1.2. prototype, __proto__, Object.getPrototypeOf
prototype is used with types whereas __proto__ or Object.getPrototypeOf() is used for instances.

var a1 = new A();
var a2 = new A();
A.prototype == Object.getPrototypeOf(a1) == Object.getPrototypeOf(a2) == a1.__proto__ == a2.__proto__

2. Example

In following example, super class -if we use OO terms- will have 2 methods and sub class will override one method.

function SuperClass(name){
  this.name = name;
}

SuperClass.prototype = {
  /**
 * Validate the request
 * Default : true
 */
 validateRequest : function(input, callback){
  console.log("Validating the request: ", input);
  callback(null, true);
 },
 /**
 * 1. Validate the request
 * 2. Reply with response
 */
 process : function(input, callback){
  var self = this;
  console.log("Processing the request");
  self.validateRequest(input, function(err, result){
   if(err){
    console.log("Error when validating the request: ", err);
    callback(err);
   }

   if(result == true){
    console.log("Validated!")
    callback(null, "OK");
   }else{
    console.log("Validation failed!")
    callback(null, "FAIL");
   }
  });
 }
}

function SubClass(){
 SuperClass.call(this, "Greater Than Test");
}

SubClass.prototype = Object.create(SuperClass.prototype, {
    validateRequest : {
      value: function(request, callback){ // override
         console.log("Validating request...");
         if(request > 10){
          callback(null, true);
         }else{
          callback(null, false);
         }
         
      },
      enumerable: true,
      configurable: true,
      writable: true
    }
});

var sub = new SubClass();
console.log(SuperClass.prototype);
console.log(SubClass.prototype);
sub.process(15, function(err, result){
 console.log(result);
});


> node inheritance.js 
{ validateRequest: [Function], process: [Function] }
{ validateRequest: [Function] }
Processing the request
Validating request...
Validated!
OK


The important points are:

  • SubClass constructor uses call method to run the constructor of  SuperClass. Usage of this comes from the requirements of call method.
  • Object.craete() method enables us to assign SuperClass.prototype to the prototype chain of SubClass. In other words, SubClass prototype assigns SuperClass.prototype as its upper prototype
  • Call to sub.process() calls function in SuperClass and than calls SubClass's validateRequest method
  • Super class implementation of validateRequest function can be used by adding below to overriding method.
SuperClass.prototype.validateRequest.call(this, request, callback);




Further Reading

https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Inheritance_and_the_prototype_chain
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Inheritance_Revisited
http://book.mixu.net/node/ch6.html

11 Şubat 2015 Çarşamba

Install and Run Zookeeper in Replicated Mode

I will explain the steps for installing Zookeeper and running in standalone and replicated mode.

It is tested with Zookeeper 3.4.6 on Ubuntu 12.04.

1. Prerequisites 

i. Java 1.6 or greater is needed to run Zookeeper. I will continue with Java 1.7
> apt-get install openjdk-7-jdk

ii. I will use supervisord to control Zookeeper
> apt-get install supervisor

2. Install Zookeeper

Get a Zookeeper binary. I will use http://ftp.itu.edu.tr/Mirror/Apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

> cd /usr/local
> wget http://ftp.itu.edu.tr/Mirror/Apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
> tar -xzvf zookeeper-3.4.6.tar.gz
> ln -s zookeeper-3.4.6 zookeeper

Create data and log directories for Zookeeper
> mkdir /var/zookeeper
> mkdir /var/zookeeper-log
> mkdir /var/log/zookeeper

Enter into /usr/local/zookeeper/conf directory
> cd /usr/local/zookeeper/conf
> vi zoo.cfg

Enter following configuration properties:
tickTime=2000
dataDir=/var/zookeeper
dataLogDir=/var/zookeeper-log
clientPort=2181
autopurge.purgeInterval=24
autopurge.snapRetainCount=5

Save and exit.
Following are explanations for these parameters, excerpt from Zookeeper documentation:

tickTime
the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be twice the tickTime.

dataDir
the location to store the in-memory database snapshots and, unless specified otherwise, the transaction log of updates to the database.

dataLogDir
the location to store the transaction log of updates to the database.

clientPort
the port to listen for client connections

3. Run In Standalone Mode

Lets run Zookeeper:
> cd /usr/local/zookeeper/bin
> ./zkServer.sh start

This will start Zookeeper and will fork into background. You can start in the foreground:
> ./zkServer.sh start-foreground

You can stop Zookeeper with:
> ./zkServer.sh stop

4. Run with Supervisord

Edit supervisord config
> vi /etc/supervisor/supervisord.conf

Enter following entry to the end of the file.

[program:zookeeper]
command=/usr/local/zookeeper/bin/zkServer.sh start-foreground
user=root
autostart=true
autorestart=true
startsecs=10
startretries=999
log_stdout=true
log_stderr=true
logfile=/var/log/zookeeper/zookeeper.out
logfile_maxbytes=20MB
logfile_backups=10

Enter into supervisord console
> sudo supervisorctl

You can monitor and start/stop Zookeeper from supervisor console
supervisor > status zookeeper
supervisor > start zookeeper
supervisor > stop zookeeper
supervisor > tail zookeeper

5. Run in Replicated Mode

In order to run Zookeeper in replicated mode, we will install Zookeeper to 3 machines. It is the least number recommended for production deployments. It is wise to install odd numbers of servers greater or equal to 3. Zookeeper needs more than half of the ensemble (Zookeeper cluster) up and running to be operational.

In each server, apply following:
Edit zoo.cfg
> cd /usr/local/zookeeper/conf
> vi zoo.cfg

initLimit=10
syncLimit=5
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

Enter into data directory and create a file named myid:
> cd /var/zookeeper
> vi myid
Enter 1 for zoo1 server stating server id.
Enter 2 for zoo2 server stating server id.
Enter 3 for zoo3 server stating server id.


Following are explanations for these parameters, excerpt from Zookeeper documentation

The new entry, initLimit is timeouts ZooKeeper uses to limit the length of time the ZooKeeper servers in quorum have to connect to a leader. The entry syncLimit limits how far out of date a server can be from a leader.

With both of these timeouts, you specify the unit of time using tickTime. In this example, the timeout for initLimit is 10 ticks at 2000 milleseconds a tick, or 20 seconds.

The entries of the form server.X list the servers that make up the ZooKeeper service. When the server starts up, it knows which server it is by looking for the file myid in the data directory. That file has the contains the server number, in ASCII.

Finally, note the two port numbers after each server name: " 2888" and "3888". Peers use the former port to connect to other peers. Such a connection is necessary so that peers can communicate, for example, to agree upon the order of updates. More specifically, a ZooKeeper server uses this port to connect followers to the leader. When a new leader arises, a follower opens a TCP connection to the leader using this port. Because the default leader election also uses TCP, we currently require another port for leader election. This is the second port in the server entry.



MySQL InnoDB Transaction Model

InnoDB transaction examples (for INSERT and UPDATE/DELETE operations) will be given and READ_COMMITTED and REPEATABLE_READ isolation levels will be compared.

It is tested with Mysql 5.5.
Some of the information are taken from http://dev.mysql.com/doc/refman/5.5/en/innodb-transaction-model.html.


1. BACKGROUND

Excerpt from MySQL documentation

1. Mysql InnoDB does locking on row level.

* Record lock: This is a lock on an index record. Record locks always lock index records, even if a table is defined with no indexes. For such cases, InnoDB creates a hidden clustered index and uses this index for record locking

* Gap lock: This is a lock on a gap between index records, or a lock on the gap before the first or after the last index record.

* Next-key lock: This is a combination of a record lock on the index record and a gap lock on the gap before the index record.

2. Mysql supports following transaction isolation levels

i. REPEATABLE READ

Features:
* Dirty Read => NO: It does not read uncommitted results.
* Repeatable read => YES: In a transaction, multiple select statements return same row with same content
* Phantom read => YES: In a transaction, multiple select statements return same result set with exact same number
* For UPDATE, and DELETE statements, locking depends on whether the statement uses a unique index with a unique search condition, or a range-type search condition. For a unique index with a unique search condition, InnoDB locks only the index record found, not the gap before it. For other search conditions, InnoDB locks the index range scanned, using gap locks or next-key (gap plus index-record) locks to block insertions by other sessions into the gaps covered by the range.

ii. READ COMMITTED
* Dirty Read: NO : It does not read uncommitted results.
* Repeatable read => NO : Each consistent read, even within the same transaction, sets and reads its own fresh snapshot. This means that multiple select statements may return different content for the same row.
* Phantom Read => YES
* For UPDATE statements, and DELETE statements, InnoDB locks only index records, not the gaps before them, and thus permits the free insertion of new records next to locked records.

iii. READ UNCOMMITTED
iv. SERIALIZABLE

3. In InnoDB, all user activity occurs inside a transaction. If auto-commit mode is enabled, each SQL statement forms a single transaction on its own.

4. By default, MySQL starts the session for each new connection with auto-commit enabled, so MySQL does a commit after each SQL statement if that statement did not return an error.

5. If auto-commit mode is disabled within a session with SET auto-commit = 0, the session always has a transaction open. A COMMIT or ROLLBACK statement ends the current transaction and a new one starts.

2. SETUP

We will investigate two scenarios.

* Two transactions making UPDATE on same table
* One transaction making INSERT after another transaction making UPDATE on same table
We will try each scenario for each of above tables

mytable
-id (Primary Key)
-name (Secondary Index)
-value

mytable_no_index
-id (Primary Key)
-name
-value

3. SCENARIO 1 - Two transactions making UPDATE

CASE 1 - Table without secondary index - mytable_no_index


i. REPEATABLE_READ
Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable_no_index SET value=22 WHERE name='a'
...
Get lock and run query
UPDATE mytable_no_index SET value=22 WHERE name='b'
...
Blocked waiting for lock
...
Blocked waiting for lock
Commit Transaction
Get lock and run query

Commit Transaction
Explanation: Since transaction iso lation level is repeatable_read, InnoDB locks the indexes until it finds the desired row. Since there is no secondary index for name column, query scans all rows. Even though WHERE clause eliminates other rows, all scanned indexes/rows are not released. As a result, second update query waits for the lock, although the first transaction updates a different row.


ii. READ_COMMITED
SCENARIO 1 Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable_no_index SET value=22 WHERE name='a'
...
Get lock and run query
UPDATE mytable_no_index SET value=22 WHERE name='b'
...
Get lock and run query
...
...
Commit Transaction
...

Commit Transaction
Explanation: Since transaction isolation level is read_commited, InnoDB locks only the index. WHERE clause eliminates other rows releasing their lock,releasing in contrast to repeatable_read isolation level. Because of this, second update query runs without waiting any lock.


CASE 2 - Table with secondary index - mytable


i. REPEATABLE_READ
Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable SET value=22 WHERE name='a'
...
Get lock and run query
UPDATE mytable SET value=22 WHERE name='b'
...
Get lock and run query
...
...
Commit Transaction
...

Commit Transaction
Explanation: Since transaction isolation level is repeatable_read, InnoDB locks the indexes until it finds the desired row. However, since there is a secondary index on name column, it will scan indexes for value a and will not scan whole table. As a result, second update query will not wait for lock and run.


Lets expand it more:

Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable SET value=22 WHERE name='a' and value=1
...
Get lock and run query
UPDATE mytable SET value=22 WHERE name='a' and value=2
...
Blocked waiting for lock
...
Blocked waiting for lock
Commit Transaction
Get lock and run query

Commit Transaction
Explanation: Since both update statements scan same index records for a, first transaction lock all rows for a and second transaction waits for lock.


ii. READ_COMMITED
Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable SET value=22 WHERE name='a'
...
Get lock and run query
UPDATE mytable SET value=22 WHERE name='b'
...
Get lock and run query
...
...
Commit Transaction
...

Commit Transaction
Explanation: Since transaction isolation level is read_commited, InnoDB locks only the index. Its outcome is identical to updating a table with secondary index.


4. SCENARIO 2 - One transaction making INSERT after another transaction making UPDATE

CASE 1 - Table without secondary index - mytable_no_index


i. REPEATABLE_READ
Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable_no_index SET value=22 WHERE name='a'
...
Get lock and run query
INSERT INTO mytable_no_index(name, value) VALUES(‘abc’, 1)
...
Blocked waiting for lock
...
Blocked waiting for lock
Commit Transaction
Get lock and run query

Commit Transaction
Explanation: Since transaction isolation level is repeatable_read, InnoDB locks the indexes until it finds the desired row. Since there is no secondary index for name column, query scans all rows. In addition to that, it puts gap locks for the indexes, thus disabling any insert by other transactions. This is done to block phantom reads.


ii. READ_COMMITED
Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable_no_index SET value=22 WHERE name='a'
...
Get lock and run query
INSERT INTO mytable_no_index(name, value) VALUES(‘abc’, 1)
...
Get lock and run query
...
...
Commit Transaction
...

Commit Transaction
Explanation: Since transaction isolation level is read_commited, InnoDB locks only index records, not the gaps before them, and thus permits the free insertion of new records next to locked records.


CASE 2 - Table with secondary index - mytable


i. REPEATABLE_READ
Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable SET value=22 WHERE name='a'
...
Get lock and run query
INSERT INTO mytable(name, value) VALUES(‘abc’, 1)
...
Blocked waiting for lock
...
Blocked waiting for lock
Commit Transaction
Get lock and run query

Commit Transaction
Explanation: Since transaction isolation level is repeatable_read, InnoDB locks the indexes until it finds the desired row. It locks the index records for a. In addition to that, InnoDB puts gap locks before selected index, thus disabling any insert by other transactions. Since insert query will use index records for a, it will wait for lock.


Lets expand more.


Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable SET value=22 WHERE name='a'
...
Get lock and run query
INSERT INTO mytable(name, value) VALUES(‘bcd’, 1)
...
Get lock and run query
...
...
Commit Transaction
...

Commit Transaction



Since bcd is after index record a, it will not wait for lock

Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable SET value=22 WHERE name='a'
...
Get lock and run query
INSERT INTO mytable(name, value) VALUES(‘012’, 1)
...
Blocked waiting for lock
...
Blocked waiting for lock
Commit Transaction
Get lock and run query

Commit Transaction
Since 012 is in the gap before index record a, it will wait for lock

ii. READ_COMMITED
Transaction 1
Transaction 2
Start Transaction
Start Transaction
UPDATE mytable SET value=22 WHERE name='a'
...
Get lock and run query
INSERT INTO mytable(name, value) VALUES(‘abc’, 1)
...
Get lock and run query
...
...
Commit Transaction
...

Commit Transaction
Explanation: Since transaction isolation level is read_commited, InnoDB locks only index records, not the gaps before them, and thus permits the free insertion of new records next to locked records.

5. DISABLING GAP LOCKS

Excerpt from MySQL documentation

Gap locking can be disabled explicitly. This occurs if you change the transaction isolation level to READ_COMMITTED or enable the innodb_locks_unsafe_for_binlog system variable. Under these circumstances, gap locking is disabled for searches and index scans and is used only for foreign-key constraint checking and duplicate-key checking.

If innodb_locks_unsafe_for_binlog variable is set to false, insert operations will not be blocked.


6. Where to Go From Here

http://dev.mysql.com/doc/refman/5.5/en/innodb-concepts.html
http://dev.mysql.com/doc/refman/5.5/en/set-transaction.html