Kotlin中的序列(Sequences)

除集合之外,kotlin的标准库中还提供了另一种容器——序列(Sequence<T>)。序列同样提供了Iterable
同的函数,但实现的是另一种方法进行多步骤的集合处理。如果你熟悉Java,其实就是Java8版本中Stream的翻版。

Sequences和Iterable的区别

当Iterable的处理包含了多个步骤时,它们会优先执行每个处理步骤完成并返回其结果——中间集合,后续
的操作在此集合的基础上执行。而序列的多步骤处理会延迟执行,仅当请求整个处理链的结果时才会进行
实际的计算。而且操作顺序也不同:Sequence对每个元素逐个执行所有步骤,反过来,Iterable 完成整个
集合的每个步骤,然后进行下一步。

一个具体的例子来解释SequencesIterable的区别是什么:先对filter操作中,取出长度大于字符,再
map转化成字符的长度,最后用take取出前两位。

1.集合Iterable的方式

1
2
3
4
5
6
7
8
val numbers = listOf("five", "four", "three", "two", "one")
numbers.filter {
println("filter:$it")
it.length > 3
}.map {
println("map->$it");
it.length
}.take(2)

输出如下:
image.png
从输出日志可以看出,先把filter的操作执行完,再执行map,最后取出集合中前两位。每个步骤中间都会生成一个集合,后续的操作都依据此集合。

2.Sequences的多步骤处理:

1
2
3
4
5
6
7
8
val numbers = listOf("five","four", "three", "two", "one")
val numberSequences = numbers.asSequence().filter {
println("filter:$it")
it.length > 3
}.map {
println("map->$it");
it.length
}.take(2)

如果只是这么一段代码,因为Sequences延迟执行得特新,不会有任何输出。因为对Sequences执行
map、filer、take等操作时返回得结果还个Sequences。需要调用Sequencesiterator()方法才会执行。
我这里在最后面加上如下代码:

1
println(numberSequences.toList())

输出如下:

从日志明显能看出Sequences在多步骤处理时是对每个元素逐个操作。

序列操作 :基本原理是惰性求值,也就是说在进行中间操作的时候,是不会产生中间数据结果的,只有等到进行末端操作的时候才会进行求值。也就是上述例子中的每个数据元素都是先执行 filter 操作,接着马上执行 map 操作,最后执行take操作。然后下一个元素也是先执行 filter 操作,接着马上执行 map 操作,再take操作。然而普通集合是所有元素都完执行 filter 后的数据存起来,然后从存储数据集中又所有的元素执行 map 操作存起来的原理。
集合普通操作 :针对每一次操作都会产生新的中间结果,也就是上述例子中的 filter 操作完后会把原始数据集循环遍历一次得到最新的数据集存放在新的集合中,然后进行 map 操作,遍历上一次 filter 新集合中数据元素,最后得到最新的数据集又存在一个新的集合中。

Sequences 原理源码完全解析

1
2
3
4
5
6
7
8
9
fun main(args: Array<String>) {
val numbers = listOf("five", "four", "three", "two", "one")
val numberSequences = numbers.asSequence().filter {
it.length > 3
}.map {
println("map->$it");
it.length
}.take(2)
}

通过decompile上述例子的源码会发现,使用序列不管进行多少中间操作都是共享同一个迭代器中的数据,想知道共享同一个迭代器的原理吗?那看内部源码是如何实现的吧。

使用Sequences反编译源码

1
2
3
4
5
6
7
8
9
10
11
public static final void main(@NotNull String[] args) {
Intrinsics.checkNotNullParameter(args, "args");
List numbers = CollectionsKt.listOf(new String[]{"five", "four", "three", "two", "one"});
Sequence var2 = SequencesKt.take(SequencesKt.map(SequencesKt.filter(CollectionsKt.asSequence((Iterable)numbers), (Function1)null.INSTANCE), (Function1)null.INSTANCE), 2);
boolean var3 = false;
boolean var4 = false;
int var6 = false;
List var7 = SequencesKt.toList(var2);
boolean var8 = false;
System.out.println(var7);
}

抽取里面关键的部分:

1
Sequence var2 = SequencesKt.take(SequencesKt.map(SequencesKt.filter(CollectionsKt.asSequence((Iterable)numbers), (Function1)null.INSTANCE), (Function1)null.INSTANCE), 2);

这段代码很长,给它分以下段。

val asSequence=CollectionsKt.asSequence((Iterable)numbers)//part 1
val filterSequence=SequencesKt.filter(asSequence, (Function1)null.INSTANCE)//part 2
val mapSequence=SequencesKt.map(filterSequence, (Function1)null.INSTANCE)//part 3
val takeSequence=SequencesKt.take(mapSequence,2)//part 4

第一部分:
调用Iterable的中的asSequence扩展方法将集合转换成Sequence对象:

1
2
3
4
5
6
7
8
public fun <T> Iterable<T>.asSequence(): Sequence<T> {
return Sequence { this.iterator() }//外部集合的Iterator迭代器对象
}

