TCP server with bi-directional communication

I tried to create a TCP service that can achieve what I want, but unfortunately I am stuck with the last obstacle.

Scenario: One server instance works with 10 connected clients, the client will send a command and receive a response. This all works great. However, the last scenario doesn't work.

When the client issues the UPDATE command, the server then has to send a message to all connected clients that they need to do something.


Examples of messages:

1
Client A          GetTime ----->          Server
Client A          <----- Time is...       Server 

2
Client A          UPDATE ------>          Server
Client A          <------- Ack            Server
Client A          <------- DoUpdate       Server
Client B          <------- DoUpdate       Server
Client C          <------- DoUpdate       Server

      

Comms 1 abover works mainly because of a call to send and a call to reeive, but for comms 2 I can't train how I can achieve this, at least without opening a second port for communication, which is not ideal.


Current attempt based on Microsoft article

Server

class Program
{
    public static int Main(String[] args)
    {
        AsynchronousSocketListener.StartListening();
        return 0;
    }
}
public class StateObject
{
    // Client  socket.
    public Socket WorkSocket = null;
    // Size of receive buffer.
    public const int BufferSize = 1024;
    // Receive buffer.
    public byte[] Buffer = new byte[BufferSize];
    // Received data string.
    public StringBuilder Sb = new StringBuilder();
}

public class AsynchronousSocketListener
{
    // Thread signal.
    public static ManualResetEvent AllDone = new ManualResetEvent(false);

    public static void StartListening()
    {
        // Data buffer for incoming data.
        //var bytes = new Byte[1024];

        // Establish the local endpoint for the socket.
        // The DNS name of the computer
        // running the listener is "host.contoso.com".
    //??IPHostEntry ipHostInfo = Dns.Resolve(Dns.GetHostName());
    //??IPAddress ipAddress = ipHostInfo.AddressList[0];
    //??IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 3030);

    // Create a TCP/IP socket.
    var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    // Bind the socket to the local endpoint and listen for incoming connections.
    try
    {
        listener.Bind(new IPEndPoint(IPAddress.Any, 3030));
        //listener.Bind(localEndPoint);
        listener.Listen(100);

        while (true)
        {
            // Set the event to nonsignaled state.
            AllDone.Reset();

            // Start an asynchronous socket to listen for connections.
            Console.WriteLine("Waiting for a connection...");
            listener.BeginAccept((AcceptCallback), listener);

            // Wait until a connection is made before continuing.
            AllDone.WaitOne();
        }

    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }

    Console.WriteLine("\nPress ENTER to continue...");
    Console.Read();

}

public static void AcceptCallback(IAsyncResult ar)
{
    // Signal the main thread to continue.
    AllDone.Set();

    // Get the socket that handles the client request.
    var listener = (Socket)ar.AsyncState;
    Socket handler = listener.EndAccept(ar);

    // Create the state object.
    var state = new StateObject {WorkSocket = handler};
    handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, ReadCallback, state);
}

public static void ReadCallback(IAsyncResult ar)
{
    // Retrieve the state object and the handler socket
    // from the asynchronous state object.
    var state = (StateObject)ar.AsyncState;
    Socket handler = state.WorkSocket;

    // Read data from the client socket. 
    int bytesRead = handler.EndReceive(ar);

    if (bytesRead > 0)
    {
        // There  might be more data, so store the data received so far.
        state.Sb.Append(Encoding.ASCII.GetString(
            state.Buffer, 0, bytesRead));

        // Check for end-of-file tag. If it is not there, read 
        // more data.
        var content = state.Sb.ToString();
        if (content.IndexOf("<EOF>", StringComparison.Ordinal) > -1)
        {
            // All the data has been read from the 
            // client. Display it on the console.
            Console.WriteLine("Read {0} bytes from socket. \n Data : {1}",
                content.Length, content);
            // Echo the data back to the client.
            Send(handler, content);
        }
        else
        {
            // Not all data received. Get more.
            handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, ReadCallback, state);
        }
    }
}

