AnthonyZero's Bolg

RocketMQ-刷盘策略

在上篇博客RocketMQ-消息存储中,已经看到了消息是如何从 Broker 最终存储到MappedFile内存缓冲区中的,但是此时消息存储的任务并没有完成,因为消息还没有刷盘,即存储到文件中,本篇继续探索RocketMQ是如何进行消息刷盘的。

刷盘策略

CommitLog在初始化的时候,见构造方法 会根据配置,启动两种不同的刷盘策略。

1
2
3
4
5
6
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();

  • CommitRealTimeService:异步刷盘并且开启内存字节缓冲区
  • FlushRealTimeService:异步刷盘但是不开启内存字节缓冲区
  • GroupCommitService:同步刷盘

同步刷盘:当消息追加到内存后,就立即刷到文件中存储
异步刷盘:当消息追加到内存中,并不是立即刷到文件中,而是在后台任务中进行异步操作

handleDiskFlush()

上一篇在CommitLog在putMessage()中收到MappedFile成功追加消息到内存的结果后,便会调用handleDiskFlush()方法进行刷盘,将消息存储到文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
//提交刷盘请求(异步)
service.putRequest(request);
//同步等待刷盘结果,刷盘失败也会标志消息存储失败,返回 FLUSH_DISK_TIMEOUT
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}

同步刷盘

如果刷盘策略是同步刷盘(GroupCommitService)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class GroupCommitService extends FlushCommitLogService {
//request写入队列,和read队列切换
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
//request读队列,和write队列切换
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
public synchronized void putRequest(final GroupCommitRequest request) {
// 添加写入请求
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
// 切换读写队列
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
//读写队列切换 避免产生锁竞争
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}

private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
boolean flushOK = false;
//考虑到有可能每次循环的消息写入的消息,可能分布在两个 MappedFile(写第N个消息时,MappedFile 已满,创建了一个新的),所以需要有循环2次。
for (int i = 0; i < 2 && !flushOK; i++) {
//是否满足需要flush条件,即请求的offset超过flush的offset
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
//唤醒等待写入请求线程,通过 CountDownLatch 实现。
req.wakeupCustomer(flushOK);
}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
// 清理读取队列
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}

public void run() {
CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}

synchronized (this) {
this.swapRequests();
}

this.doCommit();

CommitLog.log.info(this.getServiceName() + " service end");
}

}

GroupCommitService 被唤醒后,便会将 requestsWrite 中的请求交换到 requestsRead中。在启动后会在死循环中run()调用doCommit()方法,而doCommit()则不断遍历requestsRead中的请求,进行处理:最终调用了CommitLog.this.mappedFileQueue.flush(0) 来进行刷盘

同步刷盘的任务虽然也是在异步线程中执行,但是消息存储的主流程中会同步(GroupCommitRequest中countDownLatch)等待刷盘结果,所以本质上还是同步操作

异步刷盘

FlushRealTimeService:消息插入成功时,异步刷盘时使用。代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//周期性的进行刷盘操作
while (!this.isStopped()) {
// 休眠策略,为 true 时,调用 Thread.sleep()休眠,为false时,调用wait()休眠,默认 false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
// 获取刷盘周期,默认为 500 ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
// 每次刷盘至少要刷多少页内容,每页大小为 4 k,默认每次要刷 4 页
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
// 两次刷写之间的最大时间间隔,默认 10 s
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

boolean printFlushProgress = false;

// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
// 判断当前时间距离上次刷盘时间是否已经超出设置的两次刷盘最大间隔
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
// 如果已经超时,则将flushPhysicQueueLeastPages设置为0,表明将所有内存缓存全部刷盘到文件中
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}

try {
// 根据不同休眠策略,进行休眠等待
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}

if (printFlushProgress) {
this.printFlushProgress();
}

long begin = System.currentTimeMillis();
// 休眠结束,开始执行刷盘操作
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}

通过上面这段逻辑可知,异步刷盘就在异步线程中,周期性的将内存缓冲区的内容刷到文件中,在消息主流程中,只会唤醒异步刷盘线程,而不会同步等待刷盘结果,所以称为异步刷盘。(CommitRealTimeService和 FlushRealTimeService 类似,性能更好)

MappedFile落盘

无论是上面哪种刷盘策略,最终都调用了下面这个方法进行数据落地:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

public boolean flush(final int flushLeastPages) {
boolean result = true;
//从mappedFileQueue保存的所有MappedFile中,找出所要刷盘的MappedFile
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
//如果找到了对应的MappedFile,则对该MappedFile中的内容执行刷盘操作,并更新flushedWhere
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}

return result;
}

刷盘的终极目的地就在MappedFile的flush()方法中:判断是否满足刷盘条件;如果满足刷盘条件,则将内存中的内容刷到文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();

try {
// 如果writeBuffer不为空,则表明消息是先提交到writeBuffer中,已经从writeBuffer提交到fileChannel,直接调用fileChannel.force()
// 反之,消息是直接存储在文件内存映射缓冲区mappedByteBuffer中
if (writeBuffer != null || this.fileChannel.position() != 0) {
//方式一 文件通道(fileChannel)flush
this.fileChannel.force(false);
} else {
//方式二 映射文件字节缓冲区(mappedByteBuffer)flush
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}

this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}

总结

了解了RocketMQ的两种刷盘策略:

  • 一种是类似强一致的(SYNC 模式),保证消息存储到文件中的同步策略,立刻刷盘并同步阻塞等待刷盘结果
  • 一种是提交到内存中就算存储成功(ASYNC 模式),在后台异步进行刷盘的异步策略(通知异步线程进行刷盘,但不等待结果)