欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka学习(一)生产者producer(个人规范用法)

程序员文章站 2022-06-14 14:57:44
...

生产者:

@Slf4j
@Component
public class KafkaProducerTest {

   //配置类
    public Map<String, Object> init() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "*********");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }


    //测试解锁
    @Test
    public void send() throws JsonProcessingException {
        KafkaProducer<String, String> producer = new KafkaProducer<>(init());

        String topic = KafkaMessageConfig.BUSINESS_SERVICE_TOPIC;
        Map<String, Object> params = new HashMap<>();
        params.put("OperationType", KafkaMessageConfig.START_CHARGE_REQUEST);
        params.put("StartChargeSeq", "1354367");
        params.put("UserId", 123);
        params.put("StartType", "THRAPP");
        params.put("Port", "8137");
        String msg = JsonUtil.map2Json(params);

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
                    "1" + i, msg);
            producer.send(record);
            log.info("kafka send message:[{}]", record);
        }
        producer.close();
    }

 

 

单例方式:

@Component
@EnableKafka
@Configuration
public class KafkaProducer {

    @Autowired
    KafkaTemplate kafkaTemplate;

    private static final KafkaProducer kafkaProducer = new KafkaProducer();

    public synchronized static KafkaProducer getInstance() {
        return kafkaProducer;
    }

    public Future<RecordMetadata> send(String topic, String key, String message) {
        final ProducerRecord <String, String> record = new ProducerRecord<String, String>(topic, key, message);
        Future<RecordMetadata> result = kafkaTemplate.send(topic,key,message);

        return result;
    }

}

配置:

@Configuration
@EnableKafka
public class KafkaProducersConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String brokers;
    @Value("${spring.kafka.producer.key-serializer}")
    private String keyType;
    @Value("${spring.kafka.producer.value-serializer}")
    private String valueType;


    @Bean("kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
        return kafkaTemplate;
    }

    public ProducerFactory<String, String> producerFactory() {

        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyType);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueType);

        return new DefaultKafkaProducerFactory<String, String>(properties);
    }
}

还是自己写的代码适合自己用,网上的很多代码杂而无用!