Kafka学习(一)生产者producer(个人规范用法)
程序员文章站
2022-06-14 14:57:44
...
生产者:
@Slf4j
@Component
public class KafkaProducerTest {
//配置类
public Map<String, Object> init() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "*********");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
//测试解锁
@Test
public void send() throws JsonProcessingException {
KafkaProducer<String, String> producer = new KafkaProducer<>(init());
String topic = KafkaMessageConfig.BUSINESS_SERVICE_TOPIC;
Map<String, Object> params = new HashMap<>();
params.put("OperationType", KafkaMessageConfig.START_CHARGE_REQUEST);
params.put("StartChargeSeq", "1354367");
params.put("UserId", 123);
params.put("StartType", "THRAPP");
params.put("Port", "8137");
String msg = JsonUtil.map2Json(params);
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
"1" + i, msg);
producer.send(record);
log.info("kafka send message:[{}]", record);
}
producer.close();
}
单例方式:
@Component
@EnableKafka
@Configuration
public class KafkaProducer {
@Autowired
KafkaTemplate kafkaTemplate;
private static final KafkaProducer kafkaProducer = new KafkaProducer();
public synchronized static KafkaProducer getInstance() {
return kafkaProducer;
}
public Future<RecordMetadata> send(String topic, String key, String message) {
final ProducerRecord <String, String> record = new ProducerRecord<String, String>(topic, key, message);
Future<RecordMetadata> result = kafkaTemplate.send(topic,key,message);
return result;
}
}
配置:
@Configuration
@EnableKafka
public class KafkaProducersConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String brokers;
@Value("${spring.kafka.producer.key-serializer}")
private String keyType;
@Value("${spring.kafka.producer.value-serializer}")
private String valueType;
@Bean("kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
return kafkaTemplate;
}
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyType);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueType);
return new DefaultKafkaProducerFactory<String, String>(properties);
}
}
还是自己写的代码适合自己用,网上的很多代码杂而无用!
上一篇: Sublime Text绑定Eclipse快捷键实例详解
下一篇: ECharts——简介