private static void Send(Socket handler, String data)
{
    // Convert the string data to byte data using ASCII encoding.
    var byteData = Encoding.ASCII.GetBytes(data);

    // Begin sending the data to the remote device.
    handler.BeginSend(byteData, 0, byteData.Length, 0, SendCallback, handler);
}

private static void SendCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the socket from the state object.
        var handler = (Socket)ar.AsyncState;

        // Complete sending the data to the remote device.
        int bytesSent = handler.EndSend(ar);
        Console.WriteLine("Sent {0} bytes to client.", bytesSent);

        handler.Shutdown(SocketShutdown.Both);
        handler.Close();

    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

/*
public static int Main(String[] args)
{
    StartListening();
    return 0;
}
 * */
}

      

Client code

class Program
{
    public static int Main(String[] args)
    //static void Main(string[] args)
    {
        Console.Title = "Client ";
        AsynchronousClient.StartClient();
        Console.ReadLine();
        return 0;
    }
}

public class StateObject
{
// Client socket.
public Socket WorkSocket = null;
// Size of receive buffer.
public const int BufferSize = 256;
// Receive buffer.
public byte[] Buffer = new byte[BufferSize];
// Received data string.
public StringBuilder Sb = new StringBuilder();
}

public class AsynchronousClient
{
// The port number for the remote device.
private const int Port = 3030;

// ManualResetEvent instances signal completion.
private static readonly ManualResetEvent ConnectDone =
    new ManualResetEvent(false);
private static readonly ManualResetEvent SendDone =
    new ManualResetEvent(false);
private static readonly ManualResetEvent ReceiveDone =
    new ManualResetEvent(false);

// The response from the remote device.
private static String _response = String.Empty;

public static void StartClient()
{
    // Connect to a remote device.
    try
    {
        // Establish the remote endpoint for the socket.
        // The name of the 
        // remote device is "host.contoso.com".
        //IPHostEntry ipHostInfo = Dns.Resolve("host.contoso.com");
        //??IPHostEntry ipHostInfo = Dns.Resolve("localhost");
        //??IPAddress ipAddress = ipHostInfo.AddressList[0];
        //??IPEndPoint remoteEP = new IPEndPoint(ipAddress, port);

        // Create a TCP/IP socket.
        var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        // Connect to the remote endpoint.

        //client.BeginConnect(remoteEP,
            //new AsyncCallback(ConnectCallback), client);
        var remoteEP = new IPEndPoint(IPAddress.Parse("127.0.0.1"), Port);
        client.BeginConnect(remoteEP, ConnectCallback, client);
        ConnectDone.WaitOne();

        // set receive to another thread so we can constantly receive, doesn't work as intended
        //var thread = new Thread(() => ReadThread(client));
        //thread.Start();

        // Send test data to the remote device.
        Send(client, "This is a test<EOF>");
        SendDone.WaitOne();

        //test remove
        // Receive the response from the remote device.
        Receive(client);
        ReceiveDone.WaitOne();

        // Write the response to the console.
        Console.WriteLine("Response received : {0}", _response);

        // Release the socket.
        //client.Shutdown(SocketShutdown.Both);
        //client.Close();

    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

// doesn't work as expected
private static void ReadThread(object ar)
{
    var client = (Socket)ar;
    while (true)
    {
        Receive(client);
        ReceiveDone.WaitOne();

        // Write the response to the console.
        Console.WriteLine("Response received : {0}", _response);
    }
}

private static void ConnectCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the socket from the state object.
        var client = (Socket)ar.AsyncState;

        // Complete the connection.
        client.EndConnect(ar);

        Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint);

        // Signal that the connection has been made.
        ConnectDone.Set();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

private static void Receive(Socket client)
{
    try
    {
        // Create the state object.
        var state = new StateObject {WorkSocket = client};

        // Begin receiving the data from the remote device.
        client.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, ReceiveCallback, state);
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

private static void ReceiveCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the state object and the client socket 
        // from the asynchronous state object.
        var state = (StateObject)ar.AsyncState;
        Socket client = state.WorkSocket;

        // Read data from the remote device.
        int bytesRead = client.EndReceive(ar);

        if (bytesRead > 0)
        {
            // There might be more data, so store the data received so far.
            state.Sb.Append(Encoding.ASCII.GetString(state.Buffer, 0, bytesRead));

            // Get the rest of the data.
            client.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, ReceiveCallback, state);
        }
        else
        {
            // All the data has arrived; put it in response.
            if (state.Sb.Length > 1)
            {
                _response = state.Sb.ToString();
            }
            // Signal that all bytes have been received.
            ReceiveDone.Set();
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

private static void Send(Socket client, String data)
{
    // Convert the string data to byte data using ASCII encoding.
    var byteData = Encoding.ASCII.GetBytes(data);

    // Begin sending the data to the remote device.
    client.BeginSend(byteData, 0, byteData.Length, 0, SendCallback, client);
}

private static void SendCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the socket from the state object.
        var client = (Socket)ar.AsyncState;

        // Complete sending the data to the remote device.
        int bytesSent = client.EndSend(ar);
        Console.WriteLine("Sent {0} bytes to server.", bytesSent);

        // Signal that all bytes have been sent.
        SendDone.Set();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

/*
public static int Main(String[] args)
{
    StartClient();
    return 0;
}
*/
}

      


Previous working system

Server:

class Program
{

    private static byte[] buffer = new byte[1024];
    public static Socket _serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    public static List<Socket> clientSockets = new List<Socket>();

    static void Main(string[] args)
    {
        Console.Title = "Server, " + clientSockets.Count + " clients are connected";
        SetupServer();
        Console.ReadLine();
    }
    public static void SetupServer()
    {
        Console.WriteLine("Setting up server...");
        _serverSocket.Bind(new IPEndPoint(IPAddress.Any, 3030));
        _serverSocket.Listen(10);
        _serverSocket.BeginAccept(AcceptCallback, null);
        Console.ReadLine();// stops cmd from closing
    }

    public static void AcceptCallback(IAsyncResult AR)
    {
        Socket socket = _serverSocket.EndAccept(AR);
        if (!clientSockets.Contains(socket))
            clientSockets.Add(socket);
        IPEndPoint remoteIPEndPoint = socket.RemoteEndPoint as IPEndPoint;

        Console.WriteLine(remoteIPEndPoint.Address);

        Console.WriteLine("Client Connected");
        Console.Title = "Server, " + clientSockets.Count + " clients are connected";
        socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, RecieveCallBack, socket);
        _serverSocket.BeginAccept(AcceptCallback, null);
    }

    private static void RecieveCallBack(IAsyncResult AR)
    {
        var socket = (Socket)AR.AsyncState;
        int received = socket.EndReceive(AR);
        var databuff = new byte[received];
        Array.Copy(buffer, databuff, received);

        string s = Encoding.ASCII.GetString(databuff);
        Console.WriteLine("Text Received: " + s);
        string response = string.Empty;

        switch (s.ToLower())
        {
            case "get time":
                response = DateTime.Now.ToLongTimeString();
                break;
            case "hello":
                response = "olleh";
                break;
            case "update clients":
                response = "";
                SendData("Ack", socket);
                doUpdateClients();
                break;
            default:
                response = "Invavlid Request";
                break;
        }

        SendData(response, socket);
    }

    private static void SendData(string Data, Socket socket)
    {
        byte[] data = Encoding.ASCII.GetBytes(Data);
        socket.BeginSend(data, 0, data.Length, SocketFlags.None, sendCallback, socket);
        socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, RecieveCallBack, socket);
    }

