Reactive Programming — From A Real-Life Example

Ryan Zheng
7 min readSep 8, 2021

The key of reactive Programming is to analyze data dependency and event dependencies instead of the underlying running environment. The running environment can be a virtual thread, a coroutine, or a native thread that can be switched. However, the data dependency will never change. Data dependency is bound to the business itself.

In this article, we will use simple examples to incrementally demonstrate what the above sentence means, and how we can transform the existing codes into reactive codes.

Example: John, please open the door when the doorbell rings.

The above sentence can be broken down into two objects, and one action.

Two Objects: John and Door

One Action: John opens the door when he receives the doorbell signal

The opening door action is dependent on the doorbell ring signal. Here the doorbell signal is the data. The signal is generated on the door, so the door can be considered as the data publisher. The signal is transferred to John. John is interested in the signal, so John can be viewed as the data subscriber.

Now we use Java classes to model the above relationships. We will use Project Reactor in the implementation. If you don’t know how to use Project Reactor, please refer to this article before going further https://medium.com/swlh/reactive-streaming-reactive-system-92e157b75a28

Version One

Door: the data publisher

protected static class Door extends Mono<DoorSignal> implements Subscription {
private CoreSubscriber<? super DoorSignal> actual;

@Override
public void subscribe(CoreSubscriber<? super DoorSignal> actual) {
this.actual = actual;
actual.onSubscribe(this);
}

@Override
public void request(long l) {
actual.onNext(DoorSignal.DOOR_RING);
}

@Override
public void cancel() {
//do nothing
}
}

DoorSignal: the data

public enum DoorSignal {
DOOR_RING
}

John: the DoorSignal.DOOR_RING subscriber

protected static class John implements Subscriber<DoorSignal> {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Integer.MAX_VALUE);
}

@Override
public void onNext(DoorSignal doorBellSignal) {
if (doorBellSignal == DoorSignal.DOOR_RING) {
System.out.println("open the door");
}
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onComplete() {

}
}

Testing of the previous code

public static void main(String[] args) {
Door door = new Door();
John john = new John();
door.subscribe(john);
}

Let’s imagine a more complicated case, in which the DOOR_RING is generated when a delivery person knocks on the door. After the delivery person knocks on the door, he waits for John to open the door. John will need to pause what he is doing, walk to the door, and perform the open action. After the door is opened, the delivery person enters.

The following event timeline can describe the relationships

We can consider that the delivery person is waiting for the door open signal. John is publishing a door open signal. So the delivery person is the subscriber, John is the publisher. The data is the door open signal.

Version Two

DoorSignal: the data

public enum DoorSignal {
DOOR_RING, DOOR_OPEN
}

John: the DoorSignal.DOOR_RING subscriber and DoorSignal.DOOR_OPEN publisher

public static class John extends Mono<DoorSignal> implements Subscriber<DoorSignal> {
private CoreSubscriber<? super DoorSignal> next;

@Override
public void onSubscribe(Subscription subscription) {
subscription.request(1);
}

@SneakyThrows
@Override
public void onNext(DoorSignal doorRingSignal) {
Thread.sleep(5000);//wait 5 second to simulate John is doing other things
next.onNext(DoorSignal.DOOR_OPEN);
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onComplete() {

}

@Override
public void subscribe(CoreSubscriber<? super DoorSignal> actual) {
next = actual;
}
}

Door: the data publisher

public static class Door extends Mono<DoorSignal> implements Subscription {
private CoreSubscriber<? super DoorSignal> actual;

@Override

public void subscribe(CoreSubscriber<? super DoorSignal> actual) {
this.actual = actual;
actual.onSubscribe(this);
}

@Override
public void request(long l) {
/*do nothing, the DOOR_RING is not generated here anymore,
the signal is generated when the delivery person knocks the door*/
}

@Override
public void cancel() {
//do nothing
}
}

The DeliveryPerson: the DoorSignal.DOOR_OPEN signal subscriber

public static class DeliveryPerson implements Subscriber<DoorSignal> {
public void knockDoor(final Door door) {
System.out.println("door knocked");
door.actual.onNext(DoorSignal.DOOR_RING);//generate DoorSignal.DOOR_RING signal
}

@Override
public void onSubscribe(Subscription subscription) {
//do nothing
}

@Override
public void onNext(DoorSignal doorSignal) {
if (doorSignal == DoorSignal.DOOR_OPEN) {
System.out.println("enter the door");
}
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onComplete() {

}
}

Testing of the new code

public static void main(final String[] args) {
Door door = new Door();
John john = new John();
DeliveryPerson deliveryPerson = new DeliveryPerson();
door.subscribe(john);
john.subscribe(deliveryPerson);
deliveryPerson.knockDoor(door);//The knock door triggers the overall process
}

Optimization

There are too many boiler codes in the previous implementation.

  • Data subscriber always implements Subscriber interface, even when only onNext is used
  • Data publisher always implements the Mono, and no real business related code in the Doorclass.

In DeliveryPerson class, the only customized method is onNext . It makes sense that one template subscriber should be provided which can take a callback function as argument. Indeed, Mono provides the following interface which takes one callback, which internally constructs one LambdaMonoSubscriber subscriber. The DeliveryPerson does not have to implement the Subscriber interface.

public final Disposable subscribe(Consumer<? super T> consumer)

Version Three

DeliveryPerson:

public static class DeliveryPerson {
public void knockDoor(final Door door) {
System.out.println("door knocked");
door.actual.onNext(DoorSignal.DOOR_RING);
}

public void enterDoor(final DoorSignal signal) {
if (signal == DoorSignal.DOOR_OPEN) {
System.out.println("delivery person enter door");
}
}
}

The deliveryPersonsubscribing part becomes

john.subscribe(signal -> deliveryPerson.enterDoor(signal));

John :

public static class John extends Mono<DoorSignal> {
private CoreSubscriber<? super DoorSignal> next;

public void openDoor(final DoorSignal signal) throws InterruptedException {
try {
if (signal == DoorSignal.DOOR_RING) {
Thread.sleep(5000);//wait 5 second to simulate John is doing other things
next.onNext(DoorSignal.DOOR_OPEN);
}
} catch (InterruptedException e) {
next.onError(e);
}
}

@Override
public void subscribe(CoreSubscriber<? super DoorSignal> actual) {
next = actual;
}
}

The Johnsubscribing part becomes:

door.subscribe(signal -> john.openDoor(signal));

The door class currently consists all boiler codes which does not have any business meaning. Also the subscribe partjohn.subscribe(signal -> deliveryPerson.enterDoor(signal) looks like the enterDoor action is actually waiting for a signal. It has nothing to do with the door. Indeed, in the example, the enterDoor action is waiting for the DoorSignal.DOOR_OPEN signal. This DoorSignal.DOOR_OPEN is generated by the knockDoor action. We can just make knockDoor return Mono.just(DoorSignal.DOOR_OPEN) directly.

Version Four

DeliveryPerson

public static class DeliveryPerson {
public Mono<DoorSignal> knockDoor() {
System.out.println("delivery person knocking door");
return Mono.just(DoorSignal.DOOR_RING);
}

public void enterDoor() {
System.out.println("delivery person enter door");
}
}

John : the same changes

public static class John {
public Mono<DoorSignal> openDoor() {
try {
Thread.sleep(5000);
return Mono.just(DoorSignal.DOOR_OPEN);
} catch (InterruptedException e) {
return Mono.error(e);
}
}
}

The new subscription part

John john = new John();
DeliveryPerson deliveryPerson = new DeliveryPerson();
deliveryPerson.knockDoor().flatMap(signal -> {
if (signal == DoorSignal.DOOR_RING)
return john.openDoor();
else
return
Mono.error(new Exception("signal is wrong"));
}).subscribe(signal -> {
if (signal == DoorSignal.DOOR_OPEN)
deliveryPerson.enterDoor();
});

The Doorclass is not needed anymore. The initial analysis is including redundant information. The final code looks more like what is described in the timeline diagram.

The Imperative Version Of The Example

public static void main(final String[] args) {
John john = new John();
DeliveryPerson deliveryPerson = new DeliveryPerson();
DoorSignal doorRingSignal = deliveryPerson.knockDoor();
if (doorRingSignal == DoorSignal.DOOR_RING) {
DoorSignal doorOpenSignal = john.openDoor();
if (doorOpenSignal == DOOR_OPEN) {
deliveryPerson.enterDoor();
}
}
}

public enum DoorSignal {
DOOR_RING, DOOR_OPEN, DOOR_ERROR
}

public static class John {
public DoorSignal openDoor() {
try {
Thread.sleep(5000);
return DOOR_OPEN;
} catch (InterruptedException e) {
return DoorSignal.DOOR_ERROR;
}
}
}

public static class DeliveryPerson {
public DoorSignal knockDoor() {
System.out.println("delivery person knocking door");
return DoorSignal.DOOR_RING;
}

public void enterDoor() {
System.out.println("delivery person enter door");
}
}

We can see that there is very slight difference between the imperative version and the reactive version.

In the imperative version, the following logic

if (doorRingSignal == DoorSignal.DOOR_RING) {
DoorSignal doorOpenSignal = john.openDoor();

is using the return value of deliveryPerson.knockDoor. We can consider the deliveryPerson.knockDoor as data publisher(publishing a return value), the logic after it can be seen as data subscriber(depending on the return value).

The reactive version just wraps the return value into a Mono(publisher) which is subscribed by the dependent logic. This conversion is intuitive and makes sense.

Which part of code can be converted to reactive code?

Answer: If the function can finish immediately, there is no need to use data publisher. However, if the data comes from the future, we can convert it into reactive code.

The following cases have a high chance to be converted to reactive code.

  • Multi-threading synchronization code. In multithreading, when some part logic is dependent on the finishing of some other logic in another thread. we can convert it into reactive code to better utilize the threads
  • When Future, CompletableFuture are used, those codes can be converted to reactive code.

Comparison Between Mono and Javascript Promise

In Javascript, there is a Promise object. Each then returns another promise. The then logic is triggered by the resolution of its owning promise.

const myPromise = new Promise(function(resolve, reject) {});
myPromise.then(function(value){}).then(function(value){})

In Project Reactor, the subscriber is being pushed data when the data is available. The publisher might need some time to finish preparing the data. (The promise might need some time to resolve)

In our previous code, each intermediate logic is triggered by the finishing of the previous logic, until the last subscriber is called.

deliveryPerson.knockDoor().flatMap(signal -> {
if (signal == DoorSignal.DOOR_RING)
return john.openDoor();
else
return
Mono.error(new Exception("signal is wrong"));
}).subscribe(signal -> {
if (signal == DoorSignal.DOOR_OPEN)
deliveryPerson.enterDoor();
});

This is similar to the promise.then execution order in javascript.

In fact, the frontend also uses event loop for executing the javascript callbacks. When the backend is switching to event loop, the problem to solve is the same. So similar language constructs are designed to solve the problem.

Isn’t the backend also learning from the frontend?

--

--

Ryan Zheng

I am a software developer who is keen to know how things work