消费者

go中使用rocketmq

初始化消费者的时候,官方例子是这样的:

c, _ := rocketmq.NewPushConsumer(
   consumer.WithGroupName("testGroup"),
   consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)

从RocketMQ源码或者一些书籍我们知道还可以指定消费的offset。

查看官方源码或者golang中包的源码,支持3种模式:

const (
    /**
     * 一个新的订阅组第一次启动从队列的最后位置开始消费<br>
     * 后续再启动接着上次消费的进度开始消费
     */
   ConsumeFromLastOffset ConsumeFromWhere = iota
   /**
     * 一个新的订阅组第一次启动从队列的最前位置开始消费<br>
     * 后续再启动接着上次消费的进度开始消费
     */
   ConsumeFromFirstOffset
   /** 
    * 一个新的订阅组第一次启动从指定时间点开始消费<br> 
    * 后续再启动接着上次消费的进度开始消费<br> 
    * 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数 */
   ConsumeFromTimestamp
)

简单总结一下:

  • ConsumeFromLastOffset:从上一次消费的位置开始
  • ConsumeFromFirstOffset:从头开始消费,如果消费过会出现重复消费的问题
  • ConsumeFromTimestamp:第一次启动从给定时间戳消费,后续启动接着上一次消费的位置继续

go里面可以使用consumer.WithConsumeFromWhere来指定。

c, err := rocketmq.NewPushConsumer(
   consumer.WithGroupName(groupName),
   consumer.WithConsumeFromWhere(consumer.ConsumeFromTimestamp),// default fromLastOffset
   consumer.WithNsResovler(primitive.NewPassthroughResolver(nameServers)))

那IM里面如果使用MQ进行消息的广播、推送等,应该使用哪种模式呢?
我个人感觉应该是第三种:ConsumeFromTimestamp。为什么要使用这种模式,看下面。

为什么ConsumeFromTimestamp模式下,还能消费到之前生产的消息?

写测试代码的时候,先写了生产者,启动后生产了几条数据。然后中间有其他事情,过了几天消费者写好了启动,我发现把之前的消息打印出来了。

因为我是用MQ来解决不同网关之间通信广播的问题(为什么不用route_server来中转?因为存在单点问题,需要使用haproxy做主备),如下app用户给web用户发消息,但是这2个用户登录在不同的网关上,势必需要进行路由广播。
在这里插入图片描述

所以我希望在启动时,只消费当前时间戳往后的消息,不然可能就会产生很奇怪的问题,1个小时前发的消息,怎么1个小时后才到?

调查ConsumeFromTimestamp

尝试使用这个模式,我发现还是能收到生产者在之前生产的消息。所以又回过头去查询文档,找到了问题所在:

一个新的订阅组第一次启动从指定时间点开始消费
后续再启动接着上次消费的进度开始消费

所以只有 第一次启动时 才有效果,因为我使用的集群模式,消费进度会保存在broker上(rocketmq-console中可看),广播模式则保存在本地,所以后续再启动时,接着上一次的消费进度继续消费。所以得从应用层解决。

解决办法

  1. 每次启动时,groupname增加时间戳,这样每次都是新的订阅组,不过后期维护很麻烦,不推荐
  2. 手动设置offset,不过这种方式go里面我没找到函数。kafka可以采用这种方式
  3. 应用层根据时间戳自己处理

我采用第3种解决,先看一下时间戳:

[root@localhost bin]# ./mqadmin queryMsgById -i 0A006BDA00002A9F0000000000000B71 -n 10.0.107.218:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
OffsetID:            0A006BDA00002A9F0000000000000B71
Topic:               cim_msg_push
Tags:                [null]
Keys:                [null]
Queue ID:            0
Queue Offset:        11
CommitLog Offset:    2929
Reconsume Times:     0
Born Timestamp:      2020-07-01 10:47:01,169
Store Timestamp:     2020-06-17 05:09:02,072
Born Host:           10.0.106.117:60041
Store Host:          10.0.107.218:10911
System Flag:         0
Properties:          {UNIQ_KEY=0A006A7508CD0000000002505c880001}
Message Body Path:   /tmp/rocketmq/msgbodys/0A006A7508CD0000000002505c880001

