1

I'm messing around with the beast async websocket client and want it to re-connect in the same session instance if it becomes disconnected. The issue is that everything works until I attempt the websocket handshake a second time following disconnect - the tcp connection works, the ssl handshake works (more on that in a bit) , but the websocket handshake fails with 'Error: unexpected message (SSL routines, ssl3_read_bytes)'. I'm not sure what's wrong with re-using the ws_. From this post it seems there may not be a good answer?

so far have modified websocket_client_async.cpp it as follows:

instead of the write then read then quit as is done in the example, I'd like the code to simulate pulling messages from a queue and sending them out via the websocket connection. The websocket connection should be secure, so an ssl_stream is used.

I got around this ws_ re-use issue by destroying the session if it becomes disconnected once a websocket connection has established, and then creating a new instance for the next connection attempt. This approach seems kind of ham-fisted and I'm hoping it's possible to re-connect in the same session.

I've hacked up websocket_client_async.cpp - apologies as it's a fair amount of code, but I don't see what could be left out without confusing the issue


// includes ...

namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl;

using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

namespace {
    std::atomic<bool> running_;
    std::atomic<bool> ws_connect_broken;
    std::atomic<bool> fill_q_done;

    std::queue<std::string> outgoingMessages;

    std::mutex mtx;
}

namespace detail {
    constexpr int64_t CONNECT_RETRY_INTERVAL_SEC = 2;
}

//------------------------------------------------------------------------------

// Report a failure
void
fail(beast::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}

// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session>
{
    net::strand< net::io_context::executor_type >& ex_;
    tcp::resolver resolver_;
    websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
    std::string api_key_{ "123456789012345678901234567890123456" };
    beast::flat_buffer buffer_;
    std::string host_;
    std::string text_;
    net::steady_timer& qtmr_;
    net::steady_timer& wtmr_;
    net::steady_timer& ctmr_;
    tcp::resolver::results_type results_;
    bool ws_initialized_ = false;
    bool retry_conn_ = false;
    bool valid_ws_conn_ = false;

public:
    // Resolver and socket require an io_context
    explicit
        session(net::strand< net::io_context::executor_type >& ex, ssl::context& ctx, net::steady_timer& qtmr,
            net::steady_timer& wtmr, net::steady_timer& ctmr)
        : ex_(ex)
        , resolver_(ex_)
        , ws_(ex_, ctx)
        , qtmr_(qtmr)
        , wtmr_(wtmr)
        , ctmr_(ctmr)
    {

        qtmr_.expires_after(std::chrono::seconds(1));
        qtmr_.async_wait(std::bind(&session::on_check_queue, this, std::placeholders::_1));

    }

    // Start the asynchronous operation
    void
        run(
            char const* host,
            char const* port)
    {
        // Save these for later
        host_ = host;

        // Set SNI Hostname (many hosts need this to handshake successfully)
        if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host))
        {
            beast::error_code ec{ static_cast<int>(::ERR_get_error()), net::error::get_ssl_category() };
            std::cerr << ec.message() << "\n";
            return;
        }

        // Look up the domain name
        resolver_.async_resolve(host, port, beast::bind_front_handler(&session::on_resolve, shared_from_this()));
    }

    void
        on_resolve(beast::error_code ec, tcp::resolver::results_type results)
    {
        if (ec) {
            return fail(ec, "resolve");
        }

        results_ = results;

        boost::system::error_code ec2;
        on_try_connect(ec2);
    }

    void
        on_try_connect(const boost::system::error_code& ec)
    {
        if (ec != boost::asio::error::operation_aborted && running_ && !ws_connect_broken) {
            std::cout << "on_try_connect() trying tcp connection to " << host_ << std::endl;    

            // Set the timeout for the operation
            beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(5));

            // Make the connection on the IP address we get from a lookup
            beast::get_lowest_layer(ws_).async_connect(
                results_, beast::bind_front_handler(&session::on_tcp_connect, shared_from_this()));
        }
        else {
            std::cout << "on_try_connect() closing" << std::endl;   
            ws_.async_close(websocket::close_code::normal,
                beast::bind_front_handler(&session::on_close, shared_from_this()));
            return;
        }
    }

    void handle_error_retry(beast::error_code ec, const char* str)
    {
        if (!ec) { return; }

        retry_conn_ = ec != boost::asio::error::operation_aborted && running_;

        std::cout << str << " closing" << std::endl;    
        ws_.async_close(websocket::close_code::normal,
            beast::bind_front_handler(&session::on_close, shared_from_this()));
    }

    void
        on_tcp_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
    {
        if (ec) {
            return handle_error_retry(ec, "on_tcp_connect");
        }

        // Turn off the timeout on the tcp_stream, because
        // the websocket stream has its own timeout system.
        beast::get_lowest_layer(ws_).expires_never();

        if (!ws_initialized_) {
            ws_initialized_ = true;

            // Set suggested timeout settings for the websocket
            ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));

            // Set a decorator to change the User-Agent of the handshake
            ws_.set_option(websocket::stream_base::decorator(
                [this](websocket::request_type& req)
                {
                    req.set(http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
                    req.set("X-Api-Key", api_key_);
                }));

            ws_.binary(true);

            // Update the host_ string. This will provide the value of the
            // Host HTTP header during the WebSocket handshake.
            // See https://tools.ietf.org/html/rfc7230#section-5.4
            host_ += ':' + std::to_string(ep.port());
        }

        // Perform the ssl handshake
        ws_.next_layer().async_handshake(ssl::stream_base::client, beast::bind_front_handler(&session::on_ssl_handshake, shared_from_this()));
    }

    void
        on_ssl_handshake(beast::error_code ec)
    {
        if (ec) {
            return handle_error_retry(ec, "on_ssl_handshake");
        }

        std::cout << "ssl handshake succeeded" << std::endl;    

        // Perform the websocket handshake
        ws_.async_handshake(host_, "/", beast::bind_front_handler(&session::on_ws_handshake, shared_from_this()));
    }

    void
        on_ws_handshake(beast::error_code ec)
    {
        if (ec) {
            return handle_error_retry(ec, "on_ws_handshake");
        }

        std::cout << "ws handshake succeeded" << std::endl; 
        valid_ws_conn_ = true;

        wtmr_.expires_after(std::chrono::seconds(0));
        wtmr_.async_wait(std::bind(&session::do_write, this, std::placeholders::_1));
    }

    void
        do_write(const boost::system::error_code& ec)
    {
        if (ec) {
            return handle_error_retry(ec, "do_write");
        }

        std::string msg;
        bool has_msg = false;
        {
            std::lock_guard<std::mutex> lock(mtx);
            if (!outgoingMessages.empty()) {
                msg = outgoingMessages.front();
                outgoingMessages.pop();
                has_msg = true;
            }
        }
        if (has_msg) {
            ws_.async_write(net::buffer(msg), boost::beast::bind_front_handler(&session::on_write, shared_from_this()));
            std::cout << "wrote \"" << msg << "\"" << std::endl;    
        }
        else
        {
            // Repeat write
            if (running_)
            {
                //ex_.get_inner_executor().context().poll();

                wtmr_.expires_after(std::chrono::seconds(1));
                wtmr_.async_wait(std::bind(&session::do_write, this, std::placeholders::_1));
            }
            else {
                std::cout << "do_write() closing" << std::endl; 
                ws_.async_close(websocket::close_code::normal,
                    beast::bind_front_handler(&session::on_close, shared_from_this()));
            }
        }
    }

    void
        on_write(beast::error_code ec, std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if (ec) {
            return handle_error_retry(ec, "on_write");
        }

        // see if anything more in the queue to write
        wtmr_.expires_after(std::chrono::milliseconds(500));
        wtmr_.async_wait(std::bind(&session::do_write, this, std::placeholders::_1));
    }

    void
        on_check_queue(const boost::system::error_code& ec)
    {
        if (ec)
            return fail(ec, "on_check_queue");

        if (running_.load()) {
            std::cout << "on_check_queue" << std::endl; 

            // TODO do some stats, check if queue is backing up, etc

            qtmr_.expires_after(std::chrono::seconds(1));
            qtmr_.async_wait(std::bind(&session::on_check_queue, this, std::placeholders::_1));
        }
    }

    void
        on_close(beast::error_code ec)
    {
        if (ec) {
            std::cout << "on_close: " << ec.message() << std::endl;
        }
        else {
            std::cout << "on_close: no error"  << std::endl;
        }

        if (valid_ws_conn_) {
            ws_connect_broken = true;
=====> was trying async call to try_connect here, but ws handshake fails
//ctmr_.expires_after(std::chrono::seconds(detail::CONNECT_RETRY_INTERVAL_SEC));
//ctmr_.async_wait(std::bind(&session::on_try_connect, this, std::placeholders::_1));
            return;
        }

        if (retry_conn_) {
            ctmr_.expires_after(std::chrono::seconds(detail::CONNECT_RETRY_INTERVAL_SEC));
            ctmr_.async_wait(std::bind(&session::on_try_connect, this, std::placeholders::_1));
        }
    }

};

