How can I create a class that extends Rx.Observable in TypeScript?

I am already using the most recent version of the rx.js command from Definitely Typed.

And when I try this:

class MyObservable extends Rx.Observable<any> { }

      

I got: A class may only extend another class.

Why Observable

, Subject

etc., defined as an instance of a class's interface rx.d.ts

?

And how can I do if I want to create a class that extends Observable or Subject?

PS I want this class to handle certain domain logic, so I need to create a new class instead of directly updating the Observable prototype.

Thank!

+3


source to share


2 answers


I had to solve the same problem for WebRx . As you already figured out, extending the RxJS IObservable using the Typescript class is not an option since the Observable is exported as an interface. As I mentioned in my comment to Steve Fenton, creating a class that implements Rx.IObservable doesn't take you too far either, because the vast majority of Rx operators are defined around the Rx.Observable interface, which is derived from Rx. IObservable. You would pretty much end up rewriting Rx.Observable.

How I solved the problem until there is a better way to extend the built-in Rx.Observable using prototypal inheritance and exporting the extension via a custom d.ts file :

RxExtension.ts



var RxObsConstructor = (<any> Rx.Observable);   // this hack is neccessary because the .d.ts for RxJs declares Observable as an interface)

/**
* Creates an read-only observable property with an optional default value from the current (this) observable
* (Note: This is the equivalent to Knockout ko.computed)
* @param {T} initialValue? Optional initial value, valid until the observable produces a value
*/
RxObsConstructor.prototype.toProperty = function(initialValue?: any, scheduler?: Rx.IScheduler) {
    scheduler = scheduler || Rx.Scheduler.currentThread;

    // initialize accessor function (read-only)
    var accessor: any = (newVal?: any): any => {
        if (arguments.length > 0) {
            internal.throwError("attempt to write to a read-only observable property");
        }

        if (accessor.sub == null) {
            accessor.sub = accessor._source.connect();
        }

        return accessor.value;
    };

    //////////////////////////////////
    // IUnknown implementation

    accessor.queryInterface = (iid: string) => {
        if (iid === IID.IUnknown ||
            iid === IID.IObservableProperty ||
            iid === IID.IDisposable)
            return true;

        return false;
    };

    //////////////////////////////////
    // IDisposable implementation

    accessor.dispose = () => {
        if (accessor.sub) {
            accessor.sub.dispose();
            accessor.sub = null;
        }
    };

    //////////////////////////////////
    // IObservableProperty<T> implementation

    accessor.value = initialValue;

    // setup observables
    accessor.changedSubject = new Rx.Subject<any>();
    accessor.changed = accessor.changedSubject
        .publish()
        .refCount();

    accessor.changingSubject = new Rx.Subject<any>();
    accessor.changing = accessor.changingSubject
        .publish()
        .refCount();

    accessor.source = this;
    accessor.thrownExceptions = internal.createScheduledSubject<Error>(scheduler, app.defaultExceptionHandler);

    //////////////////////////////////
    // implementation

    var firedInitial = false;

    accessor.sub = this
        .distinctUntilChanged()
        .subscribe(x => {
            // Suppress a non-change between initialValue and the first value
            // from a Subscribe
            if (firedInitial && x === accessor.value) {
                return;
            }

            firedInitial = true;

            accessor.changingSubject.onNext(x);
            accessor.value = x;
            accessor.changedSubject.onNext(x);
        }, x=> accessor.thrownExceptions.onNext(x));

    return accessor;
}

      

RxExtension.d.ts

declare module Rx {
    export interface Observable<T> extends IObservable<T> {
        toProperty(initialValue?: T): wx.IObservableProperty<T>;
    }
}

      

+4


source


The base one Observable

in rx.js cannot be extended because it looks more like TypeScript module

than a class

(i.e. it is single).

 var Observable = Rx.Observable = (function () {
     //...
 })();

      



This is why it is modeled as an interface, rather than a class, in a specific typed definition. To implement an interface, you must provide a structure that is compatible with the interface. Here's an example for IObservable<T>

.

class MyObservable<T> implements Rx.IObservable<T> {
    subscribe(observer: Rx.Observer<T>): Rx.IDisposable;
    subscribe(onNext?: (value: T) => void, onError?: (exception: any) => void, onCompleted?: () => void): Rx.IDisposable;
    subscribe(a?: Rx.IObserver<T> | Function, onError?: (exception: any) => void, onCompleted?: () => void) {
        return null;
    }

    subscribeOnNext(onNext: (value: T) => void, thisArg?: any): Rx.IDisposable {
        return null;
    }

    subscribeOnError(onError: (exception: any) => void, thisArg?: any): Rx.IDisposable {
        return null;
    }

    subscribeOnCompleted(onCompleted: () => void, thisArg?: any): Rx.IDisposable {
        return null;
    }
}

      

0


source







All Articles