欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hadoop(二)自定义输入输出格式

程序员文章站 2022-07-14 16:36:43
...

一、Mapreducer输入格式

     数据输入格式(InputFormat)用于描述MR作业的数据输入规范。MR框架依赖数据输入格式完成对数据文件进行分块(一个块就是一个mapper任务),以及提供从输入分块中将数据记录逐一读出、并转换为Map过程的输入键/值对等功能。
    *输入格式类为:org.apache.hadoop.mapreduce.InputFormat
    *块类为:org.apache.hadoop.mapreduce.InputSplit
    *块数据读取类为:org.apache.hadoop.mapreduce.RecordReader

   Hadoop提供了大量的内置数据输入格式,包括:CombineFileInputFormat(处理多个小文件,将多个文件合成一个split作为输入)、SequenceFileInputFormat(处理SequenceFile格式的文件)、SequenceFileAsTextInputFormat、NlineInputFormat(NLineInputFormat是Hadoop的一种非默认初始化的一种输入格式。不同与InputFormat默认初始化的LineInputFormat,这种分片方式是可以一次性按照指定的分片数进行InputSplit。所以在特定情况下,可以有效提升代码的运行效率)、FileInputFormat、TextInputFormat、KeyValueTextInputFormat等。最常用的是TextInputFormat和KeyValueTextInputFormat这两种。
    TextInputFormat是MR框架默认的数据读入格式,可以将文本文件分块逐行读入一遍Map节点处理。key为当前行在整个文本文件中的字节偏移量,value为当前行的内容。
    KeyValueTextInputFormat可以将一个按照<key,value>格式逐行保存的文本文件逐行读出,并自动解析为相对于的key和value。默认按照'\t'分割,有限设置key的值,如果没有value的值,那么就设置为empty。

 

二、Mapreducer输出格式

    数据输出格式(OutputFormat)用于描述MR作业的数据输出规范。MR框架依赖数据输出格式完成输出规范检查(如检查HDFS文件目录是否存在等),以及提供作业结果数据输出等功能。
    *输出格式类为:org.apache.hadoop.mapreduce.OutputFormat
    *数据写出类为:org.apache.hadoop.mapreduce.RecordWriter

     Hadoop提供了大量的内置数据输出格式,包括:MapFileOutputFormat、SequenceFileOutputFormat、SequenceFileAsBinaryOutputFormat、TextOutputFormat等。最常用的是TextOutputFormat。
    TextOutputFormat是MR框架默认的数据输出格式,可以将计算结果以key+"\t"+value的形式逐行输出到文本文件中,当key或者value有一个为NullWritable或者为null的时候,当前为空的值不进行输出,只输出不为空的值。对应的数据输出类为LineRecordWriter。分隔符有参数mapreduce.output.textoutputformat.separator指定

 

三、自定义输入输出格式的使用

    eg:读取MySql数据库中的日志原始数据表event_logs中的数据,计算url的唯一用户访问量,并将结果保存到MySql数据库stats_uv表中。
    event_logs表包含列:uid(用户id),url(当前页面url),time(时间),sid(会话id); stats_uv表包含列:url(页面url),date(日期),uv(uv值)。
    计算规则:同一个url在同一个会话中被用户多次访问,只算一次,如果在多个会话中被访问,那么每个会话算作一次。即假设用户1在会话1中访问url1五次,在会话2中访问url1三次,那么url1的uv值被设置为2。其实就是计算url和date固定的情况下,有多少个sid(去重后)。 

   实现步骤:

    1. 编写自定义输入输出格式类
    2. 编写mapper类
    3. 编写reducer类
    4. 编写执行入口runner类
    5. 测试
     注意:自定义InputSplit必须实现Writable接口

(1)自定义输入格式类

package com.beifeng.customer.inputformat;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;

/**
 * mysql输入的value类型,其实际应用中使用到的数据类型必须继承自Writable类
 */
