class shared_priority_queue_scheduler

Declaration

template <typename Mutex = std::mutex>
class shared_priority_queue_scheduler : public scheduler_base { /* full declaration omitted */ };

Description

//////////////////////////////////////////////////////////////////////// The shared_priority_queue_scheduler maintains a set of high, normal, and low priority queues. For each priority level there is a core/queue ratio which determines how many cores share a single queue. If the high priority core/queue ratio is 4 the first 4 cores will share a single high priority queue, the next 4 will share another one and so on. In addition, the shared_priority_queue_scheduler is NUMA-aware and takes NUMA scheduling hints into account when creating and scheduling work. Warning: PendingQueuing lifo causes lockup on termination

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:106

Inherits from: scheduler_base

Member Variables

protected std::array<std::size_t, 8> q_counts_
protected std::array<std::size_t, 8> q_offset_
protected std::array<numa_queues, 8> numa_holder_
protected std::vector<std::size_t> d_lookup_
protected std::vector<std::size_t> q_lookup_
protected std::vector<std::size_t> schedcpu_
protected pika::threads::policies::core_ratios cores_per_queue_
protected bool round_robin_
protected bool steal_hp_first_
protected bool numa_stealing_
protected bool core_stealing_
protected std::size_t num_workers_
protected std::size_t num_domains_
protected const pika::detail::affinity_data& affinity_data_
protected const pika::threads::policies:: thread_queue_init_parameters queue_parameters_
protected std::mutex init_mutex
protected bool initialized_
protected bool debug_init_
protected std::atomic<std::size_t> thread_init_counter_
protected std::size_t pool_index_

Inherited from scheduler_base:

protected mode_
protected mtx_
protected cond_
protected wait_counts_
protected suspend_mtxs_
protected suspend_conds_
protected pu_mtxs_
protected states_
protected description_
protected thread_queue_init_
protected parent_pool_
protected background_thread_count_
protected polling_function_mpi_
protected polling_function_cuda_
protected polling_work_count_function_mpi_
protected polling_work_count_function_cuda_

Method Overview

  • public void abort_all_suspended_threads()
  • public bool cleanup_terminated(bool delete_all)
  • public bool cleanup_terminated(std::size_t, bool delete_all)
  • public void create_thread(pika::threads::thread_init_data & data, pika::threads::thread_id_ref_type * thrd, pika::error_code & ec)
  • public void destroy_thread(threads::thread_data * thrd)
  • public bool enumerate_threads(const util::function<bool (thread_id_type)> & f, pika::threads::thread_schedule_state state = thread_schedule_state::unknown) const
  • public virtual bool get_next_thread(std::size_t, bool, threads::thread_id_ref_type & thrd, bool enable_stealing)
  • public std::int64_t get_queue_length(std::size_t thread_num = unsigned long(-1)) const
  • public static std::string get_scheduler_name()
  • public std::int64_t get_thread_count(pika::threads::thread_schedule_state state = thread_schedule_state::unknown, pika::threads::thread_priority priority = thread_priority::default_, std::size_t thread_num = unsigned long(-1), bool = false) const
  • public bool is_core_idle(std::size_t num_thread) const
  • public bool just_add_new(std::size_t & added)
  • public inline std::size_t local_thread_number() const
  • public void on_error(std::size_t thread_num, const std::exception_ptr &)
  • public void on_start_thread(std::size_t local_thread)
  • public void on_stop_thread(std::size_t thread_num)
  • public void schedule_thread(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, pika::threads::thread_priority priority = thread_priority::normal)
  • public void schedule_thread_last(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback, pika::threads::thread_priority priority = thread_priority::normal)
  • public void schedule_work(threads::thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool, bool other_end, pika::threads::thread_priority priority = thread_priority::normal)
  • public void set_scheduler_mode(pika::threads::policies::scheduler_mode mode)
  • public shared_priority_queue_scheduler<Mutex>(const pika::threads::policies::shared_priority_queue_scheduler::init_parameter & init)
  • public template <typename T>bool steal_by_function(std::size_t domain, std::size_t q_index, bool steal_numa, bool steal_core, pika::threads::policies::shared_priority_queue_scheduler::thread_holder_type * origin, T & var, const char * prefix, util::function<bool (std::size_t, std::size_t, thread_holder_type *, T &, bool, bool)> operation_HP, util::function<bool (std::size_t, std::size_t, thread_holder_type *, T &, bool, bool)> operation)
  • public virtual bool wait_or_add_new(std::size_t, bool, std::int64_t &, bool, std::size_t & added)
  • public virtual ~shared_priority_queue_scheduler<Mutex>()

Inherited from scheduler_base:

Methods

void abort_all_suspended_threads()

Description

passes the abort request through to all queues

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:202

bool cleanup_terminated(bool delete_all)

Description

Only cleans up terminated tasks belonging to this thread

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:231

Parameters

bool delete_all

bool cleanup_terminated(std::size_t,
                        bool delete_all)

Description

Generic cleanup function called by scheduling loop

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:273

Parameters

std::size_t
bool delete_all

void create_thread(
    pika::threads::thread_init_data& data,
    pika::threads::thread_id_ref_type* thrd,
    pika::error_code& ec)

Description

create a new thread if the task state is pending and run_now attribute is set, then the task can be added to the staged queue, In this scheduler threads only create pending tasks on their own queues

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:284

Parameters

pika::threads::thread_init_data& data
pika::threads::thread_id_ref_type* thrd
pika::error_code& ec

void destroy_thread(threads::thread_data* thrd)

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:856

Parameters

threads::thread_data* thrd

bool enumerate_threads(
    const util::function<bool(thread_id_type)>& f,
    pika::threads::thread_schedule_state state =
        thread_schedule_state::unknown) const

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:967

