Converting repeated callbacks into C++20 coroutines

C++20 introduced coroutines. Which drastically simplifies the implementation of asynchronous code. However, converting existing callback-based code to coroutines is not always easy. The simple case, where the callback is guarenteed to be called only once is covered by the fillowing question on StackOverflow.

I've cleaned up the code and modernized it a bit. To make it easier to read.

auto api_call_async(const std::string& some_parameter)
{
    struct awaiter : public std::suspend_always
    {
        awaiter(const std::string &parameter)
            :parameter_(parmeter) {}

        bool await_ready() { return true; }

        void await_suspend(std::coroutine_handle<> handle)
        { 
            // use your third party lib call directly here.
            api_call(parameter_, [handle](std::error_code ec) 
            { 
                // call the handle to resume the coroutine
                handle(); 
            }
        }
    };
    return awaiter(some_parameter);
}

Won't get into the details. C++ coroutines are complicated AF. In short. It does the following:

  • In constructor, store all of the parameters to the function call.
  • Upon await_suspend, call the function with the stored parameters use a lambda as the callback.
  • Resume the coroutine in the lambda.

Let's try a pratical example. In GNUnet++, a library I wrote. The DHT::put(key, callback) function is used to store a value in the DHT. The callback is called when the operation is complete. To wrap it into a coroutine, we can do the following:

struct PutAwaiter
{
    // store the parameters into member variables
    PutAwaiter(DHT* dht, const std::string& key, const std::string& value)
        :key_(key), value_(value), dht_(dht) {}

    bool await_ready() { return true; }

    void await_suspend(std::coroutine_handle<> handle)
    {
        // use the stored parameters to call the function
        // and awake the coroutine when the callback is called.
        dht_->put(key_, value_, [handle](bool success) {
            success_ = success;
            handle.resume();
        });
    }

    void await_resume() {
        if(!success_)
            throw std::runtime_error("put failed");
    }

    bool success_;

    std::string key_;
    std::string value_;
    DHT* dht_;
};

// Now we can use it like this:
// PutAwaiter putCoro(DHT* dht, const std::string& key, const std::string& value)
// {
//     return PutAwaiter(dht, key, value);
// }

// or to make the signature a bit nicer:
cppcoro::task<bool> putCoro(DHT* dht, const std::string& key, const std::string& value)
{
    co_return co_await PutAwaiter(dht, key, value)
}

Dealing callbacks being called multiple times

That's nice and all. But what whould we do when the callback can be called multiple times? For example, when we want to read from the DHR where there could be multiple values for a given key. Or when iterating over all replies from a TCP stream. I took me a lot of head scratching to figure out how. Turns out the key is you can resume a coroutine multiple times as long as you make damn sure you don't try to resume while the coroutien is running or is dead.

Here's my solution. The QueuedAwaiter. It'll keep track of all values generated by the callback and dispense them one by one to the awaiter. The main API is addValue and addException. Which queues the value or exception.

template <typename T>
struct QueuedAwaiter
{
    using ElementType = std::variant<T, std::exception_ptr>;
    std::queue<ElementType> queue_;
    std::coroutine_handle<> handle_;
    // A mutex is needed to protect the queue as it's possible for
    // multi thread system to run the coroutine and the callback
    // at the same time.
    mutable std::mutex mtx_;

    void addValue(T&& value)
    {
        std::coroutine_handle<> handle = nullptr;
        {
            std::lock_guard<std::mutex> lock(mtx_);
            queue_.emplace(std::move(value));
            handle = handle_;
            handle_ = nullptr;
        }
        if(handle)
            handle.resume();
    }

    void addException(std::exception_ptr&& exception)
    {
        std::coroutine_handle<> handle = nullptr;
        {
            std::lock_guard<std::mutex> lock(mtx_);
            queue_.emplace(std::move(exception));
            handle = handle_;
            handle_ = nullptr;
        }
        if(handle)
            handle.resume();
    }

    bool await_ready() const noexcept
    {
        std::lock_guard<std::mutex> lock(mtx_);
        return !queue_.empty();
    }

    T await_resume() noexcept(false)
    {
        auto try_front = [this]() ->std::optional<ElementType> {
            std::lock_guard<std::mutex> lock(mtx_);
            if(queue_.empty())
                return std::nullopt;
            return std::move(queue_.front());
        };
        auto front = try_front();
        assert(front.has_value());


        if(front->index() == 1)
            std::rethrow_exception(std::get<1>(*front));
        else {
            auto item = std::move(std::get<0>(*front));
            queue_.pop();
            return item;
        }
    }

    void await_suspend(std::coroutine_handle<> handle) noexcept
    {
        bool has_data = false;
        {
            std::lock_guard<std::mutex> lock(mtx_);
            has_data = queue_.size() != 0;
        }
        if (has_data)
            handle.resume();
        else {
            std::lock_guard<std::mutex> lock(mtx_);
            handle_ = handle;
        }
    }
};

To convert callbacks getting called over and over, like the DHT::get function:

DHT::get(key, [](const std::optional<std::string>& value) {
    // value == std::nullopt if the DHT search is complete.
    // otherwise, value contains the data found

    if (!value.has_value()) 
        std::cout << "Search complete" << std::endl;
    else
        // do something with the value
});

We do the following horrible and unreadable thing:

cppcoro::async_generator(std::string) getCoro(DHT* dht, const std::string& key)
{
    QueuedAwaiter<std::optional<std::string>> awaiter;
    dht->get(key, [&awaiter](const std::optional<std::string>& value) {
        awaiter.addValue(value);
    });

    // Try to await for the values and exit the loop when we get the notification
    // that the search is complete.
    while(true) {
        // wait for the next value
        auto res = co_await awaiter;

        // Note! You must provide a way to notify the generator to exit the loop
        // or it'll hang there and cause all sorts of problems.
        if (!res.has_value())
            break;
        // Nice! We got a value. Yield it to the caller.
        co_yield *res;
    }
}

// using it
auto lookup = getCoro(dht, "key");
for(auto it = co_await lookup.begin(); it != lookup.end(); co_await ++it) {
    std::cout << "Found on DHT: " << *it << std::endl;
}

// Unfortunately, the C++ standard committee decided to remove the for co_await syntax
// for co_await(auto val : lookup) {
//     std::cout << "Found on DHT: " << val << std::endl;
// }


It's much harder than I thought.

Author's profile. Photo taken in VRChat by my friend Tast+
Martin Chang
Systems software, HPC, GPGPU and AI. I mostly write stupid C++ code. Sometimes does AI research. Chronic VRChat addict

I run TLGS, a major search engine on Gemini. Used by Buran by default.


  • marty1885 \at protonmail.com
  • Matrix: @clehaxze:matrix.clehaxze.tw
  • Jami: a72b62ac04a958ca57739247aa1ed4fe0d11d2df