Observables

What is an Observable?

To quote the spec directly:

The Observable type can be used to model push-based data sources such as DOM events, timer intervals, and sockets.

They are:

  • Compositional: Observables can be composed with higher-order combinators.
  • Lazy: Observables do not start emitting data until an observer has subscribed.

Implementations

There are various implementations of the Observable spec. Some of them include:

Frint uses RxJS, which is widely adopted by the JavaScript community. Our examples would demonstrate this particular library below.

Conventions

It is common practice to end your variable names with a $ sign if it represents an Observable.

const name$ = Rx.Observable.of('Frint');

Basic usage

Create an Observable:

const colors$ = Rx.Observable.create(function (observer) {
  observer.next('red');
  observer.next('green');
  observer.next('blue');

  // observer.error('something wrong here...');

  observer.complete();
});

Subscribe to it:

const subscription = colors$.subscribe({
  next: function (color) {
    console.log(color); // `red`, `green`, `blue`
  },
  error: function (error) {
    console.log(error); // never fires
  },
  complete: function() {
    console.log('completed'); // once it is completed
  }
});

If you only care about the emitted values, then your subscription code can be shortened:

const subscription = colors$.subscribe(function (color) {
  console.log(color);
});

To unsubscribe:

subscription.unsubscribe();

Examples

From known values

const numbers$ = Rx.Observable.of(1, 2, 3);

numbers$.subscribe(function (number) {
  console.log(number);
});

// Prints these numbers sequentially:
//
//   1
//   2
//   3

Interval

const interval$ = Rx.Observable.interval(1000); // every second

interval$.subscribe(function (x) {
  console.log(x); // `x` is the nth time interval$ has triggered
});

// Prints `x` every second

From DOM event

const clicks$ = Rx.Observable.fromEvent(document, 'click');

clicks$.subscribe(function (mouseEvent) {
  console.log('clicked!');
});

// Prints `clicked`, every time the document is clicked

Operators

RxJS also comes with various operators that can help manage your Observables.

Filter

Let's say you have an Observable that keeps emitting an integer every 100ms, but you only want values which are divisible by 10.

Like: 0, 10, 20, 30, ...more.

const interval$ = Rx.Observable.interval(100);

interval$
  .filter(x => x % 10 === 0) // we are filtering out unwanted values here
  .subscribe(x => console.log(x));

Map

We can even map emitted values to something else:

const houses$ = Rx.Observable.of('gryffindor', 'slytherin', 'ravenclaw', 'hufflepuff');

houses$
  .map(x => x.toUpperCase()) // uppercasing the house names
  .subscribe(x => console.log(x));

Merge

Merging two Observables:

const numbers$ = Rx.Observable.of(1, 2, 3);
const colors$ = Rx.Observable.of('red', 'green', 'blue');
const houses$ = Rx.Observable.of('gryffindor', 'slytherin', 'ravenclaw', 'hufflepuff');

numbers$
  .merge(colors$)
  .merge(houses$)
  .subscribe(x => console.log(x)); // emits all values from 3 observables, one by one

Alternatively, you could merge them all as follows:

const merged$ = Observable.merge(
  numbers$,
  colors$,
  houses$
);

Scan

Let's say we want to merge all the three observables from above example, and generate a final Object which holds the most recent value of each of them:

{
  number: 3,
  color: 'blue',
  house: 'hufflepuff'
}

We can use the scan operator to achieve that. The operator scan is for Observables, what reduce is to arrays in JavaScript.

const merged$ = Rx.Observable.merge(
  numbers$.map(x => ({ number: x })), // map `1` to `{number: 1}`
  colors$.map(x => ({ color: x })), // map `red` to `{color: "red"}`
  houses$.map(x => ({ house: x })) // map `gryffindor` to `{house: "gryffindor"}`
);

merged$
  // we keep merging the previous object with new emitted object
  .scan((acc, curr) => {
    return {
      ...acc,
      ...curr,
    };
  }, {
    number: 'n/a',
    color: 'n/a',
    house: 'n/a'
  });

Lettable operators

Since RxJS v5.5, it has introduced lettable operators so that only the operators your application needs can be bundled, reducing overall bundle size.

You are highly recommended to use RxJS this way:

import { Observable } from 'rxjs/Observable'; // Rx.Observable
import { Subject } from 'rxjs/Subject'; // Rx.Subject

import { of } from 'rxjs/observable/of'; // Rx.Observable.of
import { interval } from 'rxjs/observable/interval'; // Rx.Observable.interval

import { map } from 'rxjs/operators/map';
import { filter } from 'rxjs/operators/filter';

The imported Observable will not make any operators available in its instance (like map, filter) automatically. And you need to pipe them yourself:

const numbers$ = of(1, 2, 3, 4, 5);

const evenMultipliedBy10$ = numbers$
  .pipe(
    filter(x => x % 2 === 0), // keep only even numbers
    map(x => x * 10) // multiply by 10
  );

evenMultipliedBy10$.subscribe(x => console.log(x));
// outputs: 20, 40

Further reading