C ++: Boost.Asio: starting SSL Server session on new thread
I wrote a couple of server / client programs based on this example for the server and I am made of all communication protocols. The server needs to receive multiple connections from multiple connections from multiple clients, so I want to decouple sessions from each other and I hope I can do that with std::thread
.
It looks simple, but I don't know how to do it. All examples on the internet seem to show how to run the function in parallel, but it doesn't seem to show how to create an object on a new thread.
I've added a few comments to explain my understanding of this session mechanism.
The code I want to use is the following:
class server
{
public:
server(boost::asio::io_service& io_service, unsigned short port)
: io_service_(io_service),
acceptor_(io_service,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
context_(io_service, boost::asio::ssl::context::sslv23)
{
//some code...
//notice the next lines here create the session object, and then recurs that to receive more connections
session* new_session = new session(io_service_, context_);
//this is called to accept more connections if available, the callback function is called with start() to start the session
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
void handle_accept(session* new_session, const boost::system::error_code& error)
{
if (!error)
{
//so the session starts here, and another object is created waiting for another session
new_session->start();
new_session = new session(io_service_, context_);
//now this function is, again, a call back function to make use of new_session, the new object that waiting for a connection
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
else
{
delete new_session;
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ssl::context context_;
};
How do I create these sessions in a new one std::thread
?
If you need more information, please ask. Thank.
source to share
I refactored the example for the linked answer mixed in with your example code.
It demonstrates the same principle, but io_service works for as many threads as your hardware supports (i.e. thread::hardware_concurrency
).
Trapped here
- (total) lifetime of objects
- thread safety
Most Asio objects are not thread safe. Therefore, you need to sync access to them. Old-fashioned mutual exclusion ( std::mutex
etc.) doesn't work very well in this scenario (because you really don't want to block every completion handler, and you don't want to re-block asynchronous calls ยน.
Boost Asio has a concept strand
for this situation:
- http://www.boost.org/doc/libs/1_58_0/doc/html/boost_asio/reference/io_service__strand.html
- Why is there a line for each connection when using boost :: asio?
I have chosen the simplest solution to perform all operations on the "socket" (ssl stream / connection / session or, as you would logically refer to it) on the chain.
And on top of that, I did all the acceptor_
serialized access on my own chain.
A hybrid solution could move all connections to the
io_service
+ pool and keep the listener (Server
) on a separate oneio_service
, which can then be its own implicit chain
Note . About the shutdown sequence:
- I made the destruction
Server
explicit, so we can stopacceptor_
onstrand
(!!) as needed. - Threads
pool
will not terminate until all connections are closed. If you want to control this, see the linked answer again (which shows how to preserve weak pointers tracking connections). Alternatively, you can follow all asynchronous operations in a session with timeouts and checkServer
for a shutdown signal.
Demo code
#include <boost/array.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>
namespace bs = boost::system;
namespace ba = boost::asio;
namespace bas = ba::ssl;
using ba::ip::tcp;
using SslContext = boost::shared_ptr<bas::context>;
typedef ba::ip::tcp::acceptor acceptor_type;
typedef bas::stream<tcp::socket> stream_type;
const short PORT = 26767;
class Session : public boost::enable_shared_from_this<Session>
{
public:
typedef boost::shared_ptr<Session> Ptr;
Session(ba::io_service& svc, SslContext ctx) : strand_(svc), ctx_(ctx), stream(svc, *ctx) { }
virtual ~Session() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
stream_type::lowest_layer_type& socket() { return stream.lowest_layer(); }
void start() { AsyncReadString(); }
void Stop() { stream.shutdown(); }
protected:
ba::io_service::strand strand_;
SslContext ctx_;
stream_type stream;
ba::streambuf stream_buffer;
std::string message;
void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
ba::async_read_until(
stream,
stream_buffer,
'\0', // null-char is a delimiter
strand_.wrap(
boost::bind(&Session::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "\n";
message = s;
ba::async_write(
stream,
ba::buffer(message.c_str(), message.size()+1),
strand_.wrap(
boost::bind(&Session::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}
std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}
void ReadHandler(const bs::error_code &ec, std::size_t /*bytes_transferred*/)
{
std::cout << __PRETTY_FUNCTION__ << "\n";
if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
void WriteHandler(const bs::error_code &/*ec*/, std::size_t /*bytes_transferred*/) {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
};
class Server : public boost::enable_shared_from_this<Server>
{
public:
Server(ba::io_service& io_service, unsigned short port) :
strand_ (io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
context_ (boost::make_shared<bas::context>(io_service, bas::context::sslv23))
{
//
}
void start_accept() {
auto new_session = boost::make_shared<Session>(strand_.get_io_service(), context_);
acceptor_.async_accept(new_session->socket(),
strand_.wrap(boost::bind(&Server::handle_accept, this, new_session, ba::placeholders::error)));
}
void stop_accept() {
auto keep = shared_from_this();
strand_.post([keep] { keep->acceptor_.close(); });
}
void handle_accept(Session::Ptr new_session, const bs::error_code& error)
{
if (!error) {
new_session->start();
start_accept(); // uses `acceptor_` safely because of the strand_
}
}
~Server() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
private:
ba::io_service::strand strand_;
tcp::acceptor acceptor_;
SslContext context_;
};
int main() {
ba::io_service svc;
boost::thread_group pool;
{
auto s = boost::make_shared<Server>(svc, PORT);
s->start_accept();
for (auto i = 0u; i<boost::thread::hardware_concurrency(); ++i)
pool.create_thread([&]{svc.run();});
std::cerr << "Shutdown in 10 seconds...\n";
boost::this_thread::sleep_for(boost::chrono::seconds(10)); // auto-shutdown in 10s
std::cerr << "Shutdown...\n";
} // destructor of Server // TODO thread-safe
pool.join_all();
}
What a seal
$ (for a in {1..20000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 6767)& done)
$ time ./test | sort | uniq -c | sort -n | tail
Shutdown in 10 seconds...
Shutdown...
1 Server::~Server()
1 void Session::AsyncReadString()virtual Session::~Session()
1 void Session::AsyncReadString()void Session::ReadHandler(const boost::system::error_code&, std::size_t)
1 void Session::ReadHandler(const boost::system::error_code&, std::size_t)void Session::AsyncReadString()
3
4523 void Session::AsyncReadString()
4524 void Session::ReadHandler(const boost::system::error_code&, std::size_t)
4526 virtual Session::~Session()
real 0m10.128s
user 0m0.430s
sys 0m0.262s
ยน The whole point of asynchrony is to avoid blocking I / O operations, which can take longer. And the idea behind locking is to never hold locks for a longer time or they will kill scalability
source to share