    private static void doUpdateClients()
    {
        // need to send an update message to all the clients
        var upd = new Thread((UpdateClients));
        upd.Start();
    }

    private static void UpdateClients()
    {
        Thread.Sleep(5000);
        foreach (var sock in clientSockets)
        {
            SendData("UpdateClients", sock);
        }
    }
    private static void sendCallback(IAsyncResult AR)
    {
        var socket = (Socket)AR.AsyncState;
        socket.EndSend(AR);
    }
    //
}

}

      

Customer:

class Program
{

    private static byte[] buffer = new byte[1024];
    public static Socket _clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    static void Main(string[] args)
    {
        Console.Title = "Client ";
        LoopConnect();
        //ReceiveLoopStart();
        //_clientSocket.Listen(10);
        SendLoop();
        Console.ReadLine();
    }

    private static void LoopConnect()
    {
        while (!_clientSocket.Connected)
        {
            try
            {
                _clientSocket.Connect(IPAddress.Parse("127.0.0.1"), 3030);
            }
            catch (SocketException se)
            {

            }
        }
        Console.WriteLine("Connected");
    }
    private static void ReceiveLoopStart()
    {
        //_clientSocket.Bind(new IPEndPoint(IPAddress.Any, 3030));
        //_clientSocket.Listen(10);
        _clientSocket.BeginAccept(AcceptCallback, null);

        Thread receiveThread = new Thread(ReceiveLoop);
        receiveThread.Start();
    }

