AnthonyZero's Bolg

RocketMQ-消息发送

RocketMQ-消息发送

rocketmq.png

由于消息发送还涉及到延迟发送,顺序发送,批量发送等情况,分享下自己看源码总结的一般同步发送消息逻辑。

Producer启动

跟进producer.start()实质调用了DefaultMQProducerImpl的start(),startFactory==true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// 如果当前服务状态为CREATE_JUST【刚创建】
this.serviceState = ServiceState.START_FAILED;
//检查生产组是否符合要求
this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
// 改变生产者的实例id为进程id
this.defaultMQProducer.changeInstanceNameToPID();
}
// 初始化一个MQ客户端工厂,同一个clientId只有一个MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册生产者,将当前生产者加入到MQClientInstance中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 注册成功则将当前生产者组对应的topic与发布关系放入topicPublishInfoTable注册表
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 启动MQClientFactory,如果已经启动则不会再启动一次
if (startFactory) {
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

clientId=客户端ip+@+实例名+unitName(可选),对于同一个JVM中的不同消费者和不同生产者在启动时候获取到的MQClientInstance是同一个。MQClientInstance是封装了网络调用相关的逻辑

mQClientFactory.start():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void start() throws MQClientException {
// 同步当前实例
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 启动Netty客户端
this.mQClientAPIImpl.start();
// 开启定时任务
this.startScheduledTask();
//启动消息拉取线程
this.pullMessageService.start();
// 启动消息重负载线程
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

进入NettyRomoteClient的start(): 看到了熟悉的Netty的Bootstrap和pipeline了。pipeline中加入了编解码器 心跳检测 连接管理以及客户端handler处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});

this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);

if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}

到此,消息发送的必要条件:生产者启动过程就结束了,我们接着看一下消息发送的流程

总结:NameServer提供轻量级的服务发现和路由,Producer和Consumer相当于Netty Client,而最重要的Broker充当Netty Server角色 通过提供轻量级的 Topic 和 Queue 机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制。

同步发送消息

从SendResult result = producer.send(msg)入手。消息对应的Topic信息以及具体内容被封装在了Message中,并交由DefaultMQProducer,调用send()进行发送。DefaultMQProducer 只是一个面向调用方的代理,真正的生产者是DefaultMQProducerImpl,具体实现见sendDefaultImpl方法

寻找Topic对应的路由信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
  1. 先从本地缓存的路由表中查询;
  2. 没有找到的话,便向NameSrv发起请求,更新本地路由表,再次查询。
  3. 如果仍然没有查询到,表明Topic没有事先配置,则用Topic TBW102向NameSrv发起查询,返回TBW102的路由信息,暂时作为Topic的路由。

选择消息要发送到的队列

根据topic路由表及broker名称,获取一个messageQueue,本次发送的队列就是选取的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

// 如果启用了broker故障延迟机制
if (this.sendLatencyFaultEnable) {
try {
// 本次需要发送的队列的index就是SendWhichQueue自增得到的
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// index与当前路由表中的对列总个数取模
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
// 获取到当前对应的待发送队列
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}

// 至少选择一个broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 获取broker中的可写队列数
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 如果可写队列数>0,则选取一个队列
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
// 可写队列数 <= 0 移除该broker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}

return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}

这段代码的核心就是进行队列的选取,选取的过程中伴随着负载均衡、故障检测,对于故障broker能够做到尽可能规避

发送的核心过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime)

//获取Queue所属Broker的地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

//将消息内容及其他信息封装进请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
......

//接着调用MQClientAPIImpl的sendMessage()方法
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);

sendMessage()内部便是创建请求,按照发送方式调用不同的发送方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}

  • 单向:只管发送,不管是否发送成功;例如日志收集。 producer 只负责发送消息,不等待 broker 响应结果,而且也没有回调函数触发
  • 同步:阻塞至拿到发送结果;指 producer 发送消息后,会在接收到 broker 响应后才继续发下一条消息的通信方式。
  • 异步:发送后直接返回,在回调函数中等待发送结果

接下来就是调用封装的Netty进行网络传输了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

//NettyRomotingClient类中
//上句根据broker地址创建NIO的通信channel

//执行发送前置钩子方法
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);


//NettyRomotingAbstract类中 实现
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();

try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
//把自增请求ID responseFuture放入到responseTable(ConcurrentMap)中,后面唤醒的时候根据这个ID拿到responseFuture
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
//处理成功 等待唤醒
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
//发送失败,直接唤醒 不进行阻塞了
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
//阻塞等待
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}

return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}

发送完毕之后,调用ResponseFuture的waitResponse方法使当前线程进行等待,使用 CountDownLatch.wait 方法实现,当 Netty 返回数据时,使用 CountDownLatch.countDown 进行唤醒,然后返回从 Broker 写入的结果,可能成功,也可能失败,需要到上层(Client层解析也就是NettyClientHandler处理),网络层只负责网络的事情。

到这儿,消息的发送就已经结束了,成功的从生产者传输到了Broker。

处理返回

客户端(Producer)通过NettyClientHandler会在 channelRead0 方法处理 Netty Server的返回值。见processMessageReceived 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}

分为处理Request请求和Response响应。我们看看处理Response响应

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        responseFuture.setResponseCommand(cmd);

        responseTable.remove(opaque);

        if (responseFuture.getInvokeCallback() != null) {
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

通过cmd对象的RequestID找到ResponseFuture,执行 responseFuture.putResponse设置返回值,唤醒阻塞等待的发送线程。好,到这里,唤醒阻塞的发送线程,返回数据,客户端同步的发送消息就结束了

看源码的过程是艰辛的,只有不断积累坚持才会有收获。继续加油!!!