public abstract class MySqlInputValueWritable implements Writable {
    /**
     * 从数据库返回连接中读取字段信息 
     * @param rs
     * @throws SQLException
     */
    public abstract void readFieldsFromResultSet(ResultSet rs) throws SQLException;
}
	package com.beifeng.customer.inputformat;
	
	import java.io.DataInput;
	import java.io.DataOutput;
	import java.io.IOException;
	import java.sql.Connection;
	import java.sql.DriverManager;
	import java.sql.PreparedStatement;
	import java.sql.ResultSet;
	import java.sql.SQLException;
	import java.sql.Statement;
	import java.util.ArrayList;
	import java.util.List;
	
	import org.apache.hadoop.conf.Configuration;
	import org.apache.hadoop.io.LongWritable;
	import org.apache.hadoop.io.Writable;
	import org.apache.hadoop.mapreduce.InputFormat;
	import org.apache.hadoop.mapreduce.InputSplit;
	import org.apache.hadoop.mapreduce.JobContext;
	import org.apache.hadoop.mapreduce.RecordReader;
	import org.apache.hadoop.mapreduce.TaskAttemptContext;
	import org.apache.hadoop.util.ReflectionUtils;
	
	/**
	 * 用户自定义的从mysql读取数据的InputFormat类<br/>
	 * 这里的K和V其实就是InputFormat输出到mapper类中的数据类型
	 * 
	 * @author gerry
	 *
	 */
	public class MySqlInputFormat<V extends MySqlInputValueWritable> extends InputFormat<LongWritable, V> {
	    public static final String MYSQL_INPUT_DRIVER_KEY = "mysql.input.driver";
	    public static final String MYSQL_INPUT_URL_KEY = "mysql.input.url";
	    public static final String MYSQL_INPUT_USERNAME_KEY = "mysql.input.username";
	    public static final String MYSQL_INPUT_PASSWORD_KEY = "mysql.input.password";
	    public static final String MYSQL_SELECT_COUNT_SQL_KEY = "mysql.select.count.sql";
	    public static final String MYSQL_INPUT_SPLIT_PRE_RECORD_COUNT_KEY = "mysql.input.split.pre.record.count";
	    public static final String MYSQL_SELECT_RECORD_SQL_KEY = "mysql.select.record.sql";
	    public static final String MYSQL_OUTPUT_VALUE_CLASS_KEY = "mysql.output.value.class";
	
	    @Override
	    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
	        // 该方法的作用就是返回数据分块,am根据分块数量决定map task的数量
	        Configuration conf = context.getConfiguration();
	        Connection conn = null;
	        PreparedStatement pstmt = null;
	        ResultSet rs = null;
	        long recordCount = 0; // 记录数量
	
	        try {
	            String sql = conf.get(MYSQL_SELECT_COUNT_SQL_KEY);
	            conn = this.getConnection(conf); // 获取数据库连接
	            pstmt = conn.prepareStatement(sql);
	            rs = pstmt.executeQuery();
	            if (rs.next()) {
	                recordCount = rs.getLong(1); // 获取数量
	            }
	        } catch (SQLException | ClassNotFoundException e) {
	            throw new IOException("获取数据库的数量产生异常", e);
	        } finally {
	            this.closeConnection(conn, pstmt, rs);
	        }
	
	        // 开始处理生成input split
	        List<InputSplit> list = new ArrayList<InputSplit>();
	        long preRecordCountOfSplit = conf.getLong(MYSQL_INPUT_SPLIT_PRE_RECORD_COUNT_KEY, 100);
	        int numSplits = (int) (recordCount / preRecordCountOfSplit
	                + (recordCount % preRecordCountOfSplit == 0 ? 0 : 1));
	        for (int i = 0; i < numSplits; i++) {
	            if (i != numSplits - 1) {
	                list.add(new MySqlInputSplit(i * preRecordCountOfSplit, (i + 1) * preRecordCountOfSplit));
	            } else {
	                list.add(new MySqlInputSplit(i * preRecordCountOfSplit, recordCount));
	            }
	        }
	        return list;
	    }
	
	    @Override
	    public RecordReader<LongWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context)
	            throws IOException, InterruptedException {
	        // 返回具体处理分块数据的recordReader类对象
	        RecordReader<LongWritable, V> reader = new MySqlRecordReader();
	        reader.initialize(split, context);
	        return reader;
	    }
	
	    /**
	     * 关闭数据库连接
	     * 
	     * @param conn
	     * @param state
	     * @param rs
	     */
	    private void closeConnection(Connection conn, Statement state, ResultSet rs) {
	        this.close(rs);
	        this.close(state);
	        this.close(conn);
	    }
	
	    private void close(AutoCloseable obj) {
	        if (obj != null) {
	            try {
	                obj.close();
	            } catch (Exception e) {
	                // nothing
	            }
	        }
	    }
	
	    /**
	     * 根据配置信息获取数据库连接信息
	     * 
	     * @param conf
	     * @return
	     * @throws SQLException
	     * @throws ClassNotFoundException
	     */
	    private Connection getConnection(Configuration conf) throws SQLException, ClassNotFoundException {
	        String driver = conf.get(MYSQL_INPUT_DRIVER_KEY);
	        String url = conf.get(MYSQL_INPUT_URL_KEY);
	        String username = conf.get(MYSQL_INPUT_USERNAME_KEY);
	        String password = conf.get(MYSQL_INPUT_PASSWORD_KEY);
	
	        Class.forName(driver);
	        return DriverManager.getConnection(url, username, password);
	    }
	
	    /**
	     * 自定义读取数据的record类
	     * 
	     * @author gerry
	     *
	     * @param <V>
	     */
	    public class MySqlRecordReader extends RecordReader<LongWritable, V> {
	        private Connection conn;
	        private Configuration conf;
	        private MySqlInputSplit split;
	        private LongWritable key = null;
	        private V value = null;
	        private ResultSet result = null;
	        private long post = 0; // 位置信息
	
	        @Override
	        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
	            this.split = (MySqlInputSplit) split;
	            this.conf = context.getConfiguration();
	        }
	
	        /**
	         * 创建value对象
	         * 
	         * @return
	         */
	        @SuppressWarnings("unchecked")
	        private V createValue() {
	            Class<? extends MySqlInputValueWritable> clazz = this.conf.getClass(MYSQL_OUTPUT_VALUE_CLASS_KEY, NullMySqlInputValue.class, MySqlInputValueWritable.class);
	            return (V) ReflectionUtils.newInstance(clazz, this.conf);
	        }
	
	        /**
	         * 获取查询sql
	         * 
	         * @return
	         */
	        private String getQuerySql() {
	            String sql = this.conf.get(MYSQL_SELECT_RECORD_SQL_KEY);
	            try {
	                sql += " LIMIT " + this.split.getLength();
	                sql += " OFFSET " + this.split.getStart();
	            } catch (Exception e) {
	                // nothing
	            }
	            return sql;
	        }
	
	        @Override
	        public boolean nextKeyValue() throws IOException, InterruptedException {
	            if (this.key == null) {
	                this.key = new LongWritable();
	            }
	
	            if (this.value == null) {
	                this.value = this.createValue();
	            }
	
	            if (this.conn == null) {
	                try {
	                    this.conn = MySqlInputFormat.this.getConnection(this.conf);
	                } catch (ClassNotFoundException | SQLException e) {
	                    throw new IOException("获取数据库连接发生异常", e);
	                }
	            }
	
	            try {
	                if (result == null) {
	                    String sql = this.getQuerySql();
	                    PreparedStatement pstmt = this.conn.prepareStatement(sql);
	                    this.result = pstmt.executeQuery();
	                }
	
	                // 正式的进行处理操作
	                if (!this.result.next()) {
	                    return false; // 没有结果了
	                }
	
	                // 还有结果
	                this.value.readFieldsFromResultSet(this.result);
	                this.key.set(this.post);
	                this.post++;
	                return true;
	            } catch (SQLException e) {
	                throw new IOException("获取数据发生异常", e);
	            }
	        }
	
	        @Override
	        public LongWritable getCurrentKey() throws IOException, InterruptedException {
	            return this.key;
	        }
	
	        @Override
	        public V getCurrentValue() throws IOException, InterruptedException {
	            return this.value;
	        }
	
	        @Override
	        public float getProgress() throws IOException, InterruptedException {
	            return this.post / this.split.getLength();
	        }
	
	        @Override
	        public void close() throws IOException {
	            // 关闭数据库连接
	            MySqlInputFormat.this.closeConnection(this.conn, null, this.result);
	        }
	
	    }
	
	    /**
	     * 默认的空的数据输出对象
	     * 
	     * @author gerry
	     *
	     */
	    public class NullMySqlInputValue extends MySqlInputValueWritable {
	
	        @Override
	        public void write(DataOutput out) throws IOException {
	        }
	
	        @Override
	        public void readFields(DataInput in) throws IOException {
	        }
	
	        @Override
	        public void readFieldsFromResultSet(ResultSet rs) throws SQLException {
	        }
	
	    }
	
	    /**
	     * mysql输入的input split
	     * 
	     * @author gerry
	     *
	     */
	    public static class MySqlInputSplit extends InputSplit implements Writable{
	        private String[] emptyLocation = new String[0];
	        private long start; // 开始下标,包含
	        private long end; // 结束下标,不包含
	
	        public MySqlInputSplit() {
	            super();
	        }
	
	        public MySqlInputSplit(long start, long end) {
	            super();
	            this.start = start;
	            this.end = end;
	        }
	
	        public long getStart() {
	            return start;
	        }
	
	        public void setStart(long start) {
	            this.start = start;
	        }
	
	        public long getEnd() {
	            return end;
	        }
	
	        public void setEnd(long end) {
	            this.end = end;
	        }
	
	        @Override
	        public long getLength() throws IOException, InterruptedException {
	            return this.end - this.start;
	        }
	
	        @Override
	        public String[] getLocations() throws IOException, InterruptedException {
	            // 根据该值决定是否采用数据本地化策略
	            return this.emptyLocation;
	        }
	
	        @Override
	        public void write(DataOutput out) throws IOException {
	            out.writeLong(this.start);
	            out.writeLong(this.end);
	        }
	        @Override
	        public void readFields(DataInput in) throws IOException {
	            this.start = in.readLong();
	            this.end = in.readLong();
	        }
	    }
	}

