Application Layer Load Balancer for Process Pool

We have legacy monolithic C ++ software that acts as a TCP request-response server. This software is single threaded and can only handle one request at a time. We currently have a fixed pool of such processes to serve multiple clients at the same time.

At times, clients experience significant delays in processing requests due to the large number of messages. We currently have an idea to solve this problem by introducing a kind of proxy between clients and workers:

Proxy

We want to use the following features for this proxy:

  • Application Layer Load Balancing: Propagate Requests Between Workers by Validating Request Context and Client ID
  • Control and monitoring of the life cycle of work processes.
  • Create new workflows (on different computers) to handle peaks

We actually want it to behave like an ExecutorService in Java, but with worker processes instead of threads. The current idea is to implement this Balancer in Java based on a Jetty or Tomcat server, with an internal message queue and servlets sending requests to the worker processes.

But I'm wondering: are there existing solutions (preferably in Java) that would automate this process? What would be the easiest way to implement such a proxy?

UPDATE:

What I do with the request context - it's good that the C ++ server is really messy software. In fact, every time it gets a different context, it updates the internal cache to match that context. For example, if you ask the server to provide you with some data in English, it reloads the internal cache to English. If the next request is in French, it reloads the cache again. Obviously, I would like to minimize the number of cache reloads by forwarding requests in a more intelligent way.

The communication protocol is homemade (TCP / IP based), but it is relatively easy to extract the context part from it.

Load balancing is currently implemented on the client side, so each client is configured to know all the server nodes and send requests to them in a loop. There are several problems with this approach: complex client-side connection management, incorrect handling of multiple clients that do not know about each other, cannot manage the lifecycle of a node. We are unable to resolve the listed refactoring issues.

We will most likely end up with a homemade forwarding solution, but I'm still wondering if there are existing products at least for process management? Ideally, this would be a Java application server that can:

  • Create child nodes (other Java processes)
  • Monitoring their life cycle
  • Communicate with them by protocol

Perhaps this functionality is already implemented on some existing application servers? This will make the problem much easier!

+3


source to share


4 answers


As far as process management is concerned, you can easily achieve your goal by mixing the functionality of the Apache Commons Exec library , which can help create new worker instances, with the Apache Commons Pool Library , which will manage running instances.

The implementation is very straightforward because the shared pool ensures that you can use one object at a time until it is returned to the pool. If the object is not returned to the pool, the shared pool spawns a new instance for you. You can monitor the lifespan of your workers either by adding a watchdog service (from apache commons exec) - the watchdog can kill an instance that hasn't been used for some time, or you can also use the community pool itself, for example by calling the pool. ClearOldest (). You can also see how many requests are being processed (how many workers are active) at the moment by calling pool.getNumActive (). Go to javadoc GenericKeyedObjectPool to see more.

The implementation can be done with one simple servlet running on the Tomcat server. This servlet will create the pool and just ask for the pool for the new worker by calling pool.borowObject (options). Within the parameters, you define what characteristics your worker should have in order to process the request (in your case, the parameters should include the language). In the absence of such a worker (for example, no worker for french langauge), the pool will not create a new worker for you. Also, if there is a worker, but a worker is currently processing another request, the pool will also create a new worker for you (so you have two workers working in the same language). The worker will be ready to handle a new request again when pool.returnObject (parameters, instance) is called.



The whole implementation took me less than 200 lines of code (see below for complete code). The code includes a situation where a worker process gets killed from the outside or crashes (see WorkersFactory.activateObject ()).

IMHO: using Apache Cammel is not a good option for you because it is too big a tool and it is designed as a mediation bus between different message formats. You don't need to do conversions, you don't need to handle different message formats. Go for a simple solution.

package com.myapp;

import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Objects;

public class BalancingServlet extends javax.servlet.http.HttpServlet {

    private final WorkersPool workersPool;

    public BalancingServlet() {
        workersPool = new WorkersPool(new WorkersFactory());
    }


    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        response.getWriter().println("Balancing");

        String language = request.getParameter("language");
        String someOtherParam = request.getParameter("other");
        WorkerParameters workerParameters = new WorkerParameters(language, someOtherParam);

        String requestSpecificParam1 = request.getParameter("requestParam1");
        String requestSpecificParam2 = request.getParameter("requestParam2");

