Advanced Streams

2024 02 01 head

The Java Stream API is a powerful and simple to understand set of tools for processing sequences of elements. The standard collections were retrofitted with the stream() method, which allows us to convert any collection to a stream.

Modern Java code has almost no loops and conditional statements. It relies on the Stream API and the functional programming style to implement algorithms.

When used properly, it allows us to reduce a huge amount of boilerplate code, create more readable programs, and improve productivity [1, 2].

The majority of Java developers I encountered are still not familiar with how to apply the Stream API when writing code.

Introductory Java courses do not extensively cover the Stream API. Most code examples are still written in an ancient imperative style. The usage of the Stream API and Optional is not covered in detail. Too much time is used to explain the basics of the various loops and conditional statements instead of teaching modern concepts.

Java developers should never return null from a method. Instead, they should return an empty collection or an optional.

You are truly using a functional programming style if some of your methods have lambda expressions as parameters or return values.

Creating Streams

There are many ways to create a stream instance of different sources. Once created, the instance will not modify its source, therefore allowing the creation of multiple instances from a single source.

public Stream<String> streamOf(List<String> list) {
    return list == null || list.isEmpty() ? Stream.empty() : list.stream();                 (1)
}

public Stream<String> streamOf(String[] array) {
    return array == null || array.length == 0 ? Stream.empty() : Arrays.stream(array);      (2)
}

IntStream intStream = IntStream.range(1, 3);                                                (3)
LongStream longStream = LongStream.rangeClosed(1, 3);

StreamSupport.stream(iterable.spliterator(), false);                                        (4)
1 The stream() method is used to convert a collection to a stream.
2 The Arrays.stream() method is used to convert an array to a stream.
3 The range() method is used to create a stream of integer or long values.
4 The StreamSupport.stream() method is used to convert an iterable to a stream.

Map and Filter

2024 02 01 flatmap

The map() method is used to transform one object into another by applying a transformation function. It produces a new stream after applying the function to each element of the original stream. The new stream could be of a different type.

The filter() method is used to eliminate elements based on a condition. It produces a new stream that contains elements of the original stream that pass a given test specified by a Predicate.

The flatMap() method is used to transform one object into another by applying a function that returns a stream.

A stream can hold complex data structures like Stream<List<String>>. In cases like this, flatMap() helps us to flatten the data structure to simplify further operations.

Collectors

There are three variations of this method, which differ by their signatures and returning types. They can have the following parameters:

Identity

the initial value for an accumulator, or a default value if a stream is empty and there is nothing to accumulate

Accumulator

a function which specifies the logic for the aggregation of elements. As the accumulator creates a new value for every step of reducing, the quantity of new values equals the stream’s size and only the last value is useful. This is not very good for the performance.

Combiner

a function which aggregates the results of the accumulator. We only call combiner in a parallel mode to reduce the results of accumulators from different threads.

The reduction of a stream can also be executed by another terminal operation, the collect() method. It accepts an argument of the type Collector, which specifies the mechanism of reduction. There are already created, predefined collectors for most common operations. They can be accessed with the help of the Collectors type.

In this section, we will use the following List as a source for all streams:

// Copy Converting a stream to the Collection (Collection, List or Set):
List<Product> productList = List.of(new Product(23, "potatoes"), new Product(14, "orange"),
                        new Product(13, "lemon"), Product(23, "bread"), new Product(13, "sugar"));

// Copy Reducing to String:
List<String> collectorCollection = productList.stream().map(Product::getName).collect(Collectors.toList());

// Copy The joiner() method can have from one to three parameters (delimiter, prefix, suffix):
String listToString = productList.stream().map(Product::getName)
                                          .collect(Collectors.joining(", ", "[", "]"));

The most convenient thing about using joiner() is that the developer does not need to check if the stream reaches its end to apply the suffix and not to apply a delimiter. Collector will take care of that.

Processing the average value for all numeric elements of the stream:

// Copy Processing the sum of all numeric elements for the stream:
double averagePrice = productList.stream() .collect(Collectors.averagingInt(Product::getPrice));

