Server alternates between users instead of broadcast

I was working on a messaging system where users dial an IP / Port server and that server takes messages and broadcasts them to all other users on the server. The whole program was based on an echo server that I rewrote from scratch. For each server.accept () socket, it creates two threads, one for receiving messages and one for sending them. The two threads are linked by the DatagramPacket system, so if the server receives a message from one socket, it sends it to all other users because their threads are listening on the same thing, that's where I run into problems; everything works fine except for the fact that the user who receives the message alternates to the login time.

An example of a problem when connecting two clients:

Client # 1 sends 10 messages:

0
1
2
3
4
5
6
7
8
9

      

The server receives all of them.

Client # 1 receives:

1
3
5
7
9

      

Client # 2 receives:

0
2
4
6
8

      

Here is the code for the Client:

import java.io.*;
import java.util.*;
import java.net.*;
import java.awt.*;
import java.awt.event.*;
import javax.swing.*;

public class MessageClient {
    public static void main(String[] args) {
        System.out.println("Starting Message System...");
        Scanner in = new Scanner(System.in);
        MessageClient mc = new MessageClient();
        String input;
        System.out.println(":System Started, type help for help.");
        System.out.print(":");
        while (true) {
            input = in.nextLine();
            if (input.equalsIgnoreCase("HELP")) {
                mc.printHelp();
                System.out.print(":");
            } else if (input.equalsIgnoreCase("QUIT")) {
                System.exit(0);
            } else if (input.equalsIgnoreCase("CONNECT")) {
                mc.connect(in);
                in.nextLine();
                System.out.print(":");
            } else {
                System.out.print("No command found.\n:");
            }
        }
    }
    public static void printHelp() {
        System.out.println("help\tShow this prompt\nconnect\tStarts a new connection\nquit\tQuit the program\nexit\tExit a connection");
    }
    public void connect(Scanner in) {
        Socket soc = null;
        InetAddress addr = null;
        System.out.print("IP_ADDRESS/HOST:");
        String ip = in.nextLine();
        System.out.print("PORT:");
        int port = in.nextInt();
        try {
            System.out.println("Attempting to connect to HOST:\'" + ip + "\' on PORT:\'" + port + "\'");
            addr = InetAddress.getByName(ip);
            soc = new Socket(addr, port);
        } catch(Exception e) {
            System.out.println("Error connecting to server: " + e.getLocalizedMessage());
            return;
        }
        SwingUtilities.invokeLater(new MessageGUI(ip + ":" + port, soc));
    }
}

class MessageGUI implements Runnable {
    public MessageGUI(String windowName, Socket server) {
        JFrame window = new JFrame(windowName);
        window.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
        window.setSize(500, 300);
        window.setLayout(new BorderLayout());
        window.setVisible(true);

        MessageReceive mr = new MessageReceive(server);
        mr.setEditable(false);
        mr.setBackground(new Color(0, 0, 0));
        mr.setForeground(new Color(0, 255, 0));
        mr.setVisible(true);
        new Thread(mr).start();
        window.add(mr, BorderLayout.CENTER);

        DataOutputStream dos = null;
        try {
            dos = new DataOutputStream(server.getOutputStream());
        } catch(Exception e) {
            System.out.println("Error creating output stream to server: " + e.getLocalizedMessage());
        }

        JTextField input = new JTextField();
        input.addActionListener(new MessageSend(server, input, dos));
        input.setBackground(new Color(0, 0, 0));
        input.setForeground(new Color(0, 255, 0));
        window.add(input, BorderLayout.PAGE_END);

        System.out.println("Displaying connection.");
    }
    public void run() {}
}

class MessageReceive extends JTextArea implements Runnable {
    protected Socket server;
    public MessageReceive(Socket server) {
        this.server = server;
    }
    public void run() {
        DataInputStream dis = null;
        int bytes;
        try {
            dis = new DataInputStream(server.getInputStream());
        } catch(Exception e) {
            System.out.println("Error connecting server: " + e.getLocalizedMessage());
        }
        this.append("Connected.\n");
        while (true) {
            try {
                while ((bytes = dis.read()) != -1) this.append(String.valueOf((char) bytes));
            } catch(Exception e) {
                System.out.println("Error reading from server: " + e.getLocalizedMessage());
                return;
            }
        }
    }
}

class MessageSend implements ActionListener {
    protected Socket server;
    protected JTextField input;
    protected DataOutputStream dos = null;
    public MessageSend(Socket server, JTextField input, DataOutputStream dos) {
        this.server = server;
        this.input = input;
        this.dos = dos;
    }
    public void actionPerformed(ActionEvent ae) {
        try {
            dos.writeBytes(input.getText() + "\n");
            input.setText("");
        } catch(Exception e) {
            System.out.println("Error writing to server output stream: " + e.getLocalizedMessage());
        }
    }
}

      

Here is the code for the server:

import java.io.*;
import java.net.*;
import java.util.*;

public class MessageServer {
    public static void main(String[] args) {
        int port = Integer.parseInt(args[0]);
        MessageServer ms = new MessageServer();
        System.out.println("Starting server on port " + port + "...");
        ServerSocket ss = null;
        try {
            ss = new ServerSocket(port);
        } catch(Exception e) {
            System.out.println("Error creating server: " + e.getLocalizedMessage());
            System.exit(0);
        }
        System.out.println("Created server port, now waiting for users...");
        Socket client = null;
        DatagramSocket ds = null;
        try {
            ds = new DatagramSocket(4);
        } catch(Exception e) {
            System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
            e.printStackTrace();
            System.exit(0);
        }
        while (true) {
            try {
                client = ss.accept();
                System.out.println("Connecting user: " + client.getInetAddress().toString());
            } catch(Exception e) {
                System.out.println("Error on server: " + e.getLocalizedMessage());
            }
            new MessageConnectionIn(client, ds).start();
            new MessageConnectionOut(client, ds).start();
        }
    }
}

class MessageConnectionOut extends Thread {
    protected Socket client;
    public DatagramSocket ds;
    public MessageConnectionOut(Socket client, DatagramSocket ds) {
        this.client = client;
        this.ds = ds;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":OUT");
        try {
            System.out.println("OUT:User connected.");
            DataOutputStream dos = new DataOutputStream(client.getOutputStream());
            while (true) {
                byte[] outgoing = new byte[4096];
                DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
                ds.receive(dp);
                dos.writeChars(new String(outgoing) + "\n");
            }
        } catch(Exception e) {
            System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

class MessageConnectionIn extends Thread {
    protected Socket client;
    public DatagramSocket ds;
    public MessageConnectionIn(Socket client, DatagramSocket ds) {
        this.client = client;
        this.ds = ds;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":IN");
        try {
            System.out.println("IN:User connected.");
            BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
            while (true) {
                String lineIn = br.readLine();
                byte[] input = lineIn.getBytes();
                System.out.println(lineIn);
                byte[] output = new byte[4096];
                for (int c = 0; c < output.length; c++) output[c] = 0x0;
                for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
                DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
                ds.send(dp);
            }
        } catch(Exception e) {
            System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

      

UPDATE:

I tried to replace all DatagramSockets with MulticastSockets and add it to the group when I declared it, MessageServer.main (). The same problem occured.

Multicast code:

public class MessageServer {
    public static void main(String[] args) {
        int port = Integer.parseInt(args[0]);
        MessageServer msgsrv = new MessageServer();
        System.out.println("Starting server on port " + port + "...");
        ServerSocket ss = null;
        try {
            ss = new ServerSocket(port);
        } catch(Exception e) {
            System.out.println("Error creating server: " + e.getLocalizedMessage());
            System.exit(0);
        }
        System.out.println("Created server port, now waiting for users...");
        Socket client = null;
        MulticastSocket ms = null;
        try {
            ms = new MulticastSocket(4);
            ms.joinGroup(InetAddress.getByName("225.65.65.65"));
        } catch(Exception e) {
            System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
            e.printStackTrace();
            System.exit(0);
        }
        while (true) {
            try {
                client = ss.accept();
                System.out.println("Connecting user: " + client.getInetAddress().toString());
            } catch(Exception e) {
                System.out.println("Error on server: " + e.getLocalizedMessage());
            }
            new MessageConnectionIn(client, ms).start();
            new MessageConnectionOut(client, ms).start();
        }
    }
}

class MessageConnectionOut extends Thread {
    protected Socket client;
    public MulticastSocket ms;
    public MessageConnectionOut(Socket client, MulticastSocket ms) {
        this.client = client;
        this.ms = ms;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":OUT");
        try {
            System.out.println("OUT:User connected.");
            DataOutputStream dos = new DataOutputStream(client.getOutputStream());
            while (true) {
                byte[] outgoing = new byte[4096];
                DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
                ms.receive(dp);
                dos.writeChars(new String(outgoing) + "\n");
                System.out.println("SENT_TO:" + this.getName());
            }
        } catch(Exception e) {
            System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

class MessageConnectionIn extends Thread {
    protected Socket client;
    public MulticastSocket ms;
    public MessageConnectionIn(Socket client, MulticastSocket ms) {
        this.client = client;
        this.ms = ms;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":IN");
        try {
            System.out.println("IN:User connected.");
            BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
            while (true) {
                String lineIn = br.readLine();
                byte[] input = lineIn.getBytes();
                System.out.println(lineIn);
                byte[] output = new byte[4096];
                for (int c = 0; c < output.length; c++) output[c] = 0x0;
                for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
                DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
                ms.send(dp);
            }
        } catch(Exception e) {
            System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

      

+3


source to share


1 answer


This sample can help you.

There are 2 threads for the server.

  • One for reading UDP messages. I used 2 different ports as I just want to avoid messages being read by the same process. I don't have two machines to test. Tested on my localhost.
  • Another stream will broadcast the UDP messages received by the read stream.

There is a thread safe list that acts between threads as data synchronization. The received data is added to the list. Broadcast protocol poll the list for data if there is any broadcast, and sleep for 500 microseconds. Themes are created with the help of the artist.

private final static String INET_ADDR = "224.0.0.3";
private final static int PORT1 = 8888;
private final static int PORT2 = 8889;
private static List<String> threadSafeList = null;

public static void main(String[] args) throws UnknownHostException, InterruptedException {
    threadSafeList = new CopyOnWriteArrayList<String>();
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    executorService.submit(new Sender(InetAddress.getByName(INET_ADDR), PORT1));
    executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR), PORT2));
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;

    public Receiver (InetAddress inetAddress, int port) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver " + this.port);
        byte[] buf = new byte[256];

        try {
            MulticastSocket clientSocket = new MulticastSocket(this.port);
            //Joint the Multicast group.
            clientSocket.joinGroup(this.addr);

            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf, buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
                threadSafeList.add(msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;

    public Sender (InetAddress inetAddress, int port) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Sender Address " + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        // Open a new DatagramSocket, which will be used to send the data.
        while (true) {
            try (DatagramSocket serverSocket = new DatagramSocket()) {
                for (Iterator<String> it = threadSafeList.iterator(); !threadSafeList.isEmpty() && it.hasNext(); ) {

                    String i = it.next();
                    String msg = "Sent message no " + i;

                    // Create a packet that will contain the data
                    // (in the form of bytes) and send it.
                    DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(), msg.getBytes().length, this.addr, this.port);
                    serverSocket.send(msgPacket);

                    threadSafeList.remove(i);
                    System.out.println("Server sent packet with msg: " + msg);
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            try {
                System.out.println("going for sleep"); 
                Thread.currentThread().sleep(500);
                System.out.println("going for sleeping"); 
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

      

The design can be changed by modifying the sender flow creation. Whenever a receiver thread receives a message, create a sender thread and broadcast and terminate that thread. You can use a reusable thread pool instead of the fixed thread pool used in this example. And you can pass the message as an argument when creating the sender stream (so the list might not be needed at all) and send . I have a code.

    public static void main(String[] args) throws UnknownHostException,
        InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR),
            PORT2, executorService));
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

      

And inner classes,

    private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;
    private ExecutorService executorService;

    public Receiver(InetAddress inetAddress, int port,
            ExecutorService executorService) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
        this.executorService = executorService;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver " + this.port);
        byte[] buf = new byte[256];

        try {
            MulticastSocket clientSocket = new MulticastSocket(this.port);
            // Joint the Multicast group.
            clientSocket.joinGroup(this.addr);

            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf,
                        buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
                executorService.submit(new Sender(InetAddress
                        .getByName(INET_ADDR), PORT1, msg));
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

      

and



    private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;
    private String message;

    public Sender(InetAddress inetAddress, int port, String message)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
        this.message = message;
    }

    public void run() {
        System.out.println(" @ Sender Address "
                + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        try {
            DatagramSocket serverSocket = new DatagramSocket();
            String msg = "Sent message no " + message;

            // Create a packet that will contain the data
            // (in the form of bytes) and send it.
            DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(),
                    msg.getBytes().length, this.addr, this.port);
            serverSocket.send(msgPacket);

            System.out.println("Server sent packet with msg: " + msg);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

      

The client has 2 streams,

  • One for reading the broadcaster's messages.
  • Another to send 5 messages in a loop. Once it completes, the thread will shut down.

There is no communication here, so there is no safe stream list.

    private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;

    public Receiver(InetAddress inetAddress, int port)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver port " + this.port);
        byte[] buf = new byte[256];

        try (MulticastSocket clientSocket = new MulticastSocket(this.port)) {
            // Joint the Multicast group.
            clientSocket.joinGroup(this.addr);
            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf,
                        buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

      

AND

    private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;

    public Sender(InetAddress inetAddress, int port)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Sender Address "
                + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        // Open a new DatagramSocket, which will be used to send the data.
        try {
            DatagramSocket serverSocket = new DatagramSocket();

            for (int i = 0; i < 5; i++) {

                System.out.println("inside loop");
                String msg = "Sent message no 2" + i;

                // Create a packet that will contain the data
                // (in the form of bytes) and send it.
                DatagramPacket msgPacket = new DatagramPacket(
                        msg.getBytes(), msg.getBytes().length, this.addr,
                        this.port);
                System.out.println("Before sending to socket");
                serverSocket.send(msgPacket);

                System.out.println("Server sent packet with msg: " + msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

      

This sample code example is further expanded .

The code should be configured correctly.

0


source







All Articles