RxJava — Reactive Extensions for Java

RxJava is a Reactive Extensions implementation for Java environment. A combination of functional and reactive techniques that can represent an elegant approach to event-driven programming.

Functional programming is the process of building software by composing pure functions, avoiding shared state, mutable data, and side-effects.

Reactive programming is basically event-based asynchronous programming or is an asynchronous programming paradigm concerned with data streams and the propagation of change.

There are two key types for reactive programming: Observable and Observer. An observable emits items; an observer consumes those items.

Observer interface methods

  • onSubscribe
  • OnNext: On the new item is emitted from the observable
  • OnComplete
  • OnError

Important Headache : Backpressure Problem: In RxJava it is not difficult to get into a situation in which an Observable is emitting items more rapidly than an operator or subscriber can consume them. This presents the problem of what to do with such a growing backlog of unconsumed items.

If backpressure problem is not handled then throws an exception such as MissingBackpressureException or OutOfMemoryError

Creating Observable

Observable Types:

TypeDescription
*Flowable<T>Emits 0 or n items and terminates with an success or an error event. Supports backpressure, which allows to control how fast a source emits items.
*Observable<T>Emits 0 or n items and terminates with an success or an error event. No support for backpressure.
+Single<T>Emits either a single item or an error event. The reactive version of a method call.
+Maybe<T>Succeeds with a single item, or 0 item, or errors. The reactive version of an Optional.
#CompletableEither completes with an success or with an error event. It never emits items. The reactive version of Runnable.

Convenience methods to create observables:

  • .just(): produces an Observable that emits a single generic.
  • .fromIterable() : produces n emits.
  • .fromArray()
  • .from…()
  • .interval()
  • .range…()
  • .repeat()

Subscription and Disposing

To receive the data emitted from an observable you need to subscribe to it via .subscribe() or .subscribeWith()

When listeners or subscribers are attached they usually are not supposed to listen eternally, so dispose of them to avoiding memory leaks via .dispose()

When working with multiple subscriptions use CompositeDisposable. Add the return of subscribes to composite disposable. Then use CompositeDisposable.dispose() or CompositeDisposable.clear() to dispose them all.

Single Caching

When working with observables doing async calls on every subscription on an observable is often not necessary. Ex: expensive works like web request. Use cache method, so that the Single instance keeps its result, once it was successful for the first time.

Regular:
Single<List<Todo>> todosSingle = Single.create(..)
todosSingle.subscribe(...)

Cached:
Single<List<Todo>> todosSingle = Single.create(..)
Single<List<Todo>> cachedSingle = todosSingle.cache();
cachedSingle.subscribe(...)

RxAndroid providers a scheduler to run code in the main thread of Android. It also provides the ability to create a scheduler that runs on a Android handler class. With this schedulers, you can define an observable which does its work in a background thread, and post our results to the main thread. This allows for example to replace a AsyncTask implementations which RxJava.

Here is a sample for AndroidSchedulers.mainThread() and Schedulers.io() :

serverDownloadObservable.
                        observeOn(AndroidSchedulers.mainThread()).
                        subscribeOn(Schedulers.io()).  
                        subscribe(integer -> {
                            updateTheUserInterface(integer);
                            view.setEnabled(true); 
                        });
            }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

Schedulers

They play major role in supporting multithreading concept in android applications. Schedulers basically decides the thread on which a particular code runs on.

  • Schedulers.io(): To perform non CPU-intensive operations 
  • AndroidSchedulers.mainThread() : To access to android Main Thread / UI Thread.
  • Schedulers.computation(): to perform CPU-intensive operations like processing huge data, bitmap processing etc.
  • Schedulers.newThread() 
  • Schedulers.single() 
  • Schedulers.immediate()
  • Schedulers.trampoline()
  • Schedulers.from()

Operator / Transformation

Operators modify the data emitted by Observable before an observer receives them.

  • Filter
  • Map
  • Scan
  • GroupBy
  • Conditional Operators : defaultIfEmpty, takeWhile, skipUntil etc.

Testing

Flowable can be tested with io.reactivex.subscribers.TestSubscriber for test object subscribeWith it.

Non-backpressured Observable, Single, Maybe and Completable can be tested with io.reactivex.observers.TestObserver.

The TestSchedular class is very useful when testing time-based operators (e.g. timeout(), buffer(), window(), etc).

TestObserver<Object> testObserver = new TestObserver<>();
todoObservable.subscribeWith(testObserver);
testObserver.assertError(NullPointerException.class);

Flatmap vs Map Operators

Map modifies each item emitted by a source Observable and emits the modified item.

FlatMapSwitchMap, and ConcatMap also apply a function on each emitted item but instead of returning the modified item, it returns the Observable itself which can emit data again.

FlatMap is used to map over asynchronous operations.

ConcatMap same with flatMap but emits items in synchronous order so has to wait for each item.