// Copy The methods averagingXX(), summingXX() and summarizingXX() can work with primitives
// (int, long, double) and with their wrapper classes (Integer, Long, Double).
// One more powerful feature of these methods is providing the mapping.
// As a result, the developer does not need to use an additional map() operation before the collect() method.
int summingPrice = productList.stream() .collect(Collectors.summingInt(Product::getPrice));

// Grouping of stream’s elements according to the specified function:
// Copy In the example above, the stream was reduced to the Map, which groups all products by their price.
Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
                                .collect(Collectors.groupingBy(Product::getPrice));

The Collectors class provides a set of predefined collectors.

To iterate over a collection and apply a function to each element, use the forEach() convenience method. You do not need to create a stream for this purpose.

Gatherers

The Java Stream API was released with Java 8 in March 2014 and has given us a fundamentally new tool for processing data streams.

However, the limited set of intermediate operations – filter, map, flatMap, mapMulti, distinct, sorted, peak, limit, skip, takeWhile, and dropWhile – means that the Stream API cannot express more complex data transformations

A set of default gatherers are provided in the java.util.stream.Gatherers class:

fold

is a stateful many-to-one gatherer which constructs an aggregate incrementally and emits that aggregate when no more input elements exist.

mapConcurrent

is a stateful one-to-one gatherer which invokes a supplied function for each input element concurrently, up to a supplied limit.

scan

is a stateful one-to-one gatherer which applies a supplied function to the current state and the current element to produce the next element, which it passes downstream.

windowFixed

is a stateful many-to-many gatherer which groups input elements into lists of a supplied size, emitting the windows downstream when they are full.

windowSliding

is a stateful many-to-many gatherer which groups input elements into lists of a supplied size. After the first window, each subsequent window is created from a copy of its predecessor by dropping the first element and appending the next element from the input stream.

Gatherers support composition via the andThen(Gatherer) method, which joins two gatherers where the first produces elements that the second can consume.

This enables the creation of sophisticated gatherers by composing simpler ones, just like function composition.

Semantically, source.gather(a).gather(b).gather(c).collect(…​) is equivalent to source.gather(a.andThen(b).andThen(c)).collect(…​)

Parallel Streams

Before Java 8, parallelization was complex. The emergence of the ExecutorService and the ForkJoin simplified a developer’s life a little bit, but it was still worth remembering how to create a specific executor and how to run it. Java 8 introduced a way of accomplishing parallelism in a functional style.

The API allows us to create parallel streams, which perform operations in a parallel mode. When the source of a stream is a Collection or an array, it can be achieved with the help of the parallelStream().

Under the hood, Stream API automatically uses the ForkJoin framework to execute operations in parallel. By default, the common thread pool will be used, and there is no way to assign some custom thread pool to it.

Lessons Learnt

Modern Java applications extensively use streams. Beware modern Java code is quite different from Java code written beginning of this millennium [3, 4].

The source code is almost free of loops and conditional statements. The code is more readable and maintainable.

The new gatherers simplify this functional and declarative programming style. Custom gatherers and collectors are seldom written. Most of the time, the predefined gatherers and collectors are sufficient.

This extension of the Stream API allows experienced Java developers to write custom gatherers and collectors.

Modern IDE like IntelliJ provides reasonable support to debug complex stream expressions. I seldom need to activate the debugger. The declarative style tremendously reduces the risks of errors in the application.

The IDE provides refactoring support to convert loops to streams.

References

[1] D. Farley, Modern Software Engineering. Pearson Education, Limited, 2022 [Online]. Available: https://www.amazon.com/dp/B09GG6XKS4

[2] J. Bloch, Effective Java, Third. Addison-Wesley Professional, 2017 [Online]. Available: https://www.amazon.com/dp/B078H61SCH

[3] N. Ford, Functional Thinking: Paradigm Over Syntax. O’Reilly Media [Online]. Available: https://www.amazon.com/dp/B00LEX6SP8

[4] V. Subramaniam, Functional Programming In Java Harnessing The Power Of Java 8 Lambda Expressions. The Pragmatic Programmers, 2014 [Online]. Available: https://www.amazon.com/dp/B0CJL7VKFL