Stream
Stream是在Java SE 8 API添加的用于增强集合的操作接口,可以让你以一种声明的方式处理集合数据。将要处理的集合看作一种流的创建者,将集合内部的元素转换为流并且在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选,排序,聚合等。元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。Stream的继承关系图如下,且容我慢慢抽丝剥茧细细道来。
过滤,转换,聚合,归约
Stream.of("one", "two", "three", "four")
.filter(e -> e.length() > 3)
.peek(e -> System.out.println("Filtered value: " + e))
.map(String::toUpperCase)
.peek(e -> System.out.println("Mapped value: " + e))
.collect(Collectors.toList());
在没有Stream之前,我们对集合数据的处理到多是外部遍历,然后做数据的聚合用算,排序,merge等等。这属于OO思想,在引入Java SE 8引入FP之后,FP的操作可以提高Java程序员的生产力,,基于类型推断的lambda表达式可以 让程序员写出高效率、干净、简洁的代码。可以避免冗余的代码。根据给定的集合操作通过
stream()
方法创建初始流,配合map()
,flatMap()
,filter()
对集合数据进行过滤,转换。api调用我这里就不多说了。直接从源码入手,看上图最核心的就是类为AbstractPipeline
,ReferencePipeline
和Sink
接口.AbstractPipeline
抽象类是整个Stream中流水线的高度抽象了源头sourceStage
,上游previousStage
,下游nextStage
,定义evaluate
结束方法,而ReferencePipeline
则是抽象了过滤,转换,聚合,归约等功能,每一个功能的添加实际上可以理解为卷心菜,菜心就是源头,每一次加入一个功能就相当于重新长出一片叶子包住了菜心,最后一个功能集成完毕之后整颗卷心菜就长大了。而Sink
接口呢负责把整个流水线串起来,然后在执行聚合,归约时候调AbstractPipeline
抽象类的evaluate
结束方法,根据是否是并行执行,调用不同的结束逻辑,如果不是并行方法则执行terminalOp.evaluateSequential
否则就执行terminalOp.evaluateParallel
,非并行执行模式下则是执行的是AbstractPipeline
抽象类的wrapAndCopyInto
方法去调用copyInto
,调用前会先执行一下wrapSink
,用于剥开这个我们在流水线上产生的卷心菜。从下游向上游去遍历AbstractPipeline
,然后包装到Sink,然后在copyInto
方法内部迭代执行对应的方法。最后完成调用,
并行执行实际上是构建一个ForkJoinTask
并执行invoke
去提交到ForkJoinPool
线程池。
BaseStream
流的基本接口,该接口制定流可以支持无序,顺序,并行的,Stream实现了BaseStream接口。
- Iterator
iterator(); 外部迭代器
- Spliterator
spliterator(); 用于创建一个内部迭代器
- isParallel
用于判断该stream是否是并行的
- S sequential();
标识该stream创建是顺序执行的
- S parallel();
标识该stream创建是并行的,需要使用
ForkJoinPool
- S unordered();
标识该stream创建是无序的
- S onClose(Runnable closeHandler);
当stream关闭的时候执行一个方法回调去关闭流。
PipelineHelper
该抽象类主要定义了操作管道的核心方法,并且能收集到流管道内的所有信息。如通过
TerminalOp#evaluateParallel
用于执行并行流操作,通过TerminalOp#evaluateSequential
执行顺序流的操作。
- abstract StreamShape getSourceShape();
用于定义该流的中元素的原型,返回一个枚举值,用于切片操作
limit
或者skip
枚举值取值范围 {REFERENCE:引用类型元素,INT_VALUE:int类型元素,LONG_VALUE:long类型元素,DOUBLE_VALUE:double类型元素}
-
abstract int getStreamAndOpFlags();
用于获取流的中元素的原型和所有操作的组合,
Stream
中所有的定义流类型和操作的指令都包含在`StreamOpFlag``枚举类中。先看下补码 掩码的运算位掩码的常用CRUD操作 a&~b: 清除标志位b; a|b : 添加标志位b; a&b : 取出标志位b; a^b : 取出a与b的不同部分; 下面是对应流的标志位对应的表。 /* * Characteristics belong to certain types, see the Type enum. Bit masks for * the types are constructed as per the following table: * * DISTINCT SORTED ORDERED SIZED SHORT_CIRCUIT * SPLITERATOR 01 01 01 01 00 * STREAM 01 01 01 01 00 * OP 11 11 11 10 01 * TERMINAL_OP 00 00 10 00 01 * UPSTREAM_TERMINAL_OP 00 00 10 00 00 * * 01 = set/inject SET_BITS=0b01设置指令 * 10 = clear CLEAR_BITS=0b10清除指令 * 11 = preserve PRESERVE_BITS=0b11保存指令 */ 构造函数 private StreamOpFlag(int position, MaskBuilder maskBuilder) { this.maskTable = maskBuilder.build(); // Two bits per flag position *= 2; this.bitPosition = position; this.set = SET_BITS << position; this.clear = CLEAR_BITS << position; this.preserve = PRESERVE_BITS << position; }
-
枚举值分析
- StreamOpFlag.DISTINCT
DISTINCT(0,set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP))
output:StreamOpFlag.DISTINCT:
StreamOpFlag(maskTable=,
bitPosition=0, set=1, clear=2, preserve=3)【0x00000001】->[Spliterator.DISTINCT]> ok,我们知道了StreamOpFlag.DISTINCT的[设置]偏移位是1,16进制表示:0x00000001。当getStreamAndOpFlags返回的包含`IS_DISTINCT`也就是0x00000001就表示对于流中遇到的X,Y元素{@code!x.equals(y)}。对应的是包含`Spliterator.DISTINCT`,标识该stream已经是distinct的了。 - StreamOpFlag.SIZED > ``` SIZED(3,set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP)) output:StreamOpFlag.SIZED: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=2, TERMINAL_OP=0, UPSTREAM_TERMINAL_OP=0}, bitPosition=6, set=64, clear=128, preserve=192)【0x00000040】->[Spliterator.SIZED]
表示遍历或拆分前从
estimateSize()
返回的值的特征值表示一个有限大小,在没有修改源结构的情况下,该值表示完整遍历流中元素数量的精确值,如果stream没有SIZED|SUBSIZED属性,则可以将estimateSize返回为Long.MAX_VALUE.这说明这个stream的estimateSize计算很复杂或本身就是一个infinite的steam流。这样设置后,性能上会差一些,但是,不会对sorted方法产生影响。如果要对流进行并行操作,实现自定义的Spliterator
时,则需要重写trySplit()
方法和long estimateSize()
方法。- StreamOpFlag.SORTED
SORTED(1,set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP))
output:StreamOpFlag.SORTED: StreamOpFlag(maskTable=, bitPosition=2, set=4, clear=8, preserve=12) 【0x00000004】->[Spliterator.SORTED]>表示流里顺序遵循定义的排序顺序。如果包含该属性,方法`getComparator()`返回关联的比较器,或者返回null,如果设置了该属性并且,方法`getComparator()`返回null,这表明改流已经排好序了,如果方法`getComparator()`返回不为null,那么在`fromCharacteristics`方法处,该SORTED属性会被取消掉。如果流里面的所有元素都是实现了Comparable,那排序顺序就是按它们的自然顺序,在`sorted(x->{...})`方法执行可以传一个lambda进去。如果有值传输进去,那么都回按照该lambda对该流进行排序。 - StreamOpFlag.ORDERED > ``` ORDERED(2, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP).clear(Type.TERMINAL_OP).clear(Type.UPSTREAM_TERMINAL_OP)) output:StreamOpFlag.ORDERED: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=3, TERMINAL_OP=2, UPSTREAM_TERMINAL_OP=2}, bitPosition=4, set=16, clear=32, preserve=48)【0x00000010】->[Spliterator.ORDERED]
表示该流中的元素已经定义了顺序。包含了ORDERED属性,是拆分器保证
trySplit
拆分元素的强制前置条件,tryAdvance
方法也会按定义了的顺序逐个元素进行拆分,forEachRemaining
方法也按定义了的顺序执行内部迭代操作。- StreamOpFlag.SHORT_CIRCUIT
SHORT_CIRCUIT(12,set(Type.OP).set(Type.TERMINAL_OP)) output:StreamOpFlag.SHORT_CIRCUIT: StreamOpFlag(maskTable={SPLITERATOR=0, STREAM=0, OP=1, TERMINAL_OP=1, UPSTREAM_TERMINAL_OP=0}, bitPosition=24, set=16777216, clear=33554432, preserve=50331648)【0x01000000】->[表示操作可能使流短路]
表示操作可能使流短路
-
abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
将此时间的管道内的元素应用到提供的
Spliterator
,并将结果发送到提供的接收器sink里
- abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
用于输出返回值的大小。
- abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
用于将从
Spliterator
获得的元素推入提供的接收器中Sink
。如果已知流管道中有短路阶段(包含StreamOpflag#SHORT_CURRENT),则在每个元素之后执行一下Sink#cancellationRequested()
,如果返回请求true,则执行终止。这个方法被实现之后需要遵守Sink的协议即:Sink#begin->Sink#accept->Sink->end
- abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
用于将从
Spliterator
获得的元素推入提供的接收器中Sink
。在每个元素之后执行一下Sink#cancellationRequested()
,如果返回请求true,则执行终止。这个方法被实现之后需要遵守Sink的协议即:Sink#begin->Sink#accept->Sink->end
- abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
该方法主要用于包装sink,从下游向上游去遍历
AbstractPipeline
,然后包装到一个Sink内,用于然后在copyInto
方法内部迭代执行对应的方法。
- abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,IntFunction<P_OUT[]> generator);
用于构造一个节点Builder,转换为数组去处理数组类型和PipelineHelper定义的输出类型一样。
- abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<P_OUT[]> generator);
该方法将源拆分器应用到管道内的所有元素。针对数组处理。如果管道没有中间(
filter,map
)操作,并且源由一个节点支持(源头),则该节点将被返回(内部遍历然后返回)。这减少了由有状态操作和返回数组的终端操作组成的管道的复制.例如:stream.sorted().toArray();该方法对应到AbstractPipeline
内部,代码如下:
@Override
@SuppressWarnings("unchecked")
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<E_OUT[]> generator) {
if (isParallel()) {
// @@@ Optimize if op of this pipeline stage is a stateful op
return evaluateToNode(this, spliterator, flatten, generator);
}
else {
Node.Builder<E_OUT> nb = makeNodeBuilder(
exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
}
}
AbstractPipeline
“管道”类的抽象基类,是流接口及其原始专门化的核心实现。用来表示流管道的初始部分,封装流源和零个或多个中间操作。对于顺序流和没有状态中间操作的并行流、并行流,管道中数据的处理是在一次“阻塞”所有操作的过程中完成的也就是最后才去处理。对于具有状态操作的并行流,执行被分成多个段,其中每个状态操作标记一个段的结束,每个段被单独评估,结果被用作下一个段的输入。上述所有情况,都是达到终端操作才开始处理源数据。
- AbstractPipeline(Supplier extends Spliterator>> source,int sourceFlags, boolean parallel)
创建源Source stage 第一个参数指定一个Supplier接口(工厂模式,只能生成Spliterator<?>的对象,根据传入的lambda实现,
<? extends Spliterator<?
泛型的PECS原则了解一下。)
- AbstractPipeline(Spliterator<?> source,int sourceFlags, boolean parallel)
创建源Source stage 第一个参数制定这个拆分器,和上面的构造方式一样,直接分析一下这个方法:
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
创建Stream 源阶段的时候
previousStage
为null
,this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
用于设置当前阶段的标识位。this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
添加源阶段的对流的操作标识,这个combinedFlags
是流在整个管道内部所有操作的合集,在最后的规约操作的时候去解析出来。
- AbstractPipeline(AbstractPipeline, E_IN, ?> previousStage, int opFlags)
根据上游创建下游
Pipeline
。
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
this.sourceStage = previousStage.sourceStage;
,用于上游和下游关联,this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
将上游的操作标识位添加到本阶段的操作标识位中。depth
记录整个管道的中间操作数。
- final
R evaluate(TerminalOp<E_OUT, R> terminalOp)
进行终端汇聚计算。执行最终的计算,得到结果,根据是否是并行执行,调用不同的结束逻辑,如果不是并行方法则执行
terminalOp.evaluateSequential
否则就执行terminalOp.evaluateParallel
。
- final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator)
处理流转换数组。
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (isParallel() && previousStage != null && opIsStateful()) {
depth = 0;
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
return evaluate(sourceSpliterator(0), true, generator);
}
}
转换数组的时候,如果是并行流并且不是源阶段,而且调用过
sorted
||limit
||skip
||distinct
这些有状态的操作之后,这里是个模版方法调用。实际上是通过调用DistinctOps
||SortedOps
||SliceOps
这些实现的opEvaluateParallel
方法,提交到ForkJoin线程池来转换数组。串行执行的时候直接执行evaluate(sourceSpliterator(0), true, generator);
- evaluate(sourceSpliterator(0), true, generator);
具体的执行方法,用于吧管道内部的输出结果放到Node中。
@Override
@SuppressWarnings("unchecked")
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<E_OUT[]> generator) {
if (isParallel()) {
// @@@ Optimize if op of this pipeline stage is a stateful op
return evaluateToNode(this, spliterator, flatten, generator);
}
else {
Node.Builder<E_OUT> nb = makeNodeBuilder(
exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
}
}
@Override
final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator,
boolean flattenTree,
IntFunction<P_OUT[]> generator) {
return Nodes.collect(helper, spliterator, flattenTree, generator);
}
// Nodes.collect方法
public static <P_IN, P_OUT> Node<P_OUT> collect(PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator,
boolean flattenTree,
IntFunction<P_OUT[]> generator) {
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size >= 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
if (size >= MAX_ARRAY_SIZE)
throw new IllegalArgumentException(BAD_SIZE);
P_OUT[] array = generator.apply((int) size);
new SizedCollectorTask.OfRef<>(spliterator, helper, array).invoke();
return node(array);
} else {
Node<P_OUT> node = new CollectorTask.OfRef<>(helper, generator, spliterator).invoke();
return flattenTree ? flatten(node, generator) : node;
}
}
如果是源是并行流的情况,以
ReferencePipeline
引用管道来看主要执行的是return Nodes.collect(helper, spliterator, flattenTree, generator);
,该collect方法内部根据切割器有无Spliterator.SUBSIZED
确定了生成的Node的长度,主要工作是创建一个Task提交到线程池。然后调用invoke拿到结果。示例代码Arrays.asList("2","22","222").parallelStream().skip(2).toArray();
整个流程如下:
串行执行示例代码Arrays.asList("2","22","222").stream().skip(2).toArray();
整个流程如下:
- final Spliterator<E_OUT> sourceStageSpliterator()
获取Stream源头设置的拆分器,如果设置有则返回并且把源拆分器置空,如果有Supplier则调用get方法返回拆分器并且把源拆分器置空。
- public final S sequential()
设置为串行流 ,设置源的paraller属性为false。终态方法不允许重写
- public final S sequential()
设置为并行流 ,设置源的paraller属性为true。终态方法不允许重写
- public void close()
关闭管道的方法,在关闭的时候会把管道使用标志设置为false,拆分器设置为null,如果源的回调关闭Job存在不为null时则invoker这个回调Job。
- public S onClose(Runnable closeHandler)
用于注册关闭的回调job,在调用close的时候用于去执行这个回调job。
- public Spliterator<E_OUT> spliterator()
和
sourceStageSpliterator
方法一样的功能,只不过不是终态方法,可以重写用于自定义的拓展。
- public final boolean isParallel()
用于盘带你当前管道是否是并行流。
- final int getStreamFlags()
获取流的标志和Stream的包含的所有操作。
- private Spliterator<?> sourceSpliterator(int terminalFlags) {
获取源拆分器,和
sourceStageSpliterator
方法一样的功能,针对是并行流时候,并且是创建Stream阶段的话有中间状态,会组合流标志和操作构建拆分器。如果传入的操作码不等于0,那么则添加到拆分器的操作码中。
- final StreamShape getSourceShape()
输出Stream源的类型。(引用 OR int OR Double OR Long)
- final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator)
获取期望的size,如果拆分器如果有SIZE标志,调用拆分器的getExactSizeIfKnown方法,否则返回-1。
- final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator)
封装整个管道的阶段,包装在Sink中。把每一个阶段串联起来。包装在Sink内部的
downstream
.
wrapAndCopyInto代码执行流程如下:
内部迭代
Stream
基于访问者模式对集合数据进行内部迭代,实际上使用的就是Spliterator
,实现内部迭代基于Spliterator
接口的forEachRemaining
默认方法,针对数组类型的集合主要是调用的Spliterators
类重写的这个方法。细心的同学可能已经心中有疑问了,在外部迭代的时候我们知道回去校验是否遍历的同时对集合数据进行修改了,回去校验ConcurrentModificationException
,这里却并没有校验。很简单因为内部迭代直接把集合转为数组进行遍历也就是源头的数据,没办法修改所以是不可变的数据就不去校验ConcurrentModificationException
异常了。
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}
}
在这里的代码我们可以看出来,就是对每一个源头的集合元素执行
action. accept(a[i])
方法,我们在上面的copyInto
方法中得知了forEachRemaining
执行的就是我们整个流水线上的按照添加功能顺序的每一个lambda表达式。应为当执行action. accept(a[i])
方法时候调用执行了下游的方法downstream.accept(mapper.apply(u));
这就相当于源头第一个元素先执行的第一个任务的结果是第二个人物的入参,以此类推。