欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Kafka使用Java客户端进行访问的示例代码

程序员文章站 2024-02-26 15:31:04
本文环境如下: 操作系统:centos 6 32位 jdk版本:1.8.0_77 32位 kafka版本:0.9.0.1(scala 2.11)...

本文环境如下:

操作系统:centos 6 32位

jdk版本:1.8.0_77 32位

kafka版本:0.9.0.1(scala 2.11)

1. maven依赖包

<dependency>
  <groupid>org.apache.kafka</groupid>
  <artifactid>kafka-clients</artifactid>
  <version>0.9.0.1</version>
</dependency>

2. 生产者代码

package com.lnho.example.kafka;

import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producer;
import org.apache.kafka.clients.producer.producerrecord;

import java.util.properties;

public class kafkaproducerexample {
  public static void main(string[] args) {
    properties props = new properties();
    props.put("bootstrap.servers", "master:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");

    producer<string, string> producer = new kafkaproducer<>(props);
    for(int i = 0; i < 100; i++)
      producer.send(new producerrecord<>("topic1", integer.tostring(i), integer.tostring(i)));

    producer.close();
  }
}

3. 消费者代码

package com.lnho.example.kafka;

import org.apache.kafka.clients.consumer.consumerrecord;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;

import java.util.arrays;
import java.util.properties;

public class kafkaconsumerexample {
  public static void main(string[] args) {
    properties props = new properties();
    props.put("bootstrap.servers", "master:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");
    kafkaconsumer<string, string> consumer = new kafkaconsumer<>(props);
    consumer.subscribe(arrays.aslist("topic1"));
    while (true) {
      consumerrecords<string, string> records = consumer.poll(100);
      for (consumerrecord<string, string> record : records)
        system.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
    }
  }
}


4. 执行程序

lib底下需要有:kafka-clients-0.9.0.1.jar log4j-1.2.17.jar slf4j-api-1.7.6.jar slf4j-log4j12-1.7.6.jar

生产者:

复制代码 代码如下:

java -classpath kafka-example-1.0-snapshot.jar:lib/* com.lnho.example.kafka.kafkaproducerexample

消费者:

复制代码 代码如下:

java -classpath kafka-example-1.0-snapshot.jar:lib/* com.lnho.example.kafka.kafkaconsumerexample

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。