AnthonyZero's Bolg

RocketMQ-消费模式

简介

RocketMQ提供了两种消费模式,Push和Pull,大多数场景使用的是Push模式,而在Pull模式下,需要业务应用代码自身去完成比较多的事情,例如需要Consumer端自己保存消息消费的offset偏移量至本地变量中。因此在实际应用中用的较少。

在源码中这两种模式分别对应的是DefaultMQPushConsumer类和DefaultMQPullConsumer类。Push模式实际上在内部还是使用的Pull方式实现的,通过Consumer采用长轮询方式不断地轮询Broker获取消息,当不存在新消息时,Broker端会挂起Pull请求,直到有新消息产生才取消挂起,返回新消息。

之所以Push模式不设计为Broker主动推送,我猜测一是在于加大了Broker的压力,二是分布式系统中并不知道消费者的消费能力-这一点不可控

Consumer向Broker端发送Pull消息请求

在RocketMQ的Consumer端,后台独立线程服务启动—pullMessageService是Pull消息请求的发起者,它不断地尝试从阻塞队列—LinkedBlockingQueue中获取元素PullRequest,并根据pullRequest中的参数以及订阅关系信息调用pullAPIWrapper的pullKernelImpl()方法发送封装后的Pull消息请求—PullMessageRequestHeader至Broker端来拉取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//consumer.start(); consumer启动    
this.defaultMQPushConsumerImpl.start();

// mQClientFactory.start();
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start(); //启动pullMessageService
// Start rebalance service
this.rebalanceService.start(); //后台独立线程—rebalanceService
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}

rebalanceService根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应PullRequest实例放入阻塞队列—pullRequestQueue中

见PullMessageService的run方法中可见:PullMessageService不断地从阻塞队列—pullRequestQueue中获取PullRequest请求并通过网络通信模块发送Pull消息的RPC请求给Broker端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest); //发送RPC 请求
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}

这里发送请求是调用DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void pullMessage(final PullRequest pullRequest) {
.......
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}

}

// 进入 this.pullAPIWrapper.pullKernelImpl方法:
构造请求头等省略
......
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);

通过RocketMQ的Remoting通信层向Broker端发送Pull消息的RPC请求:具体完成发送一次Pull消息的PRC通信请求的是MQClientAPIImpl中的pullMessage()方法。到此消费者消息发送完毕

Broker端处理Pull消息请求

从NettyServerHandler处理中根据Cmd Code拿到的Processor处理器应该是PullMessageProcessor:也就是说PullMessageProcessor的processRequest为处理拉取消息请求的入口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend){
......
case ResponseCode.SUCCESS:
......
//如果PULL到消息了 采用基于Zero-Copy的Netty组件的FileRegion,其包装的“FileChannel.tranferTo”实现数据传输,
// 可以直接将文件缓冲区的数据发送至通信目标通道Channel中
FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
......
// 当还没有消息可以拉取情况下 Broker端会hold住这个请求,
// 直到Broker端存在相关的数据,或者等待超时时间后返回
// 在响应返回后,Consumer端又会再次发起下一次的长轮询请求。RocketMQ的push模式正是采用了这种长轮询机制的设计思路
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}

String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}

}

如果在上述的第一次尝试Pull消息失败后(比如Broker端暂时没有可以消费的消息),先hold住并且挂起该请求(这里,设置返回响应response为null,此时不会向Consumer端发送任何响应的内容,即不会对响应结果进行处理),然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService的二次处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable
//将需要hold处理的PullRequest放入到一个ConcurrentHashMap中,等待被检查
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}

mpr.addPullRequest(pullRequest);
}

public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}

//具体的检查代码在checkHoldRequest中
private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}

在run方法中不停检查(checkHoldRequest):一是从pullRequestTable本地缓存变量中取PullRequest请求,检查轮询条件—“待拉取消息的偏移量是否小于消费队列最大偏移量”是否成立,如果条件成立则说明有新消息达到Broker端;另外一个就是比较当前的时间和阻塞的时间,看是否超过了最大的阻塞时间,超过也同样返回。

Consumer端收到消息消费

服务端处理完之后,给客户端响应,回调其中的PullCallback进行我们的消息消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
 //NettyRemotingAbstract的processResponseCommand()方法 从NettyClientHandler中进入
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);

responseTable.remove(opaque);
//这里invokeCallback有值 执行异步回调
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}

//继续到达pullMessageAsync的operationComplete()方法
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult); //执行
} catch (Exception e) {
pullCallback.onException(e);
}
} else {

}
}

//见PullCallback(DefaultMQPushConsumerImpl)
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 到这里开始执行消费情况了 submitConsumeRequest
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);

总结

RocketMQ的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡常规Push/Pull模型的各自缺点。通过这种长轮询机制,可以解决Consumer端需要通过不断地发送无效的轮询Pull请求,而导致整个RocketMQ集群中Broker端负载很高的问题。