AnthonyZero's Bolg

RocketMQ-消息存储

每个Broker都对应有一个MessageStore,专门用来存储发送到它的消息,不过MessageStore本身不是文件,只是存储的一个抽象,MessageStore 中保存着一个 CommitLog,CommitLog 维护了一个 MappedFileQueue,而MappedFileQueue 中又维护了多个 MappedFile,每个MappedFile都会映射到文件系统中一个文件,这些文件才是真正的存储消息的地方,MappedFile的文件名为它记录的第一条消息的全局物理偏移量。
rocketmqstore.png

消息接收

从Netty服务端(Broker)接收消息处理器NettyServerHandler入手,NettyServerHandler是NettyRemotingServer的内部类,可在NettyRemotingServer 的源码中看到:
它的ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler 处理器,这个处理器的 channelRead0 方法会调用 NettyRemotingServer 的父类 processMessageReceived 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND: //broker服务端处理请求
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND: //客户端处理响应
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}

进入processRequestCommand(ctx, cmd)的方法看到以下关键代码:这部分是从 processorTable 里根据 Cmd Code,也就是 SEND_MESSAGE 获取对应的 Processor, Processor 由 2 部分组成,一部分是处理数据的对象,一部分是这个对象所对应的线程池。用于异步处理逻辑,防止阻塞 Netty IO 线程

1
2
3
4
5
6
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
......
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

Processor是Broker对于每个请求Code,都注册了对应的处理类,代码可见BrokerController中的registerProcessor()方法.

1
2
3
4
......
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
......

Netty Server根据 Cmd 对象的 Code,就可以找到对应 Processor 类,对数据进行处理。其中用于接收消息的处理类为:SendMessageProcessor。
SendMessageProcessor 实现了 NettyRequestProcessor接口,并在接口方法processRequest()中处理接收到的请求.

消息存储

SendMessageProcessor在processRequest()中调用了sendMessage()方法来进行消息处理。其中关键代码this.brokerController.getMessageStore().putMessage(msgInner) 实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// MessageStore是否被关闭
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 从节点不允许写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}

return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// mesageStore是否允许写入
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}

return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
// topic消息过长
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// 消息附加属性过长
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
//检查 PageCache 忙不忙.(内存锁定时间超过一秒为繁忙)
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

long beginTime = this.getSystemClock().now();
//写数据到commitlog
PutMessageResult result = this.commitLog.putMessage(msg);

long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) { //如果耗时超过 500 毫秒,就会打印日志
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
//记录消息存储时间分布
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

if (null == result || !result.isOk()) {
//记录消息存储失败次数
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}

return result;
}

每个Broker都有一个MessageStore实例,MessageStore本身是一个接口,定义了一些用来存储消息的接口协议,RocketMQ中MessageStore默认的实现类为DefaultMessageStore,Broker在其初始化方法initialize()中便会初始化好DefaultMessageStore

消息到了CommitLog后,便要开始进入存储逻辑了。CommitLog 中的 PutMessage()大概(省略一些)步骤如下:

  1. 获取写锁,保证同一时刻只处理一条消息的存储操作

    1
    2
    事务消息相关TODO ...
    putMessageLock.lock();
  2. 从CommitLog中获取最新的MappedFile,追加消息

CommitLog 中保存了一个MappedFileQueue,MappedFileQueue初始化的时候配置了消息文件MappedFile的存储路径以及单个MappedFile文件的大小,当某个消息文件写满后,便会生成一个新的MappedFile继续写入消息,所以MappedFileQueue中会按照消息写入时间顺序,维护多个MappedFile。

1
2
3
4
5
6
7
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
......
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);

MappedFile.appendMessage()方法,此方法最终调用了MappedFile.appendMessagesInner(): 插入消息到了 MappedFile,并返回插入结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
// 获取当前文件的写入位置
int currentPos = this.wrotePosition.get();
// 如果当前文件未写满,则进入追加逻辑
if (currentPos < this.fileSize) {
//获取需要写入的字节缓冲区
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
// 写数据到 缓存(字节缓冲区)
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

备注:此时写入消息并没有写入磁盘,而是写入了映射文件的writeBuffer或者mappedByteBuffer(PageCache或堆外内存)
  1. 消息追加结束后,释放写锁

    1
    2
    3
    //超过500ms写告警日志 写统计消息(写入topic的次数及写入消息大小)
    ......
    putMessageLock.unlock();
  2. 最后处理刷盘和主从数据同步(TODO)

    1
    2
    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg); //HAService

这里承接上一篇:在这篇中大概知道了Broker接受消息到存储到内存中的过程,接下去跟着源码看RocketMQ的文件刷盘策略也就是如何存放到物理文件中的。