消费者最佳实践

消费者组和订阅 Consumer Group and Subscriptions

你应该注意的第一件事就是:不同的消费者组可以消费相同Topic的消息,且每个消费者组都会有各自的消费偏移量。请务必确保同一组类的消费者订阅了相同的Topics。

消息监听器 Message Listener

有序性

消费者会锁定每一个消息队列( MessageQueue)来确保消费的顺序性。这会带来性能损耗,但如果你在意消息的顺序,它就很有用。通过不建议抛出异常,你应该返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT来替代。(TODO:哪里抛出异常,这个状态优势干啥的?)

并发性

消费者会并发消费消息。通常在追求高性能的场景下才使用该特性。通常不建议抛出异常,你应该返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 来代替。(TODO:为啥抛出异常,这个状态优势干啥的?)

消费状态 Consumer Status

For MessageListenerConcurrently,你可以返回RECONSUME_LATER来告诉消费者: 现在不能消费,可以稍后再来消费(can not consume it right now and want to reconsume it later)。然后,你可以继续消费其他消息。此时,为了消息的有序性,你又不能跳过这条消息,但你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT告诉消费者让它等一会。(TODO:没太懂...)

阻塞

不建议阻塞消息监听器,因为它会阻塞线程池,且可能导致最终停止消费消息。

线程数

消费者内部使用线程池来处理消费过程,所以,你可以通过设置setConsumeThreadMin 或者 setConsumeThreadMax来修改线程池中的线程数量。

消费的起点

当创建一个新的消费者组的时候,它需要配置是否消费Broker中已经存在的历史消息。. 如果设置为CONSUME_FROM_LAST_OFFSET,消费者会忽略历史消息,只消费从它创建后产生的新的消息。 如果设置了CONSUME_FROM_FIRST_OFFSET,消费者会消费Broker中已经存在的每一个消息。 你也可以设置CONSUME_FROM_TIMESTAMP来让消费者去消费从某一时间点之后产生的消息。

消息重复

很多情况下都会导致消息重复,比如:

  • 消费者重发消息(i.e, FLUSH_SLAVE_TIMEOUT的时候)

  • 消费者宕机,导致消费偏移量没有及时更新到Broker。

所有,如果你的应用不能容忍消息重复,你需要做一些额外的工作来处理这个问题,比如,你可以校验你数据库的主键。

Last updated