MapReduce直接连接Mysql获取数据
程序员文章站
2022-05-30 20:14:06
...
Mysql中数据:
mysql> select * from lxw_tbls; +---------------------+----------------+ | TBL_NAME | TBL_TYPE | +---------------------+----------------+ | lxw_test_table | EXTERNAL_TABLE | | lxw_t | MANAGED_TABLE | | lxw_t1 | MANAGED_TABLE | | tt | MANAGED_TABLE | | tab_partition | MANAGED_TABLE | | lxw_hbase_table_1 | MANAGED_TABLE | | lxw_hbase_user_info | MANAGED_TABLE | | t | EXTERNAL_TABLE | | lxw_jobid | MANAGED_TABLE | +---------------------+----------------+ 9 rows in set (0.01 sec) mysql> select * from lxw_tbls where TBL_NAME like 'lxw%' order by TBL_NAME; +---------------------+----------------+ | TBL_NAME | TBL_TYPE | +---------------------+----------------+ | lxw_hbase_table_1 | MANAGED_TABLE | | lxw_hbase_user_info | MANAGED_TABLE | | lxw_jobid | MANAGED_TABLE | | lxw_t | MANAGED_TABLE | | lxw_t1 | MANAGED_TABLE | | lxw_test_table | EXTERNAL_TABLE | +---------------------+----------------+ 6 rows in set (0.00 sec)
MapReduce程序代码,ConnMysql.java:
package com.lxw.study; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ConnMysql { private static Configuration conf = new Configuration(); static { conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml")); conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml")); conf.addResource(new Path("F:/lxw-hadoop/core-site.xml")); conf.set("mapred.job.tracker", "10.133.103.21:50021"); } public static class TblsRecord implements Writable, DBWritable { String tbl_name; String tbl_type; public TblsRecord() { } @Override public void write(PreparedStatement statement) throws SQLException { // TODO Auto-generated method stub statement.setString(1, this.tbl_name); statement.setString(2, this.tbl_type); } @Override public void readFields(ResultSet resultSet) throws SQLException { // TODO Auto-generated method stub this.tbl_name = resultSet.getString(1); this.tbl_type = resultSet.getString(2); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub Text.writeString(out, this.tbl_name); Text.writeString(out, this.tbl_type); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.tbl_name = Text.readString(in); this.tbl_type = Text.readString(in); } public String toString() { return new String(this.tbl_name + " " + this.tbl_type); } } public static class ConnMysqlMapper extends Mapper<LongWritable,TblsRecord,Text,Text> { public void map(LongWritable key,TblsRecord values,Context context) throws IOException,InterruptedException { context.write(new Text(values.tbl_name), new Text(values.tbl_type)); } } public static class ConnMysqlReducer extends Reducer<Text,Text,Text,Text> { public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException { for(Iterator<Text> itr = values.iterator();itr.hasNext();) { context.write(key, itr.next()); } } } public static void main(String[] args) throws Exception { Path output = new Path("/user/lxw/output/"); FileSystem fs = FileSystem.get(URI.create(output.toString()), conf); if (fs.exists(output)) { fs.delete(output); } //mysql的jdbc驱动 DistributedCache.addFileToClassPath(new Path( "hdfs://hd022-test.nh.sdo.com/user/liuxiaowen/mysql-connector-java-5.1.13-bin.jar"), conf); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.133.103.22:3306/hive", "hive", "hive"); Job job = new Job(conf,"test mysql connection"); job.setJarByClass(ConnMysql.class); job.setMapperClass(ConnMysqlMapper.class); job.setReducerClass(ConnMysqlReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(DBInputFormat.class); FileOutputFormat.setOutputPath(job, output); //列名 String[] fields = { "TBL_NAME", "TBL_TYPE" }; //六个参数分别为: //1.Job;2.Class<? extends DBWritable> //3.表名;4.where条件 //5.order by语句;6.列名 DBInputFormat.setInput(job, TblsRecord.class, "lxw_tbls", "TBL_NAME like 'lxw%'", "TBL_NAME", fields); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
运行结果:
[lxw@hd025-test ~]$ hadoop fs -cat /user/lxw/output/part-r-00000 lxw_hbase_table_1 MANAGED_TABLE lxw_hbase_user_info MANAGED_TABLE lxw_jobid MANAGED_TABLE lxw_t MANAGED_TABLE lxw_t1 MANAGED_TABLE lxw_test_table EXTERNAL_TABLE
上一篇: 2014年18大热门IT认证
推荐阅读
-
Python连接mysql数据库及python使用mysqldb连接数据库教程
-
使用EF CodeFirst连接MySql数据库
-
Linux下远程连接MySQL数据库的方法
-
IIS下PHP连接数据库提示mysql undefined function mysql_connect()
-
PHP 获取MySQL数据库里所有表的实现代码
-
JSP使用JDBC连接MYSQL数据库的方法
-
在ASP中连接MySQL数据库,最好的通过ODBC方法
-
.net(C#数据库访问) Mysql,Sql server,Sqlite,Access四种数据库的连接方式
-
使用数据库客户端工具Oracle SQL Developer加载第三方驱动连接mysql的方法
-
jsp连接MySQL操作GIS地图数据实现添加point的功能代码