每个Broker都对应有一个MessageStore,专门用来存储发送到它的消息,不过MessageStore本身不是文件,只是存储的一个抽象,MessageStore 中保存着一个 CommitLog,CommitLog 维护了一个 MappedFileQueue,而MappedFileQueue 中又维护了多个 MappedFile,每个MappedFile都会映射到文件系统中一个文件,这些文件才是真正的存储消息的地方,MappedFile的文件名为它记录的第一条消息的全局物理偏移量。
消息接收
从Netty服务端(Broker)接收消息处理器NettyServerHandler入手,NettyServerHandler是NettyRemotingServer的内部类,可在NettyRemotingServer 的源码中看到:
它的ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler 处理器,这个处理器的 channelRead0 方法会调用 NettyRemotingServer 的父类 processMessageReceived 方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND: //broker服务端处理请求
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND: //客户端处理响应
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
进入processRequestCommand(ctx, cmd)的方法看到以下关键代码:这部分是从 processorTable 里根据 Cmd Code,也就是 SEND_MESSAGE 获取对应的 Processor, Processor 由 2 部分组成,一部分是处理数据的对象,一部分是这个对象所对应的线程池。用于异步处理逻辑,防止阻塞 Netty IO 线程1
2
3
4
5
6final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
......
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
Processor是Broker对于每个请求Code,都注册了对应的处理类,代码可见BrokerController中的registerProcessor()方法.1
2
3
4......
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
......
Netty Server根据 Cmd 对象的 Code,就可以找到对应 Processor 类,对数据进行处理。其中用于接收消息的处理类为:SendMessageProcessor。
SendMessageProcessor 实现了 NettyRequestProcessor接口,并在接口方法processRequest()中处理接收到的请求.
消息存储
SendMessageProcessor在processRequest()中调用了sendMessage()方法来进行消息处理。其中关键代码this.brokerController.getMessageStore().putMessage(msgInner) 实现如下
1 | public PutMessageResult putMessage(MessageExtBrokerInner msg) { |
每个Broker都有一个MessageStore实例,MessageStore本身是一个接口,定义了一些用来存储消息的接口协议,RocketMQ中MessageStore默认的实现类为DefaultMessageStore,Broker在其初始化方法initialize()中便会初始化好DefaultMessageStore
消息到了CommitLog后,便要开始进入存储逻辑了。CommitLog 中的 PutMessage()大概(省略一些)步骤如下:
获取写锁,保证同一时刻只处理一条消息的存储操作
1
2事务消息相关TODO ...
putMessageLock.lock();从CommitLog中获取最新的MappedFile,追加消息
CommitLog 中保存了一个MappedFileQueue,MappedFileQueue初始化的时候配置了消息文件MappedFile的存储路径以及单个MappedFile文件的大小,当某个消息文件写满后,便会生成一个新的MappedFile继续写入消息,所以MappedFileQueue中会按照消息写入时间顺序,维护多个MappedFile。
1
2
3
4
5
6
7 if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
......
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
MappedFile.appendMessage()方法,此方法最终调用了MappedFile.appendMessagesInner(): 插入消息到了 MappedFile,并返回插入结果1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
// 获取当前文件的写入位置
int currentPos = this.wrotePosition.get();
// 如果当前文件未写满,则进入追加逻辑
if (currentPos < this.fileSize) {
//获取需要写入的字节缓冲区
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
// 写数据到 缓存(字节缓冲区)
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
消息追加结束后,释放写锁
1
2
3//超过500ms写告警日志 写统计消息(写入topic的次数及写入消息大小)
......
putMessageLock.unlock();最后处理刷盘和主从数据同步(TODO)
1
2handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg); //HAService
这里承接上一篇:在这篇中大概知道了Broker接受消息到存储到内存中的过程,接下去跟着源码看RocketMQ的文件刷盘策略也就是如何存放到物理文件中的。