MiniOB 1
MiniOB is one mini database, helping developers to learn how database works.
载入中...
搜索中...
未找到
thread_pool_executor.h
1/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
2miniob is licensed under Mulan PSL v2.
3You can use this software according to the terms and conditions of the Mulan PSL v2.
4You may obtain a copy of Mulan PSL v2 at:
5 http://license.coscl.org.cn/MulanPSL2
6THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
7EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
8MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
9See the Mulan PSL v2 for more details. */
10
11//
12// Created by Wangyunlai on 2023/01/11.
13//
14
15#pragma once
16
17#include <stdint.h>
18
19#include "common/queue/queue.h"
20#include "common/thread/runnable.h"
21#include "common/lang/mutex.h"
22#include "common/lang/atomic.h"
23#include "common/lang/memory.h"
24#include "common/lang/map.h"
25#include "common/lang/chrono.h"
26#include "common/lang/thread.h"
27
28namespace common {
29
44{
45public:
46 ThreadPoolExecutor() = default;
47 virtual ~ThreadPoolExecutor();
48
57 int init(const char *name, int core_size, int max_size, long keep_alive_time_ms);
58
68 int init(const char *name, int core_pool_size, int max_pool_size, long keep_alive_time_ms,
69 unique_ptr<Queue<unique_ptr<Runnable>>> &&work_queue);
70
77 int execute(unique_ptr<Runnable> &&task);
78
85 int execute(const function<void()> &callable);
86
90 int shutdown();
95
96public:
100 int active_count() const { return active_count_.load(); }
104 int core_pool_size() const { return core_pool_size_; }
108 int pool_size() const { return static_cast<int>(threads_.size()); }
112 int largest_pool_size() const { return largest_pool_size_; }
116 int64_t task_count() const { return task_count_.load(); }
117
121 int64_t queue_size() const { return static_cast<int64_t>(work_queue_->size()); }
122
123private:
129 int create_thread(bool core_thread);
135 int create_thread_locked(bool core_thread);
139 int extend_thread();
140
141private:
145 void thread_func();
146
147private:
151 enum class State
152 {
153 NEW,
154 RUNNING,
157 };
158
160 {
161 bool core_thread = false;
162 bool idle = false;
163 bool terminated = false;
164 thread *thread_ptr = nullptr;
165 };
166
167private:
168 State state_ = State::NEW;
169
172 chrono::milliseconds keep_alive_time_ms_;
173
174 unique_ptr<Queue<unique_ptr<Runnable>>> work_queue_;
175
176 mutable mutex lock_;
177 map<thread::id, ThreadData> threads_;
178
180 atomic<int64_t> task_count_ = 0;
181 atomic<int> active_count_ = 0;
182 string pool_name_;
183};
184
185} // namespace common
任务队列接口
Definition: queue.h:31
Definition: thread_pool_executor.h:44
int pool_size() const
线程池中线程个数
Definition: thread_pool_executor.h:108
int core_pool_size() const
核心线程个数
Definition: thread_pool_executor.h:104
unique_ptr< Queue< unique_ptr< Runnable > > > work_queue_
非核心线程空闲多久后退出
Definition: thread_pool_executor.h:174
mutex lock_
任务队列
Definition: thread_pool_executor.h:176
int active_count() const
当前活跃线程的个数,就是正在处理任务的线程个数
Definition: thread_pool_executor.h:100
int extend_thread()
检测是否需要扩展线程,如果需要就扩展
Definition: thread_pool_executor.cpp:204
int64_t task_count() const
处理过的任务个数
Definition: thread_pool_executor.h:116
map< thread::id, ThreadData > threads_
保护线程池内部数据的锁
Definition: thread_pool_executor.h:177
int max_pool_size_
核心线程个数
Definition: thread_pool_executor.h:171
int create_thread(bool core_thread)
创建一个线程
Definition: thread_pool_executor.cpp:177
void thread_func()
线程函数。从队列中拉任务并执行
Definition: thread_pool_executor.cpp:116
int shutdown()
关闭线程池
Definition: thread_pool_executor.cpp:73
int await_termination()
等待线程池处理完所有任务并退出
Definition: thread_pool_executor.cpp:104
int execute(unique_ptr< Runnable > &&task)
提交一个任务,不一定可以立即执行
Definition: thread_pool_executor.cpp:89
atomic< int > active_count_
处理过的任务个数
Definition: thread_pool_executor.h:181
chrono::milliseconds keep_alive_time_ms_
最大线程个数
Definition: thread_pool_executor.h:172
int largest_pool_size_
线程列表
Definition: thread_pool_executor.h:179
int create_thread_locked(bool core_thread)
创建一个线程。调用此函数前已经加锁
Definition: thread_pool_executor.cpp:183
string pool_name_
活跃线程个数
Definition: thread_pool_executor.h:182
int largest_pool_size() const
曾经达到过的最大线程个数
Definition: thread_pool_executor.h:112
State
线程池的状态
Definition: thread_pool_executor.h:152
atomic< int64_t > task_count_
历史上达到的最大的线程个数
Definition: thread_pool_executor.h:180
int core_pool_size_
线程池状态
Definition: thread_pool_executor.h:170
int64_t queue_size() const
任务队列中的任务个数
Definition: thread_pool_executor.h:121
int init(const char *name, int core_size, int max_size, long keep_alive_time_ms)
初始化线程池
Definition: thread_pool_executor.cpp:26
Definition: thread_pool_executor.h:160
thread * thread_ptr
是否已经退出
Definition: thread_pool_executor.h:164
bool terminated
是否空闲
Definition: thread_pool_executor.h:163
bool idle
是否是核心线程
Definition: thread_pool_executor.h:162