How to get the last modified events IObservable <IObservable <T>>?
There are many status objects on my system - connection status, CPU load, registered users, etc. All such events are combined into one observable stream.
I want to make an admin utility to display the actual system status and display all these counters.
How do I create an observable that will have a list of the most recently changed values of all counters?
Here is the marble chart I want to have:
s1 (cpu): -s1_v1----s1_v1---s1_v2
s2 (users count): --s2_v1--s2_v1---------s2_v2
s3 (some cat purr/sec) ----s3_v1----s3_v1----s3_v1
flatten sequence: s1_v1-s2_v1-s3_v1-s2_v1-s1_v1-s3_v1-s1_v2-s3_v1-s2_v2
desired result:
s1_v1|s1_v1|s1_v1|s1_v2|s1_v2
s2_v1|s2_v1|s2_v1|s2_v2
s3_v1|s3_v1|s3_v1
So far, I can do this implementation:
public class StatusImplementation
{
public static IObservable<IDictionary<TKey, TValue>> Status<TKey, TValue>(
params IObservable<KeyValuePair<TKey, TValue>>[] observables)
{
var uniqueObservables = observables
.Select(x => x.Publish().RefCount().DistinctUntilChanged());
return Observable.Create<IDictionary<TKey, TValue>>(o =>
{
var compositeDisposable = new CompositeDisposable();
var dictionary = new Dictionary<TKey, TValue>();
foreach (var uniqueObservable in uniqueObservables)
{
var disposable = uniqueObservable.Subscribe(x =>
{
if (dictionary.ContainsKey(x.Key) && !dictionary[x.Key].Equals(x.Value))
{
var newDictionary = new Dictionary<TKey, TValue>(dictionary);
newDictionary[x.Key] = x.Value;
dictionary = newDictionary;
}
else
{
dictionary.Add(x.Key, x.Value);
}
o.OnNext(dictionary);
});
compositeDisposable.Add(disposable);
}
return compositeDisposable;
});
}
}
And here's a usage example:
var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(x => new KeyValuePair<string, long>("event 1", x));
var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
.Select(x => new KeyValuePair<string, long>("event 2", x));
var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
.Select(x => new KeyValuePair<string, long>("event 3", x));
var combined = f1.Merge(f2).Merge(f3);
StatusImplementation.Status(f1, f2, f3)
.Select(x => string.Join(", ", x.ToList()))
.Dump("\tstatus");
combined.Dump("normal");
And the Dump function (from Lee Campbell 's excellent book):
public static void Dump<T>(this IObservable<T> source, string name)
{
source.Subscribe(
i => Console.WriteLine("{0}-->{1}", name, i),
ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message),
() => Console.WriteLine("{0} completed", name));
}
So the question is: is there a better way to implement this functionality? Perhaps not using a dictionary inside an observable?
Thank.
source to share
So, if you start with your combined
observable - which can be obtained from any number of observable sources - then you can do this:
var query =
combined
.Scan(
new Dictionary<string, long>() as IDictionary<string, long>,
(d, kvp) =>
{
var d2 = new Dictionary<string, long>(d) as IDictionary<string, long>;
d2[kvp.Key] = kvp.Value;
return d2;
});
This will return a series of dictionary objects for each value created by the observable combined
. Each dictionary object will be a separate instance - if the same instance was returned, you will always change the values, which could cause threading problems.
source to share
You can use Observable.CombineLatest
which will return the latest values from all observables every time a new value arrives. Then you don't need to use a dictionary.
var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(x = > new KeyValuePair < string, long > ("event 1", x));
var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
.Select(x = > new KeyValuePair < string, long > ("event 2", x));
var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
.Select(x = > new KeyValuePair < string, long > ("event 3", x));
var combined = f1.Merge(f2).Merge(f3);
Observable.CombineLatest(f1, f2, f3)
.Select(x = > string.Join(", ", x.ToList()))
.Dump("\tstatus");
combined.Dump("normal");
source to share