如何在Spring Kafka Consumer中跳过损坏的(不可序列化的)消息?

问题描述

您需要ErrorHandlingDeserializerhttps : //docs.spring.io/spring- kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling- deserializer

如果无法升级到该2.2版本,请考虑实施自己的版本并返回null无法正确反序列化的那些记录。

代码在这里:[https](https://github.com/spring-projects/spring-
kafka/blob/master/spring-
kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java)
//github.com/spring-projects/spring-kafka/blob/master/spring- kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java

解决方法

有没有一种方法可以配置Spring Kafka使用者以跳过无法读取/处理(已损坏)的记录?

我看到一种情况,如果无法反序列化,则消费者将停留在同一记录上。这是消费者抛出的错误。

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value

使用者轮询该主题,并一直循环循环打印相同的错误,直到程序被杀死为止。

在具有以下消费者工厂配置的@KafkaListener中,

Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

猜你在找的技术问答相关文章

如何检查配对的蓝牙设备是打印机还是扫描仪(Android)
是否允许实体正文进行HTTP DELETE请求?
如何将ZipInputStream转换为InputStream?
java.util.logging Java 8中的变量
PowerMockito.doReturn返回null
Java中的RESTful调用
Swing / Java:如何正确使用getText和setText字符串
特殊字符和重音字符
Android Studio中的ndk.dir错误
错误“找不到主类”