Parameters

const util::function<bool(thread_id_type)>& f
pika::threads::thread_schedule_state state = thread_schedule_state::unknown

virtual bool get_next_thread(
    std::size_t,
    bool,
    threads::thread_id_ref_type& thrd,
    bool enable_stealing)

Description

Return the next thread to be executed, return false if none available

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:597

Parameters

std::size_t
bool
threads::thread_id_ref_type& thrd
bool enable_stealing

std::int64_t get_queue_length(
    std::size_t thread_num =
        unsigned long(-1)) const

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:893

Parameters

std::size_t thread_num = unsigned long(-1)

static std::string get_scheduler_name()

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:173

std::int64_t get_thread_count(
    pika::threads::thread_schedule_state state =
        thread_schedule_state::unknown,
    pika::threads::thread_priority priority =
        thread_priority::default_,
    std::size_t thread_num = unsigned long(-1),
    bool = false) const

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:921

Parameters

pika::threads::thread_schedule_state state = thread_schedule_state::unknown
pika::threads::thread_priority priority = thread_priority::default_
std::size_t thread_num = unsigned long(-1)
bool = false

bool is_core_idle(std::size_t num_thread) const

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:956

Parameters

std::size_t num_thread

bool just_add_new(std::size_t& added)

Description

Return the next thread to be executed, return false if none available

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:661

Parameters

std::size_t& added

inline std::size_t local_thread_number() const

Description

access thread local storage to determine correct thread and pool identification. This is used internally by the scheduler to compute correct queue indexes and offsets relative to a numa node. It should not be used without care as the thread numbering internal to the scheduler is not a simple linear indexing. returns -1 to indicate that the calling thread is not part of the thread pool the scheduler is running on

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:219

void on_error(std::size_t thread_num,
              const std::exception_ptr&)

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:1270

Parameters

std::size_t thread_num
const std::exception_ptr&

void on_start_thread(std::size_t local_thread)

Description

////////////////////////////////////////////////////////////////////

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:983

Parameters

std::size_t local_thread

void on_stop_thread(std::size_t thread_num)

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:1259

Parameters

std::size_t thread_num

void schedule_thread(
    threads::thread_id_ref_type thrd,
    threads::thread_schedule_hint schedulehint,
    bool allow_fallback,
    pika::threads::thread_priority priority =
        thread_priority::normal)

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:834

Parameters

threads::thread_id_ref_type thrd
threads::thread_schedule_hint schedulehint
bool allow_fallback
pika::threads::thread_priority priority = thread_priority::normal

void schedule_thread_last(
    threads::thread_id_ref_type thrd,
    threads::thread_schedule_hint schedulehint,
    bool allow_fallback,
    pika::threads::thread_priority priority =
        thread_priority::normal)

Description

Put task on the back of the queue : not yet implemented just put it on the normal queue for now

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:844

Parameters

threads::thread_id_ref_type thrd
threads::thread_schedule_hint schedulehint
bool allow_fallback
pika::threads::thread_priority priority = thread_priority::normal

void schedule_work(
    threads::thread_id_ref_type thrd,
    threads::thread_schedule_hint schedulehint,
    bool,
    bool other_end,
    pika::threads::thread_priority priority =
        thread_priority::normal)

Description

Schedule the passed thread

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:710

Parameters

threads::thread_id_ref_type thrd
threads::thread_schedule_hint schedulehint
bool
bool other_end
pika::threads::thread_priority priority = thread_priority::normal

void set_scheduler_mode(
    pika::threads::policies::scheduler_mode mode)

Description

get/set scheduler mode, calls inherited set function and then sets some flags we need later for scheduling

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:181

Parameters

pika::threads::policies::scheduler_mode mode

shared_priority_queue_scheduler<Mutex>(
    const pika::threads::policies::
        shared_priority_queue_scheduler::
            init_parameter& init)

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:150

Parameters

const pika::threads::policies:: shared_priority_queue_scheduler:: init_parameter& init

template <typename T>
bool steal_by_function(
    std::size_t domain,
    std::size_t q_index,
    bool steal_numa,
    bool steal_core,
    pika::threads::policies::
        shared_priority_queue_scheduler::
            thread_holder_type* origin,
    T& var,
    const char* prefix,
    util::function<bool(std::size_t,
                        std::size_t,
                        thread_holder_type*,
                        T&,
                        bool,
                        bool)> operation_HP,
    util::function<bool(std::size_t,
                        std::size_t,
                        thread_holder_type*,
                        T&,
                        bool,
                        bool)> operation)

Description

This function, steals tasks from queues on other threads using two passed in function objects thhat allow the behaviour to be tweaked according to needs. One is called for high priority tasks, the other for normal/low since high priority tasks on other queues take precedence to local tasks of lower priority

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:438

Parameters

std::size_t domain
std::size_t q_index
bool steal_numa
bool steal_core
pika::threads::policies:: shared_priority_queue_scheduler:: thread_holder_type* origin
T& var
const char* prefix
util::function<bool(std::size_t, std::size_t, thread_holder_type*, T&, bool, bool)> operation_HP
util::function<bool(std::size_t, std::size_t, thread_holder_type*, T&, bool, bool)> operation

virtual bool wait_or_add_new(std::size_t,
                             bool,
                             std::int64_t&,
                             bool,
                             std::size_t& added)

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:653

Parameters

std::size_t
bool
std::int64_t&
bool
std::size_t& added

virtual ~shared_priority_queue_scheduler<Mutex>()

Declared at: libs/pika/schedulers/include/pika/schedulers/shared_priority_queue_scheduler.hpp:171