Scheduler Algorithm
Here is a bit simplified code of my scheduler (some functions are omitted for brevity):
class worker_thread
{
// global index of current thread
size_t my_index;
// local tasks
deque<task_t> tasks;
// 1-element queue for work requests from other threads
atomic<size_t> steal_req;
// 1-element queue for work request acknowledges from other threads
atomic<size_t> steal_ack;
// main thread loop
void loop()
{
for (;;)
{
// while we have local work process it
while (tasks.size() != 0)
{
// pop task in LIFO order
task_t task = tasks.back();
tasks.pop_back();
// user processing
execute(task);
// check for steal requests from other threads
if (steal_req.load(memory_order_relaxed) != no_requests)
process_work_request();
}
// try to get some work from other threads
if (false == send_steal_request())
break;
}
}
void process_work_request()
{
// get thief descriptor
size_t thief_idx = steal_req.load(memory_order_relaxed);
worker_thread&; thief = get_thread(thief_idx);
if (tasks.size())
{
// pop task in FIFO order
task_t task = tasks.back();
tasks.pop_back();
// synchronous user processing
steal(task);
// give it to the thift
thief.tasks.push_back(task);
}
// notify the thief that the operation is completed
thief.steal_ack.store(1, memory_order_release);
}
bool send_steal_request()
{
for (;;)
{
// choose a victim
size_t victim_idx = choose_randomly();
worker_thread&; victim = get_thread(victim_idx);
// send a request to it (if it's not busy processing another request)
steal_ack.store(0, memory_order_relaxed);
size_t cmp = victim.steal_req.load(memory_order_relaxed);
for (;;)
{
if (cmp != (size_t)-1)
break;
if (victim.steal_req.compare_exchange_strong(cmp, my_index, memory_order_acq_rel))
break;
}
// request is sent?
if (cmp == no_requests)
{
// wait for ack
while (steal_ack.load(memory_order_acquire) == 0)
Sleep(0);
// check as to whether we got some work or not
if (tasks.size())
return true;
}
// termination condition
if (no_work_in_the_system())
return false;
}
}
};
Move on to Performance Result