Reactive programming for Java, revisited

Abstract

Rewritten on 15 January 2015
In this post I’ll talk about the problems callbacks introduce into the readability of asynchronous Java programs. I’ll also discuss an implementation of method pointers for Java which aids the transformation of a callback-style program into a linear program flow, greatly increasing readability while still being a 100% asynchronous program.

On reactive programming in Java

The traditional JEE thread-per-request model does, apparently, not scale well because these threads sit most of the time idly waiting on some blocking operation such as a database query, a filesystem operation or an external web service. Thus, large enterprise programs require very many threads, which tax the operating system’s memory and task scheduler significantly.

Clever people came up with the idea to write asynchronous programs: instead of having a thread wait for an I/O operation, the program execution creates a callback object, encodes all information it needs to process the results of the I/O operation, gives that object to the I/O code and continues with something else.

The problem of blocking resources is tackled with new APIs which require callbacks. Thus, instead of waiting for a blocking call…

ResultSet rs = stmt.executeQuery("SELECT * FROM USERS");
… we could write something different with a non-blocking JDBC API such as the async-mysql-connector I have written previously about:

The benefit here is that the call won’t block and the thread is free to do something else. Once the database returns the result set, a callback will handle the results.

ResultSetCallback rsCallback = new ResultSetCallback() {
  public void onResultSet(ResultSet rs) {
    while (rs.hasNext()) {
      rs.next();
      System.out.println(rs.getLong(1) + " " + rs.getString(2) + " " + rs.getString(3) + " " + rs.getTimestamp(4));
      }
     }
   };
st.executeQuery("select * from test", rsCallback);

 Callbacks

The crossing between a synchronous and asynchronous programming style is callbacks. And callbacks are ugly. While the previous example is reasonably succinct, if we add just a second query, things get messy. Plus: we need the data in the remainder of the program, but how do we get them out of the callback? Callbacks lead to nested control structures, heavy code indentation and confusion about variable scopes.

Futures?

Reactive programming is many things to many people; to me it is the return to a linear program notation by using references to future values.

The Java API hides some of the problems with the use of Futures, but futures don’t do away with the conceptual mismatch: callbacks are the necessary and only bridge that joins a sequential program with components that return asynchronous data.

In an ideal world, the entire JRE and every library would support futures. Lets look at a simple example of a login form. A listener on the submit button listens for clicks. It then calls an asynchronous service and passes along a callback. Once the service call returns, the callback is called and user information is shown on a form.

Form form = ...;
AuthService service = ...;
form.button.addClickListener(new ClickListener(){
  void onClick(){
    String login = form.getLogin();
    String password = form.getPassword();
    service.authenticate(login, password, new Callback(){
    void success(User user){
      form.show(user);
    }

    void error(Exception e){
      form.error(e);
    }
  });
}
} );

This example contains two asynchronous structures: a click listener on a button and a network callback on a remote service.

A 100% reactive API where both the UI components and the remote service API support futures would look like this:

Form form = ...;
AuthService service = ...;
Future login = form.getLogin();
Future password = form.getPassword();
Future user = service.authenticate(login, password);
form.show(user);
The form doesn’t really return strings; it returns futures which will resolve once the submit button is clicked. When the service’s authenticate method is invoked, it won’t dispatch a network request until both login and password are resolved first (thus: not before the submit button is clicked). The form will be given a future on a User object, but it won’t update the view until the service has finished the network request. Hence, a through and through asynchronous procedure has been put into order with a genuinely sequential programming declaration.
However, in reality, we need to mix reactive and non-reactive APIs all the time, and that’s when callbacks pop up everywhere.

The reactive-programming library

I’m experimenting on github with an approach that aims at allowing a linear programming style by using adapters and conventions which make callback code look like linear code. Callbacks are inherently predictable constructs: they have an “onSuccess” (or equivalent) method and, maybe, an “onError” method. They are being passed as references to asynchronous components and their methods are called once results are available.

The library has similarities to other reactive libraries: there are promises and there are callbacks. It differs however from other approaches in that it uses conventions which allow methods to be used directly as callbacks instead of callback objects:

FunctionPointer onUserAvailable(Promise user, Form form){
FunctionPointer fp = new FunctionPointer(this, user);
if (user.isAvailable()){
form.showUserName(user.fullName);
}
return fp;
}


void loginUser(){
String login =form.getLogin();
String password =form.getPassword();
Promise user = service.getUser(login, password);
user.invokeWhenAvailable(onUserAvailable(user, form));
}

This maybe doesn’t look like much at first sight, but we actually managed to call the method “onUserAvailable” once the “user” promise is resolved without any callbacks! Now what exactly happens here?

Function pointers are a little bit of magic. They are like bookmarks into code which can be called later on (I’ll disappoint Scheme fans here, this little trick is way short of full continuation support) . A simple example of just function pointers, without any reactive semantics:

FunctionPointer sayHi(String name){
System.out.println("Hello "+name);
return new FunctionPointer(this, name);
}


...

FunctionPointer fp = sayHi("george"); // prints "Hello george"
fp.invoke(); // prints "Hello george", again
This example will actually print “Hello george” twice. That is a bit annoying, but we’ll see later that in the reactive programming context this can be worked around easily.
Under the hood, Java doesn’t support function pointers so we have to be creative. What we want is a reference to a method and its arguments and we can get that by calling the method and returning a function pointer. The function pointer will grab the current stack, make a note of the calling method (which is the method we want to get a pointer to), record the current object (“this”) and any method arguments and package everything into a reference. By calling “invoke” on that function pointer, we can call the entire method any time later just as it was at the moment of the recording.
Thus, any method that returns a FunctionPointer can be used instead of a callback.

If you dig into the API you’ll notice that function pointers also implement the Promise interface, hence they can contain values. A trivial example of where that might come in handy:

FunctionPointer sum(Promise b1, Promise b2){
FunctionPointer fp = new FunctionPointer(this, b1, b2);
if (b1.isAvailable() && b2.isAvailable()){
fp.set(b1.get() + b2.get());
}
return fp;
}


...

Promise balance1 = bank.queryAccountBalance(12345);
Promise balance2 = bank.queryAccountBalance(67890);
Promise sum = balance1.invokeWhenAvailable(sum(balance1, balance2));
reactiveForm.showAccountBalanceSum(sum);
It is important to notice that the function pointer will be called multiple times in its life cycle: at least once during construction and possibly later once promises are resolved. That is why it is important to check, as in the example, that any arguments that are promises have been resolved. The callback method can return a function pointer and resolve it at any time with a new value, just as in the example.

Resources

[1] async-mysql-connector
[2] reactive-programming

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.