Context
I build a webserver using boost coroutine ts, boost asio and boost beast.
There is a coroutine for reading and one for writing.
There is a message_to_send queue where messages get pushed to send to the user.
The writing coroutine checks if there is something in the message_to_send queue and sends it.
After sending the writing coroutine suspends itself for 100 milliseconds and checks again for something to write.
Problem
The writing coroutine is polling the message queue every 100 milliseconds. I like to find a solution without polling after some timer has fired.
Posible solution
Maybe ther is a solution to co_await the change of a variable. Maybe creating a async_wait_for_callback with "async_initiate"?
Code example
You can clone the project. Or use the complete example code posted here:
#include <algorithm>
#include <boost/asio.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/system_timer.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/bind/bind.hpp>
#include <boost/optional.hpp>
#include <chrono>
#include <cstddef>
#include <deque>
#include <exception>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <stdexcept>
#include <string>
// TODO use cmake to find out if the compiler is gcc or clang
#include <coroutine> // enable if build with gcc
// #include <experimental/coroutine> //enable if build with clang
using namespace boost::beast;
using namespace boost::asio;
typedef boost::asio::use_awaitable_t<>::as_default_on_t<boost::asio::basic_waitable_timer<boost::asio::chrono::system_clock>> CoroTimer;
typedef boost::beast::websocket::stream<boost::beast::tcp_stream> Websocket;
using namespace boost::beast;
using namespace boost::asio;
using boost::asio::ip::tcp;
using tcp_acceptor = use_awaitable_t<>::as_default_on_t<tcp::acceptor>;
struct User
{
boost::asio::awaitable<void> writeToClient (std::weak_ptr<Websocket> &connection);
std::deque<std::string> msgQueue{};
std::shared_ptr<CoroTimer> timer{};
};
void
handleMessage (std::string const &msg, std::list<std::shared_ptr<User>> &users, std::shared_ptr<User> user)
{
std::cout << "please implement handle message" << std::endl;
user->msgQueue.push_back ("please implement handle message");
user->timer->cancel ();
}
boost::asio::awaitable<void>
User::writeToClient (std::weak_ptr<Websocket> &connection)
{
try
{
while (not connection.expired ())
{
timer = std::make_shared<CoroTimer> (CoroTimer{ co_await this_coro::executor });
timer->expires_after (std::chrono::system_clock::time_point::max () - std::chrono::system_clock::now ());
try
{
co_await timer->async_wait ();
}
catch (boost::system::system_error &e)
{
using namespace boost::system::errc;
if (operation_canceled == e.code ())
{
// swallow cancel
}
else
{
std::cout << "error in timer boost::system::errc: " << e.code () << std::endl;
abort ();
}
}
while (not msgQueue.empty () && not connection.expired ())
{
auto tmpMsg = std::move (msgQueue.front ());
std::cout << " msg: " << tmpMsg << std::endl;
msgQueue.pop_front ();
co_await connection.lock ()->async_write (buffer (tmpMsg), use_awaitable);
}
}
}
catch (std::exception &e)
{
std::cout << "write Exception: " << e.what () << std::endl;
}
}
class Server
{
public:
Server (boost::asio::ip::tcp::endpoint const &endpoint);
boost::asio::awaitable<void> listener ();
private:
void removeUser (std::list<std::shared_ptr<User>>::iterator user);
boost::asio::awaitable<std::string> my_read (Websocket &ws_);
boost::asio::awaitable<void> readFromClient (std::list<std::shared_ptr<User>>::iterator user, Websocket &connection);
boost::asio::ip::tcp::endpoint _endpoint{};
std::list<std::shared_ptr<User>> users{};
};
namespace this_coro = boost::asio::this_coro;
Server::Server (boost::asio::ip::tcp::endpoint const &endpoint) : _endpoint{ endpoint } {}
awaitable<std::string>
Server::my_read (Websocket &ws_)
{
std::cout << "read" << std::endl;
flat_buffer buffer;
co_await ws_.async_read (buffer, use_awaitable);
auto msg = buffers_to_string (buffer.data ());
std::cout << "number of letters '" << msg.size () << "' msg: '" << msg << "'" << std::endl;
co_return msg;
}
awaitable<void>
Server::readFromClient (std::list<std::shared_ptr<User>>::iterator user, Websocket &connection)
{
try
{
for (;;)
{
auto readResult = co_await my_read (connection);
handleMessage (readResult, users, *user);
}
}
catch (std::exception &e)
{
removeUser (user);
std::cout << "read Exception: " << e.what () << std::endl;
}
}
void
Server::removeUser (std::list<std::shared_ptr<User>>::iterator user)
{
users.erase (user);
}
awaitable<void>
Server::listener ()
{
auto executor = co_await this_coro::executor;
tcp_acceptor acceptor (executor, _endpoint);
for (;;)
{
try
{
auto socket = co_await acceptor.async_accept ();
auto connection = std::make_shared<Websocket> (std::move (socket));
users.emplace_back (std::make_shared<User> ());
std::list<std::shared_ptr<User>>::iterator user = std::next (users.end (), -1);
connection->set_option (websocket::stream_base::timeout::suggested (role_type::server));
connection->set_option (websocket::stream_base::decorator ([] (websocket::response_type &res) { res.set (http::field::server, std::string (BOOST_BEAST_VERSION_STRING) + " websocket-server-async"); }));
co_await connection->async_accept (use_awaitable);
co_spawn (
executor, [connection, this, &user] () mutable { return readFromClient (user, *connection); }, detached);
co_spawn (
executor, [connectionWeakPointer = std::weak_ptr<Websocket>{ connection }, &user] () mutable { return user->get ()->writeToClient (connectionWeakPointer); }, detached);
}
catch (std::exception &e)
{
std::cout << "Server::listener () connect Exception : " << e.what () << std::endl;
}
}
}
auto const DEFAULT_PORT = u_int16_t{ 55555 };
int
main ()
{
try
{
using namespace boost::asio;
io_context io_context (1);
signal_set signals (io_context, SIGINT, SIGTERM);
signals.async_wait ([&] (auto, auto) { io_context.stop (); });
auto server = Server{ { ip::tcp::v4 (), DEFAULT_PORT } };
co_spawn (
io_context, [&server] { return server.listener (); }, detached);
io_context.run ();
}
catch (std::exception &e)
{
std::printf ("Exception: %s\n", e.what ());
}
return 0;
}
EDIT: updated code based on sehe's idea which is marked as answer.