    private static void ReceiveLoop()
    {
        _clientSocket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, RecieveCallBack, _clientSocket);
        _clientSocket.BeginAccept(AcceptCallback, null);
    }

    private static void RecieveCallBack(IAsyncResult AR)
    {
        int received = _clientSocket.EndReceive(AR);
        var databuff = new byte[received];
        Array.Copy(buffer, databuff, received);

        string s = Encoding.ASCII.GetString(databuff);
        Console.WriteLine("Text Received: " + s);
        string response = string.Empty;

        switch (s.ToLower())
        {
            case "get time":
                response = DateTime.Now.ToLongTimeString();
                break;
            case "hello":
                response = "olleh";
                break;
            default:
                response = "Invavlid Request";
                break;
        }

    }
    public static void AcceptCallback(IAsyncResult AR)
    {
        Socket socket = _clientSocket.EndAccept(AR);

        Console.WriteLine("Client Connected");
        socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, RecieveCallBack, socket);
        _clientSocket.BeginAccept(AcceptCallback, null);
    }



    private static void SendLoop()
    {
        while (true)
        {
            Console.Write("Enter Request: ");
            string req = Console.ReadLine();

            var buffer = Encoding.ASCII.GetBytes(req);
            _clientSocket.Send(buffer);

            var tempBuff = new byte[1024];
            int rec = _clientSocket.Receive(tempBuff);

            var data = new byte[rec];
            Array.Copy(tempBuff, data, rec);

            Console.WriteLine("Received: " + Encoding.ASCII.GetString(data));
        }
    }


}
}

      

+3


source to share


2 answers


I don't have an example with my sleeve here in C #, but you need to learn how to use the select api.
You only need one thread for this. You are using the same thread to handle all the sockets that are in use at any given time.
If no one is connected, you only have a listening jack. And then you only use the select api to see what is happening on that socket. Choice is a blocking call when no timeout is specified. If data is available, it means you can call accept. The result of acceptance is that you know a different socket. Now you select 2 sockets. The select option will block until one of these sockets has data. Maybe listening socket again, so you get a different socket after you call accept. Now you select 3 sockets. Suppose now there is data available in one of the commit sockets, you will see that using the select api is correct. And then you can use any of these sockets to send anything on top of it, except of course the listening socket.which is not intended to be sent.

More information can be found here: http://www.codeproject.com/Articles/20066/A-scalable-client-server-using-select-socket-funct



He uses what I have explained and gives more detailed explanations.

+2


source


I don't fit your design, but it looks like you can't even link to other clients. why don't you keep a collection of sockets like this:

 private List<Socket> _handlers = new List<Socket>();
 public static void AcceptCallback(IAsyncResult ar)
 {   
     Socket handler = listener.EndAccept(ar);    
     var state = new StateObject {WorkSocket = handler};
     handlers.Add(handler);
     handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0,   ReadCallback, state);
 }

      



And then receive a receipt according to the type of message or what you have to notify those clients (all of them not just the ones currently being passed to ReciveCallback.

 public static void ReadCallback(IAsyncResult ar)
 {
      if("<EOF>")
      {
         foreach(var h in _handlers)
         {
             Send(h,data);
         } 
      } 
 }

      

+1


source







All Articles