欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

c语言使用librdkafka库实现kafka生产和消费的实例

程序员文章站 2022-03-23 19:45:34
使用librdkafka库实现kafka生产和消费的实例 关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行k...

使用librdkafka库实现kafka生产和消费的实例

关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的生产、消费

一、producer

librdkafka进行kafka生产操作的大致步骤如下:

1、创建kafka配置

rd_kafka_conf_t *rd_kafka_conf_new (void)

2、配置kafka各项参数

rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,

const char *name,

const char *value,

char *errstr, size_t errstr_size)

3、设置发送回调函数

void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,

void (*dr_msg_cb) (rd_kafka_t *rk,

const rd_kafka_message_t *

rkmessage,

void *opaque))

4、创建producer实例

rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)

5、实例化topic

rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)

6、异步调用将消息发送到指定的topic

int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,

int msgflags,

void *payload, size_t len,

const void *key, size_t keylen,

void *msg_opaque)

7、阻塞等待消息发送完成

int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)

8、等待完成producer请求完成

rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)

9、销毁topic

void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)

10、销毁producer实例

void rd_kafka_destroy (rd_kafka_t *rk)

完整代码如下my_producer.c:

#include

#include

#include

#include "../src/rdkafka.h"

static int run = 1;

static void stop(int sig){

run = 0;

fclose(stdin);

}

/*

每条消息调用一次该回调函数,说明消息是传递成功(rkmessage->err == rd_kafka_resp_err_no_error)

还是传递失败(rkmessage->err != rd_kafka_resp_err_no_error)

该回调函数由rd_kafka_poll()触发,在应用程序的线程上执行

*/

static void dr_msg_cb(rd_kafka_t *rk,

const rd_kafka_message_t *rkmessage, void *opaque){

if(rkmessage->err)

fprintf(stderr, "%% message delivery failed: %s\n",

rd_kafka_err2str(rkmessage->err));

else

fprintf(stderr,

"%% message delivered (%zd bytes, "

"partition %"prid32")\n",

rkmessage->len, rkmessage->partition);

/* rkmessage被librdkafka自动销毁*/

}

int main(int argc, char **argv){

rd_kafka_t *rk; /*producer instance handle*/

rd_kafka_topic_t *rkt; /*topic对象*/

rd_kafka_conf_t *conf; /*临时配置对象*/

char errstr[512];

char buf[512];

const char *brokers;

const char *topic;

if(argc != 3){

fprintf(stderr, "%% usage: %s \n", argv[0]);

return 1;

}

brokers = argv[1];

topic = argv[2];

/* 创建一个kafka配置占位 */

conf = rd_kafka_conf_new();

/*创建broker集群*/

if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,

sizeof(errstr)) != rd_kafka_conf_ok){

fprintf(stderr, "%s\n", errstr);

return 1;

}

/*设置发送报告回调函数,rd_kafka_produce()接收的每条消息都会调用一次该回调函数

*应用程序需要定期调用rd_kafka_poll()来服务排队的发送报告回调函数*/

rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

/*创建producer实例

rd_kafka_new()获取conf对象的所有权,应用程序在此调用之后不得再次引用它*/

rk = rd_kafka_new(rd_kafka_producer, conf, errstr, sizeof(errstr));

if(!rk){

fprintf(stderr, "%% failed to create new producer:%s\n", errstr);

return 1;

}

/*实例化一个或多个topics(`rd_kafka_topic_t`)来提供生产或消费,topic

对象保存topic特定的配置,并在内部填充所有可用分区和leader brokers,*/

rkt = rd_kafka_topic_new(rk, topic, null);

if (!rkt){

fprintf(stderr, "%% failed to create topic object: %s\n",

rd_kafka_err2str(rd_kafka_last_error()));

rd_kafka_destroy(rk);

return 1;

}

/*用于中断的信号*/

signal(sigint, stop);

fprintf(stderr,

"%% type some text and hit enter to produce message\n"

"%% or just hit enter to only serve delivery reports\n"

"%% press ctrl-c or ctrl-d to exit\n");

