从协程到bthread

一、协程

优点:

  • 协程消耗更少的资源:协程的创建和销毁所消耗的资源更少。
  • 协程的切换更加高效:线程的切换需要进行上下文的切换,用户空间到操作系统,而协程的切换是在用户空间内完成的。
  • 自主掌控协程调度:线程的调度由操作系统完成,而协程的调度可以由自己控制,更加灵活。
  • 协程更易于处理复杂的异步编程:调度和切换由自己控制,更适合处理复杂的异步编程。

理解协程:

协程有三个组成要素:一个任务函数,一个存储寄存器状态的结构,一个私有栈空间(通常是malloc分配的一块内存,或者static静态区的一块内存),下面用ucontext讲述下如何实现协程的yield(挂起)和resume(开始或继续执行):

1 ucontext数据结构如下:

typedef struct ucontext
{
  // 协程私有栈。                             
  stack_t uc_stack;
  // uc_mcontext结构用于缓存协程yield时,cpu各个寄存器的当前值。
  mcontext_t uc_mcontext;
  // uc_link指向的ucontext是后继协程的ucontext,当前协程的任务函数return后,会自动将后继协程
  // 的ucontext缓存的寄存器值加载到cpu的寄存器中,cpu会去执行后继协程的任务函数(可能是从函数
  // 入口点开始执行,也可能是从函数yield调用的返回点恢复执行)。
  struct ucontext *uc_link;  
} ucontext_t;

用ucontext实现的一个协程的内存布局如下图所示:

2 ucontext的api接口:

  • int getcontext(ucontext_t *ucp)
    • 将cpu的各个寄存器的当前值存入当前正在被cpu执行的协程A的ucontext_t的uc_mcontext结构中。重要的寄存器有栈底指针寄存器、栈顶指针寄存器、协程A的任务函数下一条将被执行的语句的指令指针寄存器等。
  • void makecontext(ucontext_t ucp, void (func)(), int argc, …)
    • 指定一个协程的任务函数func以及func的argc等参数。
  • int swapcontext(ucontext_t from_ucp, ucontext_t to_ucp)
    • 目的是让当前协程yield,保存上下文到from_ucp,并让to_ucp对应的协程start或resume。

3 协程示例程序

下面通过一个协程示例程序,展现pthread系统线程执行多个协程切换的过程:

static ucontext_t ctx[3];

static void func_1(void) {
  int a;
  // 执行点2,协程1在这里yield,pthread线程恢复执行协程2的任务函数,即令协程2 resume。
  swapcontext(&ctx[1], &ctx[2]);
  // 执行点4,协程1从这里resume恢复执行。
  // func_1 return后,由于ctx[1].uc_link = &ctx[0],将令main函数resume。
}

static void func_2(void) {
  int b;
  // 协程2在这里yield,pthread线程去执行协程1的任务函数func_1。
  swapcontext(&ctx[2], &ctx[1]);
  // 执行点3,协程2从这里resume恢复执行。
  // func_2 return后,由于ctx[2].uc_link = &ctx[1],将令协程1 resume。
}

int main(int argc, char **argv) {
  // 定义协程1和协程2的私有栈。
  char stack_1[1024] = { 0 };
  char stack_2[1024] = { 0 };

  // 初始化协程1的ucontext_t结构ctx[1]。
  getcontext(&ctx[1]);
  // 在ctx[1]结构中指定协程1的私有栈stack_1。
  ctx[1].uc_stack.ss_sp   = stack_1;
  ctx[1].uc_stack.ss_size = sizeof(stack_1);
  // ctx[0]用于存储执行main函数所在线程的cpu的各个寄存器的值,
  // 下面语句的作用是,当协程1的任务函数return后,将ctx[0]中存储的各寄存器的值加载到cpu的寄存器中,
  // 也就是pthread线程从main函数之前的yield调用的返回处继续执行。
  ctx[1].uc_link = &ctx[0];
  // 指定协程1的任务函数为func_1。
  makecontext(&ctx[1], func1, 0);

  // 初始化协程2的ucontext_t结构ctx[2]。
  getcontext(&ctx[2]);
  // 在ctx[2]结构中指定协程2的私有栈stack_2。
  ctx[2].uc_stack.ss_sp   = stack_2;
  ctx[2].uc_stack.ss_size = sizeof(stack_2);
  // 协程2的任务函数return后,pthread线程将从协程1的yield调用的返回点处继续执行。
  ctx[2].uc_link = &ctx[1];
  // 指定协程2的任务函数为func_2。
  makecontext(&ctx[2], func_2, 0);

  // 执行点1,将cpu当前各寄存器的值存入ctx[0],将ctx[2]中存储的寄存器值加载到cpu寄存器中,
  // 也就是main函数在这里yield,开始执行协程2的任务函数func_2。
  swapcontext(&ctx[0], &ctx[2]);
  // 执行点5,main函数从这里resume恢复执行。return 0;
}