(2)自定义输入格式类

package com.beifeng.customer.outputformat;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;

/**
 * mysql定义的输出value*父类
 * 
 * @author gerry
 *
 */
public abstract class MySqlOutputValueWritable implements Writable {
    /**
     * 获取数据库连接的sql语句
     * 
     * @return
     */
    public abstract String fetchInsertOrUpdateSql();

    /**
     * 设置数据输出参数
     * 
     * @param pstmt
     * @throws SQLException
     */
    public abstract void setPreparedStatementParameters(PreparedStatement pstmt) throws SQLException;
}
package com.beifeng.customer.outputformat;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

/**
 * 自定义OutputFormat类<br/>
 * 输出key/value键值对到mysql数据库中,要求key为NullWritable, value要求为MysqlOutputValueWritable
 */
public class MySqlOutputFormat<V extends MySqlOutputValueWritable> extends OutputFormat<NullWritable, V> {
    public static final String MYSQL_OUTPUT_DRIVER_KEY = "mysql.output.driver";
    public static final String MYSQL_OUTPUT_URL_KEY = "mysql.output.url";
    public static final String MYSQL_OUTPUT_USERNAME_KEY = "mysql.output.username";
    public static final String MYSQL_OUTPUT_PASSWORD_KEY = "mysql.output.password";
    public static final String MYSQL_OUTPUT_BATCH_SIZE_KEY = "mysql.output.batch.size";

