Starting Android App development with RxJava 2.0

Upgrading to the latest release of a library is usually as simple as changing the version number in your dependency configuration, but for RxJava users, making the switch to RxJava 2.0 isn’t quite so straightforward.

For version 2.0, RxJava has been completely rewritten on top of the new Reactive Streams specification, and while its operators remain largely unchanged, RxJava 2.0 overhauls some pretty fundamental parts of the RxJava workflow, including maintaining subscriptions and handling the long-standing problem of backpressure.

In this article I’m going to cover all the major breaking changes you need to be aware of when migrating from RxJava 1.0 to RxJava 2.0. And, if you’re new to RxJava, then I’ll also be outlining the RxJava fundamentals, so you can start your RxJava journey with the latest release of this powerful Reactive Programming library.

RxJava 2.0 fundamentals

RxJava is a JVM-compatible library that provides an efficient, structured way of working with asynchronous streams of real-time data in a reactive programming style.

The RxJava 2.0 library is particularly useful in Android development, as mobile apps tend to be asynchronous by nature. At any one time, an Android app may be monitoring a network connection for any updates that it can incorporate into its user interface (UI), while pulling information from a database, and responding to any user input events that occur. RxJava gives you a way of writing code that can react to all these different events as they happen, without having to write a tonne of callbacks.

The RxJava workflow consists of a stream, reactive objects that consume this stream and operators that transform the data being emitted by each stream. You implement this workflow using the following components:

1. An Observable

An Observable is an object that emits zero or more items, calling onNext() each time it emits an item. By default, an Observable doesn’t start emitting data until it’s been assigned an Observer.

Once an Observer has emitted all of its data, it terminates by calling either:

  • onComplete. The operation was a success, and the Observable has no more items to emit. Note that in RxJava 1.0, onComplete was onCompleted.
  • onError. Processing onNext() resulted in an exception. If an onError() occurs, then the Observable passes this error up the chain to its assigned Observer, which is then responsible for handling this error. While you can create an Observer without defining an action for onError, this can result in errors going unhandled, and therefore isn’t recommended.

2. An Observer

As soon as you assign an Observer to an Observable, it starts listening for emissions from that Observable. It’s possible for an Observable to have multiple Observers.

3. Operators

RxJava supports a large collection of operators that you can use to modify, combine and compose the data being emitted by an Observable. For example, here we’re applying the map operator to a string:

Observable<String> caps = name.map(s -> s.toUppercase());

In addition to transforming data, you can use RxJava’s operators to create multi-threaded applications. Here we’re creating an Observable that executes on a new thread:

Observable<Name> name =
       name.subscribeOn(Schedulers.newThread())

If you do perform work on any thread other than Android’s main UI thread, you can use the observeOn operator to send the result of this work back to the main thread. The easiest way to achieve this, is to use the RxAndroid library:

dependencies {  
...  
...  
...  
  compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}

The RxAndroid library provides the AndroidSchedulers.mainThread scheduler, which you can use to send the results of an Observable to your app’s main UI thread, in a single line of code:

.observeOn(AndroidSchedulers.mainThread())

Applying an operator to an Observable almost always returns another Observable, so you can perform complex, multi-step data transformations by chaining multiple operators together.

Adding RxJava 2.0 to Android Studio

To start working with the RxJava 2.0 library, open your module-level build.gradle file and add the latest release of RxJava 2.0 as a project dependency:

dependencies {
...
...
...
   compile 'io.reactivex.rxjava2:rxjava:2.1.5'

If you’re migrating from RxJava, this dependency probably looks very different from what you were expecting, as RxJava 2.0 has a completely different set of Maven coordinates compared to RxJava 1.0. This change also affects RxJava 2.0’s import statements:

import io.reactivex.Observable;

Compared to RxJava 1.0’s:

import rx.Observable;

These different package names give you the flexibility to use RxJava 1.x and RxJava 2.x code side-by-side in the same project, which makes it easier to migrate your existing projects to RxJava 2.0. Just add the RxJava 2.0 dependency and you can start using the new features straight away, without having to immediately update all of your existing RxJava 1.0 code to target RxJava 2.0.

However, including both versions of the RxJava library in a project will increase the size of your APK, so while it’s possible to use both libraries side-by-side, this shouldn’t be a long-term strategy, and you should still make a point of updating your legacy code to use RxJava 2.0.

Adding Java 8.0 support

Implementing an Observer can sometimes be a clunky process, so I’ll be using lambda expressions to help keep the amount of boilerplate code under control.

Although you can use all of RxJava 2.0’s features without having to write a single lambda expression, if you want to use the code samples in this article then you’ll need to update your project to use Java 8.0:

android {    
    compileSdkVersion 26    
    buildToolsVersion "26.0.1"    
    defaultConfig {        
        applicationId "com.jessicathornsby.myapplication"        
        minSdkVersion 26        
        targetSdkVersion 26        
        versionCode 1        
        versionName "1.0"        
        testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"

//Add the following block of code//
        compileOptions {            
             sourceCompatibility JavaVersion.VERSION_1_8            
             targetCompatibility JavaVersion.VERSION_1_8

Create an RxJava 2.0 app

Let’s create a simple Observable, using the Observe.just() method:

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;import 
io.reactivex.Observable;

public class MainActivity extends AppCompatActivity {

   private static final String TAG = "MainActivity";

   @Override   
   protected void onCreate(Bundle savedInstanceState) {       
   super.onCreate(savedInstanceState);       
   setContentView(R.layout.activity_main);

       {
           Observable<String> source =                   
                Observable.just("Testing", "One", "Two", "Three");           
           source.subscribe(s -> Log.e(TAG, "RECEIVED: " + s));

       }   
   }
}

Run this project on your physical Android device or Android Virtual Device (AVD), and it’ll print each emission to Android Studio’s Logcat.

At the moment, this Observer is simply receiving and emitting the same sequence of data, but you could also transform this data using one or more operators. Here we’re using the map() operator to convert each string to an integer:

Observable<String> source =
                   Observable.just("Testing", "One", "Two", "Three");

//Create an Observable<Integer> that’s derived from the original Observable<String>//

           Observable<Integer> count = source.map(String::length);
           count.subscribe(s -> Log.e(TAG, "RECEIVED: " + s));

       }
   }
}

This gives us the following output:

It’s possible to subscribe multiple Observers to the same Observable:

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Observable;

public class MainActivity extends AppCompatActivity {

   private static final String TAG = "MainActivity";

   @Override
   protected void onCreate(Bundle savedInstanceState) {
       super.onCreate(savedInstanceState);
       setContentView(R.layout.activity_main);

       {

           Observable<String> source =
                   Observable.just("Testing", "One", "Two", "Three");
           source.subscribe(s -> Log.e(TAG, "FIRST OBSERVER RECEIVED: " + s));
           Observable<Integer> count = source.map(String::length);
           count.subscribe(s -> Log.e(TAG, "SECOND OBSERVER RECEIVED: " + s));
 
       }
   }
}

As you can see from the output, the first Observer receives the entire data set before the second Observer starts receiving data. This is because most Observables are by default cold Observables which replay the same data set to each Observer in turn.

If you want an Observable to send each emission to all of its assigned Observers simultaneously, then you’ll need to create a hot Observable, and one method is to use a ConnectableObservable.

It’s important to note that ConnectableObservable doesn’t start sending data to its Observers automatically, so once all of your Observers are in place you’ll need to give your Observable the go-ahead by calling the connect() method.

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;

public class MainActivity extends AppCompatActivity {

   private static final String TAG = "MainActivity";

@Override
   protected void onCreate(Bundle savedInstanceState) {
       super.onCreate(savedInstanceState);
       setContentView(R.layout.activity_main);

       {
           ConnectableObservable<String> source =

                   Observable.just("Testing", "One", "Two", "Three")
                           .publish();
          source.subscribe(s -> Log.e(TAG, "FIRST OBSERVER RECEIVED: " + s)); 
          Observable<Integer> count = source.map(String::length);
          count.subscribe(s -> Log.e(TAG, "SECOND OBSERVER RECEIVED: " + s));
          source.connect();
       }
   }
}

This gives us the following output, where each emission is sent to both Observers simultaneously:

Creating more Observables

When it comes to creating Observables, Observable.create() isn’t your only option. RxJava 2.0 supports a long list of convenience methods, including:

  • Observable.just(). Converts any object into an Observable, by acting as a wrapper around other data types.
    Observable<String> observable = Observable.just("Hello World!");
    
  • Observable.fromArray(). Converts an array into an Observable stream.
    final String[] myString = {"One", "Two", "Three", "Four"};
    final Observable<String> observable Observable.fromArray(myString);
    
  • Observable.range(). Emits a range of sequential integers.
    Observable<Integer> observable = Observable.range(0, 5);
    
  • Observable.interval(). Emits a sequence of ascending integers at an interval specified by you.
    Observable.interval(1, TimeUnit.SECONDS)
    
  • Observable.empty(). Creates an Observable that emits nothing and then calls onComplete(). An empty Observable is basically RxJava’s version of null, although Observable.empty() won’t result in a NullPointerException.

RxJava 2.0 also has a couple of important Observable variants.

Maybe

‘Maybe’ is a new base reactive type introduced in RxJava 2. A Maybe represents an Observable that may emit an item, an error, or nothing at all – hence the name ‘Maybe!’

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Maybe;

public class MainActivity extends AppCompatActivity {

   private static final String TAG = "MainActivity";

   @Override   
   protected void onCreate(Bundle savedInstanceState) {
       super.onCreate(savedInstanceState);
       setContentView(R.layout.activity_main);

       Maybe.just("Hello World")
               .subscribe(s -> Log.e(TAG, s),
                       throwable -> Log.e(TAG, "error"));

   }
}

Single

A Single is an Observable that either completes successfully by emitting a single item (again, the clue’s in the name) or fails by emitting an error.

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Single;

public class MainActivity extends AppCompatActivity {

   private static final String TAG = "MainActivity";

@Override
   protected void onCreate(Bundle savedInstanceState) {
       super.onCreate(savedInstanceState);
       setContentView(R.layout.activity_main);
{

           Single.just("Hello World")
           .subscribe(s -> Log.e(TAG, s));

       }
   }
}

Flowables and backpressure

By default, RxJava operates a push-based workflow, where the Observable pushes its data downstream to its assigned Observable(s). This pushed-based workflow can cause a problem if the source Observable emits items too quickly for the downstream Observer to process, resulting in a backlog of unconsumed items that take up precious space in the device’s memory.

To help combat this problem, RxJava 2.0 introduced a Flowable class that allows you to control backpressure, by telling the source to emit data at a pace that the downstream Observers can process.

RxJava 1.0’s Observables attempted to combine the functionality of a “standard” Observable and the functionality that’s now offered via a Flowable, but in RxJava 2.0 there’s a very clear distinction between the two:

  • Observables are no longer backpressured.
  • Flowables are inherently capable of supporting backpressure.

By replacing an Observable with a Flowable, you can control how many items are emitted within a specific period of time.

Most of the Observable convenience methods also work with Flowable, so you can create a Flowable in pretty much the same way you’d create an Observable:

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import io.reactivex.Flowable;
import android.util.Log;
import org.reactivestreams.Subscriber;
import io.reactivex.subscribers.DisposableSubscriber;

public class MainActivity extends AppCompatActivity {

   private static final String TAG = "MainActivity";

   @Override
   protected void onCreate(Bundle savedInstanceState) {
       super.onCreate(savedInstanceState);
       setContentView(R.layout.activity_main);

   Flowable<String> flowable = Flowable.just("Hello World");
   Subscriber<String> mySubscriber = new DisposableSubscriber<String>(){

public void onNext(String s) {
   Log.e(TAG, "Next");
}

public void onError(Throwable t) {
   Log.e(TAG, "Error" );
}
public void onComplete() {
   Log.e(TAG, "Completed");
}
                 };

   flowable.subscribe(mySubscriber);
      }
   }

Once you’ve created your Flowable, you can specify how you want to control the flow of data by using BackpressureStrategy and setting it to one of the following values:

  • BUFFER. Buffers the onNext() values in memory until the downstream can consume it, for example BackpressureStrategy.BUFFER. Note that this can still lead to an OufOfMemoryError.
  • DROP. If the Observer can’t keep up, then drop the most recent onNext() value.
  • LATEST. Keeps only the latest onNext() value, dropping all previous values that the Observer hasn’t consumed.
  • ERROR. Signals a MissingBackpressureException as soon as the downstream can’t keep up.
  • MISSING. OnNext() events are written without any buffering or dropping.

The major disadvantage to the backpressure-aware Flowable, is that they do incur more of an overhead than an Observable, so in the interests of creating a high-performing app you should stick with Observables until backpressure becomes a problem. As a general rule, it’s usually safe to stick with Observables when you’re dealing with less than 1,000 emissions, or infrequent events.

Disposable

Processing an Observable’s emissions requires resources, so long-running or infinite Observables are a potential source of memory leaks. Memory leaks always have a negative impact on performance, but they’re a particular problem for devices where memory is restricted to begin with, such as Android smartphones and tablets.

Finite Observables that call onComplete() will typically dispose of themselves, but if you’re working with an Observable that has the potential to run for a significant period of time or even infinitely, you’ll need to explicitly disconnect this Observer from its Observable, which will free up resources ready to be garbage collected.

In RxJava 1.0, the rx.Subscription interface was responsible for unsubscribing an Observer. However, the Reactive-Streams specification uses the word “Subscription” for another purpose, so to avoid a naming conflict RxJava 1.0’s rx.Subscription has essentially become io.reactivex.Disposable in RxJava 2.0. You can now break the connection between an Observable and its assigned Observer, by calling .dispose().

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import io.reactivex.Flowable;
import android.util.Log;
import io.reactivex.disposables.Disposable;
import io.reactivex.subscribers.DisposableSubscriber;

public class MainActivity extends AppCompatActivity {

   private static final String TAG = "MainActivity";

@Override
   protected void onCreate(Bundle savedInstanceState) {
       super.onCreate(savedInstanceState);
       setContentView(R.layout.activity_main);

       Disposable d =
           Flowable.just(1)
               .subscribeWith(new DisposableSubscriber<Integer>() {

   @Override
      public void onNext(Integer integer) {
        Log.e(TAG, "Next" );
   }
   public void onError(Throwable t) {
      Log.e(TAG, "Error");  
   }

   public void onComplete() { 
      Log.e(TAG, "Completed");
                           }                       
});
       d.dispose();
   }
}

No More Nulls

In version 2.0, RxJava no longer accepts null values. Try to create an Observable that emits a null value, and you’re going to encounter a NullPointerException. For example, both of the following will result in an error:

Observable.just(null);
Single.just(null));

If you do want to use null values in your code, then you can use Optionals in API level 24 and higher.

Wrapping up

In this article we looked at some of the major changes you need to be aware of when making the move from RxJava 1.0 and RxJava 2.0, as well as the RxJava basics you’ll need to know when adding this library to your projects for the first time.

If you want to continue exploring what’s possible with RxJava, then there’s a number of additional Android-specific RxJava libraries that are well worth exploring, including RxBinding and RxPermissions. If you have any other recommendations for RxJava libraries, then let us know in the comments below!

No comments:

Powered by Blogger.