Monday, September 2, 2013

Map Reduce Programming Concepts

Map Reduce Programming is a simple programming model which is almost always coined up when we talk about Hadoop, a framework for storing, retrieving, processing and manipulating large amount of data sets which could go up to multi-terabytes. Hadoop makes use of Map Reduce Programming model to process this vast amount of data which is spread across large set of commodity hardware nodes known as clusters, in parallel. The Map Reduce Programming is specially used for aggregating or segregating the large dataset in a parallel fashion so that the processing power of all the nodes in the cluster is put to use and hence, high efficiency and quick results can be achieved. Here, in this blog I have tried to explain Map Reduce Programming in context to the Hadoop Framework.
The fundamental data type on which the Map Reduce programming operates is <Key, Value> pairs. These Key-Value pairs are analogous to HashMap in Java, Associated Arrays in PHP, Hash in Ruby and so on. Map Reduce job (program) mainly (not necessarily) consists two types of Job processes working in tandem. The first one is Map (referred to as Mapper) & another is Reduce (referred to as Reducer). The input data-set which does not necessarily has to be in <Key, Value> pairs, is fed into Mapper. The input is broken down into independent chunks and further into <Key, Value> pairs and fed to Map jobs. Map job is run exactly once for each Key, in parallel on nodes, and will break down <Key, Value> pairs further. In Hadoop, for each input <K, V>, there could zero or more output <K, V> pairs. So, one input <K, V> pair could produce one or more than one or do not generate a output <K, V> at all. Once this operation is completed on all nodes (which is running Map jobs), the Hadoop sorts the output of the Maps and pass them on to the Reducer job(s), which could also be running in parallel on various cluster nodes. While sorting the Maps output, the values within same key are aggregated & collected to form a list or an array (to be more specific) and form a new <K, V> pair which is like <K, [V1, V2, V3...]>. This aggregated <K, V> pair is then input to the Reduce jobs so that it can be processed further down to produce final <K, V> pairs. 
Let me explain this by an example. I am taking an example of small & simple program WordCount 1.0 which can be found on Hadoop siteThis example will get you started with Map Reduce Programming and you'll 
get the feel of how it works.

The Wordcount program simply counts number of occurrences of each word in 
given input set (two text files in this case)

So, lets assume we have two text files:-

File01.txt
Hello World Bye World

File02.txt
Hello Hadoop Goodbye Hadoop

Before feeding these input files to Map jobs, these files are broken into chunks 
separated by newlines. So, each map job will receive exactly one line at a time 
to process. It will further break 
it down along the white spaces and create Key-Value for each word found 
like <(Word), 1>. In our case the output of the map jobs will be like:-

<Hello, 1>
<World, 1>
<Bye, 1>
<World, 1>
<Hello, 1>
<Hadoop, 1>
<Goodbye, 1>
<Hadoop, 1>

Once all the map jobs are done, these Key-Values are sorted by key and pushed to the Reducer jobs. Reducer aggregates these pairs to emit new Key-Value pairs like this:-

<Bye, 1>
<Goodbye, 1>
<Hadoop, 2>
<Hello, 2>
<World, 2>

Note the output. The keys are lexicographically sorted and aggregated. These output can be saved to output text files or depending on the needs, can be pushed into Database or other data sources. In context to Hadoop, Map Reduce programs can have number of Mapper and Reducer jobs which are customizable. Reducer jobs are completely optional and can be omitted for certain jobs.

I hope this small tutorial on Map Reduce Programming helped you understand the concept and get you started. In the meantime, if you have any questions/concerns, you can contact me at developer.vineet@gmail.com or you can comment here. You can also checkout this content at my blog http://www.technologywithvineet.com

Tuesday, August 27, 2013

Import data from RDBMS to HDFS using SQOOP

This tutorial assumes that you have successfully installed Hadoop & SQOOP and both are up and running.

Lets start with a simple example to import data from MySQL to HDFS. SQOOP can operate on data with minimum instructions and we are going to start with it and then move on to other advanced options.

We have a MySQL table like this

