Kafka与.net core(三)kafka操作
程序员文章站
2022-06-22 12:46:05
1.Kafka相关知识 Broker:即Kafka的服务器,用户存储消息,Kafa集群中的一台或多台服务器统称为broker。 Message消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。 Kafka中的Message是以topic为基本单位组织的,不同 ......
1.kafka相关知识
- broker:即kafka的服务器,用户存储消息,kafa集群中的一台或多台服务器统称为broker。
- message消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。
-
- kafka中的message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分message。
-
partition中的每条message包含了以下三个属性:kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存。可以将一个topic切分多任意多个partitions,来消息保存/消费的效率。
- offset:消息唯一标识:对应类型:long
- messagesize 对应类型:int32
- data 是message的具体内容。
- 越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。
- message:在broker中通log追加的方式进行持久化存储。并进行分区(patitions)。
-
- 一个topic可以认为是一类消息,每个topic将被分成多partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),partition是以文件的形式存储在文件系统中。
- logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间。
-
- topic物理上的分组,一个 topic可以分为多个 partition,每个 partition 是一个有序的队列。partition中的每条消息都会被分配一个有序的 id(offset)。
- 为实现稀疏存储,我们通过给文件建索引,每隔一定字节的数据建立一条索引
- 为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘io调用的次数。
- broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。message消息是有多份的。
- consumer:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
-
- 在 kafka中,我们可以认为一个group是一个订阅者,一个topic中的每个partions,只会被一个订阅者中的一个consumer消费,不过一个 consumer可以消费多个partitions中的消息(消费者数据小于partions 的数量时)。注意:kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
- 一个partition中的消息只会被group中的一个consumer消息。每个group中consumer消息消费互相独立。
- 无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的sla(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
- 消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息。
2.kafka操作
2.1.查看有哪些主题:
kafka-topics.sh --list --zookeeper 192.168.0.201:12181
2.2.查看topic的详细信息
kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testkj1
2.3.为topic增加副本
kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute
2.4.创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testkj1
2.5为topic增加partition
bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testkj1
2.6kafka生产者客户端命令
kafka-console-producer.sh --broker-list localhost:9092 --topic testkj1
2.7kafka消费者客户端命令
kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testkj1
2.8kafka服务启动
kafka-server-start.sh -daemon ../config/server.properties
3..net core操作
producer端,引入confluent.kafka
install-package confluent.kafka -version 1.0-beta2
using confluent.kafka; using system; using system.collections.generic; using system.text; using system.threading.tasks; namespace kafkatest { class program { static void main(string[] args) { test().wait(); } static async task test() { var conf = new producerconfig { bootstrapservers = "39.**.**.**:9092" }; action<deliveryreportresult<null, string>> handler = r => console.writeline(!r.error.iserror ? $"delivered message to {r.topicpartitionoffset}" : $"delivery error: {r.error.reason}"); using (var p = new producer<null, string>(conf)) { for (int i = 0; i < 100000; ++i) { p.beginproduce("my-topic", new message<null, string> { value = i.tostring() }, handler); } // wait for up to 10 seconds for any inflight messages to be delivered. p.flush(timespan.fromseconds(10)); } } } }
consumer端,引入confluent.kafka
install-package confluent.kafka -version 1.0-beta2
using confluent.kafka; using system; using system.linq; using system.text; namespace kafkaclient { class program { static void main(string[] args) { var conf = new consumerconfig { groupid = "test-consumer-group4", bootstrapservers = "39.**.**.**:9092", // note: the autooffsetreset property determines the start offset in the event // there are not yet any committed offsets for the consumer group for the // topic/partitions of interest. by default, offsets are committed // automatically, so in this example, consumption will only start from the // earliest message in the topic 'my-topic' the first time you run the program. autooffsetreset = autooffsetresettype.earliest }; using (var c = new consumer<ignore, string>(conf)) { c.subscribe("my-topic"); bool consuming = true; // the client will automatically recover from non-fatal errors. you typically // don't need to take any action unless an error is marked as fatal. c.onerror += (_, e) => consuming = !e.isfatal; while (consuming) { try { var cr = c.consume(); console.writeline($"consumed message '{cr.value}' at: '{cr.topicpartitionoffset}'."); } catch (consumeexception e) { console.writeline($"error occured: {e.error.reason}"); } } // ensure the consumer leaves the group cleanly and final offsets are committed. c.close(); } } } }
推荐阅读
-
ASP.NET Core应用中与第三方IoC/DI框架的整合
-
ASP.NET Core应用中与第三方IoC/DI框架的整合
-
Spring Boot 2.3.6 与 Spring kafka 集成 出错(ClassNotFoundException: org.springframework.kafka.core.Microm
-
SUSE12Sp3安装配置.net core 生产环境(7)-kafka离线安装
-
.Net Core 集成 Kafka的步骤
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间
-
abp(net core)+easyui+efcore实现仓储管理系统——ABP WebAPI与EasyUI结合增删改查之三(二十九)
-
兼容 .NET Core3.0, Natasha 框架实现 隔离域与热编译操作
-
[Kafka与Spark集成系列三] Spark编程模型
-
Kafka与.net core(二)zookeeper