Java Akka Streams/Sources

From Wikibooks, open books for an open world
Jump to navigation Jump to search

Analysis of Iterable interface[edit | edit source]

An Iterable<T> is a good starting point for understanding Akka Streams. Following is the interface as defined in Java 8, omitting default methods.

public interface Iterable<T> {
    Iterator<T> iterator();
}
public interface Iterator<E> {
    boolean hasNext();
    E next();
}

We could easily create an anonymous inline implementation that return Integer ranging from 1 to 10.

Iterable<Integer> numbersOneToTen = new Iterable<Integer>() {

    @NotNull
    @Override
    public Iterator<Integer> iterator() {
        return new Iterator<Integer>() {

            int i = 0;

            @Override
            public boolean hasNext() {
                return i < 10;
            }

            @Override
            public Integer next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                return ++i;
            }
        };
    }
};

The Iterable<Integer> numbersOneToTen represents a blueprint for producing a sequence of numbers between 1 and 10. The Iterable<T> only represents potential, the actual work of emitting the integers is yet to be done. We can unleash that potential by putting the Iterable<T> in a for-loop.

for(Integer i : numbersOneToTen) {
    System.out.println(i);
}

The for-loop is semantically equivalent to manually traversing the Iterator<T>. By calling the iterator() method, the blueprint of the Iterable<T> is iteratively built through an Iterator<T> instance. The blueprint of the Iterable<T> can be used many times to create many Iterator<T> instances. But each Iterator<T> instance can only be used once.

Iterator<Integer> iterator = numbersOneToTen.iterator();
while(iterator.hasNext()) {
    Integer i = iterator.next();
    System.out.println(i);
}

This represents a pull technology, where the driving thread synchronously blocks while fetching the next element.

We can make the Iterable<T> unbounded, to return a sequence of all positive integers.

Iterable<Integer> positiveIntegers = new Iterable<Integer>() {

    @NotNull
    @Override
    public Iterator<Integer> iterator() {
        return new Iterator<Integer>() {

            int i = 0;
            
            @Override
            public boolean hasNext() {
                return true;
            }

            @Override
            public Integer next() {
                return ++i;
            }
        };
    }
};

Running Iterable<Integer> positiveIntegers would cause an endless loop. So care must be taken when consuming positiveIntegers.

for(Integer i : positiveIntegers) {
    if (i > 10) {
        break;
    }
    System.out.println(i);
}

Understanding Iterable<T> serves as a good first step to understanding other streaming technologies. But take note of the limitations:

  • Single threaded.
  • Iterator<T> is generally not thread safe.
  • The code blocks while waiting for next element.
  • The Iterator<T> is idle between calls to next().

Turning Iterable<T> into Source<T, NotUsed>[edit | edit source]

There are many implementations of Iterable<T> interface. Our first usage of Akka Streams will be converting an Iterable<T> into a Source<T, NotUsed>, and then running that source with a materializer.

There are plenty of helper methods on the akka.stream.javadsl.Source class for the creation of sources.

Iterable<Integer> numbersOneToTen = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Source<Integer, NotUsed> source = Source.from(numbersOneToTen);

Take a look at the generic types of the akka.stream.javadsl.Source<Out, Mat> signature. The type Out is the element type Integer in our above code, but what does the type Mat represent, and why is it NotUsed? The short answer right now is that Mat is the materialized value. Since we are not using the materialized value, the type is NotUsed.

You will see that the Source<T, M> is blueprint, much like an Iterable<T> is a blueprint, and that during construction of blueprint we not only process values of type T, but also produce a materialized value of type M.

Akka Streams is naturally multi-threaded. The traversal of the stream is scheduled on a thread pool managed by the Akka system. The materialized value is constructed on the thread running the source in a graph. This will be relevant, but for our simple Iterable<Integer>, there is no materialized value when converting to an akka.stream.javadsl.Source<T, M>, so the type M is simply given a placeholder of NotUsed.

NotUsed is a class that has a singleton instance with no methods nor properties. Should you ever need this NotUsed singleton, you can get it by calling akka.NotUsed.getInstance().

Running a Source<T, M>[edit | edit source]

Akka Streams uses the Actor System to process sources within graphs.

We need a akka.stream.ActorMaterializer. Although the Actor System is strongly related, our focus for this book is Akka Streams. To this end, we will simply show you a shortcut to obtain akka.actor.ActorSystem. Much more consideration should be given in a production environment. As a rule, you should only have one ActorSystem shared by all your Akka Streams.

The following is a good basis for experimentation with JUnit.

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.ActorMaterializerSettings;
import akka.stream.Materializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;

public abstract class AkkaStreamTesting  {

    protected static ActorSystem system;
    protected static Materializer materializer;

    @BeforeClass
    public static void beforeClass() {
        system = ActorSystem.create();
        ActorMaterializerSettings settings = ActorMaterializerSettings.create(system);
        materializer = ActorMaterializer.create(settings, system);
    }

    @AfterClass
    public static void afterClass() throws Exception {
        system.terminate();
    }

}

Once you have a Materlizer, you can easily run a source.

Iterable<Integer> numbersOneToTen = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Source<Integer, NotUsed> source = Source.from(numbersOneToTen);
CompletionStage<Done> futureDone = source.runForeach(System.out::println, materializer);
futureDone.toCompletableFuture().join();

You should take a moment to appreciate the above code. Without much effort, processing of source is done on a thread pool managed by the ActorSystem. Notice how the runForeach(...) returns a future. You could do other work on the calling thread while your Akka Stream is running. To coordinate execution, the future is joined, thereby blocking until the Akka Stream is processed.

At this point you may assume that all Akka Streams provide a CompletionStage<Done>. Although this is a common pattern, this is actually just the materialized value. You see, we could write equivalent code that more explicitly handles the materialized value.

Iterable<Integer> numbersOneToTen = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Source<Integer, NotUsed> source = Source.from(numbersOneToTen);
Sink<Integer, CompletionStage<Done>> foreach = Sink.foreach(System.out::println);
RunnableGraph<CompletionStage<Done>> graph = source.toMat(foreach, Keep.right());
CompletionStage<Done> futureDone = graph.run(materializer);
futureDone.toCompletableFuture().join();

Getting a little ahead of ourselves, note the Sink<Integer, CompletionStage<Done>> foreach and the intermediate RunnableGraph<CompletionStage<Done>> that has the generic type of the materialized value. Only because our materialized value is CompletionStage<Done> do we get futureDone when running the graph.