4 bthread中实现协程功能的部分

  • 转到brpc中,对应ucontext的结构体为bthread_fcontext_t,结构体初始化的函数为:
    • bthread_fcontext_t bthread_make_fcontext(void* storage, size_t size,void (*fn)(intptr_t));
    • bthread中的fn为TaskGroup::task_runner()函数。
  • 对应swapcontext的函数为
    • intptr_t bthread_jump_fcontext(bthread_fcontext_t * from_fc, bthread_fcontext_t to_fc, intptr_t vp);
#include <bthread/context.h>  // 提供 bthread_fcontext_t, bthread_make_fcontext, bthread_jump_fcontext 的定义[8,10](@ref)
#include <iostream>           // 提供 std::cout, std::endl[8](@ref)
#include <utility>            // 提供 std::pair, std::make_pair[8](@ref)
#include <cstdlib>            // 提供 malloc 和 free 函数
bthread_fcontext_t fc_main;
bthread_fcontext_t fc;
typedef std::pair<int,int> pair_t;
static void fun_pair(intptr_t param) {
  pair_t* p = (pair_t*)param;
  std::cout << "point 2....." <<std::endl;
  p = (pair_t*)bthread_jump_fcontext(&fc, fc_main, (intptr_t)(p->first+p->second));
  std::cout << "point 4....." <<std::endl;
  p = (pair_t*)bthread_jump_fcontext(&fc, fc_main, (intptr_t)(p->first+p->second));
  std::cout << "point 6....." <<std::endl;
  // 协程结束,跳回主上下文,传递 0 作为结束信号
  bthread_jump_fcontext(&fc, fc_main, 0);
}
void test_bthread_context() {
  fc_main = NULL;
  std::size_t size(8192);
  void* sp = malloc(size);

  pair_t p(std::make_pair(2, 7));
  fc = bthread_make_fcontext((char*)sp + size, size, fun_pair/*入口函数*/);
  std::cout << "point 1......" <<std::endl;
  // 第一次p是协程func_pair的参数
  int res = (int)bthread_jump_fcontext(&fc_main, fc, (intptr_t)&p);
  std::cout << "point 3......" <<p.first << " + " << p.second << " == "
    << res << std::endl;
  p = std::make_pair(5, 6);
  // 后面的时候,p都是作为协程func_pair内bthread_jump_fcontext的返回值
  res = (int)bthread_jump_fcontext(&fc_main, fc, (intptr_t)&p);
  std::cout << "point 5......." <<p.first << " + " << p.second << " == "
    << res << std::endl;
  // 最后一次跳入协程,它将执行完毕并跳回
  res = (int)bthread_jump_fcontext(&fc_main, fc, (intptr_t)&p);
  if (res == 0) {
    std::cout << "coroutine finished." << std::endl;
  }
  std::cout << "exit test_bthread_context......." << std::endl;
  free(sp);
}

int main() {
  test_bthread_context();
  return 0;
}
  • bthread中使用函数jump_stack封装了bthread_jump_fcontext,并用类ContextualStack来管理上下文:
inline void jump_stack(ContextualStack* from, ContextualStack* to) {
    bthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}
struct ContextualStack {
    bthread_fcontext_t context;
    StackType stacktype;
    StackStorage storage;
};

二、bthread

