news 2026/6/9 16:45:21

kotlin协程-冷数据流Flow

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
kotlin协程-冷数据流Flow

种一颗树的最好时机是十年前,其次是现在。
学习也一样。
跟着霍老师的《深入理解 Kotlin 携程》学习一下协程。

一点前言

随着RxJava的流行,响应式编程模型逐步深入人心。Flow就是kotlin协程与响应式编程模型结合的产物。

认识Flow

我们从序列生成器开始

valints=sequence{(10..30).forEach{yield(it)}}

这里如果希望在元素之间加个延时怎么办?因为受restrictsSuspension注解的约束,delay函数不能再SequenceScope的扩展成员中被调用

假设序列生成器不受这个限制,调用delay函数会导致后续的执行流程的线程发生变化,外部的调用者发现在访问ints的下一个元素的时候居然还会有切换线程的副作用。不仅如此,通过制定调度器来限定序列创建所在的线程同样是不可以的,我们甚至没有办法为它设置协程上下文。
那么我们来看一下Flow。

valintFlow=flow{(1..3).forEach{emit(it)delay(1000)}}

Flow也可以设定它运行时所使用的调度器:

intFlow.flowOn(Dispatchers.IO).collect{println(it)}

最终消费intFlow需要调用collect函数。

冷数据流

在Flow创建出来之后,不消费则不生产,多次消费则多次生产,生产和消费总是相对应的。

suspendfunmain(){valintFlow=flow{(1..3).forEach{emit(it)delay(1000)}}intFlow.flowOn(Dispatchers.IO).collect{println(it)}intFlow.flowOn(Dispatchers.IO).collect{println(it)}}

这里会输出两次“123”

异常处理

Flow的异常处理也比较直接,直接调用catch函数即可。需要注意的是,catch函数只能捕获它上游的异常,并且,当我们没有调用catch函数时,未捕获的异常会在消费时抛出。当然了,我们可以使用onCompletion来进行FLow完成时的逻辑

suspendfunmain(){flow{emit(1)throwArithmeticException("div 0")}.catch{t:Throwable->println("caught error :$t")}.onCompletion{t:Throwable?->println("finally.")}.flowOn(Dispatchers.Default).collect{value->println(value)}}

onCompletion类似于try ... catch ... finally中的finally。这套处理机制的设计初衷是确保Flow操作中异常的透明,因此我们不能或者禁止这样写:

flow{try{emit(1)throwArithmeticException("Div 0")}catch(e:ArithmeticException){println("caught error:$e")}finally{println("finally")}}

末端操作符

collect是最基本的末端操作符,还有其他末端操作符,大体分为两类

  • 集合类型转换操作符,包括toList、toSet等
  • 聚合操作符,包括将Flow规约到单值的reduce、fold等操作;还有获得单个元素的操作符,包括single、singleOrNull、first等

由于Flow的消费端一定需要运行在协程中,因此末端操作符都是挂起函数。

分离Flow的消费和触发

我们还可以通过onEach来做到这一点,这样消费的具体操作就不需要与末端操作符放到一起,collect函数可以放到其他任意位置调用

funcreateFlow()=flow<Int>{(1..10).forEach{emit(it)delay(1000)}}.onEach{println(it)}suspendfunmain(){GlobalScope.launch{createFlow().collect()}delay(20*1000)}

需要注意一下,Flow并没有提供取消操作,想要取消Flow,只需要取消它所在的协程即可。

其他Flow的创建方式

当我们使用flow{...}来创建Flow时,无法随意切换调度器,因为emit函数不是线程安全的。想要在生成元素时切换调度器,就必须使用channelFlow函数来创建Flow:

channelFlow{send(1)withContext(Dispatchers.IO){send(2)}}

此外,我们可以通过集合矿建来创建Flow:

suspendfunmain(){listOf(1,2,3,4).asFlow().collect{value->println(value)}setOf(1,2,3,4).asFlow().collect{value->println(value)}flowOf(1,2,3,4).collect{value->println(value)}}

背压

只要是响应式编程,就一定会有背压问题,背压问题在生产者的生产速率高于消费者的处理速率情况下出现。为了保证数据不丢失,我们可以添加一个制定容量的buffer。但这只是治标不治本的方法,随着时间的推移,还是会造成时间上的积压。
出现背压问题的根本原因是生产者和消费者的速速率不匹配,除了直接优化消费者的性能外,我们还可以采取一些取舍的手段。