    @Override
    public RecordWriter<NullWritable, V> getRecordWriter(TaskAttemptContext context)
            throws IOException, InterruptedException {
        return new MySqlRecordWriter(context.getConfiguration());
    }

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
        Connection conn = null;
        try {
            conn = this.getConnection(context.getConfiguration());
        } catch (ClassNotFoundException | SQLException e) {
            throw new IOException("无法连接Mysql数据库", e);
        } finally {
            this.closeSqlConnection(conn, null, null);
        }
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new FileOutputCommitter(null, context);
    }

    /**
     * 根据配置信息获取数据库连接
     * 
     * @param conf
     * @return
     * @throws ClassNotFoundException
     * @throws SQLException
     */
    private Connection getConnection(Configuration conf) throws ClassNotFoundException, SQLException {
        String driver = conf.get(MYSQL_OUTPUT_DRIVER_KEY);
        String url = conf.get(MYSQL_OUTPUT_URL_KEY);
        String username = conf.get(MYSQL_OUTPUT_USERNAME_KEY);
        String password = conf.get(MYSQL_OUTPUT_PASSWORD_KEY);

        Class.forName(driver);
        return DriverManager.getConnection(url, username, password);
    }

    /**
     * 关闭数据库连接
     * 
     * @param conn
     * @param state
     * @param rs
     */
    private void closeSqlConnection(Connection conn, Statement state, ResultSet rs) {
        this.closeAutoCloseable(rs);
        this.closeAutoCloseable(state);
        this.closeAutoCloseable(conn);
    }

    /**
     * 关闭连接
     * 
     * @param obj
     */
    private void closeAutoCloseable(AutoCloseable obj) {
        if (obj != null) {
            try {
                obj.close();
            } catch (Exception e) {
                // nothing
            }
        }
    }

    /**
     * 自定义的输出到Mysql的record writer类
     * 
     * @author gerry
     *
     */
    public class MySqlRecordWriter extends RecordWriter<NullWritable, V> {
        private Connection conn = null;
        private Map<String, PreparedStatement> pstmtCache = new HashMap<String, PreparedStatement>();
        private Map<String, Integer> batchCache = new HashMap<String, Integer>();
        private Configuration conf = null;
        private int batchSize = 100;

        public MySqlRecordWriter() {
        }

        public MySqlRecordWriter(Configuration conf) {
            this.conf = conf;
            this.batchSize = this.conf.getInt(MYSQL_OUTPUT_BATCH_SIZE_KEY, this.batchSize);
        }

        @Override
        public void write(NullWritable key, V value) throws IOException, InterruptedException {
            if (this.conn == null) {
                try {
                    this.conn = getConnection(this.conf);
                    this.conn.setAutoCommit(false); // 取消自动提交功能
                } catch (ClassNotFoundException | SQLException e) {
                    throw new IOException("获取数据库连接失败", e);
                }
            }

            String sql = value.fetchInsertOrUpdateSql(); // 获取连接sql
            PreparedStatement pstmt = this.pstmtCache.get(sql);
            if (pstmt == null) {
                try {
                    pstmt = this.conn.prepareStatement(sql);// 创建对象
                    this.pstmtCache.put(sql, pstmt);
                } catch (SQLException e) {
                    throw new IOException("创建PreparedStatement对象产生异常", e);
                }
            }
            Integer count = this.batchCache.get(sql);
            if (count == null) {
                count = 0;
            }

            try {
                value.setPreparedStatementParameters(pstmt);// 设置参数,设置往数据库写入的值
                pstmt.addBatch(); // 添加到batch,待后序批量执行
                count++; // 追加一个
                if (count > this.batchSize) {
                    pstmt.executeBatch(); // 进行批量执行
                    this.conn.commit(); // 提交
                    count = 0; // 重置
                }
                this.batchCache.put(sql, count); // 修改未处理的批量数
            } catch (SQLException e) {
                throw new IOException("往mysql写出数据发生异常", e);
            }
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (this.conn != null) {
                for (Map.Entry<String, PreparedStatement> entry : this.pstmtCache.entrySet()) {
                    try {
                        entry.getValue().executeBatch();
                        this.conn.commit();
                    } catch (SQLException e) {
                        throw new IOException("往mysql数据库中写数据发生异常", e);
                    }
                }
            }
        }

    }

}