讲到bthread,首先要讲的三大件:TaskControl、TaskGroup、TaskMeta,简称TC、TG、TM。

1 TaskControl

TaskControl进程内全局唯一,看一下他的初始化函数init()。

   for (int i = 0; i < _concurrency; ++i) {
     const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
     if (rc) {
       LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
       return -1;}
   }

pthread_create的参数:

thread:指向 pthread_t类型变量的指针,用于存储新创建线程的标识符(线程ID)

attr:指向 pthread_attr_t类型线程属性对象的指针。可用于设置线程的栈大小、调度策略、分离状态等属性。若传入 NULL,则使用所有默认属性

start_routine:线程入口函数的指针。新线程创建后将从这个函数开始执行。该函数的格式必须是返回 void*且接收一个 void*类型的参数

arg:传递给线程入口函数 start_routine的参数。如果需要传递多个参数,需要将它们打包到一个结构体中,然后传递该结构体的地址

2 TaskGroup

创建的pthread内部的工作函数:

void* TaskControl::worker_thread(void* arg) {
    // 1. TG创建前的处理,里面也是回调g_worker_start_fun函数来执行操作,
    // 获取TC的指针
    TaskControl* c = static_cast<TaskControl*>(arg);
    // 2.1 创建一个TG
    TaskGroup* g = c->create_group();
    // 把thread local的tls_task_group 用刚才创建的TG来初始化
    tls_task_group = g;
    // TG运行主任务(死循环)
    g->run_main_task();
}
  • 其中,*tls_task_group是什么概念呢?
    • tls_task_group是 pthread(内核线程)内部 的一个线程局部存储(Thread-Local Storage, TLS)变量,用于高效地管理和调度运行在该 pthread 上的 bthread(用户态协程)。线程局部存储(TLS)机制确保了每个 pthread 都拥有该变量的一个独立副本,且互不干扰。
    • tls_task_group的主要作用是让每个工作线程(pthread)都能无需加锁地快速访问到自己专属的 TaskGroup,从而高效地管理分配给它的 bthread 和相关队列(如本地队列 _rq和远程队列 _remote_rq)。这是实现高效 M:N 协程调度(如工作窃取)的基础。
    • 指向的对象与关系:这个指针的值在每个 bthread 的工作线程(pthread) 中,被设置为该线程所关联的 TaskGroup对象的地址。如果一个普通 pthread 的 tls_task_groupNULL,则表明该线程并非 bthread 的工作线程。因此,tls_task_grouppthread 和其专属的 TaskGroup**对象之间的连接桥梁**。
  • brpc中也将TaskGroup称之为 worker,每个线程(pthread)都有一个TaskGroup。
class TaskGroup {
    ContextualStack* _main_stack;
    bthread_t _main_tid;
    // 无锁的本地队列
    WorkStealingQueue<bthread_t> _rq;
    // 有互斥锁的远端队列
    RemoteTaskQueue _remote_rq;
    TaskMeta* _cur_meta;
}

3 TaskMeta

TaskMeta是表征bthread上下文的结构体。

struct TaskMeta {    
  // The identifier. It does not have to be here, however many code is    
  // simplified if they can get tid from TaskMeta.    
  bthread_t tid;    
  // 成员有bthread_fcontext_t和私有栈,用于实现协程切换的    
  ContextualStack* stack;    
  // User function and argument    
  void* (*fn)(void*);    
  void* arg; 
}

4 bthread的创建(生产者)

  • bthread_start_background:将新bthread放入队列后立即返回,不保证立即执行,适合非紧急任务。
  • bthread_start_urgent:如果是在worker内调用,会让出当前worker立即执行新bthread,或将其放入队列后尽快调度,延迟更低

4.1 创建函数bthread_start_background

