Boost.Asio: Is it good to use "io_service" for every connection / socket?

I want to create an application that implements a model with one thread per connection. But every connection must be stopped. I tried this boost.asio example which implements the blocking version of what I want. But after a little polling, I found out that there is no reliable way to stop the session of this example. So I tried to implement my own. I had to use async functions. Since I only want a thread to manage one connection and there is no way to control which asynchronous job is used for that thread, I decided to use io_service

for each connection / socket / thread.

So this is a good approach, do you know the best approach?

My code is here, so you can explore and review it:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;
class Server;

// A connection has its own io_service and socket
class Connection {
protected:
    ba::io_service service;
    socket_type sock;
    b::thread *thread;
    ba::streambuf stream_buffer;    // for reading etc
    Server *server;
    void AsyncReadString() {
        ba::async_read_until(
            sock,
            stream_buffer,
            '\0',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::string newstr = s + '\0';  // add a null char
        ba::async_write(
            sock,
            ba::buffer(newstr.c_str(), newstr.size()),
            b::bind(&Connection::WriteHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    virtual void Session() {
        AsyncReadString();
        service.run();  // run at last
    }
    std::string ExtractString() {
        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }
    virtual void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    virtual void WriteHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
    }
public:
    Connection(Server *s) :
        service(),
        sock(service),
        server(s),
        thread(NULL)
    {  }
    socket_type& Socket() {
        return sock;
    }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Connection::Session, this));
    }
    void Join() {
        if (thread) thread->join();
    }
    void Stop() {
        service.stop();
    }
    void KillMe();
    virtual ~Connection() {
    }
};

// a server also has its own io_service but it only used for accepting
class Server {
public:
    std::list<Connection*> Connections;
protected:
    ba::io_service service;
    acceptor_type acc;
    b::thread *thread;
    virtual void AcceptHandler(const bs::error_code &ec) {
        if (!ec) {
            Connections.back()->Start();
            Connections.push_back(new Connection(this));
            acc.async_accept(
                Connections.back()->Socket(),
                b::bind(&Server::AcceptHandler,
                    this,
                    ba::placeholders::error));
        }
        else {
            // do nothing
            // since the new session will be deleted
            // automatically by the destructor
        }
    }
    virtual void ThreadFunc() {
        Connections.push_back(new Connection(this));
        acc.async_accept(
            Connections.back()->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error));
        service.run();
    }
public:
    Server():
        service(),
        acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(NULL)
    {  }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Server::ThreadFunc, this));
    }
    void Stop() {
        service.stop();
    }
    void Join() {
        if (thread) thread->join();
    }
    void StopAllConnections() {
        for (auto c : Connections) {
            c->Stop();
        }
    }
    void JoinAllConnections() {
        for (auto c : Connections) {
            c->Join();
        }
    }
    void KillAllConnections() {
        for (auto c : Connections) {
            delete c;
        }
        Connections.clear();
    }
    void KillConnection(Connection *c) {
        Connections.remove(c);
        delete c;
    }
    virtual ~Server() {
        delete thread;
        // connection should be deleted by the user (?)
    }
};

void Connection::KillMe() {
    server->KillConnection(this);
}

int main() {
    try {
        Server s;
        s.Start();
        std::cin.get(); // wait for enter
        s.Stop();   // stop listening first
        s.StopAllConnections(); // interrupt ongoing connections
        s.Join();   // wait for server, should return immediately
        s.JoinAllConnections(); // wait for ongoing connections
        s.KillAllConnections(); // destroy connection objects
        // at the end of scope, Server will be destroyed
    }
    catch (std::exception &e) {
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }
    return 0;
}

      

+3


source to share


1 answer


Not. Using an object io_service

for each compound is definitely a scent. Moreover, you also start each connection on a dedicated thread.

At this point, you have to ask yourself, what bought you asynchronously? You can have all your code synchronously and have exactly the same number of threads, etc.

