Observed Receive Invalid Behavior

I'm trying to implement a helper method on observables that returns a new observable emitting only values ​​until the timeout is reached:

implicit class ObservableOps[T](obs: Observable[T]) {

  def timedOut(totalSec: Long): Observable[T] = {
    require(totalSec >= 0)
    val timeOut = Observable.interval(totalSec seconds)
      .filter(_ > 0)
      .take(1)
    obs.takeUntil(timeOut)
  }
}

      

I wrote a test for it that creates an observable emitting its first value long after a timeout. However, as a result, the observables still seem to include the latter value:

test("single value too late for timeout") {
  val obs = Observable({Thread.sleep(8000); 1})
  val list = obs.timedOut(1).toBlockingObservable.toList
  assert(list === List())
}

      

Test failed with message List(1) did not equal List()

. What am I doing wrong?

+3


source to share


1 answer


I suspect yours is Thread.sleep(8000)

actually blocking your main thread. Have you tried adding println after val obs

to your test to see if it comes up immediately after running the test?

What happens is that your declaration obs

blocks your program for 8 seconds, then you create a new observable with timedOut

to timedOut

display the emitted value as soon as it is called.



Using rx-scala

0.23.0 your method timedOut

works (except that Observable.interval doesn't emit immediately, so filter(_ > 0)

should be removed).

val obs = Observable.just(42).delay(900.millis)
val list = obs.timedOut(1).toBlocking.toList
println(list) // prints List(42)

val obs = Observable.just(42).delay(1100.millis)
val list = obs.timedOut(1).toBlocking.toList
println(list) // prints List()

      

+2


source







All Articles