go rocketmq——ConsumeFromTimestamp的注意事项
消费者
go中使用rocketmq
初始化消费者的时候,官方例子是这样的:
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
从RocketMQ源码或者一些书籍我们知道还可以指定消费的offset。
查看官方源码或者golang中包的源码,支持3种模式:
const (
/**
* 一个新的订阅组第一次启动从队列的最后位置开始消费<br>
* 后续再启动接着上次消费的进度开始消费
*/
ConsumeFromLastOffset ConsumeFromWhere = iota
/**
* 一个新的订阅组第一次启动从队列的最前位置开始消费<br>
* 后续再启动接着上次消费的进度开始消费
*/
ConsumeFromFirstOffset
/**
* 一个新的订阅组第一次启动从指定时间点开始消费<br>
* 后续再启动接着上次消费的进度开始消费<br>
* 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数 */
ConsumeFromTimestamp
)
简单总结一下:
- ConsumeFromLastOffset:从上一次消费的位置开始
- ConsumeFromFirstOffset:从头开始消费,如果消费过会出现重复消费的问题
- ConsumeFromTimestamp:第一次启动从给定时间戳消费,后续启动接着上一次消费的位置继续
go里面可以使用consumer.WithConsumeFromWhere来指定。
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName(groupName),
consumer.WithConsumeFromWhere(consumer.ConsumeFromTimestamp),// default fromLastOffset
consumer.WithNsResovler(primitive.NewPassthroughResolver(nameServers)))
那IM里面如果使用MQ进行消息的广播、推送等,应该使用哪种模式呢?
我个人感觉应该是第三种:ConsumeFromTimestamp。为什么要使用这种模式,看下面。
为什么ConsumeFromTimestamp模式下,还能消费到之前生产的消息?
写测试代码的时候,先写了生产者,启动后生产了几条数据。然后中间有其他事情,过了几天消费者写好了启动,我发现把之前的消息打印出来了。
因为我是用MQ来解决不同网关之间通信广播的问题(为什么不用route_server来中转?因为存在单点问题,需要使用haproxy做主备),如下app用户给web用户发消息,但是这2个用户登录在不同的网关上,势必需要进行路由广播。
所以我希望在启动时,只消费当前时间戳往后的消息,不然可能就会产生很奇怪的问题,1个小时前发的消息,怎么1个小时后才到?
调查ConsumeFromTimestamp
尝试使用这个模式,我发现还是能收到生产者在之前生产的消息。所以又回过头去查询文档,找到了问题所在:
一个新的订阅组第一次启动从指定时间点开始消费
后续再启动接着上次消费的进度开始消费
所以只有 第一次启动时 才有效果,因为我使用的集群模式,消费进度会保存在broker上(rocketmq-console中可看),广播模式则保存在本地,所以后续再启动时,接着上一次的消费进度继续消费。所以得从应用层解决。
解决办法
- 每次启动时,groupname增加时间戳,这样每次都是新的订阅组,不过后期维护很麻烦,不推荐
- 手动设置offset,不过这种方式go里面我没找到函数。kafka可以采用这种方式
- 应用层根据时间戳自己处理
我采用第3种解决,先看一下时间戳:
[root@localhost bin]# ./mqadmin queryMsgById -i 0A006BDA00002A9F0000000000000B71 -n 10.0.107.218:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
OffsetID: 0A006BDA00002A9F0000000000000B71
Topic: cim_msg_push
Tags: [null]
Keys: [null]
Queue ID: 0
Queue Offset: 11
CommitLog Offset: 2929
Reconsume Times: 0
Born Timestamp: 2020-07-01 10:47:01,169
Store Timestamp: 2020-06-17 05:09:02,072
Born Host: 10.0.106.117:60041
Store Host: 10.0.107.218:10911
System Flag: 0
Properties: {UNIQ_KEY=0A006A7508CD0000000002505c880001}
Message Body Path: /tmp/rocketmq/msgbodys/0A006A7508CD0000000002505c880001
MessageTrack [consumerGroup=group1, trackType=NOT_ONLINE, exceptionDesc=CODE:206 DESC:the consumer group[group1] not online]
为什么born timestamp和store timestamp不一样?
启动时,记录startTimeStamp,然后订阅后,判断时间戳大于该时间戳才处理。
func NewMsgConsumer() *MsgConsumer {
return &MsgConsumer{
pushMsgChan: make(chan *cim.CIMPushMsg),
startTimeStamp: time.Now().Unix() * 1000, // ms
}
}
func (m *MsgConsumer) onMsgPush(ctx context.Context, msgs ...*primitive.MessageExt) (result consumer.ConsumeResult, err error) {
for i := range msgs {
logger.Sugar.Infof("subscribe callback: %v", msgs[i])
// if msg too old,drop all
if msgs[i].BornTimestamp < m.startTimeStamp {
logger.Sugar.Warnf("expired msg,dorp it: %v", msgs[i])
continue
}
msg := &cim.CIMPushMsg{}
err = proto.Unmarshal(msgs[i].Body, msg)
if err != nil {
logger.Sugar.Info(err)
} else {
m.pushMsgChan <- msg
}
}
return consumer.ConsumeSuccess, nil
}
输出:
2020-07-01 14:01:20.971 [INFO] [mq/consumer.go:140] subscribe callback: [Message=[topic=cim_msg_push, body�10.0.107.117:8000 (2�wx_2020�"$16757bf1-c9e5-4705-ba04-3b2bca925f27(����08hello mq, Flag=0, properties=map[CONSUME_START_TIME:1593583280971 MAX_OFFSET:16 MIN_OFFSET:0 UNIQ_KEY:0A006A753A940000000003022ff80001], TransactionId=], MsgId=0A006A753A940000000003022ff80001, OffsetMsgId=0A006BDA00002A9F0000000000000F49,QueueId=0, StoreSize=246, QueueOffset=15, SysFlag=0, BornTimestamp=1593583275496, BornHost=10.0.106.117:59062, StoreTimestamp=1592352409555, StoreHost=10.0.107.218:10911, CommitLogOffset=3913, BodyCRC=2070446043, ReconsumeTimes=0, PreparedTransactionOffset=0]
2020-07-01 14:01:20.971 [WARN] [mq/consumer.go:144] expired msg,dorp it: [Message=[topic=cim_msg_push, body�10.0.107.117:8000 (2�wx_2020�"$16757bf1-c9e5-4705-ba04-3b2bca925f27(����08hello mq, Flag=0, properties=map[CONSUME_START_TIME:1593583280971 MAX_OFFSET:16 MIN_OFFSET:0 UNIQ_KEY:0A006A753A940000000003022ff80001], TransactionId=], MsgId=0A006A753A940000000003022ff80001, OffsetMsgId=0A006BDA00002A9F0000000000000F49,QueueId=0, StoreSize=246, QueueOffset=15, SysFlag=0, BornTimestamp=1593583275496, BornHost=10.0.106.117:59062, StoreTimestamp=1592352409555, StoreHost=10.0.107.218:10911, CommitLogOffset=3913, BodyCRC=2070446043, ReconsumeTimes=0, PreparedTransactionOffset=0]
总结
ConsumeFromTimestamp模式下
- 只会在订阅组第一次启动的时候,过滤掉小于当前系统时间戳的消息,后续如果进程停掉或者崩溃,但是又生产了新消息。下次启动消费者时,会继续消费停掉期间新生产的消息。
- 后续行为和ConsumeFromLastOffset类似,再启动时,因为是在broker中保存了消费进度,所以继续上一次进程退出前的位置继续消费。