Reply send packet according to packet sequence identifier
I have a third part server and I am writing a dll interface for it, my clients are using my DLL to communicate with the server.
The protocol uses a long tcp connection, all traffic comes from this tcp connection. There may be multiple packets sending / receiving at the same time, for example, send_msg
and heart_beat
so I have to use async_write / async_read to prevent blocking operation. Each packet has its own sequence identifier. For example, I send a message with sequence id == 123, then I have to wait for the server to respond to a packet with id == 123.
UPDATE : There is no guarantee that the server will respond to the packet ok. If two packages sent in the order A
, B
the order of the answer may be response_B
, response_A
. Sequence ID is the only way to identify a packet.
the package looks like this:
4bytes size + 2 bytes crc check + 4 bytes SEQUENCE ID + ....
The problem is that my clients who are using my dll prefer to use blocking blocks, they don't like callbacks. For example, they like
bool DLL_EXPORT send_msg(...) {
// send msg via long_connection, the seq_id==123
// recv msg via long_connection, just want the packet with seq_id==123 (How?)
return if_msg_sent_successfully;
}
I am using boost asio, I do not know if there is any useful boost class or design patterns suitable for this scenario, here is a solution I can come up with:
// use a global std::map<seq_id, packet_content>
std::map<int, std::string> map_received;
Every time receives the packet, records seq_id
and packet_body
in map_received, and the function send_msg
looks like
bool DLL_EXPORT send_msg(...) {
// async_send msg via long_connection, the seq_id==123
while(not_time_out) {
if(map_received.find(123) != map_received.end()) {
// get the packet and erase the 123 pair
}
Sleep(10); // prevent cpu cost
}
return if_msg_sent_successfully;
}
This solution is ugly, there must be a better design for that. Any idea?
source to share
You can use std::promise
and std::future
(or dummy copies of them if not already in C ++ 11).
The idea is to store std::shared_ptr<std::promise<bool>>
with the current sequence id as a key on the map whenever a request is sent. In the send blocking function, you expect the appropriate one to be set std::future<bool>
. When a response packet is received, the corresponding character std::promise<bool>
is removed from the card and set to a value, and the send function is "unlocked".
The following example is loosely based on the chat client example from the Boost asio documentation and is not complete (for example, a connection part is missing, header and body reads are not shared, etc.). Since it is incomplete, I have not tested the runtime, but it should illustrate the idea.
#include <thread>
#include <map>
#include <future>
#include <iostream>
#include <boost/asio.hpp>
class Message
{
public:
enum { header_length = 10 };
enum { max_body_length = 512 };
Message()
: body_length_(0)
{
}
const char* data() const
{
return data_;
}
char* data()
{
return data_;
}
std::size_t length() const
{
return header_length + body_length_;
}
const char* body() const
{
return data_ + header_length;
}
char* body()
{
return data_ + header_length;
}
private:
char data_[header_length + max_body_length];
std::size_t body_length_;
};
class Client
{
public:
Client(boost::asio::io_service& io_service)
: io_service(io_service),
socket(io_service),
current_sequence_id(0)
{}
bool blocking_send(const std::string& msg)
{
auto future = async_send(msg);
// blocking wait
return future.get();
}
void start_reading()
{
auto handler = [this](boost::system::error_code ec, std::size_t /*length*/)
{
if(!ec)
{
// parse response ...
int response_id = 0;
auto promise = map_received[response_id];
promise->set_value(true);
map_received.erase(response_id);
}
};
boost::asio::async_read(socket,
boost::asio::buffer(read_msg_.data(), Message::header_length),
handler);
}
void connect()
{
// ...
start_reading();
}
private:
std::future<bool> async_send(const std::string& msg)
{
auto promise = std::make_shared<std::promise<bool>>();
auto handler = [=](boost::system::error_code ec, std::size_t /*length*/){std::cout << ec << std::endl;};
boost::asio::async_write(socket,
boost::asio::buffer(msg),
handler);
// store promise in map
map_received[current_sequence_id] = promise;
current_sequence_id++;
return promise->get_future();
}
boost::asio::io_service& io_service;
boost::asio::ip::tcp::socket socket;
std::map<int, std::shared_ptr<std::promise<bool>>> map_received;
int current_sequence_id;
Message read_msg_;
};
int main()
{
boost::asio::io_service io_service;
Client client(io_service);
std::thread t([&io_service](){ io_service.run(); });
client.connect();
client.blocking_send("dummy1");
client.blocking_send("dummy2");
return 0;
}
source to share