int bthread_start_background(bthread_t* __restrict tid,
                             const bthread_attr_t* __restrict attr,
                             void * (*fn)(void*),void* __restrict arg) {
  bthread::TaskGroup* g = bthread::tls_task_group;
  // 如果 g非空(True),说明当前正在一个bthread工作线程中执行,新创建的bthread将放入当前Worker的本地队列。
  // 如果 g为空(False),说明当前在一个普通pthread中,新bthread需要放入某个Worker的远程队列
  if (g) {
    // start from worker
    // start_background<false>的false代表是一个本地(非远程)任务提交
    return g->start_background<false>(tid, attr, fn, arg);
    // 这个函数会:
    // (1)创建一个 TaskMeta对象来保存新bthread的信息(函数 fn、参数 arg、属性 attr等)并生成唯一的 bthread_t tid
    // (2)将这个新bthread放入当前 TaskGroup的本地运行队列(_rq),这是一个无锁(wait-free)队列,操作非常高效
    // (3)通常不会立即唤醒其他worker线程,因为优先消耗本地队列。
  }
  // 只有由BRPC创建的、用于调度bthread的工作线程(worker pthread),这个指针才不为NULL。普通pthread的 tls_task_group为 NULL
  return bthread::start_from_non_worker(tid, attr, fn, arg);
  // 当在普通pthread中调用时,执行这个分支。
  // 调用全局函数 start_from_non_worker。
  // 这个函数会:
  //   获取或创建全局的 TaskControl单例(管理所有工作线程和任务组)
  //   随机选择一个 TaskGroup。
  //   调用该 TaskGroup的 start_background<true>方法。模板参数 <true>表示这是一个远程任务提交
  //   新bthread会被放入所选 TaskGroup的远程运行队列(_remote_rq),这是一个由互斥锁保护的队列
  //   最后,唤醒(signal)可能处于休眠状态的worker线程来处理新任务
}

创建流程:

5 bthread的执行(消费者)

5.1 bthread任务执行流程图

5.2 run_main_task

void TaskGroup::run_main_task() {
    TaskGroup* dummy = this;
    bthread_t tid;
    while (wait_task(&tid)) {
        TaskGroup::sched_to(&dummy, tid);
        // 当前线程的线程局部存储tls_task_group可能已经指向了被窃取任务原本所属的那个TaskGroup
        DCHECK_EQ(this, dummy);
        // 断言当前正在执行的任务的栈(_cur_meta->stack)就是 TaskGroup的主栈(_main_stack),即工作线程自身的内核栈。
        DCHECK_EQ(_cur_meta->stack, _main_stack);
        // 区分当前正在执行的任务是一个普通的 bthread 还是 TaskGroup 自身的主任务
        // 自身主任务:即工作线程正处于调度循环中,没有执行用户态的 bthread)
        if (_cur_meta->tid != _main_tid) {
            TaskGroup::task_runner(1/*skip remained*/);
        }
    }
}

dummy是一个指向当前工作线程的 TaskGroup的指针。其核心设计目的是为了处理工作窃取(Work Stealing) 后的上下文归属问题。在 sched_to的内部,会调用 jump_stack来进行上下文切换。如果当前工作线程通过工作窃取机制,成功从其他 TaskGroup的队列中窃取到了一个任务并执行,那么在执行完这个“窃取”来的任务后,当上下文试图切换回来时,当前线程的线程局部存储 tls_task_group**可能已经指向了被窃取任务原本所属的那个 TaskGroup**(即提供任务的那个线程组)。此时,sched_to函数会修改 *dummy(即解引用 dummy 指针),使其指向这个新的 TaskGroup地址,以反映执行环境的真实变化。

5.3 wait_task

wait_task函数获取待执行的任务,也就是bthread

bool TaskGroup::wait_task(bthread_t* tid) {
  do {
    // _pl是一个 ParkingLot对象,你可以将其理解为一个基于 futex 的高效等待/通知机制,类似于条件变量,但更底层、更高效
    // 当调用 _pl->wait(_last_pl_state)时,当前工作线程会在此处阻塞(休眠),直到有其他线程(例如,另一个生产了任务的工作线程或主线程)向这个 ParkingLot发送通知(signal)
    // 当有其他线程通过 _pl->signal()唤醒此 ParkingLot时,工作线程会从这里恢复执行。
    _pl->wait(_last_pl_state);
    if (steal_task(tid)) {
        return true;
    }
  } while (true);
}

