Author: xijia
MariaDB 共有三种线程调度方式
one-thread-per-connection 每个连接一个线程
no-threads 所有连接共用一个线程
pool-of-threads 线程池
no-threads 只适用于简单的系统,并发数稍高性能就会严重下降
one-thread-per-connection 在多数情况下性能优良,是个合适的选择,生产系统也常用此配置。但在高并发、短连接的业务场景下,使用 one-thread-per-connection 会频繁得创建和销毁线程,严重影响性能
pool-of-threads 适用于高并发短连接的业务场景,线程复用,避免频繁创建和销毁线程带来的性能损耗
MariaDB 的 thread pool 在 win 和 unix 系统的实现不同,本文分析 unix 系统下的实现
thread pool 由若干个 thread_group 组成,每个 thread_group 有若干个 worker 线程,和 0~1 个 listenr 线程
server 接收到连接请求时,将这个连接分配给一个 group 处理,listener 线程负责监听请求,worker 线程处理请求内容
struct scheduler_functions 内部声明了连接建立相关的属性和方法,这里使用函数指针实现多态
每种线程调度方式,分别实例化一个struct scheduler_functions,给相关变量和函数指针赋值
thread pool 相关实现在 sql/threadpool_common.cc 和 sql/threadpool_unix.cc 中
sql/scheduler.h
struct scheduler_functions
{
uint max_threads, *connection_count;
ulong *max_connections;
bool (*init)(void);
bool (*init_new_connection_thread)(void);
void (*add_connection)(THD *thd);
void (*thd_wait_begin)(THD *thd, int wait_type);
void (*thd_wait_end)(THD *thd);
void (*post_kill_notification)(THD *thd);
bool (*end_thread)(THD *thd, bool cache_thread);
void (*end)(void);
};
-------------------------------------------------
sql/threadpool_common.cc
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
NULL,
NULL,
tp_init, // init
NULL, // init_new_connection_thread
tp_add_connection, // add_connection
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
post_kill_notification, // post_kill_notification
NULL, // end_thread
tp_end // end
};
初始化 thread_group
main
mysqld_main
network_init
/* 调用相应 thread_scheduler 下的 init 函数,这里是 tp_init */
MYSQL_CALLBACK_ELSE(thread_scheduler, init, (), 0)
tp_init
/* thread_group_t 申请内存 */
all_groups= (thread_group_t *)
my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
/* 初始化 thread_group_t 数组 */
for (uint i= 0; i < threadpool_max_size; i++)
{
thread_group_init(&all_groups[i], get_connection_attrib());
}
/* 设置最大执行时间,worker thread */
pool_timer.tick_interval= threadpool_stall_limit;
/* 开启 timer thread*/
start_timer(&pool_timer);
创建 connection,将 connection 分配到一个 thread_group,插入 group 队列
唤醒或创建一个 worker thread,如果 group 中没有活跃的 worker thread
main
mysqld_main
handle_connections_sockets
create_new_thread
/* 调用相应 thread_scheduler 下的 add_connection 函数,这里是 tp_add_connection */
MYSQL_CALLBACK(thd->scheduler, add_connection, (thd));
tp_add_connection
/* 申请一个 connection */
connection_t *connection= alloc_connection(thd);
/* round-robin 映射到一个 group */
thread_group_t *group=
&all_groups[thd->thread_id%group_count];
queue_put(group, connection)
/* connection 插入队列 */
thread_group->queue.push_back(connection);
/*
如果没有活跃 woker 线程,唤醒一个处理新连接
此时连接尚未 login,认证等工作在 worker 线程中完成
*/
if (thread_group->active_thread_count == 0)
wake_or_create_thread(thread_group);
wake_or_create_thread
/* 取出 thread_group->waiting_threads 中的 waiting thread (如有) */
if (wake_thread(thread_group) == 0)
DBUG_RETURN(0);
/*
没有活跃线程时,立即创建一个,在其他地方也有这个逻辑
保证每个 thread_group 只是存在一个活跃线程
*/
if (thread_group->active_thread_count == 0)
DBUG_RETURN(create_worker(thread_group));
/*
对创建新的 woker thread 限流,距离上个线程创建间隔一定时间才允许再次创建 worker thread
这个 group 线程总数越多,要求的间隔越长
4线程以下间隔为0,大于等于 4/8/16 线程时,要求间隔分别为 50*1000/100*1000/200*1000 微秒
*/
if (time_since_last_thread_created >
microsecond_throttling_interval(thread_group))
{
DBUG_RETURN(create_worker(thread_group));
}
从 queue 中取出和处理 event
在 group 没有 listener 时会变成 listener
没取到 event 时,进入休眠状态,等待被唤醒
最后进入休眠状态的 worker thread 最先被唤醒
worker_main
for(;;)
{
connection = get_event(&this_thread, thread_group, &ts);
/* 没有 event 时,跳出循环,结束 worker thread */
if (!connection)
break;
handle_event(connection);
}
get_event
for(;;)
{
/*
取出 tp_add_connection/listener 插入的 connection
*/
connection = queue_get(thread_group);
/* 取到 connection,跳出循环,进入下一步 handle_event */
if(connection)
break;
/* 如果没有 listener thread,这个线程变为 listener */
if(!thread_group->listener)
{
/* listener 变为 worker 处理一个 connection */
connection = listener(current_thread, thread_group);
/* listener 已变为 worker */
thread_group->listener= NULL;
/* 跳出循环,进入 handle_event 处理 connection */
break;
}
/* 进入休眠状态前,尝试非阻塞方式获取一个 connection */
if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
{
thread_group->io_event_count++;
connection = (connection_t *)native_event_get_userdata(&nev);
break;
}
/*
进入休眠状态
最后休眠的 worker thread 最先被唤醒,更容易命中 cache
*/
current_thread->woken = false;
thread_group->waiting_threads.push_front(current_thread);
/* 等待被唤醒 */
mysql_cond_wait
}
/* 返回获取到的 connection */
DBUG_RETURN(connection);
handle_event
if (!connection->logged_in)
{
/* login connection */
err= threadpool_add_connection(connection->thd);
login_connection
connection->logged_in= true;
}
else
{
/* 处理 connection 请求 */
err= threadpool_process_request(connection->thd);
do_command
}
/*
告诉客户端可以读
可能会变动 connection 所属的 group
*/
err= start_io(connection);
/*
group_count 允许动态改变,所以处理 connection 的 group 可能发生变动
这里检查 group 是否需要变动
*/
thread_group_t *group =
&all_groups[connection->thd->thread_id%group_count];
if (group != connection->thread_group)
change_group(connection, connection->thread_group, group)
return io_poll_start_read(group->pollfd, fd, connection);
listener 线程通过 epoll_wait 监听 group 关联的描述符,epoll使用一个文件描述符管理多个描述符
监听获得的 event 插入队列,如果插入前队列为空,listener 变成 worker 线程处理第一个event,其余插入队列,否则所有 event 插入队列
如果 group 中没有活跃线程,唤醒或者创建一个 worker 线程
这里考虑一种情况,如果监听只获得一个 event 且队列为空,那么这个 event 将会被 listener 处理,队列中不会插入新的 event,此时只需要 listener 变成 worker 线程处理 event,不需要再唤醒其他 worker 线程
listener
for(;;)
{
/* 监听事件 */
cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
/* 如果队列为空,listener 处理第一个事件,一定概率(只有一个事件)可以不再唤醒 worker thread */
bool listener_picks_event= thread_group->queue.is_empty();
/* 第一个事件留给 listener 处理,其余放入队列,或者全部放入队列 */
for(int i=(listener_picks_event)?1:0; i < cnt ; i++)
{
connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]);
thread_group->queue.push_back(c);
}
if (listener_picks_event)
{
/* Handle the first event. */
retval= (connection_t *)native_event_get_userdata(&ev[0]);
mysql_mutex_unlock(&thread_group->mutex);
break;
}
/*
队列中已经有一些 event,但是没有活跃线程
唤醒一个 worker 线程处理队列中 event
唤醒失败,并且仅有一个线程(仅有listener),创建一个 worker 线程
*/
if(thread_group->active_thread_count==0)
{
if(wake_thread(thread_group))
{
if(thread_group->thread_count == 1)
create_worker(thread_group);
}
}
}
DBUG_RETURN(retval);
检查 listener、worker thread 的运行情况和 connection 超时情况,每隔 thread_pool_stall_limit 检查一次
如果 listener 不存在,且检查周期内没有监听到新的 event,则创建一个 worker thread(自动变成 listener)
如果没有活跃线程(运行状态的 worker thread),且检查周期内没有处理 event,创建一个 worker thread
关闭长时间空闲的 connection
timer_thread
for(;;)
{
/*
等待一个 tick_interval,即 thread_pool_stall_limit
正常情况下回等待至超时,只有 stop_timer 会发出 timer->cond 信号
*/
set_timespec_nsec(ts,timer->tick_interval*1000000);
err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
/* ETIMEDOUT 等待超时,代表 timer 没有被 shutdown */
if (err == ETIMEDOUT)
{
timer->current_microtime= microsecond_interval_timer();
/* 遍历 thread_group,查看是否 stall */
for (i= 0; i < threadpool_max_size; i++)
{
if(all_groups[i].connection_count)
check_stall(&all_groups[i]);
}
/* 关闭空闲连接 */
if (timer->next_timeout_check <= timer->current_microtime)
timeout_check(timer);
}
}
check_stall
/*
thread_group->io_event_count 代表插入队列的 event 数量
thread_group 没有 listener,且没有 event 在队列中,这个检查周期内 group 没有监听连接信息
唤醒或者创建一个 worker 线程,这个线程会自动成为 listener
*/
if (!thread_group->listener && !thread_group->io_event_count)
{
wake_or_create_thread(thread_group);
return;
}
/* Reset io event count */
thread_group->io_event_count= 0;
/*
thread_group->queue_event_count 代表已经出队的 event 数量
队列非空,并且出队 event 数量为0,这个检查周期内 worker thread 没有处理 event
唤醒或创建一个 worker thread, thread_group 标记为 stalled, 下个 event 出队时 reset stalled标记
*/
if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)
{
thread_group->stalled= true;
wake_or_create_thread(thread_group);
}
/* Reset queue event count */
thread_group->queue_event_count= 0;
线程进入阻塞状态前/阻塞状态结束后,分别调用者两个函数汇报状态,用于维护 active_thread_count 即活跃线程数量
tp_wait_begin 将活跃线程数 -1,活跃线程为 0 时,唤醒或创建一个线程
tp_wait_eng 将活跃线程数 +1
这里需要注意,不是所有的等待都需要调用 tp_wait_begin,预期内短时间结束的等待,比如 mutex,可以不调用 tp_wait_begin
行锁或者磁盘IO这种长时间的等待,需要调用 tp_wait_begin,汇报这个线程暂时无法处理请求,需要开启新的线程
线程池是在 server 端的优化,避免频繁的创建和销毁线程
连接池是在 client 端的优化,减少连接创建时间,节省资源
连接池和线程池是两个不同的概念,可以同时使用