Spring & RabbitMQ - register queue at runtime

How do I create a new queue associated with the Fanout exchange and start it at runtime? So far I have this:

Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-message-ttl", 600000L);

    GenericBeanDefinition runtimeQueueBean = new GenericBeanDefinition();
    runtimeQueueBean.setBeanClass(Queue.class);
    runtimeQueueBean.setLazyInit(false);
    runtimeQueueBean.setAbstract(false);
    runtimeQueueBean.setAutowireCandidate(true);
    ConstructorArgumentValues queueConstrArgs = new ConstructorArgumentValues();
    queueConstrArgs.addIndexedArgumentValue(0, queueName);
    queueConstrArgs.addIndexedArgumentValue(1, true);
    queueConstrArgs.addIndexedArgumentValue(2, false);
    queueConstrArgs.addIndexedArgumentValue(3, false);
    queueConstrArgs.addIndexedArgumentValue(4, arguments);
    runtimeQueueBean.setConstructorArgumentValues(queueConstrArgs);
    this.context.registerBeanDefinition("nejm", runtimeQueueBean);


    GenericBeanDefinition runtimeFanoutExchange = new GenericBeanDefinition();
    runtimeFanoutExchange.setBeanClass(FanoutExchange.class);
    runtimeFanoutExchange.setLazyInit(false);
    runtimeFanoutExchange.setAbstract(false);
    runtimeFanoutExchange.setAutowireCandidate(true);
    ConstructorArgumentValues constructorArgumentValues = new ConstructorArgumentValues();
    constructorArgumentValues.addIndexedArgumentValue(0, "staticCache");
    runtimeFanoutExchange.setConstructorArgumentValues(constructorArgumentValues);
    this.context.registerBeanDefinition("staticCache", runtimeFanoutExchange);


    GenericBeanDefinition runtimeBinding = new GenericBeanDefinition();
    runtimeBinding.setBeanClass(Binding.class);
    runtimeBinding.setLazyInit(false);
    runtimeBinding.setAbstract(false);
    runtimeBinding.setAutowireCandidate(true);
    constructorArgumentValues = new ConstructorArgumentValues();
    constructorArgumentValues.addIndexedArgumentValue(0, queueName);
    constructorArgumentValues.addIndexedArgumentValue(1, Binding.DestinationType.QUEUE);
    constructorArgumentValues.addIndexedArgumentValue(2, "staticCache");
    constructorArgumentValues.addIndexedArgumentValue(3, "");
    runtimeBinding.setConstructorArgumentValues(constructorArgumentValues);
    this.context.registerBeanDefinition("bajnding", runtimeBinding);


    GenericBeanDefinition runtimeMessageListenerAdapter = new GenericBeanDefinition();
    runtimeMessageListenerAdapter.setBeanClass(MessageListenerAdapter.class);
    runtimeMessageListenerAdapter.setLazyInit(false);
    runtimeMessageListenerAdapter.setAbstract(false);
    runtimeMessageListenerAdapter.setAutowireCandidate(true);
    constructorArgumentValues = new ConstructorArgumentValues();
    constructorArgumentValues.addIndexedArgumentValue(0, this);
    constructorArgumentValues.addIndexedArgumentValue(1, new RuntimeBeanReference("jackson2JsonMessageConverter"));
    runtimeMessageListenerAdapter.setConstructorArgumentValues(constructorArgumentValues);
    this.context.registerBeanDefinition("mla2", runtimeMessageListenerAdapter);



    GenericBeanDefinition runtimeContainerExchange = new GenericBeanDefinition();
    runtimeContainerExchange.setBeanClass(SimpleMessageListenerContainer.class);
    runtimeContainerExchange.setLazyInit(false);
    runtimeContainerExchange.setAbstract(false);
    runtimeContainerExchange.setAutowireCandidate(true);
    MutablePropertyValues propertyValues = new MutablePropertyValues();
    propertyValues.addPropertyValue("connectionFactory", new RuntimeBeanReference("connectionFactory"));
    propertyValues.addPropertyValue("queues", new RuntimeBeanReference("nejm"));
    propertyValues.addPropertyValue("messageListener", new RuntimeBeanReference("mla2"));
    runtimeContainerExchange.setPropertyValues(propertyValues);
    this.context.registerBeanDefinition("defqueue", runtimeContainerExchange);

      

The problem is that the queue / exchange is not created at runtime and I have to manually start the listener (unless I call this.context.start () - but I don't know if this is the right way).

My question is, is there a way to magically fire all the generated beans at runtime (something like this.context.refresh () - this exists but doesn't work or seems to be)?

UPDATE:

This is how I am doing it at the moment (this approach works, but I don't know how correct it is)

    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-message-ttl", 600000L);
    Queue queue = new Queue(queueName, true, false, false, arguments);

    FanoutExchange exchange = new FanoutExchange("staticCache");

    Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, "staticCache", "", null);

    rabbitAdmin.declareQueue(queue);
    rabbitAdmin.declareExchange(exchange);
    rabbitAdmin.declareBinding(binding);

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(this.connectionFactory);
    container.setQueues(queue);
    container.setMessageListener(new MessageListenerAdapter(this, this.converter));

    container.start();

      

+3


source to share


1 answer


You cannot do this. BeanDefinition

and this.context.registerBeanDefinition

are intended for the analysis phase of the application context life cycle.

If the application already exists, the application context will not accept any BeanDefinition

.

Yes, you can declare Queue

it Binding

to be exchanged manually at runtime too. And also you can create SimpleMessageListenerContainer

manually and make it work.



And what's good for you is that you just have to use your classes manually to instantiate. You just need to provide the container environment (for example, inject this.applicationContext

into an object listenerContainer

).

You must use a RabbitAdmin

bean from your own to declare a broker applicationContext

.

On the other hand, there is no reason to start a new one listenerContainer

manually. The existing one can ship with your new one Queue

at runtime.

+1


source







All Articles