@kotlin.internal.InlineOnly
public inline fun <T> Sequence(crossinline iterator: () -> Iterator<T>): Sequence<T> = object : Sequence<T> {
override fun iterator(): Iterator<T> = iterator()
}

通过外部传入集合的Iterator迭代器对象,再实例化一个SequenceSequence是一个接口,内部有个 iterator () 抽象函数返回一个迭代器对象,然后把传入迭代器对象作为Sequence内部的迭代器,也就是相当于给迭代器加了Sequence`序列的外壳,核心迭代器还是由外部传入的迭代器对象,有点偷梁换柱的概念。

第2部分:
在第一部分中把集合转换成Sequence,接下来就是filter操作,也就是调用Sequencefilter扩展方法:

1
2
3
public fun <T> Sequence<T>.filter(predicate: (T) -> Boolean): Sequence<T> {
return FilteringSequence(this, true, predicate)
}

该方法会返回FilteringSequence对象,该对象的构造器有三个参数,其中第一个接收的是个Sequence,第三个参数是个predicatelambda表达式。FilteringSequence本身继承至Sequence,至于其它细节先不急,待会再看。

第3部分:
调用Sequencemap扩展方法:

1
2
3
public fun <T, R> Sequence<T>.map(transform: (T) -> R): Sequence<R> {
return TransformingSequence(this, transform)
}

和第二部分很像,只不过该方法会返回TransformingSequence对象,它同样也继承至Sequence

第4部分:
调用Sequencetake扩展方法:

1
2
3
4
5
6
7
8
public fun <T> Sequence<T>.take(n: Int): Sequence<T> {
require(n >= 0) { "Requested element count $n is less than zero." }
return when {
n == 0 -> emptySequence()//1
this is DropTakeSequence -> this.take(n)//2
else -> TakeSequence(this, n)//3 我们此时知道传入Sequence是TransformingSequence,所有走着分支
}
}

因为第三部分返回的是TransformingSequence,所有走分支3并返回TakeSequence对象,它同样也继承至Sequence

这里同时出现了FilteringSequenceTransformingSequenceTakeSequence三个序列对象,它们的构
造器都会接收一个Sequence类型的对象,并且都继承至Sequence。那们先看一下Sequence这个接口:

1
2
3
4
5
6
7
8
public interface Sequence<out T> {
/**
* Returns an [Iterator] that returns the values from the sequence.
*
* Throws an exception if the sequence is constrained to be iterated once and `iterator` is invoked the second time.
*/
public operator fun iterator(): Iterator<T>
}

回想一下序列的执行流程,先是通过调用Iterable<T>asSequence()扩展方法,传入一个Iterable<T>
象,并通过该对象实例的iterator()拿到迭代器对象,并把该迭代器对象传入Sequence中,然后调用不同
的扩展操作符反复实现了Sequence接口的不同的子类对象,而这些子对象会实现不同的iterator()函数,
返回一个新的迭代器对象,但是迭代器的数据则来源原始的迭代器。

接下来我们就以TakeSequence为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
internal class TakeSequence<T>(
private val sequence: Sequence<T>,
private val count: Int
) : Sequence<T>, DropTakeSequence<T> {

init {
require(count >= 0) { "count must be non-negative, but was $count." }
}

override fun drop(n: Int): Sequence<T> = if (n >= count) emptySequence() else SubSequence(sequence, n, count)
override fun take(n: Int): Sequence<T> = if (n >= count) this else TakeSequence(sequence, n)

/**
* 重写iterator方法,返回一个新的迭代器
*/
override fun iterator(): Iterator<T> = object : Iterator<T> {
var left = count
val iterator = sequence.iterator()//取出构造器传入的Sequence中的迭代器

override fun next(): T {
if (left == 0)
throw NoSuchElementException()
left--
return iterator.next()//共享迭代器中的数据
}

override fun hasNext(): Boolean {
return left > 0 && iterator.hasNext()//判断是否还有数据
}
}
}

源码总结

序列内部的实现原理是采用状态设计模式,根据不同的操作符的扩展函数,实例化对应的 Sequence 子类
对象,每个子类对象重写了 Sequence 接口中的 iterator () 抽象方法,内部实现根据传入的迭代器对象中的
数据元素,加以变换、过滤、合并等操作,返回一个新的迭代器对象。这就能解释为什么序列中工作原理
是逐个元素执行不同的操作,而不是像普通集合所有元素先执行 A 操作,再所有元素执行 B 操作。这是因
为序列内部始终维护着一个迭代器,当一个元素被迭代的时候,就需要依次执行 A,B,C 各个操作后,如果
此时没有执行iterator()操作,那么值将会存储在 C 的迭代器中,依次执行,等待原始集合中共享的数据被
迭代完毕,或者不满足某些条件终止迭代,最后取出 C 迭代器中的数据即可。

参考资料: