Implementing a Turnelike Operator with RxJava

I need help implementing a tournel-like operator in RxJava (RxScala). I've pondered this for quite some time, but it seems like I'm stuck.

The function type must be as follows:

def turnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T]

      

The idea is that the operator's behavior should be very similar to a real turnstile. There are people who come ( queue

), and there is turnstile

one who is either ready to receive a new single person (a true

) at the turnstile, you can imagine him as a token inserted into the turnstile) or closed ( false

at the turnstile, canceling the previous token). true

Only one person can enter the turnstile for each element .

In addition, inserting several tokens in a row (several elements true

in the turnstile) without passing a person is the same as inserting only one token, the turnstile does not count tokens.

In other words, the turnstile is initially closed. When an element appears in it true

, it opens up for one person. If a person appears, he passes (to the exit) and the turnstile closes again. If an element appears in the turnstile false

, the turnstile is also closed.

queue       ----A---B-------------C--D--
turnstile   --T--------T--T-T-T-T------T
            ============================
output      ----A------B----------C----D

      

Marble diagram showing an open turnstile waiting for person A, then person B waiting for the turnstile to open, then several tokens that behave like one person C, but person D has to wait for a new token again

----A----B--
--T---T-F-T-
============
----A-----B-

      

Marble diagram showing how an element false

in the turnstile closes the turnstile again.

Any help is appreciated. I think the only way to implement this without writing a custom operator would be to use the operator somehow zip

, because it is probably the only operator that makes elements from one sequence wait for elements from another (or is there any other, m not know?). But I don't need to fasten some elements of the turnstile depending on whether they are paired with a person or not ...

I think this is an interesting problem and I am very interested to know about some good solution.

+3


source to share


2 answers


So, I think I have a cleaner, completely Rx solution. It was actually a pretty funny problem. If it works for your needs, I think it turned out to be really nifty, although it took quite a while to achieve it.

Unfortunately I don't know Scala, so you have to deal with my Java8 iambads.: D

All implementation:

public static Observable<String> getTurnstile(final Observable<String> queue, final Observable<Boolean> tokens) {
    return queue.publish(sharedQueue ->
            tokens.switchMap(token -> token ? sharedQueue.limit(1) : Observable.empty()));
}

      



So what happens here we use publish

to make a shared observable queue of people that we can subscribe to multiple times. Inside this we are using switchMap

for our token, which means that anytime a new Observable is emitted from the switchMap, it discards the last one and subscribes to the new one. Every time the token is true, it makes a new subscription to the people queue (and multiple truths in the string are fine as it cancels old subscriptions). When false, it just dumps out an empty Observable so it doesn't waste time.

And some (missing) test files:

@RunWith(JUnit4.class)
public class TurnstileTest {
    private final TestScheduler scheduler = new TestScheduler();
    private final TestSubscriber<String> output = new TestSubscriber<>();

    private final TestSubject<Boolean> tokens = TestSubject.create(scheduler);
    private final TestSubject<String> queue = TestSubject.create(scheduler);

    @Before
    public void setup() {
        Turnstile.getTurnstile(queue, tokens).subscribe(output);
    }

    @Test
    public void allowsOneWithTokenBefore() {
        tokens.onNext(true, 0);
        queue.onNext("Bill", 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void tokenBeforeIsCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(false, 1);
        queue.onNext("Bill", 2);

        assertNonePassed();
    }

    @Test
    public void tokensBeforeAreCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        tokens.onNext(false, 3);
        queue.onNext("Bill", 4);

        assertNonePassed();
    }

    @Test
    public void eventualPassThroughAfterFalseTokens() {
        tokens.onNext(false, 0);
        queue.onNext("Bill", 1);
        tokens.onNext(false, 2);
        tokens.onNext(false, 3);
        queue.onNext("Jane", 4);
        queue.onNext("Bob", 5);
        tokens.onNext(true, 6);
        tokens.onNext(true, 7);
        tokens.onNext(false, 8);
        tokens.onNext(false, 9);
        queue.onNext("Phil", 10);
        tokens.onNext(false, 11);
        tokens.onNext(false, 12);
        tokens.onNext(true, 13);

        assertPassedThrough("Bill", "Jane", "Bob");
    }

    @Test
    public void allowsOneWithTokenAfter() {
        queue.onNext("Bill", 0);
        tokens.onNext(true, 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void multipleTokenEntriesBeforeOnlyAllowsOneAtATime() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        queue.onNext("Bill", 3);
        tokens.onNext(true, 4);
        tokens.onNext(true, 5);
        queue.onNext("Jane", 6);
        queue.onNext("John", 7);

        assertPassedThrough("Bill", "Jane");
    }

    @Test
    public void noneShallPassWithoutToken() {
        queue.onNext("Jane", 0);
        queue.onNext("John", 1);

        assertNonePassed();
    }

    private void closeSubjects() {
        scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
        scheduler.triggerActions();
        tokens.onCompleted();
        queue.onCompleted();
        scheduler.triggerActions();
    }

    private void assertNonePassed() {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList());
    }

    private void assertPassedThrough(final String... names) {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList(names));
    }
}

      

Let me know if you find any edge cases that don't work with this, especially if it has realtime issues as the tests are obviously in a controlled environment.

+2


source


OK, I found one solution inspired by Dave Sexton's comment. In the end, I didn't use zip

it as I just couldn't figure out how to do it.

I basically implemented the turnstile as a state machine with three state variables: whether it is locked or not, the queue of items waiting to pass through the turnstile, and the last item to pass through the turnstile (they are collected at the end to get the actual exit).



The input to a state machine is a flow of transition requests that is combined with two input flows: a flow of lock / unlock requests and a flow of items to pass through the turnstile. I just handle transitions with scan

and then collect

passed items from the resulting states.

/** sample elements from queue through turnstile, one at a time
*
* @param queue source of elements to pass through the turnstile.
* @param turnstile For every `true` in the turnstile pass one element through from the queue
* @tparam T type of the elements
* @return the source of queue elements passing through the turnstile
*/
def queueThroughTurnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T] = {
  import scala.collection.immutable.Queue

  case class State(isOpen: Boolean, elementsInQueue: Queue[T], maybeLastEmittedElement: Option[T])
  sealed abstract class Transition
  case object Lock extends Transition
  case object Unlock extends Transition
  case class Element(element: T) extends Transition

  val initialState = State(isOpen = false, Queue.empty, None)

  queue.map(element ⇒ Element(element))
    .merge(turnstile map (unlock ⇒ if (unlock) Unlock else Lock))
    .scan(initialState) { case (State(isOpen, elementsInQueue, _), transition) ⇒ transition match {
    case Lock ⇒ State(isOpen = false, elementsInQueue, None)
    case Unlock ⇒ {
      if (elementsInQueue.isEmpty)
        State(isOpen = true, elementsInQueue, None)
      else {
        val (firstElement, newQueue) = elementsInQueue.dequeue
        State(isOpen = false, newQueue, Some(firstElement))
      }
    }
    case Element(newElement) ⇒ {
      if (isOpen) {
        if (elementsInQueue.isEmpty)
          State(isOpen = false, Queue.empty, Some(newElement))
        else {
          val (firstElement, newQueue) = elementsInQueue.dequeue
          State(isOpen = false, newQueue enqueue newElement, Some(firstElement))
        }  
      } else {
        State(isOpen = false, elementsInQueue enqueue newElement, None)
      }
    }
  }
  }.collect { case State(_, _, Some(lastEmittedElement)) ⇒ lastEmittedElement}
}

      

0


source







All Articles