hadoop(二)自定义输入输出格式
一、Mapreducer输入格式
数据输入格式(InputFormat)用于描述MR作业的数据输入规范。MR框架依赖数据输入格式完成对数据文件进行分块(一个块就是一个mapper任务),以及提供从输入分块中将数据记录逐一读出、并转换为Map过程的输入键/值对等功能。
*输入格式类为:org.apache.hadoop.mapreduce.InputFormat
*块类为:org.apache.hadoop.mapreduce.InputSplit
*块数据读取类为:org.apache.hadoop.mapreduce.RecordReader
Hadoop提供了大量的内置数据输入格式,包括:CombineFileInputFormat(处理多个小文件,将多个文件合成一个split作为输入)、SequenceFileInputFormat(处理SequenceFile格式的文件)、SequenceFileAsTextInputFormat、NlineInputFormat(NLineInputFormat是Hadoop的一种非默认初始化的一种输入格式。不同与InputFormat默认初始化的LineInputFormat,这种分片方式是可以一次性按照指定的分片数进行InputSplit。所以在特定情况下,可以有效提升代码的运行效率)、FileInputFormat、TextInputFormat、KeyValueTextInputFormat等。最常用的是TextInputFormat和KeyValueTextInputFormat这两种。
TextInputFormat是MR框架默认的数据读入格式,可以将文本文件分块逐行读入一遍Map节点处理。key为当前行在整个文本文件中的字节偏移量,value为当前行的内容。
KeyValueTextInputFormat可以将一个按照<key,value>格式逐行保存的文本文件逐行读出,并自动解析为相对于的key和value。默认按照'\t'分割,有限设置key的值,如果没有value的值,那么就设置为empty。
二、Mapreducer输出格式
数据输出格式(OutputFormat)用于描述MR作业的数据输出规范。MR框架依赖数据输出格式完成输出规范检查(如检查HDFS文件目录是否存在等),以及提供作业结果数据输出等功能。
*输出格式类为:org.apache.hadoop.mapreduce.OutputFormat
*数据写出类为:org.apache.hadoop.mapreduce.RecordWriter
Hadoop提供了大量的内置数据输出格式,包括:MapFileOutputFormat、SequenceFileOutputFormat、SequenceFileAsBinaryOutputFormat、TextOutputFormat等。最常用的是TextOutputFormat。
TextOutputFormat是MR框架默认的数据输出格式,可以将计算结果以key+"\t"+value的形式逐行输出到文本文件中,当key或者value有一个为NullWritable或者为null的时候,当前为空的值不进行输出,只输出不为空的值。对应的数据输出类为LineRecordWriter。分隔符有参数mapreduce.output.textoutputformat.separator指定
三、自定义输入输出格式的使用
eg:读取MySql数据库中的日志原始数据表event_logs中的数据,计算url的唯一用户访问量,并将结果保存到MySql数据库stats_uv表中。
event_logs表包含列:uid(用户id),url(当前页面url),time(时间),sid(会话id); stats_uv表包含列:url(页面url),date(日期),uv(uv值)。
计算规则:同一个url在同一个会话中被用户多次访问,只算一次,如果在多个会话中被访问,那么每个会话算作一次。即假设用户1在会话1中访问url1五次,在会话2中访问url1三次,那么url1的uv值被设置为2。其实就是计算url和date固定的情况下,有多少个sid(去重后)。
实现步骤:
1. 编写自定义输入输出格式类
2. 编写mapper类
3. 编写reducer类
4. 编写执行入口runner类
5. 测试
注意:自定义InputSplit必须实现Writable接口
(1)自定义输入格式类
package com.beifeng.customer.inputformat;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
/**
* mysql输入的value类型,其实际应用中使用到的数据类型必须继承自Writable类
*/
public abstract class MySqlInputValueWritable implements Writable {
/**
* 从数据库返回连接中读取字段信息
* @param rs
* @throws SQLException
*/
public abstract void readFieldsFromResultSet(ResultSet rs) throws SQLException;
}
package com.beifeng.customer.inputformat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
/**
* 用户自定义的从mysql读取数据的InputFormat类<br/>
* 这里的K和V其实就是InputFormat输出到mapper类中的数据类型
*
* @author gerry
*
*/
public class MySqlInputFormat<V extends MySqlInputValueWritable> extends InputFormat<LongWritable, V> {
public static final String MYSQL_INPUT_DRIVER_KEY = "mysql.input.driver";
public static final String MYSQL_INPUT_URL_KEY = "mysql.input.url";
public static final String MYSQL_INPUT_USERNAME_KEY = "mysql.input.username";
public static final String MYSQL_INPUT_PASSWORD_KEY = "mysql.input.password";
public static final String MYSQL_SELECT_COUNT_SQL_KEY = "mysql.select.count.sql";
public static final String MYSQL_INPUT_SPLIT_PRE_RECORD_COUNT_KEY = "mysql.input.split.pre.record.count";
public static final String MYSQL_SELECT_RECORD_SQL_KEY = "mysql.select.record.sql";
public static final String MYSQL_OUTPUT_VALUE_CLASS_KEY = "mysql.output.value.class";
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
// 该方法的作用就是返回数据分块,am根据分块数量决定map task的数量
Configuration conf = context.getConfiguration();
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
long recordCount = 0; // 记录数量
try {
String sql = conf.get(MYSQL_SELECT_COUNT_SQL_KEY);
conn = this.getConnection(conf); // 获取数据库连接
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
recordCount = rs.getLong(1); // 获取数量
}
} catch (SQLException | ClassNotFoundException e) {
throw new IOException("获取数据库的数量产生异常", e);
} finally {
this.closeConnection(conn, pstmt, rs);
}
// 开始处理生成input split
List<InputSplit> list = new ArrayList<InputSplit>();
long preRecordCountOfSplit = conf.getLong(MYSQL_INPUT_SPLIT_PRE_RECORD_COUNT_KEY, 100);
int numSplits = (int) (recordCount / preRecordCountOfSplit
+ (recordCount % preRecordCountOfSplit == 0 ? 0 : 1));
for (int i = 0; i < numSplits; i++) {
if (i != numSplits - 1) {
list.add(new MySqlInputSplit(i * preRecordCountOfSplit, (i + 1) * preRecordCountOfSplit));
} else {
list.add(new MySqlInputSplit(i * preRecordCountOfSplit, recordCount));
}
}
return list;
}
@Override
public RecordReader<LongWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// 返回具体处理分块数据的recordReader类对象
RecordReader<LongWritable, V> reader = new MySqlRecordReader();
reader.initialize(split, context);
return reader;
}
/**
* 关闭数据库连接
*
* @param conn
* @param state
* @param rs
*/
private void closeConnection(Connection conn, Statement state, ResultSet rs) {
this.close(rs);
this.close(state);
this.close(conn);
}
private void close(AutoCloseable obj) {
if (obj != null) {
try {
obj.close();
} catch (Exception e) {
// nothing
}
}
}
/**
* 根据配置信息获取数据库连接信息
*
* @param conf
* @return
* @throws SQLException
* @throws ClassNotFoundException
*/
private Connection getConnection(Configuration conf) throws SQLException, ClassNotFoundException {
String driver = conf.get(MYSQL_INPUT_DRIVER_KEY);
String url = conf.get(MYSQL_INPUT_URL_KEY);
String username = conf.get(MYSQL_INPUT_USERNAME_KEY);
String password = conf.get(MYSQL_INPUT_PASSWORD_KEY);
Class.forName(driver);
return DriverManager.getConnection(url, username, password);
}
/**
* 自定义读取数据的record类
*
* @author gerry
*
* @param <V>
*/
public class MySqlRecordReader extends RecordReader<LongWritable, V> {
private Connection conn;
private Configuration conf;
private MySqlInputSplit split;
private LongWritable key = null;
private V value = null;
private ResultSet result = null;
private long post = 0; // 位置信息
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (MySqlInputSplit) split;
this.conf = context.getConfiguration();
}
/**
* 创建value对象
*
* @return
*/
@SuppressWarnings("unchecked")
private V createValue() {
Class<? extends MySqlInputValueWritable> clazz = this.conf.getClass(MYSQL_OUTPUT_VALUE_CLASS_KEY, NullMySqlInputValue.class, MySqlInputValueWritable.class);
return (V) ReflectionUtils.newInstance(clazz, this.conf);
}
/**
* 获取查询sql
*
* @return
*/
private String getQuerySql() {
String sql = this.conf.get(MYSQL_SELECT_RECORD_SQL_KEY);
try {
sql += " LIMIT " + this.split.getLength();
sql += " OFFSET " + this.split.getStart();
} catch (Exception e) {
// nothing
}
return sql;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (this.key == null) {
this.key = new LongWritable();
}
if (this.value == null) {
this.value = this.createValue();
}
if (this.conn == null) {
try {
this.conn = MySqlInputFormat.this.getConnection(this.conf);
} catch (ClassNotFoundException | SQLException e) {
throw new IOException("获取数据库连接发生异常", e);
}
}
try {
if (result == null) {
String sql = this.getQuerySql();
PreparedStatement pstmt = this.conn.prepareStatement(sql);
this.result = pstmt.executeQuery();
}
// 正式的进行处理操作
if (!this.result.next()) {
return false; // 没有结果了
}
// 还有结果
this.value.readFieldsFromResultSet(this.result);
this.key.set(this.post);
this.post++;
return true;
} catch (SQLException e) {
throw new IOException("获取数据发生异常", e);
}
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return this.key;
}
@Override
public V getCurrentValue() throws IOException, InterruptedException {
return this.value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return this.post / this.split.getLength();
}
@Override
public void close() throws IOException {
// 关闭数据库连接
MySqlInputFormat.this.closeConnection(this.conn, null, this.result);
}
}
/**
* 默认的空的数据输出对象
*
* @author gerry
*
*/
public class NullMySqlInputValue extends MySqlInputValueWritable {
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void readFields(DataInput in) throws IOException {
}
@Override
public void readFieldsFromResultSet(ResultSet rs) throws SQLException {
}
}
/**
* mysql输入的input split
*
* @author gerry
*
*/
public static class MySqlInputSplit extends InputSplit implements Writable{
private String[] emptyLocation = new String[0];
private long start; // 开始下标,包含
private long end; // 结束下标,不包含
public MySqlInputSplit() {
super();
}
public MySqlInputSplit(long start, long end) {
super();
this.start = start;
this.end = end;
}
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public long getEnd() {
return end;
}
public void setEnd(long end) {
this.end = end;
}
@Override
public long getLength() throws IOException, InterruptedException {
return this.end - this.start;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
// 根据该值决定是否采用数据本地化策略
return this.emptyLocation;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.start);
out.writeLong(this.end);
}
@Override
public void readFields(DataInput in) throws IOException {
this.start = in.readLong();
this.end = in.readLong();
}
}
}
(2)自定义输入格式类
package com.beifeng.customer.outputformat;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
/**
* mysql定义的输出value*父类
*
* @author gerry
*
*/
public abstract class MySqlOutputValueWritable implements Writable {
/**
* 获取数据库连接的sql语句
*
* @return
*/
public abstract String fetchInsertOrUpdateSql();
/**
* 设置数据输出参数
*
* @param pstmt
* @throws SQLException
*/
public abstract void setPreparedStatementParameters(PreparedStatement pstmt) throws SQLException;
}
package com.beifeng.customer.outputformat;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
/**
* 自定义OutputFormat类<br/>
* 输出key/value键值对到mysql数据库中,要求key为NullWritable, value要求为MysqlOutputValueWritable
*/
public class MySqlOutputFormat<V extends MySqlOutputValueWritable> extends OutputFormat<NullWritable, V> {
public static final String MYSQL_OUTPUT_DRIVER_KEY = "mysql.output.driver";
public static final String MYSQL_OUTPUT_URL_KEY = "mysql.output.url";
public static final String MYSQL_OUTPUT_USERNAME_KEY = "mysql.output.username";
public static final String MYSQL_OUTPUT_PASSWORD_KEY = "mysql.output.password";
public static final String MYSQL_OUTPUT_BATCH_SIZE_KEY = "mysql.output.batch.size";
@Override
public RecordWriter<NullWritable, V> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new MySqlRecordWriter(context.getConfiguration());
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
Connection conn = null;
try {
conn = this.getConnection(context.getConfiguration());
} catch (ClassNotFoundException | SQLException e) {
throw new IOException("无法连接Mysql数据库", e);
} finally {
this.closeSqlConnection(conn, null, null);
}
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return new FileOutputCommitter(null, context);
}
/**
* 根据配置信息获取数据库连接
*
* @param conf
* @return
* @throws ClassNotFoundException
* @throws SQLException
*/
private Connection getConnection(Configuration conf) throws ClassNotFoundException, SQLException {
String driver = conf.get(MYSQL_OUTPUT_DRIVER_KEY);
String url = conf.get(MYSQL_OUTPUT_URL_KEY);
String username = conf.get(MYSQL_OUTPUT_USERNAME_KEY);
String password = conf.get(MYSQL_OUTPUT_PASSWORD_KEY);
Class.forName(driver);
return DriverManager.getConnection(url, username, password);
}
/**
* 关闭数据库连接
*
* @param conn
* @param state
* @param rs
*/
private void closeSqlConnection(Connection conn, Statement state, ResultSet rs) {
this.closeAutoCloseable(rs);
this.closeAutoCloseable(state);
this.closeAutoCloseable(conn);
}
/**
* 关闭连接
*
* @param obj
*/
private void closeAutoCloseable(AutoCloseable obj) {
if (obj != null) {
try {
obj.close();
} catch (Exception e) {
// nothing
}
}
}
/**
* 自定义的输出到Mysql的record writer类
*
* @author gerry
*
*/
public class MySqlRecordWriter extends RecordWriter<NullWritable, V> {
private Connection conn = null;
private Map<String, PreparedStatement> pstmtCache = new HashMap<String, PreparedStatement>();
private Map<String, Integer> batchCache = new HashMap<String, Integer>();
private Configuration conf = null;
private int batchSize = 100;
public MySqlRecordWriter() {
}
public MySqlRecordWriter(Configuration conf) {
this.conf = conf;
this.batchSize = this.conf.getInt(MYSQL_OUTPUT_BATCH_SIZE_KEY, this.batchSize);
}
@Override
public void write(NullWritable key, V value) throws IOException, InterruptedException {
if (this.conn == null) {
try {
this.conn = getConnection(this.conf);
this.conn.setAutoCommit(false); // 取消自动提交功能
} catch (ClassNotFoundException | SQLException e) {
throw new IOException("获取数据库连接失败", e);
}
}
String sql = value.fetchInsertOrUpdateSql(); // 获取连接sql
PreparedStatement pstmt = this.pstmtCache.get(sql);
if (pstmt == null) {
try {
pstmt = this.conn.prepareStatement(sql);// 创建对象
this.pstmtCache.put(sql, pstmt);
} catch (SQLException e) {
throw new IOException("创建PreparedStatement对象产生异常", e);
}
}
Integer count = this.batchCache.get(sql);
if (count == null) {
count = 0;
}
try {
value.setPreparedStatementParameters(pstmt);// 设置参数,设置往数据库写入的值
pstmt.addBatch(); // 添加到batch,待后序批量执行
count++; // 追加一个
if (count > this.batchSize) {
pstmt.executeBatch(); // 进行批量执行
this.conn.commit(); // 提交
count = 0; // 重置
}
this.batchCache.put(sql, count); // 修改未处理的批量数
} catch (SQLException e) {
throw new IOException("往mysql写出数据发生异常", e);
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (this.conn != null) {
for (Map.Entry<String, PreparedStatement> entry : this.pstmtCache.entrySet()) {
try {
entry.getValue().executeBatch();
this.conn.commit();
} catch (SQLException e) {
throw new IOException("往mysql数据库中写数据发生异常", e);
}
}
}
}
}
}
(3)MysqlMapperInputValue类
package com.beifeng.customer.mysql;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import com.beifeng.customer.inputformat.MySqlInputValueWritable;
/**
* 自定义输入value对象
*/
public class MySqlMapperInputValue extends MySqlInputValueWritable {
private String uid; // 会员id
private String sid; // 会话id
private String url; // url
private long time; // 毫秒级时间戳
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.uid);
out.writeUTF(this.sid);
out.writeUTF(this.url);
out.writeLong(this.time);
}
@Override
public void readFields(DataInput in) throws IOException {
this.uid = in.readUTF();
this.sid = in.readUTF();
this.url = in.readUTF();
this.time = in.readLong();
}
@Override
public void readFieldsFromResultSet(ResultSet rs) throws SQLException {
this.uid = rs.getString("uid");
this.sid = rs.getString("sid");
this.url = rs.getString("url");
this.time = rs.getLong("time");
}
}
MysqlMapperOutputValue类
package com.beifeng.customer.mysql;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import com.beifeng.customer.inputformat.MySqlInputValueWritable;
/**
* 自定义输入value对象
*/
public class MySqlMapperInputValue extends MySqlInputValueWritable {
private String uid; // 会员id
private String sid; // 会话id
private String url; // url
private long time; // 毫秒级时间戳
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.uid);
out.writeUTF(this.sid);
out.writeUTF(this.url);
out.writeLong(this.time);
}
@Override
public void readFields(DataInput in) throws IOException {
this.uid = in.readUTF();
this.sid = in.readUTF();
this.url = in.readUTF();
this.time = in.readLong();
}
@Override
public void readFieldsFromResultSet(ResultSet rs) throws SQLException {
this.uid = rs.getString("uid");
this.sid = rs.getString("sid");
this.url = rs.getString("url");
this.time = rs.getLong("time");
}
}
MysqlMapperOutputKey类
package com.beifeng.customer.mysql;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* 自定义mapper输出key
*/
public class MySqlMapperOutputKey implements WritableComparable<MySqlMapperOutputKey> {
private String url;
private String date; // yyyy-mm-dd这个格式的字符串
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.url);
out.writeUTF(this.date);
}
@Override
public void readFields(DataInput in) throws IOException {
this.url = in.readUTF();
this.date = in.readUTF();
}
@Override
public int compareTo(MySqlMapperOutputKey o) {
int tmp = this.url.compareTo(o.url);
if (tmp != 0) {
return tmp;
}
return this.date.compareTo(o.date);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((date == null) ? 0 : date.hashCode());
result = prime * result + ((url == null) ? 0 : url.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MySqlMapperOutputKey other = (MySqlMapperOutputKey) obj;
if (date == null) {
if (other.date != null)
return false;
} else if (!date.equals(other.date))
return false;
if (url == null) {
if (other.url != null)
return false;
} else if (!url.equals(other.url))
return false;
return true;
}
}
MySqlMapper类
package com.beifeng.customer.mysql;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 自定义数据输入/输出格式验证案例mapper类<br/>
* 主要作用是计算uv值
*/
public class MySqlMapper
extends Mapper<LongWritable, MySqlMapperInputValue, MySqlMapperOutputKey, MySqlMapperOutputValue> {
private MySqlMapperOutputKey outputKey = new MySqlMapperOutputKey();
private MySqlMapperOutputValue outputValue = new MySqlMapperOutputValue();
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
private Calendar calendar = Calendar.getInstance();
@Override
protected void map(LongWritable key, MySqlMapperInputValue value, Context context)
throws IOException, InterruptedException {
String url = value.getUrl();
if (url != null) {
calendar.setTimeInMillis(value.getTime()); // 设置毫秒级别的时间戳
this.outputKey.setUrl(url);
this.outputKey.setDate(this.sdf.format(calendar.getTime()));
this.outputValue.setUid(value.getUid());
this.outputValue.setSid(value.getSid());
context.write(this.outputKey, this.outputValue);
}
}
}
MySqlReducerOutputValue类
package com.beifeng.customer.mysql;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import com.beifeng.customer.outputformat.MySqlOutputValueWritable;
/**
* 自定义的reducer输出value数据类型
*
* @author gerry
*
*/
public class MySqlReducerOutputValue extends MySqlOutputValueWritable {
private String url;
private String date;
private int uv;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public int getUv() {
return uv;
}
public void setUv(int uv) {
this.uv = uv;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.url);
out.writeUTF(this.date);
out.writeInt(this.uv);
}
@Override
public void readFields(DataInput in) throws IOException {
this.url = in.readUTF();
this.date = in.readUTF();
this.uv = in.readInt();
}
@Override
public String fetchInsertOrUpdateSql() {
// TODO:可以将该sql语句改为,插入or更新语句
return "INSERT INTO stats_uv(url,date,uv) VALUES(?,?,?)";
}
@Override
public void setPreparedStatementParameters(PreparedStatement pstmt) throws SQLException {
pstmt.setString(1, this.url);
pstmt.setString(2, this.date);
pstmt.setInt(3, this.uv);
}
}
MysqlReducer类
package com.beifeng.customer.mysql;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 计算uv的reducer类
*/
public class MySqlReducer
extends Reducer<MySqlMapperOutputKey, MySqlMapperOutputValue, NullWritable, MySqlReducerOutputValue> {
private MySqlReducerOutputValue outputValue = new MySqlReducerOutputValue();
@Override
protected void reduce(MySqlMapperOutputKey key, Iterable<MySqlMapperOutputValue> values, Context context)
throws IOException, InterruptedException {
Set<String> set = new HashSet<String>();
for (MySqlMapperOutputValue value : values) {
set.add(value.getSid());
}
int uv = set.size();
this.outputValue.setUrl(key.getUrl());
this.outputValue.setDate(key.getDate());
this.outputValue.setUv(uv);
context.write(NullWritable.get(), this.outputValue);
}
}
MysqlRunner类
package com.beifeng.customer.mysql;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.beifeng.customer.inputformat.MySqlInputFormat;
import com.beifeng.customer.inputformat.MySqlInputValueWritable;
import com.beifeng.customer.outputformat.MySqlOutputFormat;
public class MySqlRunner implements Tool {
private Configuration conf = new Configuration();
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.conf.set("fs.defaultFS", "hdfs://192.168.211.137:8020");
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
// 创建job
Job job = Job.getInstance(conf, "test-format");
job.setJarByClass(MySqlRunner.class);
conf = job.getConfiguration(); // 后面可以直接修改conf,那么同时会反映到job中去。
// job设置inputformat
conf.set(MySqlInputFormat.MYSQL_INPUT_DRIVER_KEY, "com.mysql.jdbc.Driver");
conf.set(MySqlInputFormat.MYSQL_INPUT_URL_KEY, "jdbc:mysql://localhost:3306/test");
conf.set(MySqlInputFormat.MYSQL_INPUT_USERNAME_KEY, "root");
conf.set(MySqlInputFormat.MYSQL_INPUT_PASSWORD_KEY, "123456");
conf.set(MySqlInputFormat.MYSQL_SELECT_COUNT_SQL_KEY, "select count(1) from event_logs");
conf.set(MySqlInputFormat.MYSQL_SELECT_RECORD_SQL_KEY, "select uid,sid,url,time from event_logs");
conf.setLong(MySqlInputFormat.MYSQL_INPUT_SPLIT_PRE_RECORD_COUNT_KEY, 4L);
conf.setClass(MySqlInputFormat.MYSQL_OUTPUT_VALUE_CLASS_KEY, MySqlMapperInputValue.class, MySqlInputValueWritable.class);
job.setInputFormatClass(MySqlInputFormat.class);
// 设置mapper
job.setMapperClass(MySqlMapper.class);
job.setMapOutputKeyClass(MySqlMapperOutputKey.class);
job.setMapOutputValueClass(MySqlMapperOutputValue.class);
// 设置reducer
job.setReducerClass(MySqlReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(MySqlReducerOutputValue.class);
// 设置outputform
conf.set(MySqlOutputFormat.MYSQL_OUTPUT_DRIVER_KEY, "com.mysql.jdbc.Driver");
conf.set(MySqlOutputFormat.MYSQL_OUTPUT_URL_KEY, "jdbc:mysql://localhost:3306/test");
conf.set(MySqlOutputFormat.MYSQL_OUTPUT_USERNAME_KEY, "root");
conf.set(MySqlOutputFormat.MYSQL_OUTPUT_PASSWORD_KEY, "123456");
conf.setInt(MySqlOutputFormat.MYSQL_OUTPUT_BATCH_SIZE_KEY, 10);
job.setOutputFormatClass(MySqlOutputFormat.class);
return job.waitForCompletion(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new MySqlRunner(), args);
}
}
运行结果截图: