How to create a durable publisher / subscriber theme using AMQP.Net Lite library in .NET Core named clientId and theme name and title

I'm new to ActiveMQ but I've tried and can create a durable publisher, but I can't set the Client ID because I can't find any properties with the Client ID and can't even google it. It will be of great help if I get some sample code.

Note: Not with NMS protocol. I am using AMQP.Net Lite with ActiveMQ in the .NET Core Web API to create a reliable publisher / subscriber with ClientId.

+3


source to share


1 answer


To create a strong subscription to ActiveMQ or ActiveMQ Artemis, your client needs to do a couple of things.

  • Set a unique "client-ID" for the client using the AMQP "ContainerId" property, which you can see in the code below. The client must use the same container ID every time he connects and re-establishes his long-term subscription.

  • Create a new session.

  • Create a new receiver for the address (in this case, Topic) that you want to subscribe to. The strong subscription source must have an address specified for the Subject address (in ActiveMQ, this is subject: // name). The source also requires an expiray policy set to NEVER, the source must also have a lifetime state set to UNSETTLED_STATE, and the distribution mode is set to COPY.

  • Once the receiver has been created, you can either set an onMessage handler at the beginning, or be called to consume the messages (assuming you've given the broker credit to send you the message).


using System;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
using Amqp.Sasl;
using System.Threading;


      



namespace aorg.apache.activemq.examples { class Program { private static string DEFAULT_BROKER_URI = "amqp://localhost:5672"; private static string DEFAULT_CONTAINER_ID = "client-1"; private static string DEFAULT_SUBSCRIPTION_NAME = "test-subscription"; private static string DEFAULT_TOPIC_NAME = "test-topic";

static void Main(string[] args) { Console.WriteLine("Starting AMQP durable consumer example."); Console.WriteLine("Creating a Durable Subscription"); CreateDurableSubscription(); Console.WriteLine("Attempting to recover a Durable Subscription"); RecoverDurableSubscription(); Console.WriteLine("Unsubscribe a durable subscription"); UnsubscribeDurableSubscription(); Console.WriteLine("Attempting to recover a non-existent durable subscription"); try { RecoverDurableSubscription(); throw new Exception("Subscription was not deleted."); } catch (AmqpException) { Console.WriteLine("Recover failed as expected"); } Console.WriteLine("Example Complete."); } // Creating a durable subscription involves creating a Receiver with a Source that // has the address set to the Topic name where the client wants to subscribe along // with an expiry policy of 'never', Terminus Durability set to 'unsettled' and the // Distribution Mode set to 'Copy'. The link name of the Receiver represents the // desired name of the Subscription and of course the Connection must carry a container // ID uniqure to the client that is creating the subscription. private static void CreateDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source source = CreateBasicSource(); // Create a Durable Consumer Source. source.Address = DEFAULT_TOPIC_NAME; source.ExpiryPolicy = new Symbol("never"); source.Durable = 2; source.DistributionMode = new Symbol("copy"); ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, source, null); session.Close(); } finally { connection.Close(); } } // Recovering an existing subscription allows the client to ask the remote // peer if a subscription with the given name for the current 'Container ID' // exists. The process involves the client attaching a receiver with a null // Source on a link with the desired subscription name as the link name and // the broker will then return a Source instance if this current container // has a subscription registered with that subscription (link) name. private static void RecoverDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source recoveredSource = null; ManualResetEvent attached = new ManualResetEvent(false); OnAttached onAttached = (link, attach) => { recoveredSource = (Source) attach.Source; attached.Set(); }; ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached); attached.WaitOne(10000); if (recoveredSource == null) { // The remote had no subscription matching what we asked for. throw new AmqpException(new Error()); } else { Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address); Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy); Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable); Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode); } session.Close(); } finally { connection.Close(); } } // Unsubscribing a durable subscription involves recovering an existing // subscription and then closing the receiver link explicitly or in AMQP // terms the close value of the Detach frame should be 'true' private static void UnsubscribeDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source recoveredSource = null; ManualResetEvent attached = new ManualResetEvent(false); OnAttached onAttached = (link, attach) => { recoveredSource = (Source) attach.Source; attached.Set(); }; ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached); attached.WaitOne(10000); if (recoveredSource == null) { // The remote had no subscription matching what we asked for. throw new AmqpException(new Error()); } else { Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address); Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy); Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable); Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode); } // Closing the Receiver vs. detaching it will unsubscribe receiver.Close(); session.Close(); } finally { connection.Close(); } } // Creates a basic Source type that contains common attributes needed // to describe to the remote peer the features and expectations of the // Source of the Receiver link. private static Source CreateBasicSource() { Source source = new Source(); // These are the outcomes this link will accept. Symbol[] outcomes = new Symbol[] {new Symbol("amqp:accepted:list"), new Symbol("amqp:rejected:list"), new Symbol("amqp:released:list"), new Symbol("amqp:modified:list") }; // Default Outcome for deliveries not settled on this link Modified defaultOutcome = new Modified(); defaultOutcome.DeliveryFailed = true; defaultOutcome.UndeliverableHere = false; // Configure Source. source.DefaultOutcome = defaultOutcome; source.Outcomes = outcomes; return source; } }

}

+2


source







All Articles