news 2026/3/16 14:04:12

kotlin协程-热数据通道Channel

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
kotlin协程-热数据通道Channel

种一颗树的最好时机是十年前,其次是现在。
学习也一样。
跟着霍老师的《深入理解 Kotlin 携程》学习一下协程。

直奔主题,认识 Channel

Channel 实际上就是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信

suspendfunmain(){valchannel=Channel<Int>()valproducer=GlobalScope.launch{vari=0while(true){delay(1000)channel.send(i++)}}valconsumer=GlobalScope.launch{while(true){valelement=channel.receive()println(element)}}producer.join()consumer.join()}

上述的代码中构造了两个协程 producer 和 consumer,我么呢没有为它们明确制定调度器,所以他们的调度器都是默认的。其中 producer 中每隔 1 秒向 Channel 发送一个整数,而 consumer 中一致在读取 channel 来获取数据并打印,显然发送端比接收端更慢,在没有可以读取的值时,receive 是挂起的,直到有新元素到达,这么看来 receive 一定是一个挂起函数,那么 send 呢?

Channel 的容量

我们查看 send 方法的声明,发现它也是挂起函数。那么发送端为什么要挂起?上面也提到,Channel 实际上就是一个队列,队列中一定存在缓冲区,一旦这个缓冲区满了,一直没有人调用 receive 并取走元素,send 就要挂起,等待接收者取走元素后再写入 Channel。

publicfun<E>Channel(capacity:Int=RENDEZVOUS):Channel<E>=when(capacity){RENDEZVOUS->RendezvousChannel()UNLIMITED->LinkedListChannel()CONFLATED->ConflatedChannel()BUFFERED->ArrayChannel(CHANNEL_DEFAULT_CAPACITY)else->ArrayChannel(capacity)}

我们构造 Cahnnel 的时候调用了一个名为 Channle 的函数,但它不是 Channel 的构造函数。在 Kotlin 中,经常定义一个顶级函数来伪装成同名类型的构造器,这本质上是工厂函数。这里有一个 Int 类型的 capacity 参数,默认值为 RENDEZVOUS。
这时候如果不调用 receive,send 就会一直挂起等待。

UNLIMITED比较好理解,没有限制,来者不拒。
CONFLATED这个名字可能有迷惑性,字面意思是合并,但实际上这个函数的效果是只保留最后一个元素,也就是说缓冲区只有一个元素大小,每次有新元素到来,都会覆盖掉旧元素。
BUFFERED效果类似于 ArrayBlockingQueue,接收一个值作为缓冲区容量大小。

迭代 Channel

我们在发送和读取的时候写了一个while(true)的死循环,因为需要不断地进行读写操作。这里我们可以直接获取一个 Channel 的 Iterator

valconsumer=GlobalScope.launch{valiter=channel.iterator()while(iter.hasNext()){valelement=iter.next()println(element)delay(1000)}}

其中 iter.hasNext()是挂起函数,在判断是否有下一个元素的时候就需要去 Channel 中读取元素了。当然也可以for ... in ..:

valconsumer=GlobalScope.launch{for(elementinchannel){println(element)delay(1000)}}

produce 和 actor

来看两个便捷的构造生产者和消费这的 api,我们可以通过produce来启动移动生产者协程,并返回一个ReceiveChannel,其他协程就可以通过这个 channel 来获取数据了。
同样的,我们可以通过actor来启动消费者协程,并返回一个SendChannel,其他协程就可以通过这个 channel 来发送数据了。

valreceiveChannel:ReceiveChannel<Int>=GlobalScope.produce{repeat(100){delay(100)send(it)}}valsendChannel:SendChannel<Int>=GlobalScope.actor{while(true){valelement=receive()println(element)}}

Channel 的关闭

以上面的produce方法为例,我们可以看到最终返回的是一个ProducerCoroutine,它的定义如下:

privateclassProducerCoroutine<E>(parentContext:CoroutineContext,channel:Channel<E>):ChannelCoroutine<E>(parentContext,channel,true,active=true),ProducerScope<E>{overridevalisActive:Booleanget()=super.isActiveoverridefunonCompleted(value:Unit){_channel.close()}overridefunonCancelled(cause:Throwable,handled:Boolean){valprocessed=_channel.close(cause)if(!processed&&!handled)handleCoroutineException(context,cause)}}