5.4 steal_task

其中steal_task是bthread可以称为M:N的协程的关键

bool TaskGroup::steal_task(bthread_t* tid) {
  if (_remote_rq.pop(tid)) {
    return true;
  }
  return _control->steal_task(tid, &_steal_seed, _steal_offset);
}

在当前TG的_remote_rq中拿任务,没有再去其他group中取,优先从_rq中取,再去_remote_rq中取。

任务获取通过TaskControl::steal_task来实现:

bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    // 1: Acquiring fence is paired with releasing fence in _add_group to
    // avoid accessing uninitialized slot of _groups.
    // 这行代码通过原子操作(附带获取内存屏障,memory_order_acquire)读取当前 TaskControl中管理的 TaskGroup总数 (_ngroup)。
    // 内存屏障至关重要:它确保在读取 _groups数组的内容之前,一定能看到其他线程对 _ngroup和 _groups数组元素的最新修改,防止访问到未初始化的 TaskGroup槽位。注释也明确说明了这一点(避免访问未初始化的 _groups槽位)
    const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
    if (0 == ngroup) {
        return false;
    }
    // NOTE: Don't return inside `for' iteration since we need to update |seed|
    // stolen: 用于记录本次窃取是否成功的标志。
    bool stolen = false;
    // 使用随机种子是为了让不同线程或不同次窃取从不同位置开始遍历,避免多个窃取者总是竞争同一个 TaskGroup的队列,从而减少竞争。
    size_t s = *seed;
    // s += offset: 在每次迭代后,种子 s会增加一个 offset。offset参数通常与窃取者的标识相关,确保不同的窃取线程每次循环的步长不同,进一步分散竞争热点,避免所有窃取者步调一致。
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
        TaskGroup* g = _groups[s % ngroup];
        // g is possibly NULL because of concurrent _destroy_group
        // 因为 TaskGroup可能会被并发地销毁(例如在动态调整线程池大小时),所以其指针可能为 NULL。
        if (g) {
            // 从其他group中取时,优先从_rq中取,再去_remote_rq中取。
            if (g->_rq.steal(tid)) {
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
                stolen = true;
                break;
            }
        }
    }
    *seed = s;
    return stolen;
}

由此可见bthread的核心设计思想:

(1)工作窃取 (Work Stealing)

当某个工作线程自己的任务队列为空时,它不会空闲等待,而是主动从其他繁忙线程的任务队列中窃取任务来执行。这充分利用了多核资源,实现了高效的负载均衡

(2)减少竞争

  • 通过随机化起点 (seed) 和使用不同的步长 (offset),避免了所有空闲线程同时竞争同一个 TaskGroup的队列,分散了压力。
  • 使用无锁的 WorkStealingQueue 作为主要窃取目标,其 steal操作本身就是为了支持多窃取者而设计的,并发开销低

(3)双队列设计

每个 TaskGroup维护两个队列:

  • _rq(本地运行队列):无锁队列,存放当前 worker 自己创建的任务。窃取操作主要针对此队列。
  • _remote_rq(远程运行队列):有锁队列,存放其他线程(普通 pthread 或其他 worker) 创建的任务。窃取时作为备用来源

5.5 schech_to

获取到bthread以后,切换到对应的上下文并执行任务

inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
    TaskMeta* next_meta = address_meta(next_tid);
    // 这是一个关键的条件判断。它检查目标 bthread 的 TaskMeta中的 stack字段是否为 NULL。
    // 如果为 NULL:这表明该 bthread 是首次被调度,尚未分配专用的栈空间。这时需要进入 if 语句块内为其分配栈。
    // 如果不为 NULL:说明该 bthread 之前已经运行过,栈已分配,可以直接跳过分配步骤,进行后续的调度。
    if (next_meta->stack == NULL) {
        ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);     
    }
    next_meta->set_stack(stk);
    // Update now_ns only when wait_task did yield.
    sched_to(pg, next_meta);
}

这里对context了初始化,设置了入口函数为TaskGroup::task_runner

