Rxjs - Observable subscription to view one variable, complete change
I am trying to create an observable to watch a variable in my application, and as soon as the variable changes to zero, all subscribers should be notified and terminated. I assumed the takeUntil property would work best, but I seem to be doing something wrong.
export default class service {
constructor($window) {
'ngInject'
this._currentObject = new BehaviorSubject(null)
}
open(config){
this._currentObject.next(config)
return this._currentObject.asObservable().takeUntil(null)
}
close(){
this._currentObject.next(null)
}
}
so for any methods that call a public function, I want to return them a new Observable that they can subscribe to, so they know when the object has changed to zero. This will happen from somewhere else in the application called the close method
service.open({
prop:'test',
prop2: 'right',
}).subscribe(s => {
// fires when currentObject is set back to null
//automatically completes
})
When I try this, I get the error takeUntil (You specified "null" where the stream was expected. You can provide an Observable, Promise, Array, or Iterable.)
I'm not even sure if this is a decent approach, but wondered if someone could take a look and give me advice. Thank!
source to share
takeUntil
takes an observable as an argument - not a value - and takes it until that observable emits a value.
takeWhile
most likely you want an operator, since it takes a predicate:
const source = new Rx.BehaviorSubject(null);
source
.takeWhile((value, index) => (value !== null) || (index === 0))
.subscribe(
(value) => console.log(value),
undefined,
() => console.log("completed; changed back to null")
);
source.next(1);
source.next(2);
source.next(3);
source.next(null);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
source to share