CREATE TABLE IF NOT EXISTS `employees` (
  `id` float NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `department` varchar(255) NOT NULL,
  `salary` float NOT NULL,
  `created_at` datetime NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM  DEFAULT CHARSET=latin1;

Or Postgres Table like this
CREATE TABLE employees (
    id integer NOT NULL,
    name character varying(255),
    department character varying,
    salary numeric,
    created_at timestamp without time zone
);

Insert some rows into it

INSERT INTO `employees` VALUES
(1, 'Vineet Sethi', 'HR', 25000, '2013-02-25 00:00:00'),
(2, 'Monika Sethi', 'Finance', 15000, '2014-02-21 00:00:00'),
(3, 'Aleen Sethi', 'Admin', 25700.5, '2013-04-02 00:00:00'),
(4, 'Rahul Sethi', 'Finance', 20000, '2013-05-07 00:00:00');

Since, you can use Sqoop with any other JDBC-compliant database. Download the MySQL JDBC driver connector from MySQL http://dev.mysql.com/downloads/connector/j/, untar it,  save it, and move mysql-connector-java-5.1.26-bin.jar into your SQOOP_HOME/lib or for Postgres download the Postgres JDBC driver connector from http://jdbc.postgresql.org/download.html  and move postgresql-9.2-1003.jdbc4.jar (in my case) into your SQOOP_HOME/lib, so that it remains available to SQOOP & MR job. 

Thats it! we are ready to import the data to HDFS. Run Sqoop with minimum instructions to import data from MySQL/Postgres to HDFS like this:-

Minimal import command

For MySQL
#sqoop import --connect jdbc:mysql://<HOST>/<DATABASE> --table employees --username <DB USERNAME> -P

For Postgres
#sqoop import --connect jdbc:postgresql://<HOST>/<DATABASE> --table employees --username <DB USERNAME> -P

This command will trigger the Map-Reduce job to import the data from MySQL Table to HDFS. 
--connect is a JDBC connection string and we have used MySQL connection string. Don't forget to replace <DATABASE> with yours.
--table is a MySQL table to export from. It can also be a MySQL View
--username MySQL DB username. Don't forget to replace it with yours.
-P option allows to enter password on the console rather that passing it as an argument to the above command. The -P argument is the preferred method over using the --password argument. 

By default, Sqoop will import a table named employees to a directory named employees inside your home directory in HDFS. For example, if your username is hduser, then the import tool will write to /user/hduser/employees/(files). Since, we are using very small dataset, we can reduce the number of parallelism & hence output files generated by minimizing the number of mappers for this job. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m or --num-mappers argument. It controls the degree of parallelism to employ. 

So modifying the above command a bit (for better output results) by specifying the -m option and --delete-target-dir if /user/hduser/employees dir already exists.

#sqoop import --connect jdbc:mysql://ubuntu/<DATABASE> --table employees --username <DB USERNAME> -P -m 1 --delete-target-dir

After the job is run, the dir /user/hduser/employees looks like this
#hadoop fs -ls /user/hduser/employees/

Found 2 items
-rw-------   3 hduser supergroup          0 2013-08-28 04:56 /user/hduser/employees/_SUCCESS

-rw-------   3 hduser supergroup        211 2013-08-28 04:55 /user/hduser/employees/part-m-00000

and looking at the contents of /user/hduser/employees/part-m-00000

#hadoop fs -cat /user/hduser/employees/part-m-00000

1.0,Vineet Sethi,HR,25000.0,2013-02-25 00:00:00.0
2.0,Monika Sethi,Finance,15000.0,2014-02-21 00:00:00.0
3.0,Aleen Sethi,Admin,25700.5,2013-04-02 00:00:00.0
4.0,Rahul Sethi,Finance,20000.0,2013-05-07 00:00:00.0

You can see by default Sqoop creates the delimited text file, ',' as field seperator character & newline(\n) as end-of-line character. 

Change delimited text 

If we want to delimit the text file with '$' as field seperator & '|' as end-of-line character pass on the options like this

#sqoop import --connect jdbc:mysql://ubuntu/mr_data --table employees --username root -P -m 1 --delete-target-dir --fields-terminated-by \$ --lines-terminated-by \|

You might have to escape the shell characters with backslash '\'. The output of the above command would be like this

1.0$Vineet Sethi$HR$25000.0$2013-02-25 00:00:00.0|2.0$Monika Sethi$Finance$15000.0$2014-02-21 00:00:00.0|3.0$Aleen Sethi$Admin$25700.5$2013-04-02 00:00:00.0|4.0$Rahul Sethi$Finance$20000.0$2013-05-07 00:00:00.0|


Column selection & ordering

By default, while importing all the table columns imported in their natural order. You can limit the columns to import and change their natural ordering by --columns option, So, if just want to include  name, department & salary while skipping id & created_at fields

#sqoop import --connect jdbc:mysql://ubuntu/mr_data --table employees --username root -P -m 1 --delete-target-dir --columns "name, department, salary"


Output file formats

Delimited text is the default file format. The other file formats in which the data can be imported is Sequence file format and AVRO file format. You can use the --as-sequencefile or --as-avrodatafile respectively to change the output file format. SequenceFiles are a binary format that store individual records in custom record-specific data types and is appropriate for storing binary data. MapReduce programs reads and write this format faster, giving performance boost. Avro data files are a compact, efficient binary format that provides interoperability with applications written in other programming languages.


Incremental Imports

SQOOP provides an incremental import so that only the rows which are new are appended to already imported set of rows. So, lets say you have 4 new more rows in your table 

INSERT INTO `employees` (`id`, `name`, `department`, `salary`) VALUES
(5, 'Fabio Russo', 'HR', 25000, '2013-02-25 00:00:00'),
(6, 'Nick Villaume', 'Finance', 15000, '2014-02-21 00:00:00'),
(7, 'Mary Villaume', 'Admin', 25700.5, '2013-04-02 00:00:00'),
(8, 'John Bat', 'Finance', 20000, '2013-05-07 00:00:00');

and you want to incrementally import it. So, after running the command with options --check-column <column name>, --incremental <append or lastmodified> and --last-value <value>

#sqoop import --connect jdbc:mysql://ubuntu/mr_data --table employees --username root -P -m 1 --check-column "id" --incremental "append" --last-value 4

This command will search the rows whose id > 4 and gets appended to the last imported result. So, if we look at the /user/hduser/employees dir

Found 3 items
-rw-------   3 hduser supergroup          0 2013-08-30 04:31 /user/hduser/employees/_SUCCESS
-rw-------   3 hduser supergroup        107 2013-08-30 04:31 /user/hduser/employees/part-m-00000

-rw-------   3 hduser supergroup        210 2013-08-30 04:53 /user/hduser/employees/part-m-00001

You can see part-m-00001 file is appended to last imported records whose content is 

5.0,Micheal Sethi,HR,25000.0,2013-02-25 00:00:00.0
6.0,Monica Sethi,Finance,15000.0,2014-02-21 00:00:00.0
7.0,Nick Sethi,Admin,25700.5,2013-04-02 00:00:00.0

8.0,Paul Sethi,Finance,20000.0,2013-05-07 00:00:00.0

Offcourse, SQOOP is highly customizable and we saw how we can customize it further according to our needs. You can see the further options by typing 

sqoop import --help

and that will give you many more options to use with the command.

I hope this tutorial is good to start with and help you. To reach out to me you can connect to me at developer.vineet@gmail.com and @vineetsethi25 on twitter. You can also checkout this content on my blog http://www.technologywithvineet.com

Saturday, August 24, 2013

Import/Export data from/to RDBMS(like MySQL, Postgres) to/from HBase using Map Reduce Job

This post assumes that you have successfully installed HADOOP, HBase and they are up and running. We are going to import data from RDBMS (like MySQL, Postgres, Oracle or any RDBMS whose Java JDBC drivers are available) to HBase . I am going to use MySQL as RDBMS & we are going to take the advantage of Map-Reduce and write a small job around it. I am going to give you the general idea of how to get things done while importing/exporting data from/to RDBMS. You can use this program to import/export data, customize it accordingly and run to achieve the desired results.

To know on how to Import RDBMS data to HDFS, check this out http://hadup.blogspot.in/2013/08/import-data-from-rdbmslike-mysql.html


So, here we go...


We have a MySQL table like this

CREATE TABLE IF NOT EXISTS `employees` (
  `id` float NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `department` varchar(255) NOT NULL,
  `salary` float NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM  DEFAULT CHARSET=latin1 AUTO_INCREMENT=5 ;

Insert some rows into it

INSERT INTO `employees` (`id`, `name`, `department`, `salary`) VALUES
(1, 'Vineet Sethi', 'HR', 25000),
(2, 'Monika Sethi', 'Finance', 15000),
(3, 'Aleen Sethi', 'Admin', 25700.5),
(4, 'Rahul Sethi', 'Finance', 20000);

Also create a table named test_out in HBase with column family cf, in which we have to import the data.

Download the MySQL JDBC driver connector from MySQL http://dev.mysql.com/downloads/connector/j/, untar it and save it, and move mysql-connector-java-5.1.26-bin.jar into your HADOOP_PATH/lib, so that it remains available to MR job.

We are going to use org.apache.hadoop.mapreduce.lib.db.DBInputFormat class as an InputFormat to MR job but before that lets make a Employee class which is Writable by Hadoop Map-Reduce.

EmployeeWritable.java


import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class EmployeeWritable implements Writable, DBWritable{


private int id;

private String name;
private String department;
private double salary;


public int getId() {
return id;
}

public String getName() {

return name;
}

public String getDepartment() {

return department;
}

public double getSalary() {

return salary;
}

@Override

public void readFields(ResultSet rs) throws SQLException {
this.id = rs.getInt(1);
this.name = rs.getString(2);
this.department = rs.getString(3);
this.salary = rs.getDouble(4);

}

@Override

public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, this.id);
statement.setString(2, this.name);
statement.setString(3, this.department);
statement.setDouble(3, this.salary);
}

@Override

public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
out.writeUTF(this.name);
out.writeUTF(this.department);
out.writeDouble(this.salary);
}

