Introduction to RxJS and Observables
Older Article
This article was published 9 years ago. Some information may be outdated or no longer applicable.
A lot of developers are asking the same question: what is RxJS, and what are observables? These concepts matter especially if you’re working with Angular, which leans on both heavily.
RxJS
The official definition from their website: “RxJS is a library for reactive programming using observables.”
Microsoft originally built this library. It’s now open source, maintained by developers from Microsoft, Google, and a bunch of other contributors.
RxJS for better asynchronous programming
RxJS helps you work with asynchronous code. It builds on a well-known design pattern: the observer pattern.
The Observer pattern uses an object (the subject) that multiple observers connect to. State changes propagate from the subject to all observers automatically. I’d recommend getting familiar with at least the “GoF (Gang of Four) patterns” (which includes the Observer pattern).
Translating that to JavaScript: think of an event stream (or subject). Data coming from an API is a good example. You can subscribe to that stream and get notified when data arrives, then act on it (say, appending rows to a table). You’ll keep receiving data until you unsubscribe or the stream emits a completed event.
The most important thing to remember about RxJS: everything is a stream. Hold onto that idea. It’s the key to understanding the whole library.
Why RxJS?
You might be wondering: why not just use promises for asynchronous data streams? Two reasons jump out immediately:
-
Promises are great at handling asynchronicity, whether natively or through the Fetch API. But a promise can’t be cancelled. Once you start a request, it must be fulfilled or rejected. There’s no backing out.
-
Even with Promises, you’re still using the JavaScript event loop. The callbacks you attach to resolved promises get placed on the event loop and moved back to the call stack when they resolve.
RxJS gives you much more flexibility than promises when working with asynchronous data streams. Keep that “everything is a stream” mantra in mind as you read on.
Observable
A stream is a sequence of events emitted from a source. The source can be anything.
Streams emit three things: values (e.g. data from an API), errors (e.g. a 401 Unauthorised API response), and a ‘completed’ or ‘done’ signal indicating the stream has finished.
In RxJS, an observer subscribes to an observable (a stream). The observer then reacts to whatever values the observable emits.
Subscribe
When you listen to a stream (an observable), you’re subscribing to it. You’ll keep receiving values until you unsubscribe or the stream emits a completed message.
Working with Observables
Let’s look at some examples of creating and subscribing to Observables.
First, bring in the RxJS library. There are multiple ways of doing this, whether for browser or Node.js. Pick whatever suits your setup.
Observables can be created from all sorts of things: a single string, an array of values, events (like a button click), a promise, or built by hand:
const Rx = require('rxjs/Rx');
const observable1 = Rx.Observable.of(1);
const observable2 = Rx.Observable.of(['hello', 'rxjs']);
const observable3 = Rx.Observable.fromEvent(
document.querySelector('button'),
'click'
);
const observable4 = Rx.Observable.fromPromise(fetch('/api'));
const observable5 = Rx.Observable.create((observer) => {
observer.next('first emitted value');
observer.next('second emitted value');
observer.complete();
});
// subscribe to any of them
observable1.subscribe((response) => console.log(response)); // 1
observable5.subscribe((response) => console.log(response)); // first emitted value, second emitted value
Once the observable exists, subscribe to it and listen for values.
Let’s try something more practical: consuming an API with the Fetch API.
The Fetch API returns a promise, and so does
response.json()for parsing the response body as JSON. We just mentioned that an Observable can be created from a Promise too.
const requestStream = Rx.Observable.of('https://swapi.co/api/people/1');
const responseStream = requestStream.flatMap((requestUrl) =>
Rx.Observable.fromPromise(
fetch(requestUrl).then((response) => response.json())
)
);
responseStream.subscribe((response) => console.log(response.name));
We create an observable from a simple string and later pass it to the Fetch API. The parsed JSON body comes back as an observable (created from the Promise returned by response.json()) and lands in responseStream. We subscribe to responseStream and log a Star Wars character’s name to the console.
Observables have other benefits we’ll explore later. They let you control event flow, apply operations, and transform values as they pass through.