Stream
A java.util.Stream represents a sequence of elements on which one or more operations can be performed.
- Stream operations are either intermediate or terminal. While terminal operations return a result of a certain type, intermediate operations return the stream itself so you can chain multiple method calls in a row.
- Streams are created on a source, e.g. a java.util.Collection like lists or sets (maps are not supported).
- Stream is like the existing “streams” (e.g. InputStream), you can consume streams only once.
- Stream operations can either be executed sequential or parallel.
- Parallel streams use the common ForkJoinPool for threading and it has 2 caveats:
- The size of the pool = logical cores – 1 (good for cpu intensive tasks but not IO bound tasks)
- It is global pool shared by all
Key concepts
- Pipelining: Many stream operations return a stream themselves. This allows operations to be chained to form a larger pipeline. This enables certain optimizations, such as laziness and short-circuiting. For laziness, no work is actually done until terminal operation is invoked.
- Internal iteration: In contrast to collections, which are iterated explicitly (external iteration), stream operations do the iteration behind the scenes for you.
- Focus on computation – Collections are about data and streams are about computations. And stream doesn’t own any data.
Key Stream Operations
- Intermediate operations return stream that allows you to chain up and lazily executed.
- filter – to filter out elements not matching the predicate
- map – to transform to another type but can be the same type
- flatMap – to flatten out list of list to a single list.
- peek
- distinct, sorted, limit – Those are special kind of intermediate operation. It’s a so called stateful operation. For example, to sort a collection of elements, you have to maintain state during ordering.
- Terminal operations return concrete types or produce a side effect and eagerly executed.
- collect – to group elements (groupingBy etc)
- reduce – to cumulate elements
- forEach – to perform a side effect on elements
- findFirst – it will short circuit once an element identified
- anyMatch – boolean
- allMatch – boolean
- noneMatch – boolean
- count, min, max and etc
Source of Stream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
//existing classes that has stream favor Collection.stream, Collection.parallelStream, Arrays.stream String.chars BufferedReader.lines Pattern.splitAsStream Random.ints BitSet.stream //factory methods in Stream Stream.of Stream.generate IntStream.range //infinite stream and uses limit to turn it into fixed stream Stream<Integer> numbers = Stream.iterate(0, n -> n + 10); numbers.limit(5).forEach(System.out::println); // 0, 10, 20, 30, 40 |
Code Examples
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
//get the unique surnames in uppercase of the first 15 book authors that are 50 years old or over library.stream() .map(book -> book.getAuthor()) .filter(author -> author.getAge() >= 50) .distinct() .limit(15) .map(Author::getSurnames) .map(String::toUpperCase) .collect(toList()); //compute the sum of ages of the all female authors younger than 25. library.stream() .map(Book::getAuthor) .filter(a -> a.getGender() == Gender.FEMALE) .map(Author::getAge) .filter(age -> age < 25) .reduce(0, Integer::sum); //more examples of the intermediate operations List<String> stringCollection = new ArrayList<>(); stringCollection.add("ddd2"); stringCollection.add("aaa2"); stringCollection.add("bbb1"); stringCollection.add("aaa1"); stringCollection.add("bbb3"); stringCollection.add("ccc"); stringCollection.add("bbb2"); stringCollection.add("ddd1"); stringCollection.stream(); //perform in single thread stringCollection.parallelStream(); //perform in multiple threads on ForkJoinPool stringCollection .stream() .map(String::toUpperCase) .sorted((a, b) -> b.compareTo(a)) .forEach(System.out::println); // "DDD2", "DDD1", "CCC", "BBB3", "BBB2", "AAA2", "AAA1" //is there any string that start with "a"? Replace anyMatch with allMatch, noneMatch for other checks. boolean anyStartsWithA = stringCollection .stream() .anyMatch((s) -> s.startsWith("a")); //reduce is terminal one. The result is an Optional holding the reduced value. Optional<String> reduced = stringCollection .stream() .sorted() .reduce((s1, s2) -> s1 + "#" + s2); //flatmap: { {'a','b'}, {'c','d'}, {'e','f'} } -> flatMap -> {'a','b','c','d','e','f'} |
Use Stream for in-memory database jobs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
public class PersonDS { public static List<Person> getPeople(){ return Arrays.asList( new Person("Sara", Person.Gender.FEMALE, 20, 100), new Person("Sara", Person.Gender.FEMALE, 22, 200), new Person("Bob", Person.Gender.MALE, 20, 1200), new Person("Paula", Person.Gender.FEMALE, 32, 300), new Person("Paul", Person.Gender.MALE, 32, 400), new Person("Jack", Person.Gender.MALE, 2, 0), new Person("Jack", Person.Gender.MALE, 72, 120), new Person("Jill", Person.Gender.FEMALE, 20, 900) ); } public static List<Person> getPeople(Person.Gender gender){ List<Person> people = getPeople(); return people.stream() .filter(person -> person.getGender() == gender) .collect(Collectors.toList()); } public static List<String> getDistinctFemaleNames(){ List<Person> people = getPeople(); return people.stream() .filter(person -> person.getGender() == Person.Gender.FEMALE) .map(Person::getName) //Stream<Person> => Stream<String> .distinct() .collect(Collectors.toList()); } public static Map<String, Long> countPeopleByName(){ List<Person> people = getPeople(); return people.stream() .collect(Collectors.groupingBy(Person::getName, Collectors.counting())); } public static Map<Integer, Set<String>> listPeopleByAge(){ List<Person> people = getPeople(); return people.stream() .collect(Collectors.groupingBy(Person::getAge, Collectors.mapping(Person::getName, Collectors.toSet()))); } public static Map<String, String> listAgesByName(){ List<Person> people = getPeople(); return people.stream() .collect(Collectors.toMap(p->p.getName(), p->p.getAge()+"", (age1, age2)-> age1 + ";" +age2)); } public static Map<Person.Gender, Double> getSavingByGender(){ List<Person> people = getPeople(); return people.stream() .collect(Collectors.groupingBy(Person::getGender, Collectors.summingDouble(Person::getSaving))); } public static Map<Boolean, List<Person>> getPeopleByGender(){ List<Person> people = getPeople(); return people.stream() .collect(Collectors.partitioningBy(p -> p.getGender() == Person.Gender.FEMALE)); } public static void main(String[] args) { System.out.println(getPeople(Person.Gender.FEMALE)); //[Sara -- FEMALE -- 20 -- 100.00, Sara -- FEMALE -- 22 -- 200.00, Paula -- FEMALE -- 32 -- 300.00, Jill -- FEMALE -- 20 -- 900.00] System.out.println(countPeopleByName()); //{Bob=1, Sara=2, Jill=1, Jack=2, Paula=1, Paul=1} System.out.println(listPeopleByAge()); //{32=[Paula, Paul], 2=[Jack], 20=[Bob, Sara, Jill], 22=[Sara], 72=[Jack]} System.out.println(listAgesByName()); //{Bob=20, Sara=20;22, Jill=20, Jack=2;72, Paula=32, Paul=32} System.out.println(getDistinctFemaleNames()); //[Sara, Paula, Jill] System.out.println(getSavingByGender()); //{MALE=1720.0, FEMALE=1500.0} } } |
Exception Handling
If you don’t want the exceptions to bubble out and interrupt the control flow of the stream, you’ll have to catch them locally.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
List<String> strList = Stream.of("1", "2", "illegal_3", "4", "illegal_5", "6").collect(Collectors.toList()); //return null and filter null intList = strList.stream()// same with "parallelStream()" .map(x -> { try { return Integer.parseInt(x); } catch (NumberFormatException nfe) { System.err.println(nfe.getMessage()); } return null; }) .filter(x -> x!= null) .collect(Collectors.toList()); intList = strList.stream() .flatMap(x -> parseIntStream(x)) .collect(Collectors.toList()); //return empty stream if error static Stream<Integer> parseIntStream(String s) { try { return Stream.of(Integer.parseInt(s)); } catch (NumberFormatException nfe) { System.err.println(nfe.getMessage()); } return Stream.empty(); } |
Parallelism Underneath the Hood
With the Fork/Join framework added in Java SE 7, the JDK has an API for efficiently implementing parallel computations. However, parallel code with Fork/Join looks very different from (and much bigger than) the equivalent serial code, which acts as a barrier to parallelization. By supporting the exact same set of operations on sequential and parallel streams, users can switch between serial and parallel execution without rewriting their code, removing this barrier and making parallelism more accessible and less error-prone. However, all streams use the same ForkJoinPool and so one stream could hold up the resources of others and you need to be careful on that if you try to switch from sequential stream to parallel stream.
Reference
- http://www.drdobbs.com/article/print?articleId=240166818&siteSectionName=jvm
- Java 8 FlatMap Example
- Think Twice Using Java 8
- Extend Java 8 Stream
Connect with us