To quote the spec directly:
The Observable type can be used to model push-based data sources such as DOM events, timer intervals, and sockets.
They are:
There are various implementations of the Observable spec. Some of them include:
Frint uses RxJS, which is widely adopted by the JavaScript community. Our examples would demonstrate this particular library below.
It is common practice to end your variable names with a $
sign if it represents an Observable.
const name$ = Rx.Observable.of('Frint');
Create an Observable:
const colors$ = Rx.Observable.create(function (observer) {
observer.next('red');
observer.next('green');
observer.next('blue');
// observer.error('something wrong here...');
observer.complete();
});
Subscribe to it:
const subscription = colors$.subscribe({
next: function (color) {
console.log(color); // `red`, `green`, `blue`
},
error: function (error) {
console.log(error); // never fires
},
complete: function() {
console.log('completed'); // once it is completed
}
});
If you only care about the emitted values, then your subscription code can be shortened:
const subscription = colors$.subscribe(function (color) {
console.log(color);
});
To unsubscribe:
subscription.unsubscribe();
const numbers$ = Rx.Observable.of(1, 2, 3);
numbers$.subscribe(function (number) {
console.log(number);
});
// Prints these numbers sequentially:
//
// 1
// 2
// 3
const interval$ = Rx.Observable.interval(1000); // every second
interval$.subscribe(function (x) {
console.log(x); // `x` is the nth time interval$ has triggered
});
// Prints `x` every second
const clicks$ = Rx.Observable.fromEvent(document, 'click');
clicks$.subscribe(function (mouseEvent) {
console.log('clicked!');
});
// Prints `clicked`, every time the document is clicked
RxJS also comes with various operators that can help manage your Observables.
Let's say you have an Observable that keeps emitting an integer every 100ms, but you only want values which are divisible by 10.
Like: 0
, 10
, 20
, 30
, ...more.
const interval$ = Rx.Observable.interval(100);
interval$
.filter(x => x % 10 === 0) // we are filtering out unwanted values here
.subscribe(x => console.log(x));
We can even map emitted values to something else:
const houses$ = Rx.Observable.of('gryffindor', 'slytherin', 'ravenclaw', 'hufflepuff');
houses$
.map(x => x.toUpperCase()) // uppercasing the house names
.subscribe(x => console.log(x));
Merging two Observables:
const numbers$ = Rx.Observable.of(1, 2, 3);
const colors$ = Rx.Observable.of('red', 'green', 'blue');
const houses$ = Rx.Observable.of('gryffindor', 'slytherin', 'ravenclaw', 'hufflepuff');
numbers$
.merge(colors$)
.merge(houses$)
.subscribe(x => console.log(x)); // emits all values from 3 observables, one by one
Alternatively, you could merge them all as follows:
const merged$ = Observable.merge(
numbers$,
colors$,
houses$
);
Let's say we want to merge all the three observables from above example, and generate a final Object which holds the most recent value of each of them:
{
number: 3,
color: 'blue',
house: 'hufflepuff'
}
We can use the scan
operator to achieve that. The operator scan
is for Observables, what reduce
is to arrays in JavaScript.
const merged$ = Rx.Observable.merge(
numbers$.map(x => ({ number: x })), // map `1` to `{number: 1}`
colors$.map(x => ({ color: x })), // map `red` to `{color: "red"}`
houses$.map(x => ({ house: x })) // map `gryffindor` to `{house: "gryffindor"}`
);
merged$
// we keep merging the previous object with new emitted object
.scan((acc, curr) => {
return {
...acc,
...curr,
};
}, {
number: 'n/a',
color: 'n/a',
house: 'n/a'
});
Since RxJS v5.5, it has introduced lettable operators so that only the operators your application needs can be bundled, reducing overall bundle size.
You are highly recommended to use RxJS this way:
import { Observable } from 'rxjs/Observable'; // Rx.Observable
import { Subject } from 'rxjs/Subject'; // Rx.Subject
import { of } from 'rxjs/observable/of'; // Rx.Observable.of
import { interval } from 'rxjs/observable/interval'; // Rx.Observable.interval
import { map } from 'rxjs/operators/map';
import { filter } from 'rxjs/operators/filter';
The imported Observable
will not make any operators available in its instance (like map
, filter
) automatically. And you need to pipe them yourself:
const numbers$ = of(1, 2, 3, 4, 5);
const evenMultipliedBy10$ = numbers$
.pipe(
filter(x => x % 2 === 0), // keep only even numbers
map(x => x * 10) // multiply by 10
);
evenMultipliedBy10$.subscribe(x => console.log(x));
// outputs: 20, 40