//------------------------------------------------------------------------------

void start(net::io_context& ioc, const std::string& host, const std::string& port)
{
    // The io_context is required for all I/O

    boost::asio::steady_timer           ctmr_{ ioc };   // connect timer
    boost::asio::steady_timer           wtmr_{ ioc };   // write timer
    boost::asio::steady_timer           qtmr_{ ioc };   // queue check timer
    boost::asio::ssl::context           ssl_ctx_{ boost::asio::ssl::context::tlsv12_client };

    // The SSL context is required, and holds certificates
    ssl::context ctx{ ssl::context::tlsv12_client };
    ctx.set_verify_mode(ssl::verify_none);

    // Launch the asynchronous operation
    auto strd = net::make_strand(ioc);
    auto s = std::make_shared<session>(strd, ctx, qtmr_, wtmr_, ctmr_);
    s->run(host.c_str(), port.c_str());

    // Run the I/O service. The call will return when
    // the socket is closed.
    ioc.run();
}

void fill_q()
{

    auto sleep_push = [&](int64_t t, std::string&& str) {
        std::this_thread::sleep_for(std::chrono::seconds(t));
        std::lock_guard<std::mutex> lock(mtx);
        outgoingMessages.push(str);
    };

    // push some fake messages on a queue periodically
    std::vector<std::string> msgs = { "yo", "ho",  "ho",  "and",  "a",  "bottle",  "of",  "rum",
        "navy", "grog", "for", "me"};
    for (auto& msg : msgs) {
        sleep_push(3, std::move(msg));
        if (!running_) { return; }
    }
    fill_q_done = true;

}

int main(int argc, char** argv)
{
    // Check command line arguments.
    if (argc != 3)
    {
        std::cerr <<
            "Usage: websocket-client-async <host> <port>\n" <<
            "Example:\n" << "    websocket-client-async echo.websocket.org 443\n";
        return EXIT_FAILURE;
    }
    auto const host = argv[1];
    auto const port = argv[2];

    net::io_context io_;

    using wgt = net::executor_work_guard<net::io_context::executor_type>;
    std::unique_ptr<wgt> work_guard_;
    std::unique_ptr<std::thread> thrd, thrd_q;
    
    outgoingMessages.push("hello"); // fake queue message

    auto destroy_conn = [&]() {
        // destroy existing connection
        std::cout << "destroy existing connection" << std::endl;    

        work_guard_.reset();
        running_ = false;
        thrd_q->join();
        thrd_q.reset();
        thrd->join();
        thrd.reset();
        io_.stop();
    };

    ws_connect_broken = true;       // true forces creation of initial session
    do {

        if (ws_connect_broken) {
            if (thrd) {
                destroy_conn();
            }

            // create new connection, set initial state and start threads
            std::cout << "create new connection" << std::endl;  

            io_.restart();
            work_guard_ = std::make_unique<wgt>(io_.get_executor());
            running_ = true;
            fill_q_done = false;
            ws_connect_broken = false;
            thrd_q.reset(new std::thread([&]() { fill_q(); }));
            thrd.reset(new std::thread([&]() { start(io_, host, port); }));
        }

        std::this_thread::sleep_for(std::chrono::seconds(1));
        if (!running_ || fill_q_done) { break; }

    } while (true);

    if (thrd) {
        destroy_conn();
    }

    std::cout << "done" << std::endl;   

    return EXIT_SUCCESS;
}

2
  • 1
    "was trying async call to try_connect here, but ws handshake fails" You're supposed to show the code you actually use. Commented May 6, 2023 at 18:57
  • What was the actual command and code you ran to get the error you mentioned Commented May 7, 2023 at 22:45

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.