pubstructContext<'a>{waker: &'aWaker,// Ensure we future-proof against variance changes by forcing
// the lifetime to be invariant (argument-position lifetimes
// are contravariant while return-position lifetimes are
// covariant).
_marker: PhantomData<fn(&'a())-> &'a()>,}
pubstructWaker{waker: RawWaker,}implWaker{/// 唤醒绑定在 Waker 上的数据,通常是 Future
pubfnwake(self){}pubfnwake_by_ref(&self){}pubfnwill_wake(&self,other: &Waker)-> bool{}pubunsafefnfrom_raw(waker: RawWaker)-> Waker{}}pubstructRawWaker{/// A data pointer, which can be used to store arbitrary data as required
/// by the executor. This could be e.g. a type-erased pointer to an `Arc`
/// that is associated with the task.
/// The value of this field gets passed to all functions that are part of
/// the vtable as the first parameter.
data: *const(),/// Virtual function pointer table that customizes the behavior of this waker.
vtable: &'staticRawWakerVTable,}/// RawWaker 行为的虚函数表
pubstructRawWakerVTable{clone: unsafefn(*const())-> RawWaker,wake: unsafefn(*const()),wake_by_ref: unsafefn(*const()),drop: unsafefn(*const()),}
sequenceDiagram
participant Executor
participant Reactor
activate Executor
deactivate Executor
Executor->>Reactor: Pending on r.read()
Note left of Executor: Execute other Future
activate Reactor
Reactor->>Executor: r.read() is ready
Note left of Executor: Execute current Future
deactivate Reactor
Executor->>Reactor: Pending on w.write_all()
Note left of Executor: Execute other Future
activate Reactor
deactivate Reactor
Reactor->>Executor: w.write_all() is ready
/// The state of an executor.
structPool{/// 全局任务队列
injector: Injector<Runnable>,/// 线程的本地队列,用来进行任务的偷取
stealers: Vec<Stealer<Runnable>>,/// 存放空闲的线程,用来后续的唤醒并执行任务
sleepers: Sleepers,}// 全局的线程池
staticPOOL: Lazy<Pool>=Lazy::new(||{letnum_threads=num_cpus::get().max(1);letmutstealers=Vec::new();// Spawn worker threads.
for_in0..num_threads{letworker=Worker::new_fifo();stealers.push(worker.stealer());letproc=Processor{worker,slot: Cell::new(None),slot_runs: Cell::new(0),};thread::Builder::new().name("async-std/executor".to_string()).spawn(||{let_=PROCESSOR.with(|p|p.set(proc));abort_on_panic(main_loop);}).expect("cannot start a thread driving tasks");}Pool{injector: Injector::new(),stealers,sleepers: Sleepers::new(),}});/// 工作线程的状态
structProcessor{/// 本地任务队列
worker: Worker<Runnable>,/// 存放了比本地队列中任务优先级更高的任务,通常第一次spawn会放到这里,
/// 执行一次poll来快速判断状态,对于无阻塞的任务更高效,不需要等待。
slot: Cell<Option<Runnable>>,/// How many times in a row tasks have been taked from the slot rather than the queue.
slot_runs: Cell<u32>,}fnmain_loop(){loop{matchfind_runnable(){Some(task)=>task.run();None=>{// 实际上,这里根据空循环的次数,会陷入睡眠状态或出让CPU资源,直到新的task来唤醒。
}}}}fnfind_runnable()-> Option<Task>{// 优先从本地的队列中获取
lettask=get_local();iftask.is_some(){returntask;}// 其次从全局队列中获取
lettask=get_global();iftask.is_some(){returntask;}// 最后尝试从其他线程的本地队列中偷取
steal_other()}/// 安排新的任务到Executor的执行队列中
pub(crate)fnschedule(task: Runnable){PROCESSOR.with(|proc|{// If the current thread is a worker thread, store it into its task slot or push it into
// its local task queue. Otherwise, push it into the global task queue.
matchproc.get(){// 如果当前线程为worker线程,插入到当前线程的第一优先级任务槽
Some(proc)=>{// Replace the task in the slot.
ifletSome(task)=proc.slot.replace(Some(task)){// 尝试把任务的优先级提升到最高,并把上一个优先级最高的任务放到当前线程任务队列
// If the slot already contained a task, push it into the local task queue.
proc.worker.push(task);POOL.sleepers.notify_one();}}// 如果当前线程不是worker线程的话,放到全局队列
None=>{// 将任务放到全局队列中
POOL.injector.push(task);// 尝试唤醒一个睡眠的worker线程
POOL.sleepers.notify_one();}}})}