void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
    TaskGroup* g = *pg;
    TaskMeta* const cur_meta = g->_cur_meta;
    // Switch to the task
    // __builtin_expect是帮助分支预测的优化,告诉编译器“next_meta != cur_meta”这个条件极有可能为真
    if (__builtin_expect(next_meta != cur_meta, 1)) {
        // NULL是在主栈上
        // 只有当前 bthread (cur_meta) 拥有自己的栈(不是运行在主栈上),并且目标 bthread (next_meta) 的栈与当前栈不同时,才会执行真正的上下文切换 jump_stack
        if (cur_meta->stack != NULL) {
            // 如果 next_meta->stack == cur_meta->stack,通常意味着两者共享同一个栈(例如都是主栈 _main_stack),因此无需切换
            if (next_meta->stack != cur_meta->stack) {
                // jump_stack封装了汇编函数bthread_jump_fcontext,操作效率极高,远超线程切换效率
                jump_stack(cur_meta->stack, next_meta->stack);
                // probably went to another group, need to assign g again.
                // 在 jump_stack返回后,执行流可能已经发生了变化。当前工作线程可能正在为另一个TaskGroup执行任务(例如,通过工作窃取机制偷来的任务执行完毕了)。
                g = tls_task_group;
            }
        }
    } 
    // 当一个 bthread 主动让出 CPU 时,它需要安排一个“回调”函数,以便在将来合适的时机将自己重新加入就绪队列或其他清理工作

    while (g->_last_context_remained) {
        RemainedFn fn = g->_last_context_remained;
        g->_last_context_remained = NULL;
        // 执行回调函数 (fn) 本身可能会再次引发上下文切换(例如,回调函数会 ready 一个 bthread 并可能触发 steal),从而导致 tls_task_group改变。因此,每次循环结束后都需要重新获取 g = tls_task_group,以确保后续回调在正确的上下文中执行
        // 处理剩余任务(Remained Functions):执行并清理由 yield等操作设置的延迟回调,确保 bthread 能够正确地被重新调度或进行资源清理。
        fn(g->_last_context_remained_arg);
        g = tls_task_group;
    }
}

5.6 task_runner

bthread协程的任务执行函数task_runner:

void TaskGroup::task_runner(intptr_t skip_remained) {
    TaskGroup* g = tls_task_group;
    // 这个条件判断是否处理上次调度留下的“剩余回调函数”。skip_remained参数为 0 时表示需要处理,非 0 则跳过。
    if (!skip_remained) {
        while (g->_last_context_remained) {
            RemainedFn fn = g->_last_context_remained;
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            g = tls_task_group;
        }
    }
    // 这个循环是 task_runner的核心,负责反复执行 bthread 任务,直到当前任务的 ID 变成 TaskGroup主任务的 ID(_main_tid),这意味着没有更多用户 bthread 需要执行,将返回调度循环
    do {
        // Meta and identifier of the task is persistent in this run.
        TaskMeta* const m = g->_cur_meta;        
        void* thread_return;
        try {
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        }
        // 此函数设置一个“剩余回调”,安排在当前任务执行完毕后,异步地释放任务占用的资源      
        g->set_remained(TaskGroup::_release_last_context, m);
        ending_sched(&g);

    } while (g->_cur_meta->tid != g->_main_tid);
}

其中ending_sched是完成当前任务后,继续执行后续的任务。

5.7 ending_sched

结束调度与寻找新任务:ending_sched(&g);:这是一个非常重要的函数调用。它的作用是

(1)进行调度统计和记录。

(2)寻找下一个要执行的 bthread 任务。其内部会按优先级检查:

  • 当前 TaskGroup的本地运行队列 (_rq)。

  • 当前 TaskGroup的远程运行队列 (_remote_rq)。

  • 通过工作窃取 (Work Stealing) 算法从其他 TaskGroup窃取任务。

(3)如果找到了新任务,就通过 sched_to切换到该任务的上下文并执行。

(4)如果没有找到任何任务,则会将当前任务设置为 TaskGroup的主任务 (_main_tid),这将导致 while循环条件不满足,从而退出循环。主任务代表工作线程自身的调度循环,让其进入等待或继续查找新任务的状态