Obviously, you want to multiplex your connections to a much smaller number of services. In practice, there are several reasonable models such as

  • one io_service

    with one service thread (usually good). No tasks queuing up for a service can ever get blocked for a significant amount of time or wait times

  • one io_service

    with multiple threads executing handlers. The number of threads in the pool must be sufficient to service the max. the number of concurrently supported CPU intensive tasks (or, again, latency will start to rise)

  • The io_service for each thread, typically one thread for each logical core, and the affinity of the thread to "bind" to that core. This can be ideal for cache localization.

UPDATE: Demo

Here's a demo showing idiomatic styling using option 1.from above:

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket   socket_type;

const short PORT = 11235;

// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
    typedef boost::shared_ptr<Connection> Ptr;
protected:
    socket_type    sock;
    ba::streambuf  stream_buffer; // for reading etc
    std::string    message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        ba::async_read_until(
            sock,
            stream_buffer,
            '\0',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        message = s;

        ba::async_write(
            sock,
            ba::buffer(message.c_str(), message.size()+1),
            b::bind(&Connection::WriteHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }
    void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) 
    {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }
public:
    Connection(ba::io_service& svc) : sock(svc) { }

    virtual ~Connection() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

    socket_type& Socket() { return sock;          } 
    void Session()        { AsyncReadString();    } 
    void Stop()           { sock.cancel();        }
};

// a server also has its own io_service but it only used for accepting
class Server {
public:
    std::list<boost::weak_ptr<Connection> > m_connections;
protected:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _work;
    acceptor_type _acc;
    b::thread thread;

    void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
        if (!ec) {
            accepted->Session();
            DoAccept();
        }
        else {
            // do nothing the new session will be deleted automatically by the
            // destructor
        }
    }

    void DoAccept() {
        auto newaccept = boost::make_shared<Connection>(_service);

        _acc.async_accept(
            newaccept->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error,
                newaccept
            ));
    }

public:
    Server():
        _service(),
        _work(ba::io_service::work(_service)),
        _acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(b::bind(&ba::io_service::run, &_service))
    {  }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        Stop();
        _work.reset();
        if (thread.joinable()) thread.join();
    }

    void Start() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        DoAccept();
    }

    void Stop() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        _acc.cancel();
    }

    void StopAllConnections() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        for (auto c : m_connections) {
            if (auto p = c.lock())
                p->Stop();
        }
    }
};

int main() {
    try {
        Server s;
        s.Start();

        std::cerr << "Shutdown in 2 seconds...\n";
        b::this_thread::sleep_for(b::chrono::seconds(2));

        std::cerr << "Stop accepting...\n";
        s.Stop();

        std::cerr << "Shutdown...\n";
        s.StopAllConnections(); // interrupt ongoing connections
    } // destructor of Server will join the service thread
    catch (std::exception &e) {
        std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }

    std::cerr << "Byebye\n";
}

      



I changed main()

to 2 seconds without user intervention. This is so that I can demonstrate it Live On Coliru (of course this limited the number of client processes).

If you run it with a lot ( lot ) of clients using for example

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 11235)& done; wait)

      

You will find that the two second windows handle them all:

$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 28214
      2 hello world 4554
      2 hello world 6216
      2 hello world 7864
      2 hello world 9966
      2 void Server::Stop()
   1000 std::string Connection::ExtractString()
   1001 virtual Connection::~Connection()
   2000 void Connection::AsyncReadString()
   2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

      

If you really go crazy and lift 1000

, for example. 100000

, you get similar things:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 5483
      2 hello world 579
      2 hello world 5865
      2 hello world 938
      2 void Server::Stop()
      3 hello world 9613
   1741 std::string Connection::ExtractString()
   1742 virtual Connection::~Connection()
   3482 void Connection::AsyncReadString()
   3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

      

With repeated two-second server runs.

+3


source







All Articles