我们发现在它的完成取消方法中都会调用_channel.close的方法。也正是这样,Channel 才被称为热数据流。这里有一点需要注意:对千一个Channe如果我们调用了它的close方法,它会立即停止接收新元素,也就是说这时候它的isClosedForSend会立即返回true,而由Channel缓冲区的存在, 这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后isClosedForReceive才会返回true.

BroadcastChannel

在实际环境中,经常会出现一个发送对应多个接收的情况。这里我们就需要 BroadcastChannel 了。

valbroadcastChannel=BroadcastChannel<Int>(Channel.BUFFERED)valproducer=GlobalScope.launch{List(3){delay(1000)broadcastChannel.send(it)}}List(3){index->GlobalScope.launch{valreceiveChannel=broadcastChannel.openSubscription()for(iinreceiveChannel){println("[#$index] received$i")}}}.joinAll()

这里有个细节需要注意一下,如果把发送端的dealy(100)去掉,可能会出现部分元素收不到或者完全收不到的情况,这是因为BroadcastChannel在发送的时候没有订阅者,这条消息就被丢弃了。
我们也可以通过普通的 Channel 进行转换:

valchannel=Channel<Int>()channel.broadcast(3)

这里需要注意一下,BroadcastChannel被标记为过时了,可以使用SharedFlowStateFlow代替。channel.broadcast()方法也被标记为过时,也是使用SharedFlow来代替


以上

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/15 10:33:45

AI魔法剪辑:用Python代码自动生成震撼电影预告片的秘密武器

目录 引言&#xff1a;当代码遇见创意&#xff0c;剪辑革命悄然来临 一、智能剪辑的技术基石&#xff1a;为什么选择Python&#xff1f; 1.1 Python在多媒体处理中的独特优势 1.2 核心技术栈深度解析 二、智能剪辑的核心魔法&#xff1a;算法原理大揭秘 2.1 场景检测的智能…

作者头像 李华
网站建设 2026/3/15 4:06:30

大模型辅助的细粒度知识图谱构建用于机器人故障诊断

文章摘要随着工业机器人在制造业的快速部署,先进维护技术需求日益凸显。本研究提出一种基于大型语言模型(LLM)辅助的数据增强方法,解决维护文本中嵌套实体识别难题和工业数据标注稀缺问题,构建更细粒度的故障诊断知识图谱,在小样本场景下平均F1值提升达8.25%。阅读原文或https:…

作者头像 李华
网站建设 2026/3/14 13:16:48

强烈安利!继续教育必用TOP8 AI论文网站测评

强烈安利&#xff01;继续教育必用TOP8 AI论文网站测评 2025年继续教育AI论文工具测评&#xff1a;精准匹配学习与研究需求 在继续教育的背景下&#xff0c;越来越多的学习者需要撰写高质量的论文以提升学术能力或满足课程要求。然而&#xff0c;面对繁重的写作任务和复杂的格…

作者头像 李华
网站建设 2026/3/15 13:59:42

电影《匿杀》票房破亿 黄晓明以复杂人物切入犯罪悬疑叙事

2025年12月31日&#xff0c;由柯汶利执导的犯罪悬疑大片《匿杀》在全国院线正式上映&#xff0c;影片上映第二日票房就突破亿元大关&#xff0c;在跨年档多部影片同台竞争的市场环境下&#xff0c;《匿杀》取得这一成绩&#xff0c;显示出影片在悬疑犯罪类型中的市场吸引力&…

作者头像 李华
网站建设 2026/3/15 13:58:26

AI应用架构师实战:基于Kubeflow的企业AI工具链搭建

好的,作为一名资深软件工程师和技术博主,我很乐意为你撰写一篇关于“AI应用架构师实战:基于Kubeflow的企业AI工具链搭建”的技术博客文章。 我将采用“问题解决型文章”的结构,因为这个主题非常适合一步步引导读者完成一个复杂的系统搭建过程。 标题:AI应用架构师实战:基…

作者头像 李华
网站建设 2026/3/15 13:28:10

CSS 预处理器:Sass的基本用法、核心特性 - 详解

一句话总结&#xff1a; Sass CSS 变量 函数 逻辑 模块化&#xff0c;是现代前端开发不可或缺的样式编程工具。 https://sass-lang.com.cn/guide/ 一、什么是 Sass&#xff1f; Sass 是一种 CSS 预处理器&#xff08;CSS Preprocessor&#xff09;&#xff0c;它扩展了…

作者头像 李华