相关PR
https://github.com/ClickHouse/ClickHouse/pull/10697
使用
在MergeTree中添加了part的in memory format,数据存储在内存中。 part在第一次合并时写入磁盘。 如果零件的行或字节大小小于阈值min_rows_for_compact_part
和min_bytes_for_compact_part
,则将以内存格式创建part。 还提供可选的Write-Ahead-Log
支持,默认情况下启用此功能,并通过设置in_memory_parts_enable_wal
进行控制。
示例SQL
1 2 3 4 5 6 7 8
| CREATE TABLE default.mem ( a UInt32, b UInt32, c UInt32 ) ENGINE = MergeTree() PARTITION BY a ORDER BY b SETTINGS min_rows_for_compact_part = 3;
|
当单次插入数据行数小于3时,就会以in memory part的形式插入,in memory part的数据暂时不落盘,但会写日志到wal.bin
文件。
Write Ahead Log
相关实现主要在src/Storages/MergeTree/MergeTreeWriteAheadLog.h
当插入如下一行数据时,可以看到wal.bin
中的内容。
1
| localhost :) insert into default.mem values(1,2,3);
|
1 2 3 4 5 6
| $ hexdump -C wal.bin 00000000 01 00 02 7b 7d 00 07 31 5f 31 5f 31 5f 30 03 01 |...{}..1_1_1_0..| 00000010 01 61 06 55 49 6e 74 33 32 01 00 00 00 01 62 06 |.a.UInt32.....b.| 00000020 55 49 6e 74 33 32 02 00 00 00 01 63 06 55 49 6e |UInt32.....c.UIn| 00000030 74 33 32 03 00 00 00 |t32....| 00000037
|
一个part对应wal.bin
一条记录,插入一个part会被序列化为:Version+metadata+ActionType+PartName+Block
具体逻辑在void MergeTreeWriteAheadLog::addPart(DataPartInMemoryPtr & part)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void MergeTreeWriteAheadLog::addPart(DataPartInMemoryPtr & part) { ... writeIntBinary(WAL_VERSION, *out);
ActionMetadata metadata{}; metadata.part_uuid = part->uuid; metadata.write(*out);
writeIntBinary(static_cast<UInt8>(ActionType::ADD_PART), *out); writeStringBinary(part->name, *out); block_out->write(part->block); block_out->flush();
... }
|
WAL_VERSION
为UInt8
类型,1字节.
ActionMetadata
,包括min_compatible_version(1字节),ser_meta.length()(VarInt)
.
ActionType
表示这次操作是插入还是删除。
PartName
表示这一part的分区键,最小block,最大block,merge level。在序列化为字符串时还会在开始写入PartName
长度。
Block
由NativeBlockOutputStream
负责序列化,具体实现逻辑在src/DataStreams/NativeBlockOutputStream.h
删除part与插入的日志类似,只是不会插入block
内容。
从WAL中恢复数据
当重启clickhouse时,需要从WAL中恢复in memory part
主要实现逻辑在MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
,会在void MergeTreeData::loadDataParts(bool skip_sanity_checks)
中被调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot) { ...
while (!in->eof()) { ...
try { ...
if (action_type == ActionType::DROP_PART) { dropped_parts.insert(part_name); } else if (action_type == ActionType::ADD_PART) { ... } else { throw Exception("Unknown action type: " + toString(static_cast<UInt8>(action_type)), ErrorCodes::CORRUPTED_DATA); } } catch (const Exception & e) { ... }
if (action_type == ActionType::ADD_PART) { ... } }
MergeTreeData::MutableDataPartsVector result; std::copy_if(parts.begin(), parts.end(), std::back_inserter(result), [&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });
return result; }
|
in memory part写入数据
实现主要在src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.h
1 2
| void MergeTreeDataPartWriterInMemory::write( const Block & block, const IColumn::Permutation * permutation)
|
block
为写入的数据块,permutation
用来对block
的数据排序,若为nullptr
则不需要排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ... if (permutation) { ... } else { for (const auto & col : columns_list) result_block.insert(block.getByName(col.name)); }
... part_in_memory->block = std::move(result_block);
if (settings.rewrite_primary_key) calculateAndSerializePrimaryIndex(primary_key_block);
|
主要逻辑包括排序,然后将插入的block
移动到part中的block,然后计算主键。在 in memory part中数据的存储格式仍然是列式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Block & primary_index_block) { size_t rows = primary_index_block.rows(); if (!rows) return;
size_t primary_columns_num = primary_index_block.columns(); index_columns.resize(primary_columns_num); for (size_t i = 0; i < primary_columns_num; ++i) { const auto & primary_column = *primary_index_block.getByPosition(i).column; index_columns[i] = primary_column.cloneEmpty(); index_columns[i]->insertFrom(primary_column, 0); if (with_final_mark) index_columns[i]->insertFrom(primary_column, rows - 1); } }
|
在计算主键时,可以看出主键只会将第一行的主键值插入到index_columns
中,并会根据with_final_mark
来确定是否将block
的最后一行插入到主键中。这里猜测是因为in memory part 的数据量一般比较小,所以作了比较简单的计算。
in memory part 读取数据
实现主要在src/Storages/MergeTree/MergeTreeDataPartReaderInMemory.h
主要实现逻辑在readRows
中,将in memory part中的数据读到res_columns
中并返回读取的行数。
如果读取的是整个part的行,则会把part中的列读到结果列中,如果只是部分行,则会创建新的列,并读取所需行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { ... auto column_it = columns.begin(); for (size_t i = 0; i < num_columns; ++i, ++column_it) { auto [name, type] = getColumnFromPart(*column_it);
auto offsets_it = positions_for_offsets.find(name); if (offsets_it != positions_for_offsets.end()) { ... } else if (part_in_memory->block.has(name)) { const auto & block_column = part_in_memory->block.getByName(name).column; if (rows_to_read == part_rows) { res_columns[i] = block_column; } else { if (res_columns[i] == nullptr) res_columns[i] = type->createColumn();
auto mutable_column = res_columns[i]->assumeMutable(); mutable_column->insertRangeFrom(*block_column, total_rows_read, rows_to_read); res_columns[i] = std::move(mutable_column); } } }
total_rows_read += rows_to_read; return rows_to_read; }
|