while(run && fgets(buf, sizeof(buf), stdin)){

size_t len = strlen(buf);

if(buf[len-1] == '\n')

buf[--len] = '\0';

if(len == 0){

/*轮询用于事件的kafka handle,

事件将导致应用程序提供的回调函数被调用

第二个参数是最大阻塞时间,如果设为0,将会是非阻塞的调用*/

rd_kafka_poll(rk, 0);

continue;

}

retry:

/*send/produce message.

这是一个异步调用,在成功的情况下,只会将消息排入内部producer队列,

对broker的实际传递尝试由后台线程处理,之前注册的传递回调函数(dr_msg_cb)

用于在消息传递成功或失败时向应用程序发回信号*/

if (rd_kafka_produce(

/* topic object */

rkt,

/*使用内置的分区来选择分区*/

rd_kafka_partition_ua,

/*生成payload的副本*/

rd_kafka_msg_f_copy,

/*消息体和长度*/

buf, len,

/*可选键及其长度*/

null, 0,

null) == -1){

fprintf(stderr,

"%% failed to produce to topic %s: %s\n",

rd_kafka_topic_name(rkt),

rd_kafka_err2str(rd_kafka_last_error()));

if (rd_kafka_last_error() == rd_kafka_resp_err__queue_full){

/*如果内部队列满,等待消息传输完成并retry,

内部队列表示要发送的消息和已发送或失败的消息,

内部队列受限于queue.buffering.max.messages配置项*/

rd_kafka_poll(rk, 1000);

goto retry;

}

}else{

fprintf(stderr, "%% enqueued message (%zd bytes) for topic %s\n",

len, rd_kafka_topic_name(rkt));

}

/*producer应用程序应不断地通过以频繁的间隔调用rd_kafka_poll()来为

传送报告队列提供服务。在没有生成消息以确定先前生成的消息已发送了其

发送报告回调函数(和其他注册过的回调函数)期间,要确保rd_kafka_poll()

仍然被调用*/

rd_kafka_poll(rk, 0);

}

fprintf(stderr, "%% flushing final message.. \n");

/*rd_kafka_flush是rd_kafka_poll()的抽象化,

等待所有未完成的produce请求完成,通常在销毁producer实例前完成

以确保所有排列中和正在传输的produce请求在销毁前完成*/

rd_kafka_flush(rk, 10*1000);

/* destroy topic object */

rd_kafka_topic_destroy(rkt);

/* destroy the producer instance */

rd_kafka_destroy(rk);

return 0;

}

二、consumer

librdkafka进行kafka消费操作的大致步骤如下:

1、创建kafka配置

rd_kafka_conf_t *rd_kafka_conf_new (void)

2、创建kafka topic的配置

rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) 

3、配置kafka各项参数

rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,

const char *name,

const char *value,

char *errstr, size_t errstr_size)

4、配置kafka topic各项参数

rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,

const char *name,

const char *value,

char *errstr, size_t errstr_size)

5、创建consumer实例

rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)

6、为consumer实例添加brokerlist

int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)

7、开启consumer订阅

rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)

8、轮询消息或事件,并调用回调函数

rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)

9、关闭consumer实例

rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)

10、释放topic list资源

rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)

11、销毁consumer实例

void rd_kafka_destroy (rd_kafka_t *rk) 

12、等待consumer对象的销毁

int rd_kafka_wait_destroyed (int timeout_ms)

完整代码如下my_consumer.c

#include

#include

#include

#include

#include

#include

#include "../src/rdkafka.h"

static int run = 1;

//`rd_kafka_t`自带一个可选的配置api,如果没有调用api,librdkafka将会使用configuration.md中的默认配置。

static rd_kafka_t *rk;

static rd_kafka_topic_partition_list_t *topics;

static void stop (int sig) {

if (!run)

exit(1);

run = 0;

fclose(stdin); /* abort fgets() */

}

static void sig_usr1 (int sig) {

rd_kafka_dump(stdout, rk);

}

/**

* 处理并打印已消费的消息

*/

static void msg_consume (rd_kafka_message_t *rkmessage,

void *opaque) {

if (rkmessage->err) {

if (rkmessage->err == rd_kafka_resp_err__partition_eof) {

fprintf(stderr,

"%% consumer reached end of %s [%"prid32"] "

"message queue at offset %"prid64"\n",

rd_kafka_topic_name(rkmessage->rkt),

rkmessage->partition, rkmessage->offset);

return;

}

if (rkmessage->rkt)

fprintf(stderr, "%% consume error for "

"topic \"%s\" [%"prid32"] "

"offset %"prid64": %s\n",

rd_kafka_topic_name(rkmessage->rkt),

rkmessage->partition,

rkmessage->offset,

rd_kafka_message_errstr(rkmessage));

else

fprintf(stderr, "%% consumer error: %s: %s\n",

rd_kafka_err2str(rkmessage->err),

rd_kafka_message_errstr(rkmessage));

if (rkmessage->err == rd_kafka_resp_err__unknown_partition ||

rkmessage->err == rd_kafka_resp_err__unknown_topic)

run = 0;

return;

}

fprintf(stdout, "%% message (topic %s [%"prid32"], "

"offset %"prid64", %zd bytes):\n",

rd_kafka_topic_name(rkmessage->rkt),

rkmessage->partition,

rkmessage->offset, rkmessage->len);

if (rkmessage->key_len) {

printf("key: %.*s\n",

(int)rkmessage->key_len, (char *)rkmessage->key);

}

printf("%.*s\n",

(int)rkmessage->len, (char *)rkmessage->payload);

}

/*

init all configuration of kafka

*/