第一种是conflate,和Channel的Conflate模式一致,新数据覆盖老数据。

suspendfunmain(){flow{List(100){emit(it)}}.conflate().collect{value->println("Collected:$value")delay(100)println("$valuecollected")}}

虽然我们发送了100个元素,但最终只接收到2个,多次运行的结果并不相同。
第二种是collectLasted,只处理最新的数据,区别在于:collectLasted并不会直接用新数据覆盖老数据,而是每一个数据都会处理,只不过如果前一个还没被处理完后一个就来了话,处理前一个数据的逻辑就会被取消。

suspendfunmain(){flow{List(10){emit(it)}}.collectLatest{value->println("Collected:$value")delay(1000)println("$valuecollected")}}

输出

Collected: 0
Collected: 1
Collected: 2
Collected: 3
Collected: 4
Collected: 5
Collected: 6
Collected: 7
Collected: 8
Collected: 9
9 collected

前面的println("Collected: $value")输出了所有结果,后面的println("$value collected")只输出了最后一个结果,因为后面的数据到达时,处理上个数据的操作正好被挂起了。

除此之外,还有mapLatest,flatMapLatest等。

Flow的变换

我们可以使用map来变换Flow的数据

suspendfunmain(){flow{List(5){emit(it)}}.map{it*2}.collect{println(it)}}

输出

0
2
4
6
8

还有按照顺序拼接的flattenConcat,不保证顺序的flattenMerge操作等


以上

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/28 16:40:28

AI魔法剪辑:用Python代码自动生成震撼电影预告片的秘密武器

目录 引言&#xff1a;当代码遇见创意&#xff0c;剪辑革命悄然来临 一、智能剪辑的技术基石&#xff1a;为什么选择Python&#xff1f; 1.1 Python在多媒体处理中的独特优势 1.2 核心技术栈深度解析 二、智能剪辑的核心魔法&#xff1a;算法原理大揭秘 2.1 场景检测的智能…

作者头像 李华
网站建设 2026/5/28 14:31:33

大模型辅助的细粒度知识图谱构建用于机器人故障诊断

文章摘要随着工业机器人在制造业的快速部署,先进维护技术需求日益凸显。本研究提出一种基于大型语言模型(LLM)辅助的数据增强方法,解决维护文本中嵌套实体识别难题和工业数据标注稀缺问题,构建更细粒度的故障诊断知识图谱,在小样本场景下平均F1值提升达8.25%。阅读原文或https:…

作者头像 李华
网站建设 2026/6/7 3:39:23

强烈安利!继续教育必用TOP8 AI论文网站测评

强烈安利&#xff01;继续教育必用TOP8 AI论文网站测评 2025年继续教育AI论文工具测评&#xff1a;精准匹配学习与研究需求 在继续教育的背景下&#xff0c;越来越多的学习者需要撰写高质量的论文以提升学术能力或满足课程要求。然而&#xff0c;面对繁重的写作任务和复杂的格…

作者头像 李华
网站建设 2026/5/28 16:40:15

电影《匿杀》票房破亿 黄晓明以复杂人物切入犯罪悬疑叙事

2025年12月31日&#xff0c;由柯汶利执导的犯罪悬疑大片《匿杀》在全国院线正式上映&#xff0c;影片上映第二日票房就突破亿元大关&#xff0c;在跨年档多部影片同台竞争的市场环境下&#xff0c;《匿杀》取得这一成绩&#xff0c;显示出影片在悬疑犯罪类型中的市场吸引力&…

作者头像 李华
网站建设 2026/6/4 20:57:17

AI应用架构师实战:基于Kubeflow的企业AI工具链搭建

好的,作为一名资深软件工程师和技术博主,我很乐意为你撰写一篇关于“AI应用架构师实战:基于Kubeflow的企业AI工具链搭建”的技术博客文章。 我将采用“问题解决型文章”的结构,因为这个主题非常适合一步步引导读者完成一个复杂的系统搭建过程。 标题:AI应用架构师实战:基…

作者头像 李华
网站建设 2026/5/28 17:24:44

CSS 预处理器:Sass的基本用法、核心特性 - 详解

一句话总结&#xff1a; Sass CSS 变量 函数 逻辑 模块化&#xff0c;是现代前端开发不可或缺的样式编程工具。 https://sass-lang.com.cn/guide/ 一、什么是 Sass&#xff1f; Sass 是一种 CSS 预处理器&#xff08;CSS Preprocessor&#xff09;&#xff0c;它扩展了…

作者头像 李华