0%

Stream流编程


外部迭代与内部迭代

1
2
3
4
5
6
7
8
9
10
11
12
13
public class StreamDemo {
public static void main(String[] args) {
int[] nums = {1, 2, 3};
int res1 = 0;
for(int i = 0; i < nums.length; i++){
res1 += nums[i];
}
System.out.println("结果是:" + res1);

int res2 = IntStream.of(nums).sum();
System.out.println("结果是:" + res2);
}
}

中间操作/终止操作和惰性操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class StreamDemo {
public static void main(String[] args) {
int[] nums = {1, 2, 3};
// map就是中间操作(返回stream的操作)
// sum就是终止操作
int res = IntStream.of(nums).map(StreamDemo::doubleNum).sum();
System.out.println("结果是:" + res);
System.out.println("惰性求值就是在终止操作没有执行的情况下,中间操作不会执行");
IntStream.of(nums).map(StreamDemo::doubleNum);
}

public static int doubleNum(int i){
System.out.println("进入了doubleNum静态方法");
return 2 * i;
}
}

然后我们可以看到输出如下:

1
2
3
4
5
进入了doubleNum静态方法
进入了doubleNum静态方法
进入了doubleNum静态方法
结果是:12
惰性求值就是在终止操作没有执行的情况下,中间操作不会执行

因为我们有三个数,所以执行了三次,但是因为第二次的是惰性求值所以没有执行静态方法。


Stream流编程的创建

这里写图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class StreamDemo1 {
public static void main(String[] args) {
List<String> list = new ArrayList<>();

//从集合创建
list.stream();
list.parallelStream();

// 从数组创建
Arrays.stream(new int[]{1,2,3});

// 创建数字流
IntStream.of(1, 2, 3);
IntStream.rangeClosed(1, 10);

// 从random中创建一个无限流
new Random().ints().limit(10);

// 自定义创建
Random random = new Random();
Stream.generate(() -> random.nextInt()).limit(20);
}
}

stream中间操作

这里写图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class StreamDemo2 {
public static void main(String[] args) {
String str = "my name is 007";

// 把每个单词中单词长度大于2的长度调用出来
Stream.of(str.split(" ")).filter(s -> s.length() > 2).map(s -> s.length()).forEach(System.out::println);

// flatMap A->B 属性是一个集合,最终得到所有的A元素中的所有B属性
// intStream/longStream并不是Stream的子类,所以要使用装箱 boxed
Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(
i -> System.out.println((char)i.intValue())
);

System.out.println("-------- start peek --------");
// peek用于debug,是一个中间操作,和forEach是个终止操作
Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println);

System.out.println("-------- start random --------");
// limit的使用,主要用于无限流
new Random().ints().filter(i -> i > 100 && i < 10000).limit(10).forEach(System.out::println);
}
}


stream流终止操作

这里写图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class StreamDemo3 {
public static void main(String[] args) {
String str = "my name is 007";

// 使用并行流
str.chars().parallel().forEach(i -> System.out.println((char)i));

// 使用并行流顺序打印
str.chars().parallel().forEachOrdered(i -> System.out.println((char)i));

// 收集到list
List<String> list = Stream.of(str.split(" ")).collect(Collectors.toList());
System.out.println(list);

// 使用Reduce拼接字符串
Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> s1 + "|" + s2);
System.out.println(letters.orElse(""));

// 使用初始化值的reduce
String reduce = Stream.of(str.split(" ")).reduce("", (s1, s2) -> s1 + "|" + s2);
System.out.println(reduce);

// 使用初始化值的reduce
Integer reduce1 = Stream.of(str.split(" ")).map(String::length).reduce(0, Integer::sum);
System.out.println(reduce1);

Optional<String> max = Stream.of(str.split(" ")).max(Comparator.comparingInt(String::length));
System.out.println(max.get());

OptionalInt first = new Random().ints().findFirst();
System.out.println(first);
}
}


并行流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class StreamDemo4 {
public static void main(String[] args) throws InterruptedException {
// 串行
IntStream.range(1, 100).peek(StreamDemo4::debug1).count();

// 并行,并行流默认使用线程池ForkJoinPool.commonPool-worker,默认大小为当前机器的cpu个数,可以使用
// System.setProperty"java.util.concurrent.ForkJoinPool.common.parallelism "20");
// 来设置默认线程数
IntStream.range(1, 100).parallel().peek(StreamDemo4::debug2).count();

// 多次调用以最后一次为准
IntStream.range(1, 100).parallel().peek(StreamDemo4::debug1).sequential().peek(StreamDemo4::debug2).count();

// 使用自己的线程池,防止都是用默认线程然后导致的线程阻塞
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> IntStream.range(0, 100).parallel().peek(StreamDemo4::debug1).count());
pool.shutdown();
pool.wait();

}

