生产者最佳实践续

发送消息注意事项

1. 一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。

message.setTags("TagA");

2. 每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

// 订单Id
String orderId = "20034568923546";
message.setKeys(orderId);

3. 消息发送成功或者失败,要打印消息日志,务必要打印sendresultkey字段。

4. send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义。

  • SEND_OK
消息发送成功

  • FLUSH_DISK_TIMEOUT
消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

  • FLUSH_SLAVE_TIMEOUT
消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

  • SLAVE_NOT_AVAILABLE
消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

对于进行发送顺序消息的应用,由于顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果sendresult中的status字段不等于SEND_OK,就应该尝试重试。对于其他应用,则没有必要这样。

5. 对于消息不可丢失应用,务必要有消息重发机制

例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。

消息发送失败如何处理

Producer的send方法本身支持内部重试,重试逻辑如下:

1. 至多重试3次。

2. 如果发送失败,则轮转到下一个Broker。

3. 这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。

所以,如果本身向broker发送消息产生超时异常,就不会再做重试。

以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议应用这样做:如果调用send同步方法发送失败,则尝试将消息存储到db,由后台线程定时重试,保证消息一定到达Broker。

上述db重试方式为什么没有集成到MQ客户端内部做,而是要求应用自己去完成,我们基于以下几点考虑:①MQ的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是cpu、内存、网络。②如果MQ客户端内部集成一个KV存储模块,那么数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又由于应用关闭过程不受MQ运维人员控制,可能经常会发生kill -9这样暴力方式关闭,造成数据没有及时落盘而丢失。③Producer所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。

综上,建议重试过程交由应用来控制。

选择oneway形式发送

一个RPC调用,通常是这样一个过程

1. 客户端发送请求到服务器

2. 服务器处理该请求

3. 服务器向客户端返回应答

所以一个RPC的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用,oneway形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个os系统调用的开销,即将数据写入客户端的socket缓冲区,此过程耗时通常在微秒级。

来源:Github vantagewang/document

Last updated