int initkafka(char *brokers, char *group,char *topic){

rd_kafka_conf_t *conf;

rd_kafka_topic_conf_t *topic_conf;

rd_kafka_resp_err_t err;

char tmp[16];

char errstr[512];

/* kafka configuration */

conf = rd_kafka_conf_new();

//quick termination

snprintf(tmp, sizeof(tmp), "%i", sigio);

rd_kafka_conf_set(conf, "internal.termination.signal", tmp, null, 0);

//topic configuration

topic_conf = rd_kafka_topic_conf_new();

/* consumer groups require a group id */

if (!group)

group = "rdkafka_consumer_example";

if (rd_kafka_conf_set(conf, "group.id", group,

errstr, sizeof(errstr)) !=

rd_kafka_conf_ok) {

fprintf(stderr, "%% %s\n", errstr);

return -1;

}

/* consumer groups always use broker based offset storage */

if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",

"broker",

errstr, sizeof(errstr)) !=

rd_kafka_conf_ok) {

fprintf(stderr, "%% %s\n", errstr);

return -1;

}

/* set default topic config for pattern-matched topics. */

rd_kafka_conf_set_default_topic_conf(conf, topic_conf);

//实例化一个*对象rd_kafka_t作为基础容器,提供全局配置和共享状态

rk = rd_kafka_new(rd_kafka_consumer, conf, errstr, sizeof(errstr));

if(!rk){

fprintf(stderr, "%% failed to create new consumer:%s\n", errstr);

return -1;

}

//librdkafka需要至少一个brokers的初始化list

if (rd_kafka_brokers_add(rk, brokers) == 0){

fprintf(stderr, "%% no valid brokers specified\n");

return -1;

}

//重定向 rd_kafka_poll()队列到consumer_poll()队列

rd_kafka_poll_set_consumer(rk);

//创建一个topic+partition的存储空间(list/vector)

topics = rd_kafka_topic_partition_list_new(1);

//把topic+partition加入list

rd_kafka_topic_partition_list_add(topics, topic, -1);

//开启consumer订阅,匹配的topic将被添加到订阅列表中

if((err = rd_kafka_subscribe(rk, topics))){

fprintf(stderr, "%% failed to start consuming topics: %s\n", rd_kafka_err2str(err));

return -1;

}

return 1;

}

int main(int argc, char **argv){

char *brokers = "localhost:9092";

char *group = null;

char *topic = null;

int opt;

rd_kafka_resp_err_t err;

while ((opt = getopt(argc, argv, "g:b:t:qd:ex:as:do")) != -1){

switch (opt) {

case 'b':

brokers = optarg;

break;

case 'g':

group = optarg;

break;

case 't':

topic = optarg;

break;

default:

break;

}

}

signal(sigint, stop);

signal(sigusr1, sig_usr1);

if(!initkafka(brokers, group, topic)){

fprintf(stderr, "kafka server initialize error\n");

}else{

while(run){

rd_kafka_message_t *rkmessage;

/*-轮询消费者的消息或事件,最多阻塞timeout_ms

-应用程序应该定期调用consumer_poll(),即使没有预期的消息,以服务

所有排队等待的回调函数,当注册过rebalance_cb,该操作尤为重要,

因为它需要被正确地调用和处理以同步内部消费者状态 */

rkmessage = rd_kafka_consumer_poll(rk, 1000);

if(rkmessage){

msg_consume(rkmessage, null);

/*释放rkmessage的资源,并把所有权还给rdkafka*/

rd_kafka_message_destroy(rkmessage);

}

}

}

done:

/*此调用将会阻塞,直到consumer撤销其分配,调用rebalance_cb(如果已设置),

commit offset到broker,并离开consumer group

最大阻塞时间被设置为session.timeout.ms

*/

err = rd_kafka_consumer_close(rk);

if(err){

fprintf(stderr, "%% failed to close consumer: %s\n", rd_kafka_err2str(err));

}else{

fprintf(stderr, "%% consumer closed\n");

}

//释放topics list使用的所有资源和它自己

rd_kafka_topic_partition_list_destroy(topics);

//destroy kafka handle

rd_kafka_destroy(rk);

run = 5;

//等待所有rd_kafka_t对象销毁,所有kafka对象被销毁,返回0,超时返回-1

while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){

printf("waiting for librdkafka to decommission\n");

}

if(run <= 0){

//dump rdkafka内部状态到stdout流

rd_kafka_dump(stdout, rk);

}

return 0;

}

在linux下编译producer和consumer的代码:

gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt

gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt

在运行my_producer或my_consumer时可能会报错"error while loading shared libraries xxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录

在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test_topic”的topic

启动方式可参考kafka0.8.2集群的环境搭建并实现基本的生产消费

启动consumer:

c语言使用librdkafka库实现kafka生产和消费的实例

启动producer,并发送一条数据“hello world”:

c语言使用librdkafka库实现kafka生产和消费的实例

consumer处成功收到producer发送的“hello world”:

c语言使用librdkafka库实现kafka生产和消费的实例