问题描述
您需要ErrorHandlingDeserializer
:https : //docs.spring.io/spring-
kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-
deserializer
如果无法升级到该2.2
版本,请考虑实施自己的版本并返回null
无法正确反序列化的那些记录。
- 源代码在这里:[https](https://github.com/spring-projects/spring-
- kafka/blob/master/spring-
- kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java)
- //github.com/spring-projects/spring-kafka/blob/master/spring- kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java
解决方法
有没有一种方法可以配置Spring Kafka使用者以跳过无法读取/处理(已损坏)的记录?
我看到一种情况,如果无法反序列化,则消费者将停留在同一记录上。这是消费者抛出的错误。
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value
使用者轮询该主题,并一直循环循环打印相同的错误,直到程序被杀死为止。
在具有以下消费者工厂配置的@KafkaListener中,
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);