Introduction to RxJS and Observables

This post is 4 years old. (Or older!) Code samples may not work, screenshots may be missing and links could be broken. Although some of the content may be relevant please take it with a pinch of salt.

The big quesiton being asked by a lot of developers these days: What is RxJS and what are observables? These concepts are of essential imporatance especially if someone is working with a popular JavaScript framework such as Angular since both RxJS and observables are being heavily used in it.

RxJS

The official definition of RxJS from their website is that "RxJS is a library for reactive programming using observables".

This library was originally developed by Microsoft - now an open source project which is developed by a bunch of developers from a variety of organistions including Microsoft and Google, as well as other contributors.

RxJS for better asynchronous programming

RxJS allows developers to work with asynchronous code and it also uses a popular and well-known software desing pattern: the observer pattern.

The Observer pattern uses an object (which is referred to as the subject) to which multiple observers can connect and state changes are automatically propagated from the subject to all the observers. I recommend that you get familiar with at least the "GoF (Gang of Four) patterns" (which also includes the Observer pattern).

Translating the above to JavaScript world, think about an event stream (or a subject, which is the same) - data coming from an API would be a great example. We can now subscribe to the subject - in other words, we can subscribe to this stream - and get notified when data arrives from the API and act on it - for example append data in a table. And the great thing is that we'll keep on receiving this data until we unsubscribe from the stream or until the stream emits a completed event.

Remember the most important thing behind RxJS: everything is a stream - you need to be in this mindset and remember this mantra in order to fully understand RxJS.

Why RxJS?

You may be wondering, why can't we just use promises to work with asynchronous data streams, like the API mentioned earlier? There are two reasons that we can discuss right off the bat:

  1. Even though promises are very good at handling asynchronicity either natively or via an implementation such as the Fetch API, a promise is not cancellable. This means that once we start a request, that request must be fulfilled or rejected - there's no way that we can cancel a promise.

  2. Even though when Promises are used we are still making use of the JavaScript event loop. How? Think about the callbacks that we use when a promise gets resolved - those get placed to the event loop and placed back to the call stack when they resolve.

RxJS allows us to work with asynchronous data streams with much more flexibility that promises give us. Just remember that everything is a stream when reading the next section.

Observable

A stream can be defined as a sequence of events that get emitted from a source. The source can be anything, it really doesn't matter.

Streams can emit three things: values (e.g. return data from an API), an error (e.g. 401 Unauthorized API access) and finally a 'completed' or 'done' signal indicating that the stream has finished.

In RxJS an observer subscribes to an observable - or simply put, to a stream. This observer is than able to react on items/values that the observable emits.

Subscribe

We mentioned the term subscribe before - essentially when we are listening on the stream (observable) we are subscribing to that stream and we'll keep on receiving values until either we unsubscribe or the stream emits a completed message.

Working with Observables

Let's take a look at a few examples on creating and subscribing to Observables.

Firsts things first, in order to start working with observables we need to make sure to bring in the RxJS library - there are multiple ways of doing this, whether you want to use them in the browser or Node.js - pick the one that's the most convenient for your use-case.

Observables can be created from single and multiple values, starting from a single string to an array of values, events - such as a button click, to a promise or even by manually creating them:

const Rx = require('rxjs/Rx');
const observable1 = Rx.Observable.of(1);
const observable2 = Rx.Observable.of(['hello', 'rxjs']);
const observable3 = Rx.Observable.fromEvent(
document.querySelector('button'),
'click'
);
const observable4 = Rx.Observable.fromPromise(fetch('/api'));

const observable5 = Rx.Observable.create((observer) => {
observer.next('first emitted value');
observer.next('second emitted value');
observer.complete();
});

// subscribe to any of them
observable1.subscribe((response) => console.log(response)); // 1
observable5.subscribe((response) => console.log(response)); // first emitted value, second emitted value

As you can see once the observable has been created we can go ahead and subscribe to it and listen for values.

Let's take a look at a more practical example and consume an API using the Fetch API.

Remember the Fetch API returns a promise, so does response.json() that we can use to parse the response body as JSON. Also remember that we just mentioned that an Observable can be created from a Promise as well.

const requestStream = Rx.Observable.of('https://swapi.co/api/people/1');
const responseStream = requestStream.flatMap((requestUrl) =>
Rx.Observable.fromPromise(
fetch(requestUrl).then((response) => response.json())
)
);

responseStream.subscribe((response) => console.log(response.name));

In the above example we create an observable out of a simple string which we later on pass to the Fetch API. The parsed JSON body is also returned as an observable (we create the observable from the Promise returned by response.json()) and we store that in the variable responseStream. Finally we subscribe to responseStream and simply log the name of a Star Wars character out to the console.

Observables have other benefits that we'll take a look at later - they allow us to help the control of event flow through observables, apply operations on them as well as transform values passed through the observables.