        try {
            WorkerInstance workerInstance = workersPool.borrowObject(workerParameters);
            workerInstance.handleRequest(requestSpecificParam1, requestSpecificParam2);
            workersPool.returnObject(workerParameters, workerInstance);

        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

class WorkerParameters {
    private final String workerLangauge;
    private final String someOtherParam;

    WorkerParameters(String workerLangauge, String someOtherParam) {
        this.workerLangauge = workerLangauge;
        this.someOtherParam = someOtherParam;
    }

    public String getWorkerLangauge() {
        return workerLangauge;
    }

    public String getSomeOtherParam() {
        return someOtherParam;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        WorkerParameters that = (WorkerParameters) o;

        return Objects.equals(this.workerLangauge, that.workerLangauge) && Objects.equals(this.someOtherParam, that.someOtherParam);
    }

    @Override
    public int hashCode() {
        return Objects.hash(workerLangauge, someOtherParam);
    }
}

class WorkerInstance {
    private final Thread thread;
    private WorkerParameters workerParameters;

    public WorkerInstance(final WorkerParameters workerParameters) {
        this.workerParameters = workerParameters;

        // launch the process here   
        System.out.println("Spawing worker for language: " + workerParameters.getWorkerLangauge());

        // use commons Exec to spawn your process using command line here

        // something like


        thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String line = "C:/Windows/notepad.exe" ;
                    final CommandLine cmdLine = CommandLine.parse(line);

                    final DefaultExecutor executor = new DefaultExecutor();
                    executor.setExitValue(0);
//                    ExecuteWatchdog watchdog = new ExecuteWatchdog(60000); // if you want to kill process running too long
//                    executor.setWatchdog(watchdog);

                    int exitValue = executor.execute(cmdLine);
                    System.out.println("process finished with exit code: " + exitValue);
                } catch (IOException e) {
                    throw new RuntimeException("Problem while executing application for language: " + workerParameters.getWorkerLangauge(), e);
                }


            }
        });

        thread.start();


        System.out.println("Process spawned for language: " + workerParameters.getWorkerLangauge());


    }

    public void handleRequest(String someRequestParam1, String someRequestParam2) {
        System.out.println("Handling request for extra params: " + someRequestParam1 + ", " + someRequestParam2);

        // communicate with your application using parameters here

        // communcate via tcp or whatever protovol you want using extra parameters: someRequestParam1, someRequestParam2


    }

    public boolean isRunning() {
        return thread.isAlive();
    }


}

class WorkersFactory extends BaseKeyedPooledObjectFactory<WorkerParameters, WorkerInstance> {

    @Override
    public WorkerInstance create(WorkerParameters parameters) throws Exception {
        return new WorkerInstance(parameters);
    }

    @Override
    public PooledObject<WorkerInstance> wrap(WorkerInstance worker) {
        return new DefaultPooledObject<WorkerInstance>(worker);
    }

    @Override
    public void activateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
            throws Exception {
        System.out.println("Activating worker for lang: " + worker.getWorkerLangauge());

        if  (! p.getObject().isRunning()) {
            System.out.println("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
            throw new RuntimeException("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
        }
    }

    @Override
    public void passivateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
            throws Exception {
        System.out.println("Passivating worker for lang: " + worker.getWorkerLangauge());
    }

}

class WorkersPool extends GenericKeyedObjectPool<WorkerParameters, WorkerInstance> {

    public WorkersPool(KeyedPooledObjectFactory<WorkerParameters, WorkerInstance> factory) {
        super(factory);
    }
}

      

+4


source


It sounds like you are looking for a messaging system. Apache Camel offers many experts for integrating different protocols and adding custom processing logic (using XML or java API). Apache Camel has implemented many ( Enterprise Integration Patterns ).

It has Apache MINA integration which can be a good starting point to look at as well.



It is unclear how you can launch new instances on other computers on the fly. I think you will need at least some agents running on these machines that you can request to start a new server.

+1


source


As you are stuck with TCP / IP and want a content-based routing to be generated - you need to do something yourself.

I think you could get some ready-made integration framework and write adapters for your protocol and handlers for routing.

Based on my experience: Apache Camel is the way to go.

As a first design step, I'll take:

  • Camel as an integration bus
  • Apache Active MQ as a message broker (JMS)
  • Mysql or Postgre SQL or HSQLDB as database (depending on load and size requirements)
  • Constructive adapter for TCP / IP server witch receives connections from clients and sends data to JMS
  • The project routing component that receives messages from JMS and redirects it to the desired queue on one of the servers based on: server context, load, queue length.
  • The development endpoint component receives the message from the JMS and is sent to the real server, receives the response and sends it to the JMS.

Camel server's role is to create workflows of all steps in the process: It can receive messages from JMS and call java methods, receive return data from the call and push to JMS, etc. Therefore, you do not need to do it yourself.

The role of JMS is to balance workload across multiple hosts of each type: adapter, router, and endpoints.

Open questions remain:

  • Monitoring and automatically starting node pools on servers is an issue worth understanding in the discussion.
+1


source


Not sure what you want to do by checking the request context and client id and how this affects routing. Also what is the format of the request content? If it's not text, but binary, then things can get complicated.

What kind of process control and monitoring are you planning to do?

Even though you are comparing this to a Java Executor service, the expansion of the problem space from threads to process was ok, but it grows exponentially when you talk about multiple PCs. Now we are talking about clustering.

This proprietary TCP / IP request / response requires configurable load balancing. Now if I am not in the development field, you can also start looking at strategies for dropping a node and adding more nodes to scale your system further.

What is the current component that handles the mapping of requests into a combined process? Is it possible to reconfigure this configurable component to request a route to a federated process or one of a node in a set of configured nodes?

0


source







All Articles