void TaskGroup::ending_sched(TaskGroup** pg) {
    TaskGroup* g = *pg;
    bthread_t next_tid = 0;
    // Find next task to run, if none, switch to idle thread of the group.
    // BTHREAD_FAIR_WSQ的作用在下面
#ifndef BTHREAD_FAIR_WSQ
    // 从头部取(公平)
    const bool popped = g->_rq.pop(&next_tid);
#else
    // 从尾部取(性能)(局部性原理)
    const bool popped = g->_rq.steal(&next_tid);
#endif
    if (!popped && !g->steal_task(&next_tid)) {
        // Jump to main task if there's no task to run.
        // 当前group的_rq没有任务,并从其他group也取不到任务时,切回到主协程。调度器会将下一个任务设置为 TaskGroup的主任务 (_main_tid)。主任务代表工作线程自身的调度循环,让其进入等待或继续查找新任务的状态
        next_tid = g->_main_tid;
    }
    TaskMeta* const cur_meta = g->_cur_meta;
    TaskMeta* next_meta = address_meta(next_tid);
    // next_meta是否首次被调度
    if (next_meta->stack == NULL) {
        // 栈类型匹配时复用:如果目标任务 (next_meta) 与当前任务 (cur_meta) 的栈类型相同,则直接复用当前任务的栈 (cur_meta->release_stack())。这是一种优化,避免了不必要的栈分配和拷贝
        if (next_meta->stack_type() == cur_meta->stack_type()) {
            next_meta->set_stack(cur_meta->release_stack());
        } else {
        // 栈类型不匹配时新建
            ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
            if (stk) {
                next_meta->set_stack(stk);
            } else {
                // 分配失败降级:如果栈分配失败(如内存不足),则强制将任务属性降级为 BTHREAD_STACKTYPE_PTHREAD,并让其使用工作线程的主栈 (_main_stack) 运行。这意味着该 bthread 将无法被挂起和切换,像普通函数一样执行
                next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
                next_meta->set_stack(g->_main_stack);
            }
        }
    }
    sched_to(pg, next_meta);
}

三、其他

1 疑问

1.1 bthread是协程(coroutine)吗?

不是。我们常说的协程特指N:1线程库,即所有的协程运行于一个系统线程中,计算能力和各类eventloop库等价。由于不跨线程,协程之间的切换不需要系统调用,可以非常快(100ns-200ns),受cache一致性的影响也小。但代价是协程无法高效地利用多核,代码必须非阻塞,否则所有的协程都被卡住,对开发者要求苛刻。协程的这个特点使其适合写运行时间确定的IO服务器,典型如http server,在一些精心调试的场景中,可以达到非常高的吞吐。但百度内大部分在线服务的运行时间并不确定,且很多检索由几十人合作完成,一个缓慢的函数会卡住所有的协程。在这点上eventloop是类似的,一个回调卡住整个loop就卡住了,比如ubaserver(注意那个a,不是ubserver)是百度对异步框架的尝试,由多个并行的eventloop组成,真实表现糟糕:回调里打日志慢一些,访问redis卡顿,计算重一点,等待中的其他请求就会大量超时。所以这个框架从未流行起来。

bthread是一个M:N线程库,一个bthread被卡住不会影响其他bthread。关键技术两点:work stealing调度和butex,前者让bthread更快地被调度到更多的核心上,后者让bthread和pthread可以相互等待和唤醒。这两点协程都不需要。更多线程的知识查看这里

1.2 在什么时候执行自身TG的_rq呢?

观察到的现象(wait_task和窃取逻辑中似乎没有直接处理本地 _rq)恰恰是 bthread 工作窃取(Work Stealing) 调度算法的一个核心设计:一个TaskGroup(Worker) 通常不会直接执行自己 _rq中的任务,而是优先让其他空闲的 TaskGroup来“窃取”执行。

这样设计的主要原因是为了减少竞争和提高缓存局部性(Cache Locality)。如果一个 Worker 同时既生产任务又消费自己生产的任务,其本地队列 _rq的锁竞争可能会增加。而让其他 Worker 来窃取任务,可以将工作负载更好地分散到多个 CPU 核心上。