flatMap: [B1, A1, B2, C1, B3, A2, C2, C3, A3]
concatMap: [A1, A2, A3, B1, B2, B3, C1, C2, C3]
switchMap: [A1, B1, B2, C1, C2, C3] – all previously generated intermediate streams are terminated.

Parallelization of requests with Multi Thread

  1. Use flatMap, create an Observable off it, do a subscribeOn() to the scheduler.
Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));

2. Use Parallel with Flowable

Flowable<Integer> vals = Flowable.range(1, 10);

vals.parallel()
        .runOn(Schedulers.computation())
        .map(i -> intenseCalculation(i))
        .sequential()
        .subscribe(val -> System.out.println(val));

Zip Operator

Zip operator strictly pairs emitted items from observables. It waits for both (or more) items to arrive then merges them.



Non-Blocking: asynchronous execution is supported. Allowed to unsubscribe at any point.

Blocking: all onNext observer calls will be synchronous. It is not possible to unsubscribe in the middle of an event stream

Observable<String> observable = Observable.just("Hello");
observable.subscribe(s -> result = s);

Hot and cold Observables

Cold observable emits sequence of items when observer demands without disrupting the integrity of sequence. Ex: Observable might include the results of a database query, file retrieval, or web request

A hot observable starts emitting data Immediately when its created. Ex: Observable might include mouse & keyboard events, system events, or stock prices.

How to Avoid from BackPressure

by Operators – Throttling – Emit an item

Operators like sample( ) or throttleLast( )throttleFirst( ), and throttleWithTimeout( ) or debounce( ) allow you to regulate the rate at which an Observable emits items.

The sample operator periodically “dips” into the sequence and emits only the most recently emitted item during each dip.

The throttleFirst operator is similar, but emits not the most recently emitted item, but the first item that was emitted after the previous “dip”.

The debounce operator emits only those items from the source Observable that are not followed by another item within a specified duration.

by Operators – Buffers and Windows – Emit a list of items

You can also use an operator like buffer( ) or window( ) to collect items from the over-producing Observable and then emit them, less-frequently, as collections (or Observables) of items. 

Buffer : You could, for example, close and emit a buffer of items from the bursty Observable periodically, at a regular interval of time.

Window is similar to buffer. Allows you to periodically emit Observable windows of items at a regular interval of time.

by establishing “Reactive Pull” backpressure mechanism to subscriber

Another way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). 

If the Observable, all of the operators that operate on it, and the observer that is subscribed to it, are all operating in the same thread, this effectively establishes a form of backpressure by means of callstack blocking.

Implementing Custom Operator 

Sequence Operator : Operator<T> by using .lift(MyOperator<T>) for emiting altered T item.

Transformational Operator: Transformer<Cls1, Cls2> by using .compose(MyTransformer<Cls1, Cls2>) for emitting Cls2 by transforming from Cls1 item.

Error Handling Operators

  • onErrorResumeNext( ) — instructs an Observable to emit a sequence of items if it encounters an error
  • onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error
  • onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
  • retry( ) — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error
  • retryWhen( ) — if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

Unhandled exceptions will be handled by global RxJavaPlugins.setErrorHandler(Throwable) method. Latter unhandled exceptions of parallel processes will be handled by that function. 

Subjects

A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by re-emitting them, and it can also emit new items.

  • Publish Subject
    It emits all the subsequent items of the source Observable at the time of subscription.
  • Replay Subject
    It emits all the items of the source Observable, regardless of when the subscriber subscribes.
  • Behavior Subject
    It emits the most recently emitted item and all the subsequent items of the source Observable when an observer subscribes to it.
  • Async Subject
    It only emits the last value of the source Observable(and only the last value) only after that source Observable completes.

When should I use a subject?

When all of the following are true:

  • you don’t have an observable or anything that can be converted into one.
  • you require a hot observable.
  • the scope of your observable is a type. (e.g., it’s exposed as a public property and backed by a field)
  • you don’t need to define a similar event and no similar event already exists.

RxRelays

A Subject except without the ability to call onComplete or onError.

Extra

Fetching from a cache or network

Get me a value from a cache if it exists, otherwise fetch it from the network

Maybe<List<Event>> source1 = cacheRepository.getEventsFeed(...);
Single<List<Event>> source2 = networkRepository.getEventsFeed(...);
Maybe<List<Event>> source = Maybe.concat(source1, source2.toMaybe()).firstElement();

References

  1. https://www.baeldung.com/rxjava-tutorial
  2. https://github.com/ReactiveX/RxJava/wiki/Backpressure
  3. http://www.vogella.com/tutorials/RxJava/article.html
  4. https://stackoverflow.com/questions/30219877/rxjava-android-how-to-use-the-zip-operator
  5. http://reactivex.io/documentation/subject.html


Leave a Reply

Your email address will not be published. Required fields are marked *