MessageTrack [consumerGroup=group1, trackType=NOT_ONLINE, exceptionDesc=CODE:206 DESC:the consumer group[group1] not online]

为什么born timestamp和store timestamp不一样?

启动时,记录startTimeStamp,然后订阅后,判断时间戳大于该时间戳才处理。

func NewMsgConsumer() *MsgConsumer {
   return &MsgConsumer{
      pushMsgChan:    make(chan *cim.CIMPushMsg),
      startTimeStamp: time.Now().Unix() * 1000, // ms
   }
}


func (m *MsgConsumer) onMsgPush(ctx context.Context, msgs ...*primitive.MessageExt) (result consumer.ConsumeResult, err error) {
   for i := range msgs {
      logger.Sugar.Infof("subscribe callback: %v", msgs[i])

      // if msg too old,drop all
      if msgs[i].BornTimestamp < m.startTimeStamp {
         logger.Sugar.Warnf("expired msg,dorp it: %v", msgs[i])
         continue
      }

      msg := &cim.CIMPushMsg{}
      err = proto.Unmarshal(msgs[i].Body, msg)
      if err != nil {
         logger.Sugar.Info(err)
      } else {
         m.pushMsgChan <- msg
      }
   }
   return consumer.ConsumeSuccess, nil
}

输出:

2020-07-01 14:01:20.971	[INFO]	[mq/consumer.go:140]	subscribe callback: [Message=[topic=cim_msg_push, body�10.0.107.117:8000 (2�wx_2020�"$16757bf1-c9e5-4705-ba04-3b2bca925f27(����08hello mq, Flag=0, properties=map[CONSUME_START_TIME:1593583280971 MAX_OFFSET:16 MIN_OFFSET:0 UNIQ_KEY:0A006A753A940000000003022ff80001], TransactionId=], MsgId=0A006A753A940000000003022ff80001, OffsetMsgId=0A006BDA00002A9F0000000000000F49,QueueId=0, StoreSize=246, QueueOffset=15, SysFlag=0, BornTimestamp=1593583275496, BornHost=10.0.106.117:59062, StoreTimestamp=1592352409555, StoreHost=10.0.107.218:10911, CommitLogOffset=3913, BodyCRC=2070446043, ReconsumeTimes=0, PreparedTransactionOffset=0]
2020-07-01 14:01:20.971	[WARN]	[mq/consumer.go:144]	expired msg,dorp it: [Message=[topic=cim_msg_push, body�10.0.107.117:8000 (2�wx_2020�"$16757bf1-c9e5-4705-ba04-3b2bca925f27(����08hello mq, Flag=0, properties=map[CONSUME_START_TIME:1593583280971 MAX_OFFSET:16 MIN_OFFSET:0 UNIQ_KEY:0A006A753A940000000003022ff80001], TransactionId=], MsgId=0A006A753A940000000003022ff80001, OffsetMsgId=0A006BDA00002A9F0000000000000F49,QueueId=0, StoreSize=246, QueueOffset=15, SysFlag=0, BornTimestamp=1593583275496, BornHost=10.0.106.117:59062, StoreTimestamp=1592352409555, StoreHost=10.0.107.218:10911, CommitLogOffset=3913, BodyCRC=2070446043, ReconsumeTimes=0, PreparedTransactionOffset=0]

总结

ConsumeFromTimestamp模式下

  • 只会在订阅组第一次启动的时候,过滤掉小于当前系统时间戳的消息,后续如果进程停掉或者崩溃,但是又生产了新消息。下次启动消费者时,会继续消费停掉期间新生产的消息。
  • 后续行为和ConsumeFromLastOffset类似,再启动时,因为是在broker中保存了消费进度,所以继续上一次进程退出前的位置继续消费。

版权声明:本文为xmcy001122原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/xmcy001122/article/details/107062441