C ++: Boost.Asio: starting SSL Server session on new thread

I wrote a couple of server / client programs based on this example for the server and I am made of all communication protocols. The server needs to receive multiple connections from multiple connections from multiple clients, so I want to decouple sessions from each other and I hope I can do that with std::thread

.

It looks simple, but I don't know how to do it. All examples on the internet seem to show how to run the function in parallel, but it doesn't seem to show how to create an object on a new thread.

I've added a few comments to explain my understanding of this session mechanism.

The code I want to use is the following:

class server
{
public:
  server(boost::asio::io_service& io_service, unsigned short port)
    : io_service_(io_service),
      acceptor_(io_service,
          boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
      context_(io_service, boost::asio::ssl::context::sslv23)
  {
    //some code...


    //notice the next lines here create the session object, and then recurs that to receive more connections
    session* new_session = new session(io_service_, context_);

    //this is called to accept more connections if available, the callback function is called with start() to start the session
    acceptor_.async_accept(new_session->socket(),
        boost::bind(&server::handle_accept, this, new_session,
          boost::asio::placeholders::error));
  }

  void handle_accept(session* new_session, const boost::system::error_code& error)
  {
    if (!error)
    {
      //so the session starts here, and another object is created waiting for another session
      new_session->start();
      new_session = new session(io_service_, context_);
      //now this function is, again, a call back function to make use of new_session, the new object that waiting for a connection
      acceptor_.async_accept(new_session->socket(),
          boost::bind(&server::handle_accept, this, new_session,
            boost::asio::placeholders::error));
    }
    else
    {
      delete new_session;
    }
  }

private:
  boost::asio::io_service& io_service_;
  boost::asio::ip::tcp::acceptor acceptor_;
  boost::asio::ssl::context context_;
};

      

How do I create these sessions in a new one std::thread

?

If you need more information, please ask. Thank.

+3


source to share


1 answer


I refactored the example for the linked answer mixed in with your example code.

It demonstrates the same principle, but io_service works for as many threads as your hardware supports (i.e. thread::hardware_concurrency

).

Trapped here

  • (total) lifetime of objects
  • thread safety

Most Asio objects are not thread safe. Therefore, you need to sync access to them. Old-fashioned mutual exclusion ( std::mutex

etc.) doesn't work very well in this scenario (because you really don't want to block every completion handler, and you don't want to re-block asynchronous calls ยน.

Boost Asio has a concept strand

for this situation:

I have chosen the simplest solution to perform all operations on the "socket" (ssl stream / connection / session or, as you would logically refer to it) on the chain.

And on top of that, I did all the acceptor_

serialized access on my own chain.

A hybrid solution could move all connections to the io_service

+ pool and keep the listener ( Server

) on a separate one io_service

, which can then be its own implicit chain



Note . About the shutdown sequence:

  • I made the destruction Server

    explicit, so we can stop acceptor_

    on strand

    (!!) as needed.
  • Threads pool

    will not terminate until all connections are closed. If you want to control this, see the linked answer again (which shows how to preserve weak pointers tracking connections). Alternatively, you can follow all asynchronous operations in a session with timeouts and check Server

    for a shutdown signal.

Demo code

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace bs  = boost::system;
namespace ba  = boost::asio;
namespace bas = ba::ssl;

using ba::ip::tcp;
using SslContext = boost::shared_ptr<bas::context>;

typedef ba::ip::tcp::acceptor    acceptor_type;
typedef bas::stream<tcp::socket> stream_type;

const short PORT = 26767;

class Session : public boost::enable_shared_from_this<Session>
{
public:
    typedef boost::shared_ptr<Session> Ptr;

    Session(ba::io_service& svc, SslContext ctx) : strand_(svc), ctx_(ctx), stream(svc, *ctx) { }

    virtual ~Session() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

    stream_type::lowest_layer_type& socket() { return stream.lowest_layer(); } 
    void start()                             { AsyncReadString();          } 
    void Stop()                              { stream.shutdown();            } 

protected:
    ba::io_service::strand strand_;
    SslContext             ctx_;
    stream_type            stream;
    ba::streambuf          stream_buffer;
    std::string            message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        ba::async_read_until(
            stream,
            stream_buffer,
            '\0', // null-char is a delimiter
            strand_.wrap(
                boost::bind(&Session::ReadHandler, shared_from_this(),
                    ba::placeholders::error,
                    ba::placeholders::bytes_transferred)));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        message = s;

        ba::async_write(
            stream,
            ba::buffer(message.c_str(), message.size()+1),
            strand_.wrap(
                boost::bind(&Session::WriteHandler, shared_from_this(),
                         ba::placeholders::error,
                         ba::placeholders::bytes_transferred)));
    }

    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }

    void ReadHandler(const bs::error_code &ec, std::size_t /*bytes_transferred*/) 
    {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }

    void WriteHandler(const bs::error_code &/*ec*/, std::size_t /*bytes_transferred*/) {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }
};

class Server : public boost::enable_shared_from_this<Server>
{
  public:
    Server(ba::io_service& io_service, unsigned short port) :
        strand_  (io_service),
        acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
        context_ (boost::make_shared<bas::context>(io_service, bas::context::sslv23))
    {
        //
    }

    void start_accept() {
        auto new_session = boost::make_shared<Session>(strand_.get_io_service(), context_);

        acceptor_.async_accept(new_session->socket(),
                strand_.wrap(boost::bind(&Server::handle_accept, this, new_session, ba::placeholders::error)));
    }

    void stop_accept() {
        auto keep = shared_from_this();
        strand_.post([keep] { keep->acceptor_.close(); });
    }

    void handle_accept(Session::Ptr new_session, const bs::error_code& error)
    {
        if (!error) {
            new_session->start();
            start_accept(); // uses `acceptor_` safely because of the strand_
        }
    }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

  private:
    ba::io_service::strand strand_;
    tcp::acceptor          acceptor_;
    SslContext             context_;
};

int main() {
    ba::io_service svc;
    boost::thread_group pool;

    {
        auto s = boost::make_shared<Server>(svc, PORT);
        s->start_accept();

        for (auto i = 0u; i<boost::thread::hardware_concurrency(); ++i)
            pool.create_thread([&]{svc.run();});

        std::cerr << "Shutdown in 10 seconds...\n";
        boost::this_thread::sleep_for(boost::chrono::seconds(10)); // auto-shutdown in 10s

        std::cerr << "Shutdown...\n";
    } // destructor of Server // TODO thread-safe

    pool.join_all();
}

      

What a seal

$ (for a in {1..20000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 6767)& done)

$ time ./test | sort | uniq -c | sort -n | tail
Shutdown in 10 seconds...
Shutdown...
      1 Server::~Server()
      1 void Session::AsyncReadString()virtual Session::~Session()
      1 void Session::AsyncReadString()void Session::ReadHandler(const boost::system::error_code&, std::size_t)
      1 void Session::ReadHandler(const boost::system::error_code&, std::size_t)void Session::AsyncReadString()
      3 
   4523 void Session::AsyncReadString()
   4524 void Session::ReadHandler(const boost::system::error_code&, std::size_t)
   4526 virtual Session::~Session()

real    0m10.128s
user    0m0.430s
sys 0m0.262s

      


ยน The whole point of asynchrony is to avoid blocking I / O operations, which can take longer. And the idea behind locking is to never hold locks for a longer time or they will kill scalability

+1


source







All Articles