大意:
>通过键生成消息(每个msg包含创建时间戳),这样可以确保每个分区已按生成时间排序消息.
> auto.commit.enable = false(将在每个消息处理后显式提交)
>消费消息
>检查消息时间戳,并检查是否有足够的时间过去
>进程消息(此操作永远不会失败)
> commit 1 offset
while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something.... } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail the consumer consumer.commitOffsets //commit each msg }@H_301_12@对此实施有一些担忧:
>提交每个偏移可能会减慢ZK
可以让consumer.commitout丢弃异常吗?如果是,我将消费两次相同的消息(可以使用幂等消息解决)
>问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获取下一个,睡眠24小时,进程和提交(ZK会话超时?)
> ZK会话如何在没有提交新的偏移量的情况下保持活动? (设置一个蜂巢zookeeper.session.timeout.ms可以在死消费者中解决而不识别它)
>任何其他的问题?谢谢!
解决方法
while(it.hasNext()) { val message = it.next().message() if(shouldBeDelayed(message)) { val delay = 24 hours val delayTo = getCurrentTime() + delay putMessageOnDelayedQueue(message,delay,delayTo) } else { process(message) } consumer.commitOffset() }@H_301_12@所有常规邮件现在都将尽快处理,而那些需要延迟的邮件将被放在另一个主题上.
好的是,我们知道延迟主题的头部的消息是应该首先处理的消息,因为它的delayTo值将是最小的.因此,我们可以设置另一个读取头信息的消费者,检查时间戳是否在过去,如果处理消息并提交偏移量.如果不是,它不会提交偏移量,而是直到那时睡觉:
while(it.hasNext()) { val delayedMessage = it.peek().message() if(delayedMessage.delayTo < getCurrentTime()) { val readMessage = it.next().message process(readMessage.originalMessage) consumer.commitOffset() } else { delayProcessingUntil(delayedMessage.delayTo) } }@H_301_12@如果有不同的延迟时间,您可以对延迟进行分区(例如24小时,12小时,6小时).如果延迟时间比这更复杂,那么延迟时间就会更加动态.您可以通过引入两个延迟主题来解决这个问题.读取所有消息关闭延迟主题A并处理所有延迟过去的消息.其他的你只是找到一个最接近的delayTo,然后把它们放在主题B上.睡觉,直到最近的一个应该被处理,并且完全相反,即处理来自主题B的消息,并将一次不应该被处理回主题A.
- commit each offset might slow ZK down
您可以考虑切换到Kafka中存储偏移量(可从0.8.2获取的功能,检查消费者配置中的offsets.storage属性)
- can consumer.commitOffsets throw an exception? if yes i will consume the same message twice (can solve with idempotent messages)
我相信如果它不能与偏移量存储进行通信,例如.正如你所说,使用幂等信息解决了这个问题.
- problem waiting long time without committing the offset,for example delay period is 24 hours,will get next from iterator,sleep for 24 hours,process and commit (ZK session timeout ?)
这不会是上述概述的解决方案的问题,除非消息本身的处理超过会话超时.
- how can ZK session keep-alive without commit new offsets ? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognising it)
再次用上面的方法,你不需要设置一个很长的会话超时.
- any other problems im missing?
总是有)