public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = in.readUTF();
this.department = in.readUTF();
this.salary = in.readDouble();
}
}

For importing from RDBMS, we just to write a Mapper and Reducer. Each record from MySQL will be passed on to the Mapper and Reducer will put that record accordingly in HBase. We are going to use to major classes DBInputFormat and TableMapReduceUtil to write our Mapper and Reduce respectively

So, lets make Map-Reduce job HBaseDBImportMapper HBaseImportMap and Reducer HBaseImportReduce.

Here is our MR Job HBaseDBImport


import java.io.IOException;

import my.test.inputformat.EmployeeWritable;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HBaseDBImport extends Configured implements Tool  {

private static class HBaseImportMap extends Mapper<LongWritable, DBWritable, LongWritable, EmployeeWritable> {
@Override
public void map(LongWritable key, DBWritable value, Context context)
throws IOException, InterruptedException {
EmployeeWritable employee = (EmployeeWritable)value;
//String record = Integer.toString(employee.getId()) + "," + employee.getName() + "," + employee.getDepartment() + "," + Double.toString(employee.getSalary());
context.write(key, employee);
}

}

private static class HBaseImportReduce extends TableReducer<LongWritable, EmployeeWritable, ImmutableBytesWritable> {
public final byte[] CF = "cf".getBytes(); 
@Override
public void reduce(LongWritable key, Iterable<EmployeeWritable> employees, Context context)
throws IOException, InterruptedException {

for(EmployeeWritable employee : employees) {
Put put = new Put(Bytes.toBytes("employee" + employee.getId()));
put.add(CF, "name".getBytes(), employee.getName().getBytes());
put.add(CF, "department".getBytes(), employee.getDepartment().getBytes());
put.add(CF, "salary".getBytes(), Bytes.toBytes(Double.toString(employee.getSalary())));
context.write(null, put);
}
}

}

@Override
public int run(String[] arg0) throws Exception {
Configuration conf = HBaseConfiguration.create(this.getConf());

DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://<DB HOST>/<DATABASE>?", "<DB USERNAME>", "<DB PASSWORD>");

Job job = new Job(conf, HBaseDBImport.class.toString());
job.setJarByClass(HBaseDBImport.class);
DBInputFormat.setInput(job, EmployeeWritable.class, "employees", "1", "name, id", "id, name, department, salary");

job.setInputFormatClass(DBInputFormat.class);
job.setMapperClass(HBaseImportMap.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(EmployeeWritable.class);

TableMapReduceUtil.initTableReducerJob("<HBASE OUTPUT TABLENAME>", HBaseImportReduce.class, job);

//job.setNumReduceTasks(1);
job.waitForCompletion(true);
return 0;
}

public static void main(String[] args) throws Exception {

int status = ToolRunner.run(new Configuration(), new HBaseDBImport(), args);
System.exit(status);
}


}

To run this example don't forget to replace the values of <DB HOST><DATABASE>, <DB USERNAME>, <DB PASSWORD> and <HBASE OUTPUT TABLENAME> with yours. Make sure that HBase Table <HBASE OUTPUT TABLENAME> exists in the HBase and it has atleast one column family "cf". If you face any trouble including HBase or MySQL JDBC adapter in classpath you can look into my another blog http://hadup.blogspot.com/2013/08/how-to-deal-with-classnotfound.html

Next, I am going to tell you how to import and export data using SQOOP. In the meantime, if you have any questions/concerns, you can contact me at developer.vineet@gmail.com or you can comment here. You can also checkout this content on my blog http://www.technologywithvineet.com

Thursday, August 22, 2013

Import/export data from RDBMS(like MySQL, Postgres) to/from HDFS using Map Reduce Job

This post assumes that you have successfully installed HADOOP and its up and running. We are going to import data from RDBMS (like MySQL, Postgres, Oracle or any RDBMS whose Java JDBC drivers are available) to HDFS such that each record in the RDBMS table get converted into comma-separated list of records. I am going to use MySQL as RDBMS & we are going to take the advantage of Map-Reduce and write a small job around it. I am going to give you the general idea of how to get things done while importing/exporting. You can use this program to import/export data, customize it accordingly and run to achieve the desired results.

So, here we go...

We have a MySQL table like this

CREATE TABLE IF NOT EXISTS `employees` (
  `id` float NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `department` varchar(255) NOT NULL,
  `salary` float NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM  DEFAULT CHARSET=latin1 AUTO_INCREMENT=5 ;

Insert some rows into it

INSERT INTO `employees` (`id`, `name`, `department`, `salary`) VALUES
(1, 'Vineet Sethi', 'HR', 25000),
(2, 'Monika Sethi', 'Finance', 15000),
(3, 'Aleen Sethi', 'Admin', 25700.5),
(4, 'Rahul Sethi', 'Finance', 20000);

Download the MySQL JDBC driver connector from MySQL http://dev.mysql.com/downloads/connector/j/, untar it and save it mysql-connector-java-5.1.26-bin.jar into your HADOOP_PATH/lib, so that it remains available to MR job.

We are going to use org.apache.hadoop.mapreduce.lib.db.DBInputFormat class as an InputFormat to MR job but before that lets make a Employee class which is Writable by Hadoop Map-Reduce.

EmployeeWritable.java

import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class EmployeeWritable implements Writable, DBWritable{

private int id;
private String name;
private String department;
private double salary;


public int getId() {
return id;
}

public String getName() {
return name;
}

public String getDepartment() {
return department;
}

public double getSalary() {
return salary;
}

@Override
public void readFields(ResultSet rs) throws SQLException {
this.id = rs.getInt(1);
this.name = rs.getString(2);
this.department = rs.getString(3);
this.salary = rs.getDouble(4);

}

@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, this.id);
statement.setString(2, this.name);
statement.setString(3, this.department);
statement.setDouble(3, this.salary);
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
out.writeUTF(this.name);
out.writeUTF(this.department);
out.writeDouble(this.salary);
}

public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = in.readUTF();
this.department = in.readUTF();
this.salary = in.readDouble();
}


}

