简介
RocketMQ提供了两种消费模式,Push和Pull,大多数场景使用的是Push模式,而在Pull模式下,需要业务应用代码自身去完成比较多的事情,例如需要Consumer端自己保存消息消费的offset偏移量至本地变量中。因此在实际应用中用的较少。
在源码中这两种模式分别对应的是DefaultMQPushConsumer类和DefaultMQPullConsumer类。Push模式实际上在内部还是使用的Pull方式实现的,通过Consumer采用长轮询方式不断地轮询Broker获取消息,当不存在新消息时,Broker端会挂起Pull请求,直到有新消息产生才取消挂起,返回新消息。
之所以Push模式不设计为Broker主动推送,我猜测一是在于加大了Broker的压力,二是分布式系统中并不知道消费者的消费能力-这一点不可控
Consumer向Broker端发送Pull消息请求
在RocketMQ的Consumer端,后台独立线程服务启动—pullMessageService是Pull消息请求的发起者,它不断地尝试从阻塞队列—LinkedBlockingQueue
1  | //consumer.start(); consumer启动  | 
rebalanceService根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应PullRequest实例放入阻塞队列—pullRequestQueue中
见PullMessageService的run方法中可见:PullMessageService不断地从阻塞队列—pullRequestQueue中获取PullRequest请求并通过网络通信模块发送Pull消息的RPC请求给Broker端1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest); //发送RPC 请求
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}
这里发送请求是调用DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public void pullMessage(final PullRequest pullRequest) {
    .......
    try {
        this.pullAPIWrapper.pullKernelImpl(
            pullRequest.getMessageQueue(),
            subExpression,
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion(),
            pullRequest.getNextOffset(),
            this.defaultMQPushConsumer.getPullBatchSize(),
            sysFlag,
            commitOffsetValue,
            BROKER_SUSPEND_MAX_TIME_MILLIS,
            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
            CommunicationMode.ASYNC,
            pullCallback
        );
    } catch (Exception e) {
        log.error("pullKernelImpl exception", e);
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    }
}
// 进入 this.pullAPIWrapper.pullKernelImpl方法:
构造请求头等省略
......
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
    brokerAddr,
    requestHeader,
    timeoutMillis,
    communicationMode,
    pullCallback);
通过RocketMQ的Remoting通信层向Broker端发送Pull消息的RPC请求:具体完成发送一次Pull消息的PRC通信请求的是MQClientAPIImpl中的pullMessage()方法。到此消费者消息发送完毕
Broker端处理Pull消息请求
从NettyServerHandler处理中根据Cmd Code拿到的Processor处理器应该是PullMessageProcessor:也就是说PullMessageProcessor的processRequest为处理拉取消息请求的入口。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend){
    ......
    case ResponseCode.SUCCESS:
    ......
    //如果PULL到消息了 采用基于Zero-Copy的Netty组件的FileRegion,其包装的“FileChannel.tranferTo”实现数据传输,
    // 可以直接将文件缓冲区的数据发送至通信目标通道Channel中
            FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
            channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    getMessageResult.release();
                    if (!future.isSuccess()) {
                        log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
                    }
                }
            });
    ......
    // 当还没有消息可以拉取情况下 Broker端会hold住这个请求,
    // 直到Broker端存在相关的数据,或者等待超时时间后返回
    // 在响应返回后,Consumer端又会再次发起下一次的长轮询请求。RocketMQ的push模式正是采用了这种长轮询机制的设计思路
    case ResponseCode.PULL_NOT_FOUND:
        if (brokerAllowSuspend && hasSuspendFlag) {
            long pollingTimeMills = suspendTimeoutMillisLong;
            if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
            }
            String topic = requestHeader.getTopic();
            long offset = requestHeader.getQueueOffset();
            int queueId = requestHeader.getQueueId();
            PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
            this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
            response = null;
            break;
        }
}
如果在上述的第一次尝试Pull消息失败后(比如Broker端暂时没有可以消费的消息),先hold住并且挂起该请求(这里,设置返回响应response为null,此时不会向Consumer端发送任何响应的内容,即不会对响应结果进行处理),然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService的二次处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable
//将需要hold处理的PullRequest放入到一个ConcurrentHashMap中,等待被检查
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }
    mpr.addPullRequest(pullRequest);
}
public void run() {
    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }
            long beginLockTimestamp = this.systemClock.now();
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    log.info("{} service end", this.getServiceName());
}
//具体的检查代码在checkHoldRequest中
private void checkHoldRequest() {
    for (String key : this.pullRequestTable.keySet()) {
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
            try {
                this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {
                log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
            }
        }
    }
}
在run方法中不停检查(checkHoldRequest):一是从pullRequestTable本地缓存变量中取PullRequest请求,检查轮询条件—“待拉取消息的偏移量是否小于消费队列最大偏移量”是否成立,如果条件成立则说明有新消息达到Broker端;另外一个就是比较当前的时间和阻塞的时间,看是否超过了最大的阻塞时间,超过也同样返回。
Consumer端收到消息消费
服务端处理完之后,给客户端响应,回调其中的PullCallback进行我们的消息消费。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 //NettyRemotingAbstract的processResponseCommand()方法 从NettyClientHandler中进入
 public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        responseFuture.setResponseCommand(cmd);
        responseTable.remove(opaque);
        //这里invokeCallback有值 执行异步回调
        if (responseFuture.getInvokeCallback() != null) {
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}
//继续到达pullMessageAsync的operationComplete()方法
public void operationComplete(ResponseFuture responseFuture) {
    RemotingCommand response = responseFuture.getResponseCommand();
    if (response != null) {
        try {
            PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
            assert pullResult != null;
            pullCallback.onSuccess(pullResult); //执行
        } catch (Exception e) {
            pullCallback.onException(e);
        }
    } else {
        
    }
}
//见PullCallback(DefaultMQPushConsumerImpl)   
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 到这里开始执行消费情况了  submitConsumeRequest
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
总结
RocketMQ的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡常规Push/Pull模型的各自缺点。通过这种长轮询机制,可以解决Consumer端需要通过不断地发送无效的轮询Pull请求,而导致整个RocketMQ集群中Broker端负载很高的问题。