Observable-Publisher-Subscriber

Let’s first understand the concept using newspaper subscription. I subscribe to a monthly newspaper. I signed a contract with the company and received a newspaper every month. The contract records what I have subscribed to. The company which produces the newspapers will look at the contract and send the product to me each month. I am also able to unsubscribe through their website. In this example, we have three objects, the company producing the products, the receiver(me), and the contract. The important and subtle object is the contract. It’s the contract that connects me and the company together.

Another example, I have subscribed to two copies of the same products. Each of them is monthly. However, I actually don’t get a monthly delivery. Instead, the company sends the goods to me every two months because they want to save delivery fees.

How does the company know that it should send two articles to me every two months? The answer is that the company also looks at the contract, finds the subscriber and goods of the contract. We can regard the contract as a trigger or timer. When it's approaching the end of the month, the contract initiates the company(publisher) to send the goods to me(receiver).

In the two months case, we could also consider that contract(Subscription) caches one-month good. In the second month, it sends all the cached objects at once to me. So the Subscription could cache objects. (Buffering)

We use Subscription, Publisher, and Subscriber to represent Contract, Company, and Me.

The subscription is initiating the publisher to give data to the subscriber. The publisher knows how to get data. Subscriber knows what to do with the data. The publisher could have an interface such as produce(). The subscriber could have one interface next(data: T) to accept data. Subscription has publisher and subscriber as attributes.

type Subscription { 
publisher: Publisher,
subscriber: Subscriber
}

To achieve sending data from publisher to subscriber, the Subscription can use the publisher to produce data and call subscriber directly.

subscription.subscriber.next(subscription.publisher.produce())

But how do we achieve caching?

Subscription can actually cache one-month product and send them to the subscriber every two months. Subscription could have one internal buffer. When it calls the publisher, it does not call the subscriber.next(data: T) directly. Instead, it puts the data inside its buffer. When the sending condition is true, then the subscription sends all the buffered objects to the subscriber.

type Subscription { 
publisher: Publisher,
subscriber: Subscriber,
buffer: Object[]
}

We also want to be able to cancel the subscription. Canceling a subscription is just removing the subscriber from the subscription or giving the subscription a state, such as closed.

type Subscription { status: closed | ready
unsubscribe() { this.status = closed && this.subscriber = null }
}

But how do we create Publisher , Subscriber and Subscription objects? Here comes the utility class Observable. Observable takes a publisher as a parameter for the constructor. Observable also has a subscribe function that takes a subscriber as a parameter. The return of the subscribe function is the Subscription object

Our concept could be implemented with the following code.

class Observable {   
constructor(publisher) {
this.publisher = publisher
}
subscribe(subscriber) {
subscription = new Subscription(this.publisher, subscriber)
subscriber.onSubscribe(subscription)
return subscription
}
}

The key here is that the data always flows from publisher to subscription to subscriber, as the Subscription might need to cache or do some custom scheduling.

data flow publisher->subscription->subscriber

Remember it could be the Subscription pulls data from the publisher. It could also be the publisher pushing data into Subscription. It’s just different implementation styles.

Data Pipe Line

Let’s consider a situation that we want to add a chain of data transformer before the real observer.

data pipeline

Each transformer can be considered as a callback. The callback takes input, do operations on the data, and returns the result.

type operator { (data) => any}

The question becomes when the callback got called? In our model, it is always the next(data: T) of the Subscriber accepting the data from the Subscription.

That means the operator callback can be called inside the next(data: T) function of the Subscriber

class Subscriber { 
next = (data) => { return operator(data) }
}

Each node in the pipeline can be considered as subscribing to its previous result. We can construct one intermediate Subscriber. As we said earlier that data always flows from Publisher to Subscription, then from Subscription to Subscriber. Then we should also have one Subscription in each intermediate node. Then our data pipeline becomes

subscription pipeline

The publisher sends data to Subscription. Subscription use Subscriber(Operator) to transform the data. It checks if there is a Subscription down the stream. If there is, then push the result of Subscriber(Operator) to the downstream Subscription until the end. This pattern is very general.

Canceling Subscription In PipeLine

Canceling subscription in the pipeline is easy. We Just break the linkage in between. Normally the last Subscription is returned. We just traverse back to remove the parent of each intermediate Subscription until we reach the Publisher.

Now let’s look at how ZenObservable and RxJs resemble our above code.

ZenObservable

type Publisher = (observer)=>void

As shown above, Zen Observable takes a publisher callback as the constructor parameter. The Publisher callback takes observer as input. The observer here is actually one instance of SubscriptionObserver. SubscriptionObserver is just a wrapper around the Subscription object as you can see below.

class SubscriptionObserver {
constructor(subscription) { this._subscription = subscription }
next(value) { onNotify(this._subscription, 'next', value) }
error(value) { onNotify(this._subscription, 'error', value }
complete() { onNotify(this._subscription, 'complete') }
}

What SubscriptionObservable does is using Subscription to call the real observer.

class SubscriptionObserver { 
notifySubscription(subscription, type, value) {
subscription._observer.next(value)
}
}

So ZenObservable data flow is the same as what we show above, from publisher to subscription, to subscriber.

RxJs Observable

The user interface for RxJs Observable is more or less the same as ZenObservable.

const subscription = from([10, 20, 30]).pipe(map(x => x + 2)).subscribe({ next: (data) => { console.log(data) }} )

In RxJs, there is a Subscriber class. Subscriber class extends from both Subscription and Observer. At the same time, the Subscriber or Observer object will be set as the destination attribute of the Subscriber.

Subscriber Observer Subscription

sample code

const observer: Observer = { next(data: T) {}}
const subscriber = new Subscriber(downstream)

In RxJS, the Subscriber performs both functions of Subscription and Subscriber. Our previous diagram in RxJs becomes like this.

Actually, RxJs does not change anything from our previous diagram. All it does is using Subscriber as both Subscriber and Subscription. So each intermediate Subscriber has the next Subscriber as the destination.

Summary

This Publisher, Subscriber, Subscription pattern is not just used in Frontend. Currently it’s also used in Spring Reactor project. That’s why I spent time analyzing it.

Please leave a comment if you find anything incorrect.

I am a software developer who is keen to know how things work