Saturday, August 24, 2013

Import/Export data from/to RDBMS(like MySQL, Postgres) to/from HBase using Map Reduce Job

This post assumes that you have successfully installed HADOOP, HBase and they are up and running. We are going to import data from RDBMS (like MySQL, Postgres, Oracle or any RDBMS whose Java JDBC drivers are available) to HBase . I am going to use MySQL as RDBMS & we are going to take the advantage of Map-Reduce and write a small job around it. I am going to give you the general idea of how to get things done while importing/exporting data from/to RDBMS. You can use this program to import/export data, customize it accordingly and run to achieve the desired results.

To know on how to Import RDBMS data to HDFS, check this out http://hadup.blogspot.in/2013/08/import-data-from-rdbmslike-mysql.html


So, here we go...


We have a MySQL table like this

CREATE TABLE IF NOT EXISTS `employees` (
  `id` float NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `department` varchar(255) NOT NULL,
  `salary` float NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM  DEFAULT CHARSET=latin1 AUTO_INCREMENT=5 ;

Insert some rows into it

INSERT INTO `employees` (`id`, `name`, `department`, `salary`) VALUES
(1, 'Vineet Sethi', 'HR', 25000),
(2, 'Monika Sethi', 'Finance', 15000),
(3, 'Aleen Sethi', 'Admin', 25700.5),
(4, 'Rahul Sethi', 'Finance', 20000);

Also create a table named test_out in HBase with column family cf, in which we have to import the data.

Download the MySQL JDBC driver connector from MySQL http://dev.mysql.com/downloads/connector/j/, untar it and save it, and move mysql-connector-java-5.1.26-bin.jar into your HADOOP_PATH/lib, so that it remains available to MR job.

We are going to use org.apache.hadoop.mapreduce.lib.db.DBInputFormat class as an InputFormat to MR job but before that lets make a Employee class which is Writable by Hadoop Map-Reduce.

EmployeeWritable.java


import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class EmployeeWritable implements Writable, DBWritable{


private int id;

private String name;
private String department;
private double salary;


public int getId() {
return id;
}

public String getName() {

return name;
}

public String getDepartment() {

return department;
}

public double getSalary() {

return salary;
}

@Override

public void readFields(ResultSet rs) throws SQLException {
this.id = rs.getInt(1);
this.name = rs.getString(2);
this.department = rs.getString(3);
this.salary = rs.getDouble(4);

}

@Override

public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, this.id);
statement.setString(2, this.name);
statement.setString(3, this.department);
statement.setDouble(3, this.salary);
}

@Override

public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
out.writeUTF(this.name);
out.writeUTF(this.department);
out.writeDouble(this.salary);
}

public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = in.readUTF();
this.department = in.readUTF();
this.salary = in.readDouble();
}
}

For importing from RDBMS, we just to write a Mapper and Reducer. Each record from MySQL will be passed on to the Mapper and Reducer will put that record accordingly in HBase. We are going to use to major classes DBInputFormat and TableMapReduceUtil to write our Mapper and Reduce respectively

So, lets make Map-Reduce job HBaseDBImportMapper HBaseImportMap and Reducer HBaseImportReduce.

Here is our MR Job HBaseDBImport


import java.io.IOException;

import my.test.inputformat.EmployeeWritable;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HBaseDBImport extends Configured implements Tool  {

private static class HBaseImportMap extends Mapper<LongWritable, DBWritable, LongWritable, EmployeeWritable> {
@Override
public void map(LongWritable key, DBWritable value, Context context)
throws IOException, InterruptedException {
EmployeeWritable employee = (EmployeeWritable)value;
//String record = Integer.toString(employee.getId()) + "," + employee.getName() + "," + employee.getDepartment() + "," + Double.toString(employee.getSalary());
context.write(key, employee);
}

}

private static class HBaseImportReduce extends TableReducer<LongWritable, EmployeeWritable, ImmutableBytesWritable> {
public final byte[] CF = "cf".getBytes(); 
@Override
public void reduce(LongWritable key, Iterable<EmployeeWritable> employees, Context context)
throws IOException, InterruptedException {

for(EmployeeWritable employee : employees) {
Put put = new Put(Bytes.toBytes("employee" + employee.getId()));
put.add(CF, "name".getBytes(), employee.getName().getBytes());
put.add(CF, "department".getBytes(), employee.getDepartment().getBytes());
put.add(CF, "salary".getBytes(), Bytes.toBytes(Double.toString(employee.getSalary())));
context.write(null, put);
}
}

}

@Override
public int run(String[] arg0) throws Exception {
Configuration conf = HBaseConfiguration.create(this.getConf());

DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://<DB HOST>/<DATABASE>?", "<DB USERNAME>", "<DB PASSWORD>");

Job job = new Job(conf, HBaseDBImport.class.toString());
job.setJarByClass(HBaseDBImport.class);
DBInputFormat.setInput(job, EmployeeWritable.class, "employees", "1", "name, id", "id, name, department, salary");

job.setInputFormatClass(DBInputFormat.class);
job.setMapperClass(HBaseImportMap.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(EmployeeWritable.class);

TableMapReduceUtil.initTableReducerJob("<HBASE OUTPUT TABLENAME>", HBaseImportReduce.class, job);

//job.setNumReduceTasks(1);
job.waitForCompletion(true);
return 0;
}

public static void main(String[] args) throws Exception {

int status = ToolRunner.run(new Configuration(), new HBaseDBImport(), args);
System.exit(status);
}


}

To run this example don't forget to replace the values of <DB HOST><DATABASE>, <DB USERNAME>, <DB PASSWORD> and <HBASE OUTPUT TABLENAME> with yours. Make sure that HBase Table <HBASE OUTPUT TABLENAME> exists in the HBase and it has atleast one column family "cf". If you face any trouble including HBase or MySQL JDBC adapter in classpath you can look into my another blog http://hadup.blogspot.com/2013/08/how-to-deal-with-classnotfound.html

Next, I am going to tell you how to import and export data using SQOOP. In the meantime, if you have any questions/concerns, you can contact me at developer.vineet@gmail.com or you can comment here. You can also checkout this content on my blog http://www.technologywithvineet.com

3 comments:

  1. Hi Vineet,

    I'm working to migrate Mysql tables into Mongodb. so that i'm trying to access mysql tables with Java mapreduce program and writing them into MongoDB document one by one. how can i load multiple tables into single collection at a time with Map reduce based on their relationships.

    for example A and B are related with FK relation.so both should be in Single Document with nested documents.

    ReplyDelete
  2. Hi Vineet,

    First of All,thank you so much for your response. i have few more questions , hope you will help me out for sure.

    1:

    If we use a query like==> "select a,b,c from XYZ". it will returns a,b,c attributes with corresponding values.

    so for holding above query result, i need to create one POJO class with a,b,c fields by implementing writable and DBwritable.[am i correct??]

    2:

    How can we know/get relations between table(or metadata) in our Mapreduce program
    ex:
    if i'm entering two table names as inpit to our MR application
    1: we need to find out relationship between them with our Mysql connection object
    2: if they r related then we will fire a join query as u told. other wise we will directly pass the tables individually to the mapper.

    ReplyDelete
  3. Very nice blog,keep sharing more blogs with us.

    Thank you for info..
    big data online training

    ReplyDelete