(3)MysqlMapperInputValue类

package com.beifeng.customer.mysql;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;

import com.beifeng.customer.inputformat.MySqlInputValueWritable;

/**
 * 自定义输入value对象
 */
public class MySqlMapperInputValue extends MySqlInputValueWritable {
    private String uid; // 会员id
    private String sid; // 会话id
    private String url; // url
    private long time; // 毫秒级时间戳

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public String getSid() {
        return sid;
    }

    public void setSid(String sid) {
        this.sid = sid;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.uid);
        out.writeUTF(this.sid);
        out.writeUTF(this.url);
        out.writeLong(this.time);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.uid = in.readUTF();
        this.sid = in.readUTF();
        this.url = in.readUTF();
        this.time = in.readLong();
    }

    @Override
    public void readFieldsFromResultSet(ResultSet rs) throws SQLException {
        this.uid = rs.getString("uid");
        this.sid = rs.getString("sid");
        this.url = rs.getString("url");
        this.time = rs.getLong("time");
    }

}

MysqlMapperOutputValue类

package com.beifeng.customer.mysql;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;

import com.beifeng.customer.inputformat.MySqlInputValueWritable;

/**
 * 自定义输入value对象
 */
public class MySqlMapperInputValue extends MySqlInputValueWritable {
    private String uid; // 会员id
    private String sid; // 会话id
    private String url; // url
    private long time; // 毫秒级时间戳

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public String getSid() {
        return sid;
    }

