[Spring cloud 一步步实现广告系统] 16. 增量索引实现以及投送数据到MQ(kafka)
实现增量数据索引
上一节中,我们为实现增量索引的加载做了充足的准备,使用到mysql-binlog-connector-java
开源组件来实现mysql 的binlog监听,关于binlog的相关知识,大家可以自行网络查阅。或者可以mailto:magicianisaac@gmail.com
本节我们将根据binlog 的数据对象,来实现增量数据的处理,我们构建广告的增量数据,其实说白了就是为了在后期能把广告投放到索引服务,实现增量数据到增量索引的生成。let's code.
- 定义一个投递增量数据的接口(接收参数为我们上一节定义的binlog日志的转换对象)
/** * isender for 投递增量数据 方法定义接口 * * @author <a href="mailto:magicianisaac@gmail.com">isaac.zhang | 若初</a> */ public interface isender { void sender(mysqlrowdata rowdata); }
- 创建增量索引监听器
/** * incrementlistener for 增量数据实现监听 * * @author <a href="mailto:magicianisaac@gmail.com">isaac.zhang | 若初</a> * @since 2019/6/27 */ @slf4j @component public class incrementlistener implements ilistener { private final aggregationlistener aggregationlistener; @autowired public incrementlistener(aggregationlistener aggregationlistener) { this.aggregationlistener = aggregationlistener; } //根据名称选择要注入的投递方式 @resource(name = "indexsender") private isender sender; /** * 标注为 {@link postconstruct}, * 即表示在服务启动,bean完成初始化之后,立刻初始化 */ @override @postconstruct public void register() { log.info("incrementlistener register db and table info."); constant.table2db.foreach((tb, db) -> aggregationlistener.register(db, tb, this)); } @override public void onevent(binlogrowdata eventdata) { tabletemplate table = eventdata.gettabletemplate(); eventtype eventtype = eventdata.geteventtype(); //包装成最后需要投递的数据 mysqlrowdata rowdata = new mysqlrowdata(); rowdata.settablename(table.gettablename()); rowdata.setlevel(eventdata.gettabletemplate().getlevel()); //将eventtype转为operationtypeenum operationtypeenum operationtype = operationtypeenum.convert(eventtype); rowdata.setoperationtypeenum(operationtype); //获取模版中该操作对应的字段列表 list<string> fieldlist = table.getoptypefieldsetmap().get(operationtype); if (null == fieldlist) { log.warn("{} not support for {}.", operationtype, table.gettablename()); return; } for (map<string, string> aftermap : eventdata.getafter()) { map<string, string> _aftermap = new hashmap<>(); for (map.entry<string, string> entry : aftermap.entryset()) { string colname = entry.getkey(); string colvalue = entry.getvalue(); _aftermap.put(colname, colvalue); } rowdata.getfieldvaluemap().add(_aftermap); } sender.sender(rowdata); } }
开启binlog监听
- 首先来配置监听binlog的数据库连接信息
adconf: mysql: host: 127.0.0.1 port: 3306 username: root password: 12345678 binlogname: "" position: -1 # 从当前位置开始监听
编写配置类:
/** * binlogconfig for 定义监听binlog的配置信息 * * @author <a href="mailto:magicianisaac@gmail.com">isaac.zhang | 若初</a> */ @component @configurationproperties(prefix = "adconf.mysql") @data @allargsconstructor @noargsconstructor public class binlogconfig { private string host; private integer port; private string username; private string password; private string binlogname; private long position; }
在我们实现 监听binlog那节,我们实现了一个自定义client custombinlogclient
,需要实现binlog的监听,这个监听的客户端就必须是一个独立运行的线程,并且要在程序启动的时候进行监听,我们来实现运行当前client的方式,这里我们会使用到一个新的runnerorg.springframework.boot.commandlinerunner
,let's code.
@slf4j @component public class binlogrunner implements commandlinerunner { @autowired private custombinlogclient binlogclient; @override public void run(string... args) throws exception { log.info("binlogrunner is running..."); binlogclient.connect(); } }
增量数据投递
在binlog监听的过程中,我们看到针对于int, string 这类数据字段,mysql的记录是没有问题的,但是针对于时间类型,它被格式化成了字符串类型:fri jun 21 15:07:53 cst 2019
。
--------insert----------- writerowseventdata{tableid=91, includedcolumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[ [10, 11, ad unit test binlog, 1, 0, 1236.7655, thu jun 27 08:00:00 cst 2019, thu jun 27 08:00:00 cst 2019] --------update----------- updaterowseventdata{tableid=81, includedcolumnsbeforeupdate={0, 1, 2, 3, 4, 5}, includedcolumns={0, 1, 2, 3, 4, 5}, rows=[ {before=[10, isaac zhang, 2d3abb6f2434109a105170fb21d00453, 0, fri jun 21 15:07:53 cst 2019, fri jun 21 15:07:53 cst 2019], after=[10, isaac zhang, 2d3abb6f2434109a105170fb21d00453, 1, fri jun 21 15:07:53 cst 2019, fri jun 21 15:07:53 cst 2019]}
对于这个时间格式,我们需要关注2点信息:
- cst,这个时间格式会比我们的时间+ 8h(中国标准时间 china standard time ut+8:00)
- 需要对这个日期进行解释处理
当然,我们也可以通过设置mysql的日期格式来改变该行为,在此,我们通过编码来解析该时间格式:
/** * thu jun 27 08:00:00 cst 2019 */ public static date parsebinlogstring2date(string datestring) { try { dateformat dateformat = new simpledateformat( "eee mmm dd hh:mm:ss zzz yyyy", locale.us ); return dateutils.addhours(dateformat.parse(datestring), -8); } catch (parseexception ex) { log.error("parsestring2date error:{}", datestring); return null; } }
因为我们在定义索引的时候,是根据表之间的层级关系(level)来设定的,根据代码规范,不允许出现magic number, 因此我们定义一个数据层级枚举,来表达数据层级。
/** * addatalevel for 广告数据层级 * * @author <a href="mailto:magicianisaac@gmail.com">isaac.zhang | 若初</a> */ @getter public enum addatalevel { level2("2", "level 2"), level3("3", "level 3"), level4("4", "level 4"); private string level; private string desc; addatalevel(string level, string desc) { this.level = level; this.desc = desc; } }
实现数据投递
因为增量数据可以投递到不同的位置以及用途,我们之前实现了一个投递接口com.sxzhongf.ad.sender.isender
,接下来我们实现一个投递类:
@slf4j @component("indexsender") public class indexsender implements isender { /** * 根据广告级别,投递binlog数据 */ @override public void sender(mysqlrowdata rowdata) { if (addatalevel.level2.getlevel().equals(rowdata.getlevel())) { level2rowdata(rowdata); } else if (addatalevel.level3.getlevel().equals(rowdata.getlevel())) { level3rowdata(rowdata); } else if (addatalevel.level4.getlevel().equals(rowdata.getlevel())) { level4rowdata(rowdata); } else { log.error("binlog mysqlrowdata error: {}", json.tojsonstring(rowdata)); } } private void level2rowdata(mysqlrowdata rowdata) { if (rowdata.gettablename().equals(constant.ad_plan_table_info.table_name)) { list<adplantable> plantables = new arraylist<>(); for (map<string, string> fieldvaluemap : rowdata.getfieldvaluemap()) { adplantable plantable = new adplantable(); //map的第二种循环方式 fieldvaluemap.foreach((k, v) -> { switch (k) { case constant.ad_plan_table_info.column_plan_id: plantable.setplanid(long.valueof(v)); break; case constant.ad_plan_table_info.column_user_id: plantable.setuserid(long.valueof(v)); break; case constant.ad_plan_table_info.column_plan_status: plantable.setplanstatus(integer.valueof(v)); break; case constant.ad_plan_table_info.column_start_date: plantable.setstartdate(commonutils.parsebinlogstring2date(v)); break; case constant.ad_plan_table_info.column_end_date: plantable.setenddate(commonutils.parsebinlogstring2date(v)); break; } }); plantables.add(plantable); } //投递推广计划 plantables.foreach(p -> adleveldatahandler.handlelevel2index(p, rowdata.getoperationtypeenum())); } else if (rowdata.gettablename().equals(constant.ad_creative_table_info.table_name)) { list<adcreativetable> creativetables = new linkedlist<>(); rowdata.getfieldvaluemap().foreach(aftermap -> { adcreativetable creativetable = new adcreativetable(); aftermap.foreach((k, v) -> { switch (k) { case constant.ad_creative_table_info.column_creative_id: creativetable.setadid(long.valueof(v)); break; case constant.ad_creative_table_info.column_type: creativetable.settype(integer.valueof(v)); break; case constant.ad_creative_table_info.column_material_type: creativetable.setmaterialtype(integer.valueof(v)); break; case constant.ad_creative_table_info.column_height: creativetable.setheight(integer.valueof(v)); break; case constant.ad_creative_table_info.column_width: creativetable.setwidth(integer.valueof(v)); break; case constant.ad_creative_table_info.column_audit_status: creativetable.setauditstatus(integer.valueof(v)); break; case constant.ad_creative_table_info.column_url: creativetable.setadurl(v); break; } }); creativetables.add(creativetable); }); //投递广告创意 creativetables.foreach(c -> adleveldatahandler.handlelevel2index(c, rowdata.getoperationtypeenum())); } } private void level3rowdata(mysqlrowdata rowdata) { ... } /** * 处理4级广告 */ private void level4rowdata(mysqlrowdata rowdata) { ... } }
投放增量数据到mq(kafka)
为了我们的数据投放更加灵活,方便数据统计,分析等系统的需求,我们来实现一个投放到消息中的接口,其他服务可以订阅当前mq 的topic来实现数据订阅。
配置文件中配置topic adconf: kafka: topic: ad-search-mysql-data -------------------------------------- /** * kafkasender for 投递binlog增量数据到kafka消息队列 * * @author <a href="mailto:magicianisaac@gmail.com">isaac.zhang | 若初</a> * @since 2019/7/1 */ @component(value = "kafkasender") public class kafkasender implements isender { @value("${adconf.kafka.topic}") private string topic; @autowired private kafkatemplate kafkatemplate; /** * 发送数据到kafka队列 */ @override public void sender(mysqlrowdata rowdata) { kafkatemplate.send( topic, json.tojsonstring(rowdata) ); } /** * 测试消费kafka消息 */ @kafkalistener(topics = {"ad-search-mysql-data"}, groupid = "ad-search") public void processmysqlrowdata(consumerrecord<?, ?> record) { optional<?> kafkamsg = optional.ofnullable(record.value()); if (kafkamsg.ispresent()) { object message = kafkamsg.get(); mysqlrowdata rowdata = json.parseobject( message.tostring(), mysqlrowdata.class ); system.out.println("kafka process mysqlrowdata: " + json.tojsonstring(rowdata)); //sender.sender(); } } }
上一篇: 白椰子正确食用方法
下一篇: 男性冬季养生小常识 常按七个穴位强身健体