How do I implement a custom mailbox using a Disorganizer?

I have some experience with LMAX Disruptor and I would really like to implement a custom mailbox using disruptor.

Are there any recommendations? Is it possible? What are the limitations of Akka Actor mailboxes?

+3


source to share


1 answer


As stated here, you just need to implement a few methods - of course, you must write / read messages directly by pointing to a circular buffer. You should also keep in mind:

  • disruptor usually predetermines a lot of memory, so using one disruptor per actor is a bad idea, you can use one router actor (with the offending one inside) in combination with BalancingPool

    .

  • if you want to have different consumption of message type, separate consumers for logging, repair, etc. - you must pass another RingBufferPointer instance (smthng-like) as a parameter to your mailbox (with the same start value for logging, different start values ​​for different message types), but still use the same Disruptor. Therefore, different mailboxes will reference the same breaker.

  • you will lose low-level control when creating messages, retrieving, etc., so no selection will be highlighted by default.

  • you can also use the ring story to restore the inactive state (in preRestart

    or in the supervisor).

What LMAX says:

It works differently with more conventional approaches, so you use it in a slightly different way than you might. For example, applying a template to your system is not as easy as replacing all of your queues with a magic ring buffer. We've got code samples to guide you, a growing number of blogs and articles giving an overview of how it works, a technical article goes into some details as you would expect, and performance tests provide examples on how to use Disruptor http: // mechanitis .blogspot.com / 2011/06 / dissecting-disruptor-whats-so-special.html

And here is a short comparison of Queues / Disruptors / Actors



In pseudo-scala code, it would be something like:

object MyUnboundedMailbox {
  val buffer = new RingBuffer()

  class MyMessageQueue(val startPointer: Pointer, readerPointer: Pointer, writerPointer: Pointer) extends MessageQueue {

    // these should be implemented; queue used as example
    def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
      writerPointer.allocate(() => handle) //allocate one element and set, if you want different message types - you should allocate big amount of data before and block when it ends (to not interfere with another messages), so it has to be bounded queue then  

    }
    def dequeue(): Envelope = readerPointer.poll()
    def numberOfMessages: Int = writerPointer - readerPointer //should be synchronized
    def hasMessages: Boolean = readerPointer == writerPointer //should be synchronized
    def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { }
  }

  trait MyUnboundedMessageQueueSemantics 

}

class MyUnboundedMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType
  with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {

  import MyUnboundedMailbox._
  final override def create(owner: Option[ActorRef],
                            system: Option[ActorSystem]): MessageQueue = {

    val pointer = ring.newPointer
    val read = pointer.copy
    val write = pointer.copy
    new MyMessageQueue(pointer, read, write) 
  }
    // you may use another strategy here based on owner (you can access name and path here), 
    // so for example may allocate same pointers for same prefixes in the name or path 
}

      

You can use the immutable MyMessageQueue.startPointer to access the message log during failover (you can also look at akka Event Sourcing for an analogy).

Using the UnboundedQueue approach does not guarantee message delivery here, as a very old unreachable message could be overwritten by a new version if the ring "ends", so you might need a BoundedQueue like here .

+3


source







All Articles