KakfaSpout自定义scheme
程序员文章站
2023-03-27 23:34:36
一.Mapper和Scheme scheme:将kafka传到spout里的数据格式进行转化. record->tuple mapper:将storm传到kafka的数据格式进行转化.tuple->record 二.为什么要自定义消息格式 在很多需求里, 从kafka传递过来的数据并不是单纯的str ......
一.mapper和scheme
scheme:将kafka传到spout里的数据格式进行转化. record->tuple
mapper:将storm传到kafka的数据格式进行转化.tuple->record
二.为什么要自定义消息格式
在很多需求里, 从kafka传递过来的数据并不是单纯的string, 可以是任意对象.当我们需要根据对象的某个属性进行分组时, 默认的new fields("bytes")就不太适合.但是消息传递的形式还是string.我们可以在传入kafka之前使用fastjson的转化方法将实体对象转化成jsonstring.
到了scheme在转换成实体类对象.
三.怎么更改scheme
构建kafkaspout时我们要配置很多参数, 可以看一下kafkaconfig代码.
public final brokerhosts hosts; //用以获取kafka broker和partition的信息 public final string topic;//从哪个topic读取消息 public final string clientid; // simpleconsumer所用的client id public int fetchsizebytes = 1024 * 1024; //发给kafka的每个fetchrequest中,用此指定想要的response中总的消息的大小 public int sockettimeoutms = 10000;//与kafka broker的连接的socket超时时间 public int fetchmaxwait = 10000; //当服务器没有新消息时,消费者会等待这些时间 public int buffersizebytes = 1024 * 1024;//simpleconsumer所使用的socketchannel的读缓冲区大小 public multischeme scheme = new rawmultischeme();//从kafka中取出的byte[],该如何反序列化 public boolean forcefromstart = false;//是否强制从kafka中offset最小的开始读起 public long startoffsettime = kafka.api.offsetrequest.earliesttime();//从何时的offset时间开始读,默认为最旧的offset public long maxoffsetbehind = long.max_value;//kafkaspout读取的进度与目标进度相差多少,相差太多,spout会丢弃中间的消息 public boolean usestartoffsettimeifoffsetoutofrange = true;//如果所请求的offset对应的消息在kafka中不存在,是否使startoffsettime
可以看到, 所有的配置项都是public, 所以当我们实例化一个spoutconfig之后, 可以通过直接引用的方式进行更改属性值.
我们可以看构建kafkaspout的代码:
zkhosts zkhosts = new zkhosts(zkhost); // zk对地址有唯一性标识 string zkroot = "/" + topic; string id = uuid.randomuuid().tostring(); // 构建spoutconfig spoutconfig spoutconf = new spoutconfig(zkhosts, topic, zkroot, id); spoutconf.scheme = new schemeasmultischeme(new sensordatascheme()); spoutconf.startoffsettime = offsetrequest.latesttime(); kafkaspout kafkaspout = new kafkaspout(spoutconf);
四.怎么自定义scheme
我们有这样一个需求,有一个实体类如下:
public class sensordata implements serializable { // 设备id; private string deviceid; // 型号id private string dmpropertiesid; // 通道名称; private string channelname; // 采集的温度值 private double devicetemp; // 采集的时间; private date date; }
数据进来kafka到storm消费时, 根据deviceid进行分组.当然, 我们在写入的时候对数据json化, 使用fastjson把实体对象变成字符串, 而不是直接传实体类对象进入kafka(亲测会报错, 无法进行转换).最终数据会在scheme的declare的方法里处理.
scheme接口:
public interface scheme extends serializable { list<object> deserialize(bytebuffer ser); public fields getoutputfields(); }
可以看到有两个需要实现的方法, 一个是传过来的byte数据进行转化, 一个是传入下一层bolt的时候以什么字段分组. 跟踪kafka的源码我们可以看到, 他的declare方法最终会调用scheme的方法来确认字段名.
看一下scheme的整体代码:
package dm.scheme; import java.nio.bytebuffer; import java.nio.charset.charset; import java.nio.charset.standardcharsets; import java.util.list; import org.apache.storm.kafka.stringscheme; import org.apache.storm.spout.scheme; import org.apache.storm.tuple.fields; import org.apache.storm.tuple.values; import org.apache.storm.utils.utils; import com.alibaba.fastjson.json; import dm.entity.sensordata; /** * * kafkarecord 映射 tuple 转化类; * * @author chenwen * */ public class sensordatascheme implements scheme { /** * */ private static final long serialversionuid = 1l; private static final charset utf8_charset = standardcharsets.utf_8; /** * * 反序列化 */ @override public list<object> deserialize(bytebuffer bytebuffer) { // 将kafka消息转化成jsonstring string sensordatajson = stringscheme.deserializestring(bytebuffer); sensordata sensordata = json.parseobject(sensordatajson, sensordata.class); string id = sensordata.getdeviceid(); return new values(id, sensordata); } public static string deserializestring(bytebuffer bytebuffer) { if (bytebuffer.hasarray()) { int base = bytebuffer.arrayoffset(); return new string(bytebuffer.array(), base + bytebuffer.position(), bytebuffer.remaining()); } else { return new string(utils.tobytearray(bytebuffer), utf8_charset); } } @override public fields getoutputfields() { return new fields("deviceid", "sensordata"); // 返回字段及其名称; } }