For importing from RDBMS, we just need to write a Mapper and skipping the Reducer. Each DB record will be passed to Mapper and we can pass-on comma-separated string on record. 

So, lets make Map-Reduce job HDFSDBImport & Mapper DBImportMap.

HDFSDBImport.java

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import EmployeeWritable;

public class HDFSDBImport extends Configured implements Tool {
private static class DBImportMap extends Mapper<LongWritable, DBWritable, Text, NullWritable> {
@Override
public void map(LongWritable key, DBWritable value, Context context)
throws IOException, InterruptedException {
EmployeeWritable employee = (EmployeeWritable)value;
String record = Integer.toString(employee.getId()) + "," + employee.getName() + "," + employee.getDepartment() + "," + Double.toString(employee.getSalary());
context.write(new Text(record), NullWritable.get());
}

}
}

Here is how we configure the MR job and run. Don't forget to replace the values of <DB HOST><DATABASE>, <DB USERNAME>, <DB PASSWORD> and <HDFS OUTPUT PATH> with yours.

Configuration conf = this.getConf();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://<DB HOST>/<DATABASE>?", "<DB USERNAME>", "<DB PASSWORD>");

Job job = new Job(conf, HDFSDBImport.class.toString());
job.setJarByClass(HDFSDBImport.class);
DBInputFormat.setInput(job, EmployeeWritable.class, "employees", "1", "name, id", "id, name, department, salary");

