Skip to content

3.1 Scheduling a task

Alexander Damian edited this page Aug 21, 2019 · 4 revisions

Anatomy of a coroutine

Any function which is intended to run as a coroutine MUST have the following signature:

int function(CoroContextPtr<T> ctx, ... );

The first parameter must be of type CoroContextPtr<T> (shared pointer to a coroutine context), followed by an arbitrary number of arguments. These arguments can be passed by reference, address or value (rvalue references can be passed using std::forward_as_tuple - see tests folder for example). The function must return an integer. T in this case represents the value returned by the promise associated with this context (more on that later). Any type of callable object can be passed-in as long as they have the above signature. Example:

//Function definition. Future returns a 'std::vector<double>'.
int func1(CoroContextPtr<std::vector<double>> ctx,
          const std::string& name,
          double* value,
          int id)
{
    ...
}

//Named lambda. Future returns an `int`.
auto func2 = [](CoroContextPtr<int> ctx, const char* name)->int
{
    ...
}

//Function object. Future returns a string. Foo::myFunction is a method
//taking CoroContextPtr<std::string> as the first parameter.
std::function<int(CoroContextPtr<std::string>)> func3 =
    std::bind(&Foo::myFunction, &foo, _1);

Executing the coroutine

Posting a function to run as a coroutine is done as follows:

//Create the dispatcher object with one coroutine thread per core and 5 IO threads.
//This object should normally be created once for the lifetime of the program.
Dispatcher dispatcher(-1, 5);

//local variables
std::string name("foobar");
double value = 0.0;
int id = 55;

//Schedule func1, func2 and func3 to run on any coroutine thread.
dispatcher.post(func1, name, &value, id);
dispatcher.post(func2, name.c_str());
dispatcher.post(func3);

//Schedule an anonymous lambda, capture 'id' by reference and pass-in 'name' by value.
dispatcher.post([&id](CoroContextPtr<std::string> ctx, std::string s)->int
{
    ... //Do something with 'id' and 's'.
}, name); 

Executing a coroutine from another coroutine

It is a quite common scenario to with to execute another function from within a coroutine. This can simply be scheduled unto the dispatcher using the CoroContext<T> passed in as the first parameter. Example:

dispatcher.post([](CoroContextPtr<char> ctx)->int
{
    ... 
    //Post func3 on any dispatcher queue and return immediately.
    ctx->post(func3); 
    ...
}); 

Lock-free code

Who doesn't like lock-free code? quantum's scheduling API allows programmers to post coroutines on a specific queue and (optionally) to specify a runtime priority. By queueing all coroutines accessing the same shared object unto the same thread, there is no need for locks as all access is serialized. If no queue number is specified, the scheduler automatically picks the least busy queue.

std::vector<int> v; //some global object

//Declare two coroutines accessing the same object.
//One inserts positive numbers, the other negative.
int func1(CoroContextPtr<int> ctx)
{
    for (int i = 0; i < 10000; ++i)
    {
        if (i % 100 == 0) ctx->yield(); //Let the other coroutine run a bit
        v.push_back(i);
    }
}

int func2(CoroContextPtr<int> ctx)
{
    for (int i = 0; i > -10000; --i)
    {
        if (i % 100 == 0) ctx->yield(); //Let the other coroutine run a bit
        v.push_back(i);
    }
}

//Post both coroutines and create the vector.
dispatcher.post(1, false, func1);
dispatcher.post(1, false, func2);

Normally one would have to protect all vector manipulation with a mutex. In this case, by using the same queue, we guarantee that all access will be serialized. In this case the vector will contain ranges of 100 positive numbers followed by ranges of 100 negative numbers.

If a coroutine launches another coroutine, it is possible to tell the scheduler to use the same execution queue as the parent coroutine, without knowing exactly which queue the code executes on. The following example runs a coroutine on any queue, which in turn starts another coroutine and forces it to run on the same queue, possibly to avoid locks.

//Assume this coroutine runs on any thread
dispatcher.post([](CoroContextPtr<char> ctx)->int
{
    ... 
    //Post func3 on the 'same' queue as its parent
    ctx->post((int)IQueue::QueueId::Same, false, func3); 
    ...
});

Anatomy of an asynchronous IO task

Any function which is intended to run as an IO task (i.e. long-running or blocking) MUST have the following signature:

int function(ThreadPromisePtr<T> promise, ... );

The first parameter must be of type ThreadPromisePtr<T> (shared pointer to a thread-compatible promise), followed by an arbitrary number of arguments. These arguments can be passed by reference, address or value. The function must return an integer. T in this case represents the value returned by this promise and will be accessible in the calling thread/coroutine via the associated future (more on that later). Any type of callable object can be passed-in as long as they have the above signature. Example:

//Function definition. Future returns a `std::vector<double>`.
int funcIO(ThreadPromisePtr<std::vector<double>> promise,
           const std::string& name,
           double* value,
           int id)
{
    std::vector<double> v;
    ...
    return promise->set(v);
}

Executing an asynchronous IO task

Everything related to asynchronous IO tasks is very similar to the coroutine counterpart. One notable exception is that although IO tasks can be scheduled from within a coroutine, an IO task only receives a promise as a parameter, therefore it cannot post additional coroutines or IO tasks. This is intended by design, since an IO task is only supposed to perform a single IO operation and (optionally) return its result. If more IO operations are needed, they need to be scheduled from the caller thread/coroutine. Example:

Posting an IO task from the main thread:

//local variables
std::string name("foobar");
double value = 0.0;
int id = 55;

//Schedule funcIO to run on any IO thread.
dispatcher.postAsyncIo(funcIO, name, &value, id);  

Posting and IO task from within a coroutine:

dispatcher.post([](CoroContextPtr<char> ctx)->int
{
    ... 
    //Post a lambda on IO queue 3 at default priority and return immediately.
    ctx->postAsyncIo(3, false, [](ThreadPromisePtr<int> promise)->int { ... });
    ...
});