MiniOB 1
MiniOB is one mini database, helping developers to learn how database works.
载入中...
搜索中...
未找到
aggregate_hash_table.h
1/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
2miniob is licensed under Mulan PSL v2.
3You can use this software according to the terms and conditions of the Mulan PSL v2.
4You may obtain a copy of Mulan PSL v2 at:
5 http://license.coscl.org.cn/MulanPSL2
6THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
7EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
8MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
9See the Mulan PSL v2 for more details. */
10
11#pragma once
12
13#include "common/lang/vector.h"
14#include "common/lang/unordered_map.h"
15#include "common/math/simd_util.h"
16#include "common/sys/rc.h"
17#include "sql/expr/expression.h"
18
23{
24public:
25 class Scanner
26 {
27 public:
28 explicit Scanner(AggregateHashTable *hash_table) : hash_table_(hash_table) {}
29 virtual ~Scanner() = default;
30
31 virtual void open_scan() = 0;
32
36 virtual RC next(Chunk &chunk) = 0;
37
38 virtual void close_scan(){};
39
40 protected:
41 AggregateHashTable *hash_table_;
42 };
43
47 virtual RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk) = 0;
48
49 virtual ~AggregateHashTable() = default;
50};
51
53{
54private:
56 {
57 size_t operator()(const vector<Value> &vec) const;
58 };
59
61 {
62 bool operator()(const vector<Value> &lhs, const vector<Value> &rhs) const;
63 };
64
65public:
66 using StandardHashTable = unordered_map<vector<Value>, vector<Value>, VectorHash, VectorEqual>;
68 {
69 public:
70 explicit Scanner(AggregateHashTable *hash_table) : AggregateHashTable::Scanner(hash_table) {}
71 ~Scanner() = default;
72
73 void open_scan() override;
74
75 RC next(Chunk &chunk) override;
76
77 private:
78 StandardHashTable::iterator end_;
79 StandardHashTable::iterator it_;
80 };
81 StandardAggregateHashTable(const vector<Expression *> aggregations)
82 {
83 for (auto &expr : aggregations) {
84 ASSERT(expr->type() == ExprType::AGGREGATION, "expect aggregate expression");
85 auto *aggregation_expr = static_cast<AggregateExpr *>(expr);
86 aggr_types_.push_back(aggregation_expr->aggregate_type());
87 }
88 }
89
91
92 RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk) override;
93
94 StandardHashTable::iterator begin() { return aggr_values_.begin(); }
95 StandardHashTable::iterator end() { return aggr_values_.end(); }
96
97private:
99 StandardHashTable aggr_values_;
100 vector<AggregateExpr::Type> aggr_types_;
101};
102
107#ifdef USE_SIMD
108template <typename V>
109class LinearProbingAggregateHashTable : public AggregateHashTable
110{
111public:
112 class Scanner : public AggregateHashTable::Scanner
113 {
114 public:
115 explicit Scanner(AggregateHashTable *hash_table) : AggregateHashTable::Scanner(hash_table) {}
116 ~Scanner() = default;
117
118 void open_scan() override;
119
120 RC next(Chunk &chunk) override;
121
122 void close_scan() override;
123
124 private:
125 int capacity_ = -1;
126 int size_ = -1;
127 int scan_pos_ = -1;
128 int scan_count_ = 0;
129 };
130
131 LinearProbingAggregateHashTable(AggregateExpr::Type aggregate_type, int capacity = DEFAULT_CAPACITY)
132 : keys_(capacity, EMPTY_KEY), values_(capacity, 0), capacity_(capacity), aggregate_type_(aggregate_type)
133 {}
134 virtual ~LinearProbingAggregateHashTable() {}
135
136 RC get(int key, V &value);
137
138 RC iter_get(int pos, int &key, V &value);
139
140 RC add_chunk(Chunk &group_chunk, Chunk &aggr_chunk) override;
141
142 int capacity() { return capacity_; }
143 int size() { return size_; }
144
145private:
153 void add_batch(int *input_keys, V *input_values, int len);
154
155 void aggregate(V *value, V value_to_aggregate);
156
157 void resize();
158
159 void resize_if_need();
160
161private:
162 static const int EMPTY_KEY;
163 static const int DEFAULT_CAPACITY;
164
165 vector<int> keys_;
166 vector<V> values_;
167 int size_ = 0;
168 int capacity_ = 0;
169 AggregateExpr::Type aggregate_type_;
170};
171#endif // USE_SIMD
Definition: expression.h:474
Definition: aggregate_hash_table.h:26
virtual RC next(Chunk &chunk)=0
用于hash group by 的哈希表实现,不支持并发访问。
Definition: aggregate_hash_table.h:23
virtual RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk)=0
将 groups_chunk 和 aggrs_chunk 写入到哈希表中。哈希表中记录了聚合结果。
A Chunk represents a set of columns.
Definition: chunk.h:23
Definition: aggregate_hash_table.h:68
RC next(Chunk &chunk) override
Definition: aggregate_hash_table.cpp:27
Definition: aggregate_hash_table.h:53
StandardHashTable aggr_values_
group by values -> aggregate values
Definition: aggregate_hash_table.h:99
RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk) override
将 groups_chunk 和 aggrs_chunk 写入到哈希表中。哈希表中记录了聚合结果。
Definition: aggregate_hash_table.cpp:15
Definition: aggregate_hash_table.h:61
Definition: aggregate_hash_table.h:56