ClickHouse in memory part和wal分析

相关PR

https://github.com/ClickHouse/ClickHouse/pull/10697

使用

在MergeTree中添加了part的in memory format,数据存储在内存中。 part在第一次合并时写入磁盘。 如果零件的行或字节大小小于阈值min_rows_for_compact_partmin_bytes_for_compact_part,则将以内存格式创建part。 还提供可选的Write-Ahead-Log支持,默认情况下启用此功能,并通过设置in_memory_parts_enable_wal进行控制。

示例SQL

1
2
3
4
5
6
7
8
CREATE TABLE default.mem (
a UInt32,
b UInt32,
c UInt32
) ENGINE = MergeTree()
PARTITION BY a
ORDER BY b
SETTINGS min_rows_for_compact_part = 3;

当单次插入数据行数小于3时,就会以in memory part的形式插入,in memory part的数据暂时不落盘,但会写日志到wal.bin文件。

Write Ahead Log

相关实现主要在src/Storages/MergeTree/MergeTreeWriteAheadLog.h

当插入如下一行数据时,可以看到wal.bin中的内容。

1
localhost :) insert into default.mem values(1,2,3);
1
2
3
4
5
6
$ hexdump -C wal.bin 
00000000 01 00 02 7b 7d 00 07 31 5f 31 5f 31 5f 30 03 01 |...{}..1_1_1_0..|
00000010 01 61 06 55 49 6e 74 33 32 01 00 00 00 01 62 06 |.a.UInt32.....b.|
00000020 55 49 6e 74 33 32 02 00 00 00 01 63 06 55 49 6e |UInt32.....c.UIn|
00000030 74 33 32 03 00 00 00 |t32....|
00000037

一个part对应wal.bin一条记录,插入一个part会被序列化为:Version+metadata+ActionType+PartName+Block

具体逻辑在void MergeTreeWriteAheadLog::addPart(DataPartInMemoryPtr & part)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void MergeTreeWriteAheadLog::addPart(DataPartInMemoryPtr & part)
{
...

writeIntBinary(WAL_VERSION, *out);

ActionMetadata metadata{};
metadata.part_uuid = part->uuid;
metadata.write(*out);

writeIntBinary(static_cast<UInt8>(ActionType::ADD_PART), *out);
writeStringBinary(part->name, *out);
block_out->write(part->block);
block_out->flush();

...
}

  • WAL_VERSIONUInt8类型,1字节.

  • ActionMetadata,包括min_compatible_version(1字节),ser_meta.length()(VarInt).

  • ActionType表示这次操作是插入还是删除。

  • PartName表示这一part的分区键,最小block,最大block,merge level。在序列化为字符串时还会在开始写入PartName长度。

  • BlockNativeBlockOutputStream负责序列化,具体实现逻辑在src/DataStreams/NativeBlockOutputStream.h

删除part与插入的日志类似,只是不会插入block内容。

从WAL中恢复数据

当重启clickhouse时,需要从WAL中恢复in memory part

主要实现逻辑在MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot),会在void MergeTreeData::loadDataParts(bool skip_sanity_checks)中被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
{
...

while (!in->eof())
{
...

try
{
...//读metadata

if (action_type == ActionType::DROP_PART)
{
dropped_parts.insert(part_name);
}
else if (action_type == ActionType::ADD_PART)
{
...//从wal中读block内容
}
else
{
throw Exception("Unknown action type: " + toString(static_cast<UInt8>(action_type)), ErrorCodes::CORRUPTED_DATA);
}
}
catch (const Exception & e)
{
...
}

if (action_type == ActionType::ADD_PART)
{
...//在内存中重建in memory part
}
}

MergeTreeData::MutableDataPartsVector result;
std::copy_if(parts.begin(), parts.end(), std::back_inserter(result),
[&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });

return result;
}

in memory part写入数据

实现主要在src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.h

1
2
void MergeTreeDataPartWriterInMemory::write(
const Block & block, const IColumn::Permutation * permutation)

block为写入的数据块,permutation用来对block的数据排序,若为nullptr则不需要排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    ...
if (permutation)//排序
{
...
}
else
{
for (const auto & col : columns_list)
result_block.insert(block.getByName(col.name));
}

...

part_in_memory->block = std::move(result_block);

if (settings.rewrite_primary_key)
calculateAndSerializePrimaryIndex(primary_key_block);

主要逻辑包括排序,然后将插入的block移动到part中的block,然后计算主键。在 in memory part中数据的存储格式仍然是列式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Block & primary_index_block)
{//计算主键
size_t rows = primary_index_block.rows();
if (!rows)
return;

size_t primary_columns_num = primary_index_block.columns();
index_columns.resize(primary_columns_num);
for (size_t i = 0; i < primary_columns_num; ++i)
{
const auto & primary_column = *primary_index_block.getByPosition(i).column;
index_columns[i] = primary_column.cloneEmpty();
index_columns[i]->insertFrom(primary_column, 0);
if (with_final_mark)
index_columns[i]->insertFrom(primary_column, rows - 1);
}
}

在计算主键时,可以看出主键只会将第一行的主键值插入到index_columns中,并会根据with_final_mark来确定是否将block的最后一行插入到主键中。这里猜测是因为in memory part 的数据量一般比较小,所以作了比较简单的计算。

in memory part 读取数据

实现主要在src/Storages/MergeTree/MergeTreeDataPartReaderInMemory.h

主要实现逻辑在readRows中,将in memory part中的数据读到res_columns中并返回读取的行数。

如果读取的是整个part的行,则会把part中的列读到结果列中,如果只是部分行,则会创建新的列,并读取所需行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
...
auto column_it = columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++column_it)
{
auto [name, type] = getColumnFromPart(*column_it);

/// Copy offsets, if array of Nested column is missing in part.
auto offsets_it = positions_for_offsets.find(name);
if (offsets_it != positions_for_offsets.end())
{
...
}
else if (part_in_memory->block.has(name))
{
const auto & block_column = part_in_memory->block.getByName(name).column;
if (rows_to_read == part_rows)
{
res_columns[i] = block_column;
}
else
{
if (res_columns[i] == nullptr)
res_columns[i] = type->createColumn();

auto mutable_column = res_columns[i]->assumeMutable();
mutable_column->insertRangeFrom(*block_column, total_rows_read, rows_to_read);
res_columns[i] = std::move(mutable_column);
}
}
}

total_rows_read += rows_to_read;
return rows_to_read;
}


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!