1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
use crate::job::*;
use crate::registry::Registry;
use crate::unwind;
use std::mem;
use std::sync::Arc;
/// Fires off a task into the Rayon threadpool in the "static" or
/// "global" scope. Just like a standard thread, this task is not
/// tied to the current stack frame, and hence it cannot hold any
/// references other than those with `'static` lifetime. If you want
/// to spawn a task that references stack data, use [the `scope()`
/// function][scope] to create a scope.
///
/// [scope]: fn.scope.html
///
/// Since tasks spawned with this function cannot hold references into
/// the enclosing stack frame, you almost certainly want to use a
/// `move` closure as their argument (otherwise, the closure will
/// typically hold references to any variables from the enclosing
/// function that you happen to use).
///
/// This API assumes that the closure is executed purely for its
/// side-effects (i.e., it might send messages, modify data protected
/// by a mutex, or some such thing).
///
/// There is no guaranteed order of execution for spawns, given that
/// other threads may steal tasks at any time. However, they are
/// generally prioritized in a LIFO order on the thread from which
/// they were spawned. Other threads always steal from the other end of
/// the deque, like FIFO order. The idea is that "recent" tasks are
/// most likely to be fresh in the local CPU's cache, while other
/// threads can steal older "stale" tasks. For an alternate approach,
/// consider [`spawn_fifo()`] instead.
///
/// [`spawn_fifo()`]: fn.spawn_fifo.html
///
/// # Panic handling
///
/// If this closure should panic, the resulting panic will be
/// propagated to the panic handler registered in the `ThreadPoolBuilder`,
/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
/// details.
///
/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler
///
/// # Examples
///
/// This code creates a Rayon task that increments a global counter.
///
/// ```rust
/// # use rayon_core as rayon;
/// use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
///
/// static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
///
/// rayon::spawn(move || {
/// GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst);
/// });
/// ```
pub fn spawn<F>(func: F)
where
F: FnOnce() + Send + 'static,
{
// We assert that current registry has not terminated.
unsafe { spawn_in(func, &Registry::current()) }
}
/// Spawns an asynchronous job in `registry.`
///
/// Unsafe because `registry` must not yet have terminated.
pub(super) unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>)
where
F: FnOnce() + Send + 'static,
{
// We assert that this does not hold any references (we know
// this because of the `'static` bound in the inferface);
// moreover, we assert that the code below is not supposed to
// be able to panic, and hence the data won't leak but will be
// enqueued into some deque for later execution.
let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
let job_ref = spawn_job(func, registry);
registry.inject_or_push(job_ref);
mem::forget(abort_guard);
}
unsafe fn spawn_job<F>(func: F, registry: &Arc<Registry>) -> JobRef
where
F: FnOnce() + Send + 'static,
{
// Ensure that registry cannot terminate until this job has
// executed. This ref is decremented at the (*) below.
registry.increment_terminate_count();
Box::new(HeapJob::new({
let registry = Arc::clone(registry);
move || {
match unwind::halt_unwinding(func) {
Ok(()) => {}
Err(err) => {
registry.handle_panic(err);
}
}
registry.terminate(); // (*) permit registry to terminate now
}
}))
.as_job_ref()
}
/// Fires off a task into the Rayon threadpool in the "static" or
/// "global" scope. Just like a standard thread, this task is not
/// tied to the current stack frame, and hence it cannot hold any
/// references other than those with `'static` lifetime. If you want
/// to spawn a task that references stack data, use [the `scope_fifo()`
/// function](fn.scope_fifo.html) to create a scope.
///
/// The behavior is essentially the same as [the `spawn`
/// function](fn.spawn.html), except that calls from the same thread
/// will be prioritized in FIFO order. This is similar to the now-
/// deprecated [`breadth_first`] option, except the effect is isolated
/// to relative `spawn_fifo` calls, not all threadpool tasks.
///
/// For more details on this design, see Rayon [RFC #1].
///
/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
///
/// # Panic handling
///
/// If this closure should panic, the resulting panic will be
/// propagated to the panic handler registered in the `ThreadPoolBuilder`,
/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
/// details.
///
/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler
pub fn spawn_fifo<F>(func: F)
where
F: FnOnce() + Send + 'static,
{
// We assert that current registry has not terminated.
unsafe { spawn_fifo_in(func, &Registry::current()) }
}
/// Spawns an asynchronous FIFO job in `registry.`
///
/// Unsafe because `registry` must not yet have terminated.
pub(super) unsafe fn spawn_fifo_in<F>(func: F, registry: &Arc<Registry>)
where
F: FnOnce() + Send + 'static,
{
// We assert that this does not hold any references (we know
// this because of the `'static` bound in the inferface);
// moreover, we assert that the code below is not supposed to
// be able to panic, and hence the data won't leak but will be
// enqueued into some deque for later execution.
let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
let job_ref = spawn_job(func, registry);
// If we're in the pool, use our thread's private fifo for this thread to execute
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
match registry.current_thread() {
Some(worker) => worker.push_fifo(job_ref),
None => registry.inject(&[job_ref]),
}
mem::forget(abort_guard);
}
#[cfg(test)]
mod test;