Spring Boot Kafka JsonSerializer示例

Spring Boot Kafka JsonSerializer示例

原文: https://howtodoinjava.com/kafka/spring-boot-jsonserializer-example/

学习使用JsonSerializerJsonDeserializer类从 Apache Kafka 主题存储和检索 JSON,并返回 Java 模型对象。

1. 先决条件

2. 应用配置

application.properties文件中,我们添加了以下配置。

server.port=9000

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

  • spring.kafka.consumer.key-deserializer指定键的反序列化器类。
  • spring.kafka.consumer.value-deserializer指定值的反序列化器类。
  • spring.kafka.consumer.properties.spring.json.trusted.packages指定允许反序列化的程序包模式的逗号分隔列表。 '*'表示反序列化所有程序包。
  • spring.kafka.producer.key-deserializer指定键的序列化器类。
  • spring.kafka.producer.value-deserializer指定值的序列化器类。

3. 模型类

我们创建了User类,该类将发送给 Kafka。 其实例将由JsonSerializer序列化为字节数组。 这个字节数组最终由 Kafka 存储到给定的分区中。

反序列化期间,JsonDeserializer用于以字节数组的形式从 Kafka 接收 JSON,并将User对象返回给应用。

public class User 
{
	private long userId;
    private String firstName;
    private String lastName;

	public long getUserId() {
		return userId;
	}
	public void setUserId(long userId) {
		this.userId = userId;
	}
	public String getFirstName() {
		return firstName;
	}
	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}
	public String getLastName() {
		return lastName;
	}
	public void setLastName(String lastName) {
		this.lastName = lastName;
	}

	@Override
	public String toString() {
		return "User [userId=" + userId + ", firstName=" 
		        + firstName + ", lastName=" + lastName + "]";
	}
}

4. Kafka 生产者

生产者 API 只是在 HTTP POST API 中使用用户信息。 然后,它创建一个新的User对象,并使用KafkaTemplate发送给 Kafka。

@PostMapping(value = "/createUser")
public void sendMessageToKafkaTopic(
		@RequestParam("userId") long userId, 
		@RequestParam("firstName") String firstName,
		@RequestParam("lastName") String lastName) {

	User user = new User();
	user.setUserId(userId);
	user.setFirstName(firstName);
	user.setLastName(lastName);

	this.producerService.saveCreateUserLog(user);
}

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void saveCreateUserLog(User user) 
{
	logger.info(String.format("User created -> %s", user));
	this.kafkaTemplate.send(AppConstants.TOPIC_NAME_USER_LOG, user);
}

5. Kafka 消费者

用户被实现为@KafkaListener,每次在主题中添加新条目时都会得到通知。

@KafkaListener(topics = AppConstants.TOPIC_NAME_USER_LOG, 
				groupId = AppConstants.GROUP_ID)
public void consume(User user) 
{
	logger.info(String.format("User created -> %s", user));
}

6. 测试

使用任何 REST API 测试器,并向 API http://localhost:9000/kafka/createUser发送一些消息,如下所示。

留言栏:http://localhost:9000/kafka/createUser?userId=1&firstName=Lokesh&lastName=Gupta

观察控制台日志:

2020-05-24 23:36:47.132  INFO 2092 --- [nio-9000-exec-4] 
2020-05-26 01:03:52.722  INFO 11924 --- [nio-9000-exec-6] c.h.k.demo.service.KafKaProducerService  
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]

2020-05-26 01:03:52.729  INFO 11924 --- [ntainer#1-0-C-1] c.h.k.demo.service.KafKaConsumerService  
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]

7. 总结

在此 SpringBoot kafka JsonSerializer示例中,我们学习了使用JsonSerializer对 Java 对象进行序列化和反序列化并存储在 Kafka 中。

学习愉快!

源码下载