public static void debug1(int i){
System.out.println(Thread.currentThread().getName() + " debug1 " + i);
}

public static void debug2(int i){
System.err.println(Thread.currentThread().getName() + " debug2 " + i);
}
}

收集器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class StreamDemo5 {
public static void main(String[] args) {
List<Student> list = Arrays.asList(
new Student("zhangsan1", 11, Sex.MALE, Gender.ONE),
new Student("zhangsan2", 12, Sex.FEMAlE, Gender.TWO),
new Student("zhangsan3", 13, Sex.MALE, Gender.THREE),
new Student("zhangsan4", 14, Sex.FEMAlE, Gender.ONE),
new Student("zhangsan5", 15, Sex.MALE, Gender.TWO),
new Student("zhangsan6", 16, Sex.FEMAlE, Gender.THREE),
new Student("zhangsan7", 17, Sex.MALE, Gender.ONE)
);

// 可以使用多种方式创建多种的集合
List<Integer> ages = list.stream().map(Student::getAge).collect(Collectors.toList());
Set<Integer> agesSet1 = list.stream().map(Student::getAge).collect(Collectors.toSet());
Set<Integer> agesSet2 = list.stream().map(Student::getAge).collect(Collectors.toCollection(TreeSet::new));
System.out.println("所有学生的年龄是:" + ages);

// 获取学生的年龄汇总信息
IntSummaryStatistics studentList = list.stream().collect(Collectors.summarizingInt(Student::getAge));
System.out.println("学生的汇总信息是:" + studentList);

// 分块
Map<Boolean, List<Student>> sex = list.stream().collect(Collectors.partitioningBy(s -> s.getSex() == Sex.MALE));
System.out.println("按照性别分块的结果是:" + sex);

// 分组
Map<Gender, List<Student>> groups = list.stream().collect(Collectors.groupingBy(Student::getGender));
System.out.println("按照班级分组的结果是:" + groups);

// 获取每个班级的人数
Map<Gender, Long> nums = list.stream().collect(Collectors.groupingBy(Student::getGender, Collectors.counting()));
System.out.println("每个班级的人数结果是:" + nums);

}
}

class Student{
private String name;
private Integer age;
private Sex sex;
private Gender gender;

public Student(String name, Integer age, Sex sex, Gender gender){
this.name = name;
this.age = age;
this.sex = sex;
this.gender = gender;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getAge() {
return age;
}

public void setAge(Integer age) {
this.age = age;
}

public Sex getSex() {
return sex;
}

public void setSex(Sex sex) {
this.sex = sex;
}

public Gender getGender() {
return gender;
}

public void setGender(Gender gender) {
this.gender = gender;
}

@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", age=" + age +
", sex=" + sex +
", gender=" + gender +
'}';
}
}

enum Sex{
MALE, FEMAlE
}

enum Gender{
ONE, TWO, THREE, FOUR, FIVE
}

Stream运行机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 验证Stream运行机制
*
* 1.所有操作都是链式调用,一个元素之迭代一次
* 2.每一个中间操作返回一个新的流,流里面有一个属性sourceStage执行同一个地方,就是Head
* 3.Head -> nextStage -> nextStage -> ... -> null
* 4.如果多个无状态操作与有状态操作交错存在,,则每个有状态操作会把之前的无状态操作段单独处理
* 5.并行状态下,有状态的中间操作不一定能并行操作
* 6.parallel()操作和sequential()都是中间操作(也是但会stream)但是他们不创建流,他们只修改我们的Head的并行标志
*/
public class StreamDemo6 {
public static void main(String[] args) {
Random random = new Random();
Stream.generate(() -> random.nextInt())
//产生五百个数据
.limit(500)
// 第一个无状态操作
.peek(s -> print("peek:" + s))
// 第二个无状态操作
.filter(s -> {
print("filter:" + s);
return s > 10000;
})
// 终止操作
.count();
}

private static void print(String s) {
System.out.println(s);
}
}