欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

KakfaSpout自定义scheme

程序员文章站 2023-03-27 23:34:36
一.Mapper和Scheme scheme:将kafka传到spout里的数据格式进行转化. record->tuple mapper:将storm传到kafka的数据格式进行转化.tuple->record 二.为什么要自定义消息格式 在很多需求里, 从kafka传递过来的数据并不是单纯的str ......

一.mapper和scheme

scheme:将kafka传到spout里的数据格式进行转化. record->tuple

mapper:将storm传到kafka的数据格式进行转化.tuple->record

二.为什么要自定义消息格式

在很多需求里, 从kafka传递过来的数据并不是单纯的string, 可以是任意对象.当我们需要根据对象的某个属性进行分组时, 默认的new fields("bytes")就不太适合.但是消息传递的形式还是string.我们可以在传入kafka之前使用fastjson的转化方法将实体对象转化成jsonstring.

到了scheme在转换成实体类对象.

三.怎么更改scheme

构建kafkaspout时我们要配置很多参数, 可以看一下kafkaconfig代码.

public final brokerhosts hosts; //用以获取kafka broker和partition的信息
public final string topic;//从哪个topic读取消息
public final string clientid; // simpleconsumer所用的client id
public int fetchsizebytes = 1024 * 1024; //发给kafka的每个fetchrequest中,用此指定想要的response中总的消息的大小
public int sockettimeoutms = 10000;//与kafka broker的连接的socket超时时间
public int fetchmaxwait = 10000;   //当服务器没有新消息时,消费者会等待这些时间
public int buffersizebytes = 1024 * 1024;//simpleconsumer所使用的socketchannel的读缓冲区大小
public multischeme scheme = new rawmultischeme();//从kafka中取出的byte[],该如何反序列化
public boolean forcefromstart = false;//是否强制从kafka中offset最小的开始读起
public long startoffsettime = kafka.api.offsetrequest.earliesttime();//从何时的offset时间开始读,默认为最旧的offset
public long maxoffsetbehind = long.max_value;//kafkaspout读取的进度与目标进度相差多少,相差太多,spout会丢弃中间的消息
public boolean usestartoffsettimeifoffsetoutofrange = true;//如果所请求的offset对应的消息在kafka中不存在,是否使startoffsettime 

 

可以看到, 所有的配置项都是public, 所以当我们实例化一个spoutconfig之后, 可以通过直接引用的方式进行更改属性值.

我们可以看构建kafkaspout的代码:

zkhosts zkhosts = new zkhosts(zkhost);
// zk对地址有唯一性标识
string zkroot = "/" + topic;
string id = uuid.randomuuid().tostring();
// 构建spoutconfig
spoutconfig spoutconf = new spoutconfig(zkhosts, topic, zkroot, id);
spoutconf.scheme = new schemeasmultischeme(new sensordatascheme());
spoutconf.startoffsettime = offsetrequest.latesttime();
kafkaspout kafkaspout = new kafkaspout(spoutconf);

四.怎么自定义scheme

我们有这样一个需求,有一个实体类如下:

public class sensordata implements serializable {
    // 设备id;
    private string deviceid;
    // 型号id
    private string dmpropertiesid; 
    // 通道名称;
    private string channelname;
    // 采集的温度值
    private double devicetemp;
    // 采集的时间;
    private date date;
}

数据进来kafka到storm消费时, 根据deviceid进行分组.当然, 我们在写入的时候对数据json化, 使用fastjson把实体对象变成字符串, 而不是直接传实体类对象进入kafka(亲测会报错, 无法进行转换).最终数据会在scheme的declare的方法里处理.

scheme接口:

public interface scheme extends serializable {
    list<object> deserialize(bytebuffer ser);
    public fields getoutputfields();
}

可以看到有两个需要实现的方法, 一个是传过来的byte数据进行转化, 一个是传入下一层bolt的时候以什么字段分组. 跟踪kafka的源码我们可以看到, 他的declare方法最终会调用scheme的方法来确认字段名.

看一下scheme的整体代码:

package dm.scheme;

import java.nio.bytebuffer;
import java.nio.charset.charset;
import java.nio.charset.standardcharsets;
import java.util.list;

import org.apache.storm.kafka.stringscheme;
import org.apache.storm.spout.scheme;
import org.apache.storm.tuple.fields;
import org.apache.storm.tuple.values;
import org.apache.storm.utils.utils;

import com.alibaba.fastjson.json;

import dm.entity.sensordata;
/**
 * 
 * kafkarecord 映射 tuple 转化类;
 * 
 * @author chenwen
 *
 */
public class sensordatascheme implements scheme {
    /**
     * 
     */
    private static final long serialversionuid = 1l;
    private static final charset utf8_charset = standardcharsets.utf_8;

    /**
     * 
     * 反序列化
     */
    @override
    public list<object> deserialize(bytebuffer bytebuffer) {
        // 将kafka消息转化成jsonstring
        string sensordatajson = stringscheme.deserializestring(bytebuffer);
        sensordata sensordata = json.parseobject(sensordatajson, sensordata.class);
        string id = sensordata.getdeviceid();
        return new values(id, sensordata);
    }
    public static string deserializestring(bytebuffer bytebuffer) {
        if (bytebuffer.hasarray()) {
            int base = bytebuffer.arrayoffset();
            return new string(bytebuffer.array(), base + bytebuffer.position(), bytebuffer.remaining());
        } else {
            return new string(utils.tobytearray(bytebuffer), utf8_charset);
        }
    }
    @override
    public fields getoutputfields() {
        return new fields("deviceid", "sensordata"); // 返回字段及其名称;
    }
}