Kafka的Consumer主要参数
3.2.2.1 bootstrap.servers
同生产者bootstrap.servers参数。
3.2.2.2 group.id
该参数指定的是consumer group的名字,它能够唯一标识一个consumer group。通常设置一个有业务意义的名字就可以了。
3.2.2.3 key.deserializer
consumer代码从broker端获取的任何消息都是字节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式。这个参数就是为消息的key做解序列化的。该参数必须是实现org.apache.kafka.common.serialization.Deserializer接口的Java类的全限定名称。Kafka默认为绝大部分的初始类型(primitive type)提供现成的解序列化器。StringDeserializer会将接收到的字节数组转换成UTF-8编码的字符串。consumer支持自定义的deserializer。不论consumer消费的消息是否指定了key,consumer都必须要设置该参数,否则程序会抛出ConfigException。
3.2.2.4 value.deserializer
与value.deserializer类似,该参数用来对消息体(即消息value)进行解序列化,从而把消息“还原”会原来的对象类型。
3.2.2.5 session.timeout.ms
session.timeout.ms是consumer group检测组内成员发送崩溃的时间。这个参数还有另外一重含义:consumer消息处理逻辑的 最大时间,倘若consumer两次poll之间的间隔超过了该参数所设置的阀值,那么coordinator(消息组协调者)就会认为这个consumer已经追不上组内其他成员的消费进度了,因此会将该consumer踢出组,该consumer负责的分区也会被分配给其他consumer。在最好的情况下,这会导致不必要的rebalance,因为consumer需要重新加入group。更糟的是,对于那些在被踢出group后处理的消息,consumer都无法提交位移,这就意味着这些消息在rebalance之后会被重新消费一遍。如果一条消息或一组消息总是需要花费很长的时间处理,那么consumer甚至无法执行任何消费,除非用户重新调整参数。
0.10.1.0版本对该参数的含义进行了拆分。在该版本及以后的版本中,session.timeout.ms参数被明确为“coordinator检测失败的时间”。实际使用中可以为该参数设置一个较小的值使coordinator能够更快第检查consumer崩溃的情况,从而更快地开启rebalance,避免造成更大的消息滞后(consumer lag),目前该参数的默认值是10秒。
3.2.2.6 max.poll.interval.ms
这个参数就是用来设置消息处理逻辑的最大时间的。通过将该参数设置成稍大于实际的逻辑处理时间再结合较低的session.timeout.ms参数值,consumer group既实现了快速的consumer崩溃检测,也保证了复杂的事件处理逻辑不会造成不必要的rebalance。
3.2.2.7 auto.offset.reset
指定了无位移信息或位移越界(即consumer要消费的消息的位移不在当前消息日志的合理区间范围)时Kafka的应对策略。
目前该参数有如下3个可能的取值:
earliest: 指定从最早的位移开始消费。注意这里最早的位移不一定就是0。
latest: 指定从最新处位移开始消费。
none:指定如果未发现位移信息或位移越界,则抛出异常。该值在真实业务场景中使用甚少。
3.2.2.8 enable.auto.commit
该参数指定consumer是否自动提交位移。若设置为true,则consumer在后台自动提交位移。否则,用户需要手动提交位移。对于有较强“精确处理一次”语义需求的用户来说,最好将该参数设置为false,由用户自行处理位移提交问题。
3.2.2.9 fetch.max.bytes
它指定了consumer端单次获取数据的最大字节数。若实际业务消息很大,则必须要设置该参数为一个较大的值,否则consumer将无法消费这些消息。
3.2.2.10 max.poll.records
该参数控制单次poll调用返回的最大消息数。比较极端的做法是设置该参数为1,那么每次poll只会返回1条消息。如果用户发现consumer端的瓶颈在poll速度太慢,可以适当地增加该参数的值。如果用户的消息处理逻辑很清理,默认的500条消息通常不能满足实际的消息处理速度。
3.2.2.11 heartbeat.interval.ms
当coordinator决定开启新一轮rebalance时,它会将这个决定以REBALANCE_IN_PROGRESS异常的形式“塞进”consumer心跳请求的response中,这样其他成员拿到response后才能知道它需要重新加入group。显然这个过程越快越好,而heartbeat.interval.ms就是用来做这件事情的。
比较推荐的做法是设置一个比较低的值,让group下的其他consumer成员能够更快地感知新一轮rebalance开启了。注意,该值必须小于session.timeout.ms!毕竟如果consumer在session.timeout.ms这段时间内都不发送心跳,coordinator就会认为它已经dead,因此也就没有必要让它知晓coordinator的决定了。
3.2.2.12 connections.max.idle.ms
Kafka会定期地关闭空闲Socket连接导致下次consumer处理请求时需要重新创建连向broker的Socket连接。当前默认值是9分钟,如果用户实际环境中不在乎这些Socket资源开销,比较推荐设置该参数值为-1,既不要关闭这些空闲连接。
上一篇: MapReduce二次排序
下一篇: Kafka多节点环境安装