    public void setSid(String sid) {
        this.sid = sid;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.uid);
        out.writeUTF(this.sid);
        out.writeUTF(this.url);
        out.writeLong(this.time);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.uid = in.readUTF();
        this.sid = in.readUTF();
        this.url = in.readUTF();
        this.time = in.readLong();
    }

    @Override
    public void readFieldsFromResultSet(ResultSet rs) throws SQLException {
        this.uid = rs.getString("uid");
        this.sid = rs.getString("sid");
        this.url = rs.getString("url");
        this.time = rs.getLong("time");
    }

}

MysqlMapperOutputKey类

package com.beifeng.customer.mysql;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * 自定义mapper输出key
 */
public class MySqlMapperOutputKey implements WritableComparable<MySqlMapperOutputKey> {
    private String url;
    private String date; // yyyy-mm-dd这个格式的字符串

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.url);
        out.writeUTF(this.date);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.url = in.readUTF();
        this.date = in.readUTF();
    }

    @Override
    public int compareTo(MySqlMapperOutputKey o) {
        int tmp = this.url.compareTo(o.url);
        if (tmp != 0) {
            return tmp;
        }
        return this.date.compareTo(o.date);
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((date == null) ? 0 : date.hashCode());
        result = prime * result + ((url == null) ? 0 : url.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        MySqlMapperOutputKey other = (MySqlMapperOutputKey) obj;
        if (date == null) {
            if (other.date != null)
                return false;
        } else if (!date.equals(other.date))
            return false;
        if (url == null) {
            if (other.url != null)
                return false;
        } else if (!url.equals(other.url))
            return false;
        return true;
    }
}

MySqlMapper类

package com.beifeng.customer.mysql;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 自定义数据输入/输出格式验证案例mapper类<br/>
 * 主要作用是计算uv值
 */
public class MySqlMapper
        extends Mapper<LongWritable, MySqlMapperInputValue, MySqlMapperOutputKey, MySqlMapperOutputValue> {
    private MySqlMapperOutputKey outputKey = new MySqlMapperOutputKey();
    private MySqlMapperOutputValue outputValue = new MySqlMapperOutputValue();
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    private Calendar calendar = Calendar.getInstance();

    @Override
    protected void map(LongWritable key, MySqlMapperInputValue value, Context context)
            throws IOException, InterruptedException {
        String url = value.getUrl();
        if (url != null) {
            calendar.setTimeInMillis(value.getTime()); // 设置毫秒级别的时间戳
            this.outputKey.setUrl(url);
            this.outputKey.setDate(this.sdf.format(calendar.getTime()));

            this.outputValue.setUid(value.getUid());
            this.outputValue.setSid(value.getSid());
            context.write(this.outputKey, this.outputValue);
        }
    }
}

MySqlReducerOutputValue类

package com.beifeng.customer.mysql;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import com.beifeng.customer.outputformat.MySqlOutputValueWritable;

/**
 * 自定义的reducer输出value数据类型
 * 
 * @author gerry
 *
 */
public class MySqlReducerOutputValue extends MySqlOutputValueWritable {
    private String url;
    private String date;
    private int uv;

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public int getUv() {
        return uv;
    }

    public void setUv(int uv) {
        this.uv = uv;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.url);
        out.writeUTF(this.date);
        out.writeInt(this.uv);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.url = in.readUTF();
        this.date = in.readUTF();
        this.uv = in.readInt();
    }

    @Override
    public String fetchInsertOrUpdateSql() {
        // TODO:可以将该sql语句改为,插入or更新语句
        return "INSERT INTO stats_uv(url,date,uv) VALUES(?,?,?)";
    }

    @Override
    public void setPreparedStatementParameters(PreparedStatement pstmt) throws SQLException {
        pstmt.setString(1, this.url);
        pstmt.setString(2, this.date);
        pstmt.setInt(3, this.uv);
    }

}