job.setInputFormatClass(DBInputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("<HDFS OUTPUT PATH>"));
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(DBImportMap.class);
job.waitForCompletion(true);

DBInputFormat will configure the Database and selects the rows to import based on SELECT...<fieldnames>..WHERE...ORDER BY. For more on DBInputFormat and its functionality, please consult the http://hadoop.apache.org/docs/stable/api.

Now, putting this all together in HDFSDBImport.java

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import EmployeeWritable;

public class HDFSDBImport extends Configured implements Tool {
private static class DBImportMap extends Mapper<LongWritable, DBWritable, Text, NullWritable> {
@Override
public void map(LongWritable key, DBWritable value, Context context)
throws IOException, InterruptedException {
EmployeeWritable employee = (EmployeeWritable)value;
String record = Integer.toString(employee.getId()) + "," + employee.getName() + "," + employee.getDepartment() + "," + Double.toString(employee.getSalary());
context.write(new Text(record), NullWritable.get());
}

}

/**
* @param args
* @throws Exception 
*/
public static void main(String[] args) throws Exception {
int status = ToolRunner.run(new Configuration(), new HDFSDBImport(), args);
System.exit(status);
}

@Override
public int run(String[] arg0) throws Exception {
Configuration conf = this.getConf();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://<DB HOST>/<DATABASE>?", "<DB USERNAME>", "<DB PASSWORD>");

Job job = new Job(conf, HDFSDBImport.class.toString());
job.setJarByClass(HDFSDBImport.class);
DBInputFormat.setInput(job, EmployeeWritable.class, "employees", "1", "name, id", "id, name, department, salary");

job.setInputFormatClass(DBInputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("<HDFS OUTPUT PATH>"));
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(DBImportMap.class);
job.waitForCompletion(true);

return 0;
}

}

Create a JAR <jarname> and run it on HADOOP Cluster

# hadoop jar <jarname> HDFSDBImport

In your HDFS output path (in my case it is /user/hduser/output/) you'll see something like this:-


1,Vineet Sethi,HR,25000.0
2,Monika Sethi,Finance,15000.0
3,Aleen Sethi,Admin,25700.5
4,Rahul Sethi,Finance,20000.0

Next, I am going to tell you how to import data to HBase from RDBMS using Map-Reduce. In the meantime, if you have any questions/concerns, you can contact me at developer.vineet@gmail.com or you can comment here. You can also checkout this content on my blog http://www.technologywithvineet.com