那么,自身 _rq中的任务主要在以下时机被消费:

(1)当前任务执行完毕后的调度间隙 (ending_sched()):这是最主要的消费时机。当一个 bthread 任务(无论来自哪个队列)执行完毕(即其函数 fn(arg)返回)后,在 TaskGroup::task_runner()函数中会进行资源清理,并调用 ending_sched(&g)。在 ending_sched()函数内部,会优先从当前 TaskGroup_rq中获取任务。如果获取成功,就直接切换到该任务执行。如果本地 _rq为空,才会再次进入工作窃取的逻辑

(2)其他空闲 TaskGroup的工作窃取:当你看到 wait_task中的 steal_task()函数时,它的作用是一个空闲的 Worker 尝试从其他 TaskGroup窃取任务。窃取的优先级顺序是:

  • 先尝试窃取其他 TaskGroup_rq(本地运行队列)。
  • 如果其他 TaskGroup_rq也没有任务,则尝试窃取其 _remote_rq(远程运行队列)

因此,自身的 _rq中的任务,很大程度上是在被其他空闲的 TaskGroup通过这种窃取机制消费掉的。

1.3 本地队列 _rq不是无锁的吗,而_remote_rq才是有锁的,为什么不直接执行_rq?

(1)_remote_rq的任务更需要“救济”_remote_rq中的任务是由非工作线程(如普通pthread)或其他TaskGroup提交的。提交它的线程自身无法执行它,它完全依赖工作线程的调度。如果工作线程不优先处理它们,这些任务可能会长时间得不到执行,导致延迟甚至“饥饿”

(2)_rq的任务有“双重保障”_rq中的任务是由当前Worker自己创建的。即使这个Worker暂时不执行它们,它也有很大概率在后续通过 pop操作快速消费它们。更重要的是,这些任务可以被其他空闲的Worker通过工作窃取(Work Stealing)机制偷走执行。因此,_rq中的任务有两条执行路径:本地消费和被窃取,不容易被饿死。

1.4 BTHREAD_FAIR_WSQ宏的影响

定义宏(公平模式)

  • 当定义了 BTHREAD_FAIR_WSQ后,steal操作会从队列的头部取出任务。这保证了任务被窃取的顺序接近于它们被放入队列的顺序(FIFO),更公平,可以有效防止某些任务长时间得不到执行(“饥饿”现象)。

未定义宏(性能模式,默认)

  • 默认情况下,stealpop操作都针对队列的尾部。这意味着新加入的任务和被窃取的任务都更靠近队列尾部,这种策略有利于利用CPU缓存局部性,因为最近操作的数据很可能还在缓存中,从而可以提高性能

  • 获取到bthread以后,切换到对应的上下文并执行任务

2 Bthread的关键点

  • M:N模型:bthread(M个用户态协程)由少量的pthread(N个内核线程,即worker)进行调度执行
  • 双队列设计:每个 TaskGroup维护两个队列:
    • _rq(本地运行队列)**:无锁队列,存放当前worker自己创建的bthread。
    • _remote_rq(远程运行队列):有锁队列,存放其他线程(普通pthread或其他worker) 创建的bthread。
    • 本地队列的优先级通常高于远程队列
    • 这种设计减少了竞争,提高了性能。
  • 工作窃取(Work Stealing):当某个工作线程自己的任务队列为空时,它不会空闲等待,而是主动从其他繁忙线程的任务队列中窃取任务来执行。这充分利用了多核资源,实现了高效的负载均衡。
  • 惰性栈分配:创建bthread时并不会立即为其分配栈空间,而是在第一次运行该bthread时才分配,这节省了内存资源】

四、参考资料

https://github.com/apache/brpc/blob/master/docs/cn/bthread.md

https://github.com/apache/brpc/blob/master/docs/en/threading_overview.md

https://zhuanlan.zhihu.com/p/294129746

https://zhuanlan.zhihu.com/p/346081659

https://zhuanlan.zhihu.com/p/347499412

https://zhuanlan.zhihu.com/p/350582218

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

Contents
滚动至顶部