MysqlReducer类

package com.beifeng.customer.mysql;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 计算uv的reducer类
 */
public class MySqlReducer
        extends Reducer<MySqlMapperOutputKey, MySqlMapperOutputValue, NullWritable, MySqlReducerOutputValue> {
    private MySqlReducerOutputValue outputValue = new MySqlReducerOutputValue();

    @Override
    protected void reduce(MySqlMapperOutputKey key, Iterable<MySqlMapperOutputValue> values, Context context)
            throws IOException, InterruptedException {
        Set<String> set = new HashSet<String>();
        for (MySqlMapperOutputValue value : values) {
            set.add(value.getSid());
        }
        int uv = set.size();
        this.outputValue.setUrl(key.getUrl());
        this.outputValue.setDate(key.getDate());
        this.outputValue.setUv(uv);
        context.write(NullWritable.get(), this.outputValue);
    }
}

MysqlRunner类

package com.beifeng.customer.mysql;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.beifeng.customer.inputformat.MySqlInputFormat;
import com.beifeng.customer.inputformat.MySqlInputValueWritable;
import com.beifeng.customer.outputformat.MySqlOutputFormat;

public class MySqlRunner implements Tool {
    private Configuration conf = new Configuration();

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
        this.conf.set("fs.defaultFS", "hdfs://192.168.211.137:8020");
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();

        // 创建job
        Job job = Job.getInstance(conf, "test-format");
        job.setJarByClass(MySqlRunner.class);
        conf = job.getConfiguration(); // 后面可以直接修改conf,那么同时会反映到job中去。
        // job设置inputformat
        conf.set(MySqlInputFormat.MYSQL_INPUT_DRIVER_KEY, "com.mysql.jdbc.Driver");
        conf.set(MySqlInputFormat.MYSQL_INPUT_URL_KEY, "jdbc:mysql://localhost:3306/test");
        conf.set(MySqlInputFormat.MYSQL_INPUT_USERNAME_KEY, "root");
        conf.set(MySqlInputFormat.MYSQL_INPUT_PASSWORD_KEY, "123456");
        conf.set(MySqlInputFormat.MYSQL_SELECT_COUNT_SQL_KEY, "select count(1) from event_logs");
        conf.set(MySqlInputFormat.MYSQL_SELECT_RECORD_SQL_KEY, "select uid,sid,url,time from event_logs");
        conf.setLong(MySqlInputFormat.MYSQL_INPUT_SPLIT_PRE_RECORD_COUNT_KEY, 4L);
        conf.setClass(MySqlInputFormat.MYSQL_OUTPUT_VALUE_CLASS_KEY, MySqlMapperInputValue.class, MySqlInputValueWritable.class);
        job.setInputFormatClass(MySqlInputFormat.class);

        // 设置mapper
        job.setMapperClass(MySqlMapper.class);
        job.setMapOutputKeyClass(MySqlMapperOutputKey.class);
        job.setMapOutputValueClass(MySqlMapperOutputValue.class);
        // 设置reducer
        job.setReducerClass(MySqlReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(MySqlReducerOutputValue.class);

        // 设置outputform
        conf.set(MySqlOutputFormat.MYSQL_OUTPUT_DRIVER_KEY, "com.mysql.jdbc.Driver");
        conf.set(MySqlOutputFormat.MYSQL_OUTPUT_URL_KEY, "jdbc:mysql://localhost:3306/test");
        conf.set(MySqlOutputFormat.MYSQL_OUTPUT_USERNAME_KEY, "root");
        conf.set(MySqlOutputFormat.MYSQL_OUTPUT_PASSWORD_KEY, "123456");
        conf.setInt(MySqlOutputFormat.MYSQL_OUTPUT_BATCH_SIZE_KEY, 10);
        job.setOutputFormatClass(MySqlOutputFormat.class);
        return job.waitForCompletion(true) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new MySqlRunner(), args);
    }

}

运行结果截图:

hadoop(二)自定义输入输出格式hadoop(二)自定义输入输出格式