Measuring response time using observables
Observable A data representing incoming requests ...
interface IRequest
{
string Id { get; }
}
IObservable<IRequest> requests;
and an observable B representing the responses ...
IObservable<string> responses;
I need to measure the time it takes to get a response. My current solution looks like below. This works, but I'm not sure if it can be simplified or implemented in a more streamlined manner. I'm also worried about the deviation between the actual response time ( [GEN]
) and the observed response time ( [OBS]
), which becomes quiet if generatedResponseTime
set to sub 50ms.
class Program
{
public class Request
{
public string Id { get; set; }
}
static void Main(string[] args)
{
var generatedResponseTime = 100;
var responseSub = new Subject<string>();
IObservable<Request> requests = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(x=> new Request { Id = x.ToString("d") });
IObservable<string> responses = responseSub.AsObservable();
var responseTime = requests
.Where(x => !string.IsNullOrEmpty(x.Id))
.Select(req => new { Id = req.Id, Start = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() })
.SelectMany(req => responses
.Where(reqId => reqId == req.Id)
.Select(_ => new { RequestId = req.Id, ResponseTime = (int)(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - req.Start) })
.Take(1))
.Publish()
.RefCount();
requests.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(req =>
{
Console.WriteLine($"[GEN] {req.Id} = {generatedResponseTime} ms");
Thread.Sleep(generatedResponseTime);
responseSub.OnNext(req.Id);
});
responseTime.Subscribe(x => Console.WriteLine($"[OBS] {x.RequestId} = {x.ResponseTime} ms"));
Console.ReadLine();
}
}
source to share
Here's an alternative that uses some of the built-in operators. Also has no problem with multiple subscriptions in your solution:
var responseTime2 = requests
.Timestamp()
.Join(responses.Timestamp(),
request => Observable.Never<Unit>(), //timeout criteria
response => Observable.Empty<Unit>(),
(request, response) => (request: request, response: response)
)
.Where(t => t.request.Value.Id == t.response.Value)
.Select(t => t.response.Timestamp.ToUnixTimeMilliseconds() - t.request.Timestamp.ToUnixTimeMilliseconds())
If you had any timeout criteria in the request, then from a performance point of view it would be helpful to point out where the comment is. Otherwise, the performance of the combination Join/Where
will degrade over time.
source to share