news 2026/2/1 2:34:13

学习笔记:Kotlin Flow 操作符

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
学习笔记:Kotlin Flow 操作符

Kotlin Flow 操作符

以下文章来源于RockByte ,作者RockByte.

十分钟速览 Kotlin Flow 操作符

一对一转换

map

Flow中发出的每个元素转换为新元素,实现一对一映射。

funmain()=runBlocking{flowOf("Kotlin","Flow").map{"Length of '$it' is${it.length}"}.collect{println(it)}}// output:// Length of 'Kotlin' is 6// Length of 'Flow' is 4

适用场景:需要将每个项转换为不同类型或格式,例如在ViewModel中将Date对象格式化为可显示的String


filter

仅允许满足给定条件的元素通过。

funmain()=runBlocking{(1..5).asFlow().filter{it%2==0}.collect{println(it)}}// output:// 2// 4

适用场景:根据条件丢弃不需要的值,例如过滤空搜索词或无效数据。


take

限制性操作符,仅发出Flow的前n个元素,随后取消Flow执行。

funmain()=runBlocking{(1..10).asFlow().take(3).collect{println(it)}}// output:// 1// 2// 3

适用场景:只需要有限数量的项,例如分页加载,或从一系列操作中取第一个有效结果。


累积值

reduce

终端操作符,从Flow的第一个元素开始累积值,并对当前累加器与每个元素执行操作。若Flow为空则抛出异常。

funmain()=runBlocking{valsum=(1..3).asFlow().reduce{accumulator,value->accumulator+value}println(sum)}// output:// 6

fold

类似于reduce,但fold需要一个初始值,是更安全的选择。

即使Flow为空也会返回初始值。

funmain()=runBlocking{valsum=(1..3).asFlow().fold(100){accumulator,value->accumulator+value}println(sum)}// output:// 106

runningReduce / scan

中间操作符,在每个元素处理时都发出当前累积值。

中间操作符不会触发Flow的收集,所以需要collect触发Flow的收集,而reduce这种终端操作符会触发Flow的收集。

scan是更通用的版本,支持指定初始种子值。

funmain()=runBlocking{println("runningReduce:")(1..3).asFlow().runningReduce{accumulator,value->accumulator+value}.collect{println(it)}println("scan:")(1..3).asFlow().scan(0){accumulator,value->accumulator+value}.collect{println(it)}}// output:// runningReduce:// 1// 3// 6// scan:// 0// 1// 3// 6

注意:他们会返回每一步的结果,而reduce/fold只返回最终结果。

适用场景:计算累计总和、跟踪进度,或在状态机中展示状态历史。


发出多个值

transform

高度灵活的操作符,可为每个输入元素发出零个、一个或多个值,对输出流有更强控制力。

funmain()=runBlocking{(1..2).asFlow().transform{emit("Item:$it")if(it%2!=0){emit("...is an odd number")}emit("Square:${it*it}")}.collect{println(it)}}// output:// Item: 1// ...is an odd number// Square: 1// Item: 2// Square: 4

适用场景:执行复杂转换、引入副作用(如日志记录),或根据单个输入有条件地发出多个值。


扁平化嵌套

flatMapConcat

将每个元素转换为一个Flow,然后依次连接这些Flow,只有当前一个Flow完成后,下一个才开始。

fungetNumbersFlow(id:Int):Flow<String>=flow{delay(100)emit("First-$id")delay(100)emit("Second-$id")}funmain()=runBlocking{(1..2).asFlow().flatMapConcat{id->getNumbersFlow(id)}.collect{println(it)}}// output:// First-1// Second-1// First-2// Second-2

仔细看,第一个流的每个数字都会参与到第二个流中。

适用场景:顺序敏感的操作,例如依次上传多个文件,或执行依赖型网络请求。


flatMapMerge

并发合并由转换函数生成的多个Flow,可通过concurrency参数控制并发数量。

fungetNumbersFlow(id:Int):Flow<String>=flow{delay(100)emit("First-$id")delay(100)emit("Second-$id")}suspendfunmain(){(1..2).asFlow().flatMapMerge{id->getNumbersFlow(id)}.collect{println(it)}}// output:// First-1// First-2// Second-2// Second-1

从结果上注意区分flatMapConcatflatMapMerge不会保证合并的顺序。

适用场景:顺序无关的并行操作,例如同时从多个数据源获取数据。


flatMapLatest

当新元素发出时,立即取消上一个元素对应的Flow

valsearchQuery=flowOf("search","search with new term").onEach{delay(200)}funsearchApi(query:String):Flow<String>=flow{emit("Searching for '$query'...")delay(500)// 模拟网络延迟emit("Results for '$query'")}suspendfunmain(){searchQuery.flatMapLatest{query->searchApi(query)}.collect{println(it)}}// output:// Searching for 'search'...// Searching for 'search with new term'...// Results for 'search with new term'

适用场景:实时搜索功能,或任何只需关注最新事件结果的场景。


上下文与缓冲

flowOn

更改用于执行上游FlowCoroutineContext,是Flow中切换调度器的正确方式。

funheavyWork():Flow<Int>=flow{println("Starting heavy work on${Thread.currentThread().name}")for(i in1..3){// Simulate CPU-intensive workThread.sleep(100)emit(i)}}funmain()=runBlocking{heavyWork().flowOn(Dispatchers.IO)// Upstream runs on IO dispatcher.collect{println("Collected$iton${Thread.currentThread().name}")}// Downstream runs on the collector's context (e.g., Main)}// output:// Starting heavy work on DefaultDispatcher-worker-1// Collected 1 on main// Collected 2 on main// Collected 3 on main

buffer

通过解耦生产者与消费者实现并发执行:生产者将项放入缓冲区,消费者从中取出。

suspendfunmain(){valtime=measureTimeMillis{flow{for(i in1..3){delay(200)// Simulate slow emissionemit(i)}}.buffer()// With buffer, the total time is closer to the slow collector's time.collect{delay(300)// Simulate slow collectionprintln(it)}}println("Collected in$timems")}// output:// 1// 2// 3// Collected in 1172 ms

得益于buffer,最后整个数据的收集时间要小于(200 + 300) * 3

使用场景:当生产者与消费者处理速度不一致时,提升性能。


conflate

一种缓冲形式,当收集器处理太慢时会丢弃中间值,确保始终获取最新值。

suspendfunmain(){flow{for(i in1..5){delay(100)emit(i)}}.conflate().collect{value->println("Started processing$value")delay(300)println("Finished processing$value")}}// output:// Started processing 1// Finished processing 1// Started processing 3// Finished processing 3// Started processing 5// Finished processing 5

适用场景:UI 更新中无需显示中间状态,如股票行情或 GPS 位置更新。


collectLatest

终端操作符,当新值发出时,取消对前一个值的收集逻辑。

suspendfunmain(){(1..3).asFlow().onEach{delay(100)}.collectLatest{value->println("Collecting$value")delay(300)println("Finished collecting$value")}}// output:// Collecting 1// Collecting 2// Collecting 3// Finished collecting 3

注意看这里的结果,Finished collecting 只收集了最后一次的值,一定要注意这个特性。

适用场景:某项操作耗时较长,且应在新项到达时被取消,例如将用户输入保存到数据库。


合并

zip

等待两个Flow各自发出一项后进行组合。任一源Flow结束,结果Flow即结束。

suspendfunmain(){valflowA=(1..3).asFlow()valflowB=flowOf("A","B","C","D")flowA.zip(flowB){number,letter->"$number$letter"}.collect{println(it)}}// output:// 1A// 2B// 3C

combine

组合两个Flow的最新值。只要任一源Flow发出新值(且双方至少各发出过一次),就会触发一次发射。

suspendfunmain(){valflowA=(1..3).asFlow().onEach{delay(100)}valflowB=flowOf("A","B").onEach{delay(150)}flowA.combine(flowB){number,letter->"$number$letter"}.collect{println(it)}}// output:// 1A// 2A// 3A// 3B

适用场景:响应多个数据源的变化。


merge

将多个Flow合并为一个,按发出顺序交错输出所有值。

suspendfunmain(){valflowA=flowOf("A1","A2").onEach{delay(100)}valflowB=flowOf("B1","B2").onEach{delay(50)}merge(flowA,flowB).collect{println(it)}}// output:// B1// A1// B2// A2

适用场景:将来自不同 UI 组件的多个事件流合并为单一处理流。


错误与完成处理

catch

捕获上游Flow(即catch之前的操作符)中发生的异常,但不捕获下游收集器中的异常。

suspendfunmain(){flow{emit(1)throwRuntimeException("Error!")}.catch{e->println("Caught:${e.message}")emit(-1)// Emit a fallback value}.collect{println(it)}// Emits 1, then -1}// output:// 1// Caught: Error!// -1

适用场景:优雅地处理错误、提供默认值或记录失败信息。


onCompletion

Flow完成时(无论成功或异常)执行指定操作。成功时causenull

suspendfunmain(){(1..3).asFlow().onCompletion{cause->if(cause!=null)println("Flow completed with error")elseprintln("Flow completed successfully")}.collect{println(it)}}// output:// 1// 2// 3// Flow completed successfully

retryWhen

在发生异常时根据谓词(包含异常原因和重试次数)决定是否重试。

suspendfunmain(){varattemptCount=0flow{emit(1)if(attemptCount<2){attemptCount++throwRuntimeException("Transient error")}emit(2)}.retryWhen{cause,attempt->println("Attempt$attempt: Retrying due to${cause.message}")delay(100)// Add a delay before retryingattempt<2// Retry up to 2 times}.catch{println("Caught final error:${it.message}")}.collect{println(it)}}// output:// 1// Attempt 0: Retrying due to Transient error// 1// Attempt 1: Retrying due to Transient error// 1// 2

retryWhen的回调有两个参数:

  • cause:导致Flow失败的异常(Throwable类型)。

  • attempt:当前是第几次重试,从0开始计数。


工具与副作用

onEach

Flow中每个元素执行指定操作,但不修改元素本身。

suspendfunmain(){(1..3).asFlow().onEach{println("About to process$it")}.map{it*it}.collect{println("Processed value:$it")}}// output:// About to process 1// Processed value: 1// About to process 2// Processed value: 4// About to process 3// Processed value: 9

适用场景:用于日志、调试或埋点等副作用场景,在不改变数据的前提下观察Flow


debounce

过滤在指定超时内被新值取代的值,仅发出“突发”中的最后一个值。

suspendfunmain(){flow{emit(1)delay(90)emit(2)delay(90)emit(3)delay(500)emit(4)delay(90)emit(5)}.debounce(100).collect{println(it)}}// output:// 3// 5

适用场景:处理快速用户输入(如搜索框),避免每次按键都触发 API 请求。


distinctUntilChanged

抑制与前一个值相同的重复发射。

suspendfunmain(){flowOf(1,1,2,2,1,3).distinctUntilChanged().collect{println(it)}}// output:// 1// 2// 1// 3

适用场景:防止 UI 因状态未变而进行不必要的重组或更新。

核心决策指南

  • 数据转换。用于修改流中的值:mapfilterdistinctUntilChanged
  • 累积值。用于跟踪状态或计算持续结果:scanrunningReducefoldreduce
  • 从单一输入发出多个值。用于自定义发射逻辑:transform
  • 处理嵌套或动态Flow。根据内部Flow行为选择:flatMapConcat(顺序执行)、flatMapMerge(并发执行)、flatMapLatest(取消旧任务,保留最新)。
  • 性能与背压控制。用于优化收集效率与响应性:bufferconflatecollectLatestflowOn
  • 合并多个流。用于组合多个数据源:zip(按顺序配对)、combine(组合最新值)、merge(交错合并)。
  • 错误处理与完成逻辑。用于应对异常和生命周期事件:catchonCompletionretryWhen
  • 调试与副作用。用于插入日志或副作用操作:onEach
  • 处理高频发射。用于抑制过快的发射频率:debounce
  • 触发Flow执行。终端操作符:collectcollectLatestfirstsingletoListreducefold
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/29 15:23:57

concurrentHashMap原理

concurrentHashMap的是为了解决HashMap在并发环境中出现的线程安全问题&#xff0c;同时也优化了HashTable在高并发中存在的性能问题&#xff0c;让其性能更接近于HashMap。高并发问题HashMap1.数据丢失问题2.JDK1.7采用头插法&#xff0c;会导致链表成环&#xff0c;抛出Concu…

作者头像 李华
网站建设 2026/1/30 0:04:14

FPC电路板先贴补强还是先SMT?正确顺序你选对了吗?

明明设计没问题&#xff0c;但SMT贴片后板子却弯曲起翘、元件浮起、甚至板子报废&#xff1f;这&#xff01;可能是补强贴合顺序埋下的雷&#xff01;FPC设计中&#xff0c;补强贴合顺序是最容易被忽视却又至关重要的环节。搞错顺序&#xff0c;轻则导致板子无法做SMT&#xff…

作者头像 李华
网站建设 2026/1/30 4:25:39

模仿文风能力,早已迭代升级

在内容创作场景中&#xff0c;“以稿写稿”的文风模仿写作&#xff0c;已经成为职场高效撰稿的写稿方法。凭借之前过稿的文章内容。无论是优秀范文&#xff0c;还是自己此前成功通过审核的稿件&#xff0c;都能作为与现有写作任务高度契合的参考范本。在类似的业务场景下&#…

作者头像 李华
网站建设 2026/1/31 3:02:04

中央空调改时间控制启停:西门子1200PLC与TP900触摸屏模拟仿真程序博途V16

中央空调改时间控制启停西门子1200PLC和TP900触摸屏模拟仿真程序博途V16【手把手玩转中央空调时间控制】最近在车间搞了个挺有意思的改造——用西门子1200PLC配TP900触摸屏实现中央空调的定时启停。今天就把实战过程扒给大家看看&#xff0c;连仿真都给你跑通了&#xff0c;记得…

作者头像 李华
网站建设 2026/1/30 13:31:05

AI Agent开发的10个致命错误,99%开发者都踩过

随着AI技术日新月异的进步&#xff0c;越来越多的企业和开发者开始着手开发自己的AI Agent&#xff08;智能代理&#xff09;。这些AI Agent可以在各种领域提供支持&#xff0c;从自动化办公到客户服务、从数据分析到智能推荐&#xff0c;几乎无所不包。然而&#xff0c;开发AI…

作者头像 李华
网站建设 2026/1/30 2:12:27

【供应链Agent需求预测终极指南】:揭秘AI驱动下精准预测的5大核心算法

第一章&#xff1a;供应链Agent需求预测的演进与挑战 随着人工智能与大数据技术的深度融合&#xff0c;供应链中的需求预测已从传统的统计模型逐步演进为基于智能Agent的动态预测系统。这类系统能够自主感知市场变化、学习历史模式并协同上下游节点做出实时响应&#xff0c;极大…

作者头像 李华