欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

kafka restful api功能介绍与使用

程序员文章站 2022-06-19 10:52:16
前述 采用confluent kafka-rest proxy实现kafka restful service时候,通过http协议数据传输,需要注意的是采用了base64编码(或者称之为加密),如果消息再post之前不采用base64处理将会出现:服务端消息乱码、程序报错等,因此正常的处理流程是:1 ......
  • 前述

采用实现kafka restful service时候(),通过http协议数据传输,需要注意的是采用了base64编码(或者称之为加密),如果消息再post之前不采用base64处理将会出现:服务端消息乱码、程序报错等,因此正常的处理流程是:
1.先对待post的消息做utf-8统一处理
2.采用base64编码包处理消息

s='kafka,hi'
ad="hi,kafka,i'm xnchall"
aa=ad.encode()#utf-8统一处理
print(aa)
b64=base64.b64encode(ad.encode())#base64编码包统一处理
  • 利用kafka-rest生产消息
post /topics/(string:topic_name)

kafka restful api功能介绍与使用

kafka restful api功能介绍与使用
data={"records":[
{
"key":"a2v5",
"value":"y29uzmx1zw50"
},
{
"value":"a2fma2e=",
"partition":1
},
{
"value":"bg9ncw=="
}
]}
data1={"records":[{"value":"5bck5pws55qe5a6i5oi35oko5aw977ymagkga2fma2esigknbsb4bmnoywxs"}]}
header={"content-type":"application/vnd.kafka.v1+json"}
r=requests.post(url=url,json=data,headers=header)
r=requests.post(url=url,json=data1,headers=header)
view code
  • 向指定分区生产消息:produce messages to one partition of the topic
post /topics/(string:topic_name)/partitions/(int:partition_id)
kafka restful api功能介绍与使用
ad="hi kafka,i'm xnchall"
url11="http://192.168.160.101:8082/topics/test_kfk_lk/partitions/1"

data2={"records":[{"value":(base64.b64encode(ad.encode())).decode()}]}
print(data2)
r2=requests.post(url=url11,json=data2,headers=header)
print(r2)
print(r2.content)
view code
  • 创建或者注册消费实例:create a new consumer instance in the consumer group
post /consumers/(string:group_name)

kafka restful api功能介绍与使用

kafka restful api功能介绍与使用
url3="http://192.168.160.101:8082/consumers/my_group"
data3={
"id":"my_consumer1",
"format":"binary",
"auto.offset.reset":"smallest",
"auto.commit.enable":"false"
}

r3=requests.post(url=url3,json=data3,headers=header)
view code
  • 提交偏移  commit offsets for the consumer
post /consumers/(string:group_name)/instances/(string:instance)/offsets

kafka restful api功能介绍与使用

kafka restful api功能介绍与使用
url4="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/offsets"
r4=requests.post(url=url4,headers=header)
view code
  • 消费消息
get /consumers/(string:group_name)/instances/(string:instance)/topics/(string:topic_name)

kafka restful api功能介绍与使用

kafka restful api功能介绍与使用
url_get2="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/topics/test_kfk_lk"
rr2=requests.get(url=url_get2,headers=header)#,params={"timeout":3000000}
print(rr2)
print(rr2.content)
print(rr2.text)
view code
  • 删除消费者实例 destroy the consumer instance
delete /consumers/(string:group_name)/instances/(string:instance)
kafka restful api功能介绍与使用
#url_del="http://192.168.160.101:8082/consumers/test_kfk_lk/instances/my_consumer"
#d1=requests.delete(url_del)#删除消费者实例
#print(d1)
view code
  • 获取指定分区、偏移消息: consume messages from one partition of the topic.(api v2)
get /topics/(string:topic_name)/partitions/(int:partition_id)/messages?offset=(int)[&count=(int)]

kafka restful api功能介绍与使用

fetch response v1 only contains message format v0.
fetch response v2 might either contain message format v0 or message format v1.
possible error codes
* offset_out_of_range (1)
* unknown_topic_or_partition (3)
* not_leader_for_partition (6)
* replica_not_available (9)
* unknown (-1)
kafka restful api功能介绍与使用
url_p="http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages"
rst=requests.get(url_p,headers=header,params={"offset":3,"count":2})#,"count":2})
print(rst)
print(len(rst.json()))
if(rst.status_code!=500):
for itr in rst.json():
    print(base64.b64decode(itr['value']).decode())
print(rst.url)#http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages?offset=3&count=2
view code
  • 获取当前订阅的topic列表.(api v2)
post /consumers/(string:group_name)/instances/(string:instance)/subscription
  • 获取手工指定的消费者的分区(api v2)
get /consumers/(string:group_name)/instances/(string:instance)/assignments
get /consumers/testgroup/instances/my_consumer/assignments http/1.1
host: proxy-instance.kafkaproxy.example.com
accept: application/vnd.kafka.v2+json
http/1.1 200 ok
content-type: application/vnd.kafka.v2+json
{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }
]
}
  • 覆盖消费者即将消费的消息的偏移量(api v2)
post /consumers/(string:group_name)/instances/(string:instance)/positions
post /consumers/testgroup/instances/my_consumer/positions http/1.1
host: proxy-instance.kafkaproxy.example.com
content-type: application/vnd.kafka.v2+json
{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 20
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": 30
    }
  ]
}
  • 获取给定topic的分区的最后偏移
post /consumers/(string:group_name)/instances/(string:instance)/positions/end
post /consumers/testgroup/instances/my_consumer/positions/end http/1.1
host: proxy-instance.kafkaproxy.example.com
content-type: application/vnd.kafka.v2+json
{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }
]
}
  • 使用分配和订阅api消费topic或者分区数据
get /consumers/(string:group_name)/instances/(string:instance)/records
get /consumers/testgroup/instances/my_consumer/records?timeout=3000&max_bytes=300000 http/1.1
host: proxy-instance.kafkaproxy.example.com
accept: application/vnd.kafka.binary.v2+json
example binary response:
http/1.1 200 ok
content-type: application/vnd.kafka.binary.v2+json
[
  {
    "topic": "test",
    "key": "a2v5",
    "value": "y29uzmx1zw50",
    "partition": 1,
    "offset": 100,
  },
  {
    "topic": "test",
    "key": "a2v5",
    "value": "a2fma2e=",
    "partition": 2,
    "offset": 101,
  }
]