This post assumes that you have successfully installed HADOOP and its up and running. We are going to import data from RDBMS (like MySQL, Postgres, Oracle or any RDBMS whose Java JDBC drivers are available) to HDFS such that each record in the RDBMS table get converted into comma-separated list of records. I am going to use MySQL as RDBMS & we are going to take the advantage of Map-Reduce and write a small job around it. I am going to give you the general idea of how to get things done while importing/exporting. You can use this program to import/export data, customize it accordingly and run to achieve the desired results.
So, here we go...
We have a MySQL table like this
CREATE TABLE IF NOT EXISTS `employees` (
`id` float NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
`department` varchar(255) NOT NULL,
`salary` float NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1 AUTO_INCREMENT=5 ;
So, here we go...
We have a MySQL table like this
CREATE TABLE IF NOT EXISTS `employees` (
`id` float NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
`department` varchar(255) NOT NULL,
`salary` float NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1 AUTO_INCREMENT=5 ;
Insert some rows into it
INSERT INTO `employees` (`id`, `name`, `department`, `salary`) VALUES
(1, 'Vineet Sethi', 'HR', 25000),
(2, 'Monika Sethi', 'Finance', 15000),
(3, 'Aleen Sethi', 'Admin', 25700.5),
(4, 'Rahul Sethi', 'Finance', 20000);
Download the MySQL JDBC driver connector from MySQL http://dev.mysql.com/downloads/connector/j/, untar it and save it mysql-connector-java-5.1.26-bin.jar into your HADOOP_PATH/lib, so that it remains available to MR job.
We are going to use org.apache.hadoop.mapreduce.lib.db.DBInputFormat class as an InputFormat to MR job but before that lets make a Employee class which is Writable by Hadoop Map-Reduce.
EmployeeWritable.java
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class EmployeeWritable implements Writable, DBWritable{
private int id;
private String name;
private String department;
private double salary;
public int getId() {
return id;
}
public String getName() {
return name;
}
public String getDepartment() {
return department;
}
public double getSalary() {
return salary;
}
@Override
public void readFields(ResultSet rs) throws SQLException {
this.id = rs.getInt(1);
this.name = rs.getString(2);
this.department = rs.getString(3);
this.salary = rs.getDouble(4);
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, this.id);
statement.setString(2, this.name);
statement.setString(3, this.department);
statement.setDouble(3, this.salary);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
out.writeUTF(this.name);
out.writeUTF(this.department);
out.writeDouble(this.salary);
}
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = in.readUTF();
this.department = in.readUTF();
this.salary = in.readDouble();
}
}
For importing from RDBMS, we just need to write a Mapper and skipping the Reducer. Each DB record will be passed to Mapper and we can pass-on comma-separated string on record.
So, lets make Map-Reduce job HDFSDBImport & Mapper DBImportMap.
HDFSDBImport.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import EmployeeWritable;
public class HDFSDBImport extends Configured implements Tool {
private static class DBImportMap extends Mapper<LongWritable, DBWritable, Text, NullWritable> {
@Override
public void map(LongWritable key, DBWritable value, Context context)
throws IOException, InterruptedException {
EmployeeWritable employee = (EmployeeWritable)value;
String record = Integer.toString(employee.getId()) + "," + employee.getName() + "," + employee.getDepartment() + "," + Double.toString(employee.getSalary());
context.write(new Text(record), NullWritable.get());
}
}
}
Here is how we configure the MR job and run. Don't forget to replace the values of <DB HOST>, <DATABASE>, <DB USERNAME>, <DB PASSWORD> and <HDFS OUTPUT PATH> with yours.
Configuration conf = this.getConf();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://<DB HOST>/<DATABASE>?", "<DB USERNAME>", "<DB PASSWORD>");
Job job = new Job(conf, HDFSDBImport.class.toString());
job.setJarByClass(HDFSDBImport.class);
DBInputFormat.setInput(job, EmployeeWritable.class, "employees", "1", "name, id", "id, name, department, salary");
job.setInputFormatClass(DBInputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("<HDFS OUTPUT PATH>"));
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(DBImportMap.class);
job.waitForCompletion(true);
DBInputFormat will configure the Database and selects the rows to import based on SELECT...<fieldnames>..WHERE...ORDER BY. For more on DBInputFormat and its functionality, please consult the http://hadoop.apache.org/docs/stable/api.
Now, putting this all together in HDFSDBImport.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import EmployeeWritable;
public class HDFSDBImport extends Configured implements Tool {
private static class DBImportMap extends Mapper<LongWritable, DBWritable, Text, NullWritable> {
@Override
public void map(LongWritable key, DBWritable value, Context context)
throws IOException, InterruptedException {
EmployeeWritable employee = (EmployeeWritable)value;
String record = Integer.toString(employee.getId()) + "," + employee.getName() + "," + employee.getDepartment() + "," + Double.toString(employee.getSalary());
context.write(new Text(record), NullWritable.get());
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int status = ToolRunner.run(new Configuration(), new HDFSDBImport(), args);
System.exit(status);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = this.getConf();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://<DB HOST>/<DATABASE>?", "<DB USERNAME>", "<DB PASSWORD>");
Job job = new Job(conf, HDFSDBImport.class.toString());
job.setJarByClass(HDFSDBImport.class);
DBInputFormat.setInput(job, EmployeeWritable.class, "employees", "1", "name, id", "id, name, department, salary");
job.setInputFormatClass(DBInputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("<HDFS OUTPUT PATH>"));
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(DBImportMap.class);
job.waitForCompletion(true);
return 0;
}
}
Create a JAR <jarname> and run it on HADOOP Cluster
# hadoop jar <jarname> HDFSDBImport
In your HDFS output path (in my case it is /user/hduser/output/) you'll see something like this:-
1,Vineet Sethi,HR,25000.0
2,Monika Sethi,Finance,15000.0
3,Aleen Sethi,Admin,25700.5
4,Rahul Sethi,Finance,20000.0
Next, I am going to tell you how to import data to HBase from RDBMS using Map-Reduce. In the meantime, if you have any questions/concerns, you can contact me at developer.vineet@gmail.com or you can comment here. You can also checkout this content on my blog http://www.technologywithvineet.com
We are going to use org.apache.hadoop.mapreduce.lib.db.DBInputFormat class as an InputFormat to MR job but before that lets make a Employee class which is Writable by Hadoop Map-Reduce.
EmployeeWritable.java
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class EmployeeWritable implements Writable, DBWritable{
private int id;
private String name;
private String department;
private double salary;
public int getId() {
return id;
}
public String getName() {
return name;
}
public String getDepartment() {
return department;
}
public double getSalary() {
return salary;
}
@Override
public void readFields(ResultSet rs) throws SQLException {
this.id = rs.getInt(1);
this.name = rs.getString(2);
this.department = rs.getString(3);
this.salary = rs.getDouble(4);
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, this.id);
statement.setString(2, this.name);
statement.setString(3, this.department);
statement.setDouble(3, this.salary);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
out.writeUTF(this.name);
out.writeUTF(this.department);
out.writeDouble(this.salary);
}
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = in.readUTF();
this.department = in.readUTF();
this.salary = in.readDouble();
}
}
For importing from RDBMS, we just need to write a Mapper and skipping the Reducer. Each DB record will be passed to Mapper and we can pass-on comma-separated string on record.
So, lets make Map-Reduce job HDFSDBImport & Mapper DBImportMap.
HDFSDBImport.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import EmployeeWritable;
public class HDFSDBImport extends Configured implements Tool {
private static class DBImportMap extends Mapper<LongWritable, DBWritable, Text, NullWritable> {
@Override
public void map(LongWritable key, DBWritable value, Context context)
throws IOException, InterruptedException {
EmployeeWritable employee = (EmployeeWritable)value;
String record = Integer.toString(employee.getId()) + "," + employee.getName() + "," + employee.getDepartment() + "," + Double.toString(employee.getSalary());
context.write(new Text(record), NullWritable.get());
}
}
}
Here is how we configure the MR job and run. Don't forget to replace the values of <DB HOST>, <DATABASE>, <DB USERNAME>, <DB PASSWORD> and <HDFS OUTPUT PATH> with yours.
Configuration conf = this.getConf();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://<DB HOST>/<DATABASE>?", "<DB USERNAME>", "<DB PASSWORD>");
Job job = new Job(conf, HDFSDBImport.class.toString());
job.setJarByClass(HDFSDBImport.class);
DBInputFormat.setInput(job, EmployeeWritable.class, "employees", "1", "name, id", "id, name, department, salary");
job.setInputFormatClass(DBInputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("<HDFS OUTPUT PATH>"));
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(DBImportMap.class);
job.waitForCompletion(true);
DBInputFormat will configure the Database and selects the rows to import based on SELECT...<fieldnames>..WHERE...ORDER BY. For more on DBInputFormat and its functionality, please consult the http://hadoop.apache.org/docs/stable/api.
Now, putting this all together in HDFSDBImport.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import EmployeeWritable;
public class HDFSDBImport extends Configured implements Tool {
private static class DBImportMap extends Mapper<LongWritable, DBWritable, Text, NullWritable> {
@Override
public void map(LongWritable key, DBWritable value, Context context)
throws IOException, InterruptedException {
EmployeeWritable employee = (EmployeeWritable)value;
String record = Integer.toString(employee.getId()) + "," + employee.getName() + "," + employee.getDepartment() + "," + Double.toString(employee.getSalary());
context.write(new Text(record), NullWritable.get());
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int status = ToolRunner.run(new Configuration(), new HDFSDBImport(), args);
System.exit(status);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = this.getConf();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://<DB HOST>/<DATABASE>?", "<DB USERNAME>", "<DB PASSWORD>");
Job job = new Job(conf, HDFSDBImport.class.toString());
job.setJarByClass(HDFSDBImport.class);
DBInputFormat.setInput(job, EmployeeWritable.class, "employees", "1", "name, id", "id, name, department, salary");
job.setInputFormatClass(DBInputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("<HDFS OUTPUT PATH>"));
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(DBImportMap.class);
job.waitForCompletion(true);
return 0;
}
}
Create a JAR <jarname> and run it on HADOOP Cluster
# hadoop jar <jarname> HDFSDBImport
In your HDFS output path (in my case it is /user/hduser/output/) you'll see something like this:-
1,Vineet Sethi,HR,25000.0
2,Monika Sethi,Finance,15000.0
3,Aleen Sethi,Admin,25700.5
4,Rahul Sethi,Finance,20000.0
No comments:
Post a Comment