@jetjs/streams

class Operator

recives values as an observer, manipulates the values and act as an observable for the manipulated values.


Example:

function filter(transform(val): boolean) {
  let so;
  return new ObservableObserver(
    (subscriptionObserver) => { so = subscriptionObserver;  // return function to get notified on termination},
    (value) => {
      let res = transform(value);
      if (!res) {
        so.next(value); // add to stream if transform returned false
      }
    }
  );
}

Observable.of("val-1", "val-2")
 .pipe(filter(str => str.endsWith("-2")))
 .subscribe((str) => console.log("Received: " + str));

 // prints:
 // Received: val-1

Methods

MethodReturnDescription
addObservable(observable)
void
complete()
void
createSimpleOperator(fn)
Operator
error(err)
void
next(value)
void
start(subscription)
void
subscribe(observerOrNext, error, complete)
Subscription
subscribeUpstream()
void

Method Details

  • addObservable(observable) Method

    Parent Observable is added, if a Operator is added to a stream via pipe. This is done to implement the laziness. So this Operator can call #subscribe after it has received a subscription.

    Signature:
    addObservable(observable: Observable<I>): void;
    Returns:
    void
    ParameterTypeDescription
    observable
    Observable

    The observable to get values from

  • complete() Method

    Signature:
    complete(): void;
    Returns:
    void
  • createSimpleOperator(fn) Method

    Helper to implement a simple Operators.

    see Operator.pipe

    Signature:
    static createSimpleOperator<T, O>(fn: (val: T) => O | undefined): Operator<T, O>;
    Returns:
    Operator

    Operator instance

    ParameterTypeDescription
    fn
    (val: T) => O | undefined

    function to transform the value. If undefined is returned, value will be removed from the stream. if an error occur, it will be passed to the #error function

  • error(err) Method

    Signature:
    error(err: Error): void;
    Returns:
    void
    ParameterTypeDescription
    err
    Error
  • next(value) Method

    Signature:
    next(value: I): void;
    Returns:
    void
    ParameterTypeDescription
    value
    I
  • start(subscription) Method

    Signature:
    start(subscription: Subscription): void;
    Returns:
    void
    ParameterTypeDescription
    subscription
    Subscription
  • subscribe(observerOrNext, error, complete) Method

    Signature:
    subscribe(observerOrNext: Observer<O> | NextFn<O>, error?: ErrorFn, complete?: CompleteFn): Subscription;
    Returns:
    Subscription
    ParameterTypeDescription
    observerOrNext
    Observer | NextFn
    error
    ErrorFn
    complete
    CompleteFn
  • subscribeUpstream() Method

    Signature:
    protected subscribeUpstream(): void;
    Returns:
    void