class shared_priority_queue_scheduler
Declaration
template <typename Mutex = std::mutex>
class shared_priority_queue_scheduler : public scheduler_base { /* full declaration omitted */ };Description
//////////////////////////////////////////////////////////////////////// The shared_priority_queue_scheduler maintains a set of high, normal, and low priority queues. For each priority level there is a core/queue ratio which determines how many cores share a single queue. If the high priority core/queue ratio is 4 the first 4 cores will share a single high priority queue, the next 4 will share another one and so on. In addition, the shared_priority_queue_scheduler is NUMA-aware and takes NUMA scheduling hints into account when creating and scheduling work. Warning: PendingQueuing lifo causes lockup on termination
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:106
Inherits from: scheduler_base
Member Variables
- protected std::array<std::size_t, 8> q_counts_
- protected std::array<std::size_t, 8> q_offset_
- protected std::array<numa_queues, 8> numa_holder_
- protected std::vector<std::size_t> d_lookup_
- protected std::vector<std::size_t> q_lookup_
- protected std::vector<std::size_t> schedcpu_
- protected pika::threads::policies::core_ratios cores_per_queue_
- protected bool round_robin_
- protected bool steal_hp_first_
- protected bool numa_stealing_
- protected bool core_stealing_
- protected std::size_t num_workers_
- protected std::size_t num_domains_
- protected const pika::detail::affinity_data& affinity_data_
- protected const pika::threads::policies:: thread_queue_init_parameters queue_parameters_
- protected std::mutex init_mutex
- protected bool initialized_
- protected bool debug_init_
- protected std::atomic<std::size_t> thread_init_counter_
- protected std::size_t pool_index_
Inherited from scheduler_base:
- protected mode_
- protected mtx_
- protected cond_
- protected wait_counts_
- protected suspend_mtxs_
- protected suspend_conds_
- protected pu_mtxs_
- protected states_
- protected description_
- protected thread_queue_init_
- protected parent_pool_
- protected background_thread_count_
- protected polling_function_mpi_
- protected polling_function_cuda_
- protected polling_work_count_function_mpi_
- protected polling_work_count_function_cuda_
Method Overview
- public void abort_all_suspended_threads()
- public bool cleanup_terminated(bool delete_all)
- public bool cleanup_terminated(std::size_t, bool delete_all)
- public void create_thread(pika::threads::thread_init_data & data, pika::threads::thread_id_ref_type * thrd, pika::error_code & ec)
- public void destroy_thread(threads::thread_data * thrd)
- public bool enumerate_threads(const util::function<bool (thread_id_type)> & f, pika::threads::thread_schedule_state state = thread_schedule_state::unknown) const
- public virtual bool get_next_thread(std::size_t, bool, threads::thread_id_ref_type & thrd, bool enable_stealing)
- public std::int64_t get_queue_length(std::size_t thread_num = unsigned long(-1)) const
- public static std::string get_scheduler_name()
- public std::int64_t get_thread_count(pika::threads::thread_schedule_state state = thread_schedule_state::unknown, pika::threads::thread_priority priority = thread_priority::default_, std::size_t thread_num = unsigned long(-1), bool = false) const
- public bool is_core_idle(std::size_t num_thread) const
- public bool just_add_new(std::size_t & added)
- public inline std::size_t local_thread_number() const
- public void on_error(std::size_t thread_num, const std::exception_ptr &)
- public void on_start_thread(std::size_t local_thread)
- public void on_stop_thread(std::size_t thread_num)
- public void schedule_thread(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, pika::threads::thread_priority priority = thread_priority::normal)
- public void schedule_thread_last(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, pika::threads::thread_priority priority = thread_priority::normal)
- public void schedule_work(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool, bool other_end, pika::threads::thread_priority priority = thread_priority::normal)
- public void set_scheduler_mode(pika::threads::policies::scheduler_mode mode)
- public shared_priority_queue_scheduler<Mutex>(const pika::threads::policies::shared_priority_queue_scheduler::init_parameter & init)
- public template <typename T>bool steal_by_function(std::size_t domain, std::size_t q_index, bool steal_numa, bool steal_core, pika::threads::policies::shared_priority_queue_scheduler::thread_holder_type * origin, T & var, const char * prefix, util::function<bool (std::size_t, std::size_t, thread_holder_type *, T &, bool, bool)> operation_HP, util::function<bool (std::size_t, std::size_t, thread_holder_type *, T &, bool, bool)> operation)
- public virtual bool wait_or_add_new(std::size_t, bool, std::int64_t &, bool, std::size_t & added)
- public virtual ~shared_priority_queue_scheduler<Mutex>()
Inherited from scheduler_base:
- public abort_all_suspended_threads
- public add_remove_scheduler_mode
- public add_scheduler_mode
- public cleanup_terminated
- public cleanup_terminated
- public clear_cuda_polling_function
- public clear_mpi_polling_function
- public create_thread
- public custom_polling_function
- public decrement_background_thread_count
- public destroy_thread
- public do_some_work
- public domain_from_local_thread_index
- public domain_threads
- public enumerate_threads
- public get_background_thread_count
- public get_description
- public get_minmax_state
- public get_next_thread
- public get_parent_pool
- public get_polling_work_count
- public get_pu_mutex
- public get_queue_length
- public get_scheduler_mode
- public get_stack_size
- public get_state
- public get_state
- public get_thread_count
- public global_to_local_thread_index
- public has_reached_state
- public has_scheduler_mode
- public idle_callback
- public increment_background_thread_count
- public is_core_idle
- public is_state
- public local_to_global_thread_index
- public null_polling_function
- public null_polling_work_count_function
- public num_domains
- public on_error
- public on_start_thread
- public on_stop_thread
- public remove_scheduler_mode
- public reset_thread_distribution
- public resume
- public schedule_thread
- public schedule_thread_last
- public select_active_pu
- public set_all_states
- public set_all_states_at_least
- public set_cuda_polling_functions
- public set_mpi_polling_functions
- public set_parent_pool
- public set_scheduler_mode
- public suspend
- public update_scheduler_mode
- public wait_or_add_new
Methods
void abort_all_suspended_threads()
void abort_all_suspended_threads()Description
passes the abort request through to all queues
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:202
bool cleanup_terminated(bool delete_all)
bool cleanup_terminated(bool delete_all)Description
Only cleans up terminated tasks belonging to this thread
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:231
Parameters
- bool delete_all
bool cleanup_terminated(std::size_t,
bool delete_all)
bool cleanup_terminated(std::size_t,
bool delete_all)Description
Generic cleanup function called by scheduling loop
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:273
Parameters
- std::size_t
- bool delete_all
void create_thread(
pika::threads::thread_init_data& data,
pika::threads::thread_id_ref_type* thrd,
pika::error_code& ec)
void create_thread(
pika::threads::thread_init_data& data,
pika::threads::thread_id_ref_type* thrd,
pika::error_code& ec)Description
create a new thread if the task state is pending and run_now attribute is set, then the task can be added to the staged queue, In this scheduler threads only create pending tasks on their own queues
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:284
Parameters
void destroy_thread(threads::thread_data* thrd)
void destroy_thread(threads::thread_data* thrd)Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:856
Parameters
- threads::thread_data* thrd
bool enumerate_threads(
const util::function<bool(thread_id_type)>& f,
pika::threads::thread_schedule_state state =
thread_schedule_state::unknown) const
bool enumerate_threads(
const util::function<bool(thread_id_type)>& f,
pika::threads::thread_schedule_state state =
thread_schedule_state::unknown) constDeclared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:967
Parameters
- const util::function<bool(thread_id_type)>& f
- pika::threads::thread_schedule_state state = thread_schedule_state::unknown
virtual bool get_next_thread(
std::size_t,
bool,
threads::thread_id_ref_type& thrd,
bool enable_stealing)
virtual bool get_next_thread(
std::size_t,
bool,
threads::thread_id_ref_type& thrd,
bool enable_stealing)Description
Return the next thread to be executed, return false if none available
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:597
Parameters
- std::size_t
- bool
- threads::thread_id_ref_type& thrd
- bool enable_stealing
std::int64_t get_queue_length(
std::size_t thread_num =
unsigned long(-1)) const
std::int64_t get_queue_length(
std::size_t thread_num =
unsigned long(-1)) constDeclared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:893
Parameters
- std::size_t thread_num = unsigned long(-1)
static std::string get_scheduler_name()
static std::string get_scheduler_name()Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:173
std::int64_t get_thread_count(
pika::threads::thread_schedule_state state =
thread_schedule_state::unknown,
pika::threads::thread_priority priority =
thread_priority::default_,
std::size_t thread_num = unsigned long(-1),
bool = false) const
std::int64_t get_thread_count(
pika::threads::thread_schedule_state state =
thread_schedule_state::unknown,
pika::threads::thread_priority priority =
thread_priority::default_,
std::size_t thread_num = unsigned long(-1),
bool = false) constDeclared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:921
Parameters
- pika::threads::thread_schedule_state state = thread_schedule_state::unknown
- pika::threads::thread_priority priority = thread_priority::default_
- std::size_t thread_num = unsigned long(-1)
- bool = false
bool is_core_idle(std::size_t num_thread) const
bool is_core_idle(std::size_t num_thread) constDeclared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:956
Parameters
- std::size_t num_thread
bool just_add_new(std::size_t& added)
bool just_add_new(std::size_t& added)Description
Return the next thread to be executed, return false if none available
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:661
Parameters
- std::size_t& added
inline std::size_t local_thread_number() const
inline std::size_t local_thread_number() constDescription
access thread local storage to determine correct thread and pool identification. This is used internally by the scheduler to compute correct queue indexes and offsets relative to a numa node. It should not be used without care as the thread numbering internal to the scheduler is not a simple linear indexing. returns -1 to indicate that the calling thread is not part of the thread pool the scheduler is running on
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:219
void on_error(std::size_t thread_num,
const std::exception_ptr&)
void on_error(std::size_t thread_num,
const std::exception_ptr&)Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:1270
Parameters
- std::size_t thread_num
- const std::exception_ptr&
void on_start_thread(std::size_t local_thread)
void on_start_thread(std::size_t local_thread)Description
////////////////////////////////////////////////////////////////////
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:983
Parameters
- std::size_t local_thread
void on_stop_thread(std::size_t thread_num)
void on_stop_thread(std::size_t thread_num)Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:1259
Parameters
- std::size_t thread_num
void schedule_thread(
threads::thread_id_ref_type thrd,
threads::thread_schedule_hint schedulehint,
bool allow_fallback,
pika::threads::thread_priority priority =
thread_priority::normal)
void schedule_thread(
threads::thread_id_ref_type thrd,
threads::thread_schedule_hint schedulehint,
bool allow_fallback,
pika::threads::thread_priority priority =
thread_priority::normal)Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:834
Parameters
- threads::thread_id_ref_type thrd
- threads::thread_schedule_hint schedulehint
- bool allow_fallback
- pika::threads::thread_priority priority = thread_priority::normal
void schedule_thread_last(
threads::thread_id_ref_type thrd,
threads::thread_schedule_hint schedulehint,
bool allow_fallback,
pika::threads::thread_priority priority =
thread_priority::normal)
void schedule_thread_last(
threads::thread_id_ref_type thrd,
threads::thread_schedule_hint schedulehint,
bool allow_fallback,
pika::threads::thread_priority priority =
thread_priority::normal)Description
Put task on the back of the queue : not yet implemented just put it on the normal queue for now
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:844
Parameters
- threads::thread_id_ref_type thrd
- threads::thread_schedule_hint schedulehint
- bool allow_fallback
- pika::threads::thread_priority priority = thread_priority::normal
void schedule_work(
threads::thread_id_ref_type thrd,
threads::thread_schedule_hint schedulehint,
bool,
bool other_end,
pika::threads::thread_priority priority =
thread_priority::normal)
void schedule_work(
threads::thread_id_ref_type thrd,
threads::thread_schedule_hint schedulehint,
bool,
bool other_end,
pika::threads::thread_priority priority =
thread_priority::normal)Description
Schedule the passed thread
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:710
Parameters
- threads::thread_id_ref_type thrd
- threads::thread_schedule_hint schedulehint
- bool
- bool other_end
- pika::threads::thread_priority priority = thread_priority::normal
void set_scheduler_mode(
pika::threads::policies::scheduler_mode mode)
void set_scheduler_mode(
pika::threads::policies::scheduler_mode mode)Description
get/set scheduler mode, calls inherited set function and then sets some flags we need later for scheduling
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:181
Parameters
- pika::threads::policies::scheduler_mode mode
shared_priority_queue_scheduler<Mutex>(
const pika::threads::policies::
shared_priority_queue_scheduler::
init_parameter& init)
shared_priority_queue_scheduler<Mutex>(
const pika::threads::policies::
shared_priority_queue_scheduler::
init_parameter& init)Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:150
Parameters
- const pika::threads::policies:: shared_priority_queue_scheduler:: init_parameter& init
template <typename T>
bool steal_by_function(
std::size_t domain,
std::size_t q_index,
bool steal_numa,
bool steal_core,
pika::threads::policies::
shared_priority_queue_scheduler::
thread_holder_type* origin,
T& var,
const char* prefix,
util::function<bool(std::size_t,
std::size_t,
thread_holder_type*,
T&,
bool,
bool)> operation_HP,
util::function<bool(std::size_t,
std::size_t,
thread_holder_type*,
T&,
bool,
bool)> operation)
template <typename T>
bool steal_by_function(
std::size_t domain,
std::size_t q_index,
bool steal_numa,
bool steal_core,
pika::threads::policies::
shared_priority_queue_scheduler::
thread_holder_type* origin,
T& var,
const char* prefix,
util::function<bool(std::size_t,
std::size_t,
thread_holder_type*,
T&,
bool,
bool)> operation_HP,
util::function<bool(std::size_t,
std::size_t,
thread_holder_type*,
T&,
bool,
bool)> operation)Description
This function, steals tasks from queues on other threads using two passed in function objects thhat allow the behaviour to be tweaked according to needs. One is called for high priority tasks, the other for normal/low since high priority tasks on other queues take precedence to local tasks of lower priority
Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:438
Parameters
- std::size_t domain
- std::size_t q_index
- bool steal_numa
- bool steal_core
- pika::threads::policies:: shared_priority_queue_scheduler:: thread_holder_type* origin
- T& var
- const char* prefix
- util::function<bool(std::size_t, std::size_t, thread_holder_type*, T&, bool, bool)> operation_HP
- util::function<bool(std::size_t, std::size_t, thread_holder_type*, T&, bool, bool)> operation
virtual bool wait_or_add_new(std::size_t,
bool,
std::int64_t&,
bool,
std::size_t& added)
virtual bool wait_or_add_new(std::size_t,
bool,
std::int64_t&,
bool,
std::size_t& added)Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:653
Parameters
- std::size_t
- bool
- std::int64_t&
- bool
- std::size_t& added
virtual ~shared_priority_queue_scheduler<Mutex>()
virtual ~shared_priority_queue_scheduler<Mutex>()Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:171