Reactive Programming in Kotlin
上QQ阅读APP看书,第一时间看更新

Subscribing and disposing

So, we have Observable (the thing that should be observed upon) and we have Observer (that should observe); now what? How to connect them? Observable and Observer are like an input device (be it keyboard or mouse) and the computer, we need something to connect them (even wireless input devices have some connectivity channels, be it Bluetooth or Wi-Fi).

The subscribe operator serves the purpose of the media by connecting an Observable to Observer. We can pass one to three methods (onNext, onComplete, onError) to the subscribe operator, or we can pass an instance of the Observer interface to the subscribe operator to get the Observable connected with an Observer.

So, let's take a look at the following example now:

    fun main(args: Array<String>) { 
      val observable:Observable<Int> = Observable.range(1,5)//1 
 
      observable.subscribe({//2 
        //onNext method 
        println("Next $it") 
      },{ 
        //onError Method 
        println("Error ${it.message}") 
      },{ 
        //onComplete Method 
        println("Done") 
     }) 
 
     val observer: Observer<Int> = object : Observer<Int> {//3 
        override fun onComplete() { 
          println("All Completed") 
        } 
 
        override fun onNext(item: Int) { 
          println("Next $item") 
        } 
 
        override fun onError(e: Throwable) { 
          println("Error Occurred ${e.message}") 
        } 
 
        override fun onSubscribe(d: Disposable) { 
          println("New Subscription ") 
        } 
    } 
 
    observable.subscribe(observer) 
  } 

In this example, we have created Observable instance (on comment 1) and used it twice with different overload subscribe operators. On comment 2, we have passed three methods as arguments to the subscribe method. The first parameter is the onNext method, the second one is the onError method, and last, onComplete. On comment 2, we have passed an instance of the Observer interface.

The output can be easily predicted as follows:

So, we have got the concepts of subscribing, and we can do it now. What if you want to stop the emissions after some period of subscription? There must be a way, right? So let's inspect this.

Remember the onSubscribe method of Observer? There was a parameter on that method that we have not talked about yet. While you subscribe, if you pass the methods instead of the Observer instance, then the subscribe operator will return an instance of Disposable, or if you use an instance of Observer, then you will get the instance of Disposable in the parameter of the onSubscribe method.

You can use the instance of the Disposable interface to stop emissions at any given time. Let's take a look at this example:

    fun main(args: Array<String>) { 
      runBlocking { 
        val observale:Observable<Long> = 
Observable.interval(100,TimeUnit.MILLISECONDS)//1 val observer:Observer<Long> = object : Observer<Long> { lateinit var disposable:Disposable//2 override fun onSubscribe(d: Disposable) { disposable = d//3 } override fun onNext(item: Long) { println("Received $item") if(item>=10 && !disposable.isDisposed) {//4 disposable.dispose()//5 println("Disposed") } } override fun onError(e: Throwable) { println("Error ${e.message}") } override fun onComplete() { println("Complete") } } observale.subscribe(observer) delay(1500)//6 } }

I hope you remember the Observable.interval factory method, from just few pages ago in this chapter. This method takes two parameters describing the interval period and time unit, then, it prints integers sequentially, starting from 0. Observable created with interval never completes and never stops until you stop them or the program stops execution. I thought it will be the perfect fit in this scenario, as here we want to stop the Observable midway.

So, in this example on comment 1, we created an Observable with the Observable.interval factory method that will emit an integer after each 100 millisecond interval.

On comment 2, I have declared a lateinit var disposable of type Disposable (lateinit means the variable will get initialized at a later point of time). On comment 3, inside the onSubscribe method, we will assign the received parameter value to the disposable variable.

We intend to stop the execution after the sequence reaches 10, that is, after 10 is emitted, the emission should be stopped immediately. To achieve that, we placed a check inside the onNext method, where we are checking if the value of the emitted item is equal to or greater than 10, and if the emission is not already stopped (disposed), then we will dispose the emission (comment 5).

Here is the output:

Received 0
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
Disposed

From the output, we can see that no integer got emitted after the disposable.dispose() method was called, although the execution waited 500 milliseconds more (100*10=1000 milliseconds to print sequence until 10, and we called the delay method with 1500, thus 500 milliseconds after emitting 10).

If you are curious to know the Disposable interface, then the following is the definition:

    interface Disposable { 
      /** 
      * Dispose the resource, the operation should be idempotent. 
      */ 
      fun dispose() 
      /** 
      * Returns true if this resource has been disposed. 
      * @return true if this resource has been disposed 
      */ 
      val isDisposed:Boolean 
    } 

It has one property that denotes if the emission is already notified to stop (disposed) and a method to notify the emission to stop (dispose).