The Observable
Dive deep into Observables, how they work, and how to build your own
Observables are one of those funny things in the JavaScript community where you either are pretty aware of the ecosystem (often through RxJS) or honestly have never heard of one before reading a random blog post or post on Twitter about it.
I was very much so in the latter camp. I had heard a bit about observables,
thought they were related to the Object.observe
proposal from way back when,
and didn't really know if there was a good use-case for an Observable in my
day-to-day work with React or Node.js.
At the end of the day, an Observable is just another tool in your toolkit and it can be incredibly helpful for modeling certain types of problems. It also isn't too bad to put together an implementation for your own project to better understand how it works under the hood.
In this post, let's take a look at what it takes to build a minimal version of
an Observable
.
Building an Observable
At it's core, an Observable is a function. This function takes in a single argument, an object often called an observer, and uses this object to send roughly three types of events:
observer.next(value)
, give the observer the next value in the sequenceobserver.complete()
, tell the observer that the sequence is completed and no new values will be sentobserver.error(error)
, tell the observer that an error has occurred, this will also stop sending more values
Here's a countdown
function that roughly uses this contract:
We could then call this function with an observer
and receive the values from
the countdown and when the countdown is completed.
And that's, well, kind of it. We will definitely be adding more functionality to this, but at its core this is an Observable. Everything we do after this will build on this fundamental example.
With this example we have:
- A way to create an Observable (through our
countdown
function) - A way to subscribe to an observable (through calling the function with an
observer
) - A way to emit multiple values over time (through
observer.next
) - A way to signal completion or errors (through
observer.error
orobserver.complete
)
For bonus points, we can make this Observable emit values synchronously or asynchronously:
And the best part is that we don't have to change any of our code for subscribing to this Observable, although now we have to worry about cleaning up some things like timers.
Formalizing things a bit
First up, let's formalize how we create and subscribe to an Observable.
We will use a class for creating our Observables, and each class will have a
subscribe
method. Each class will accept a "source", which is copy-and-pasted
from our countdown
function above. Similarly, we "subscribe" to our Observable
in the same way we called countdown
above.
We can use this class to re-create our Countdown Observable and subscribe to it:
This is looking like a great start. Let's try making another Observable using this code.
Building a counter
We mentioned above that Observables can emit values synchronously or asynchronously so let's take advantage of that to build a counter. This counter will start at zero, counting upwards by one every second.
We can subscribe to this Observable using subscribe
:
However, our counter will keep going on and on for as long as the program is running. What happens if we want to stop subscribing to values?
Canceling a subscription
We can update our class definition from above to have subscribe
return a
Subscription
. A Subscription in this case is an object that provides an
unsubscribe
function, and calling this function would "cancel" the
subscription that was created.
And then in our Counter code we could use the returned subscription to unsubscribe.
But what exactly should we be doing when we call unsubscribe
in our
subscribe
method? Really, we're after two things:
- How do we "close" our observable and tell it to stop sending values through "next"
- How do we give our observable source a way to clean-up things like timers when something unsubscribes from the subscription
Rethinking subscribe
Our first step to tackle the two questions above will be to figure out how to remember if our subscription has been closed. We can use this information to stop the observable source from calling "next".
We this change, we use the local variable closed
and set it to true
whenever
subscription.unsubscribe
is called. But how do we use this to stop our source
function from calling next
on the observer?
In reality, there isn't much we can do to change when source
calls next
. H
owever, we can intercept values that it sends to the observer and choose whether
or not to forward them along.
In other words, let's define something called a Sink
. This will have the exact
same shape as our observer, but will include logic for whether or not to share
values emitted from the source with the observer.
With this change, now we can call subscription.unsubscribe
and it will stop
our observer from receiving any additional values 🎉
However, we still need to figure out our clean-up step. In particular, when we cancel our Counter subscription how will we cancel any pending timers that currently exist?
Rethinking our Observable source
When defining our Observable source, let's formalize it by saying that a
Source
is a function that takes in a Sink
and either returns nothing or a
function that is run to "clean-up" the source.
We can then use this definition to update our Counter
Observable:
This update has our source return a function where we clear the active interval
that we have. However, now we have to update our subscribe
method to take
advantage of this clean-up function.
And that's it! We added support for a clean-up function by tracking it with the
local variable cleanup
, and whenever we "close" our subscription we call
runCleanup
to make sure that this function is called once, and only once.
Wrapping up
So there we have it. We made an Observable
class that supports creating and
subscribing to Observables. It's definitely not complete yet, and you'll find
way more robust implementations in libraries like RxJS, but I hope it was
helpful to learn some of the core concepts that go into building this primitive.
If you're curious about more formal definitions for the topics that were touched on in this post, definitely check-out the appendix below!
Appendix
TypeScript
Here are some of the formal types for the concepts talked about in this post