使用协程Channel实现事件总线

咱们开发项目的时候,为了方便组件之间的通讯,使代码更加简洁,耦合性更低,须要引入事件总线。事件总线的库咱们一般会选择EventBus或者基于Rxjava的RxBus,如今随着jetpack里LiveData的推出,也出现了基于LiveData实现的事件总线库。 那么,除了这些,还有没有其余的实现事件总线的方法呢?在使用协程的过程当中,发现协程中的Channel用到了生产者消费者模式,那么能够使用Channel实现事件总线吗?接下来试一下。html

Channel

Channel是属于kotlin协程,要使用须要在项目中引入协程库java

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3"
复制代码

Channel相似Java里的BlockingQueue,producer生产事件发送到Channel,consumer从Channel里取出事件进行消费。android

kotlin官网文档提供了Channel的用法git

val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
    for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")

复制代码

建立Channel,使用Channel.send 发送事件 ,使用Channel.receive 接收事件,要注意的是Channel事件的接收和发送都须要在协程里调用。github

使用Channel实现事件总线

  1. 首先建立ChannelBus单例类
class ChannelBus private constructor() {
	
	    private var channel: Channel<Events> = Channel(Channel.BUFFERED)
	    
	    companion object {
	        val instance: ChannelBus by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
	            ChannelBus()
	        }
	    }
	}
复制代码
  1. 而后建立一个数据类,这个数据类是用来包装消费者。其中context表示协程执行时的上下文;event是一个挂起的方法,用来消费事件;jobList用来保存消费事件的job。
data class ChannelConsumer(
	    val context: CoroutineContext,
	    val event: suspend (event: Events) -> Unit,
	    var jobList: MutableList<Job> = mutableListOf()
	)
复制代码
  1. 消费者。咱们建立一个Map用来保存消费者,在这个receive方法中,key表示存储该消费者时的键,onEvent是一个挂起函数,表示当咱们接收到事件时的处理方法,是一个lambda函数,而context为协程上下文,表示这个lambda函数在哪一个线程执行,这地方默认在Dispatchers.Main主线程执行。咱们将传入的参数构形成一个ChannelConsumer对象而后保存在Map中。
private val consumerMap = ConcurrentHashMap<String, ChannelConsumer>()

	fun receive( key: String, context: CoroutineContext = Dispatchers.Main, onEvent: suspend (event: Events) -> Unit
	) {
	    consumerMap[key] = ChannelConsumer(context, onEvent)
	}
复制代码
  1. 发送事件。调用这个方法就是发送事件,在协程里把传入的事件经过Channel发送出去。
fun send(event: Events) {
	     GlobalScope.launch {
	         channel.send(event)
	     }
	}
复制代码
  1. 消费事件。这是个生产者-消费者模式,当 Channel为空时会挂起,当有新事件时,事件会从Channel中取出,咱们在这里进行分发。咱们遍历Map,获得每一个ChannelConsumer,因而就能够处理事件e,这里直接经过launch方法启动协程,协程的上下文 it.value.context就是receive方法传入的contextit.value.event(e)就是receive方法传入的lambda函数,esend方法传入的event,launch方法返回一个job,咱们把这个job添加到ChannelConsumerjobList里。
init {
	    GlobalScope.launch {
	        for (e in channel) {
	            consumerMap.entries.forEach {
                    it.value.jobList.add(launch(it.value.context) {
                        it.value.event(e)
                    })
	            }
	        }
	    }
	}
复制代码
  1. 最后取消订阅时移除消费者。remove方法中,咱们经过传入的key在Map中获得ChannelConsumer,而后循环jobList并取消每个job,避免内存泄漏,最后移除消费者。
fun remove(key: String) {
        consumerMap[key]?.jobList?.forEach {
            it.cancel()
        }
        consumerMap.remove(key)
    }
复制代码
使用方法

因此咱们在项目里能够这么用api

  1. 注册事件消费者。由于默认在主线程,因此能够直接进行UI操做。
override fun onCreate(savedInstanceState: Bundle?) {
        ......
        ChannelBus.instance.receive("key",Dispatchers.Main,{
            activity_main_text.text = it.name
        }) 
		......
    }
复制代码

能够简写成以下方式网络

override fun onCreate(savedInstanceState: Bundle?) {
        ......
        ChannelBus.instance.receive("key") {
            activity_main_text.text = it.name
        }
		......
    }
复制代码

由于传入的是suspend函数,因此若是要进行耗时操做,能够直接执行,只须要把context参数传入IO线程Dispatchers.IO就好了,而后使用withContext函数切回主线程,再进行UI操做。async

override fun onCreate(savedInstanceState: Bundle?) {
       	......
        ChannelBus.instance.receive("key", Dispatchers.IO) {
            val s = httpRequest()	//IO线程,耗时操做
            withContext(Dispatchers.Main) {	//切回UI线程
                activity_sticky_text.text = s	//更改UI
            }

        }
    }

	//网络请求
    private fun httpRequest(): String {
        val url = URL("https://api.github.com/users/LGD2009")
        val urlConnection = url.openConnection() as HttpURLConnection
        urlConnection.let {
            it.connectTimeout = 5000
            it.requestMethod = "GET"
        }
        urlConnection.connect()
        if (urlConnection.responseCode != 200) {
            return "请求url失败"
        } else {
            val inputStream: InputStream = urlConnection.inputStream
            return inputStream.bufferedReader().use { it.readText() }
        }
    }
复制代码
  1. 发送事件
ChannelBus.instance.send(Events.EVENT_1)
复制代码
  1. 最后取消订阅
override fun onDestroy() {
   		......
        ChannelBus.instance.remove("key")
    }
复制代码

自动取消

上面的方法中,注册消费者以后每次都须要手动取消,那么可不能够自动取消呢?这里就须要用到Lifecycle了。 咱们使 ChannelBus 继承 LifecycleObserver,并重载receive方法和remove方法。 重载receive方法,其中key换成LifecycleOwner,而后调用lifecycleOwner.lifecycle.addObserver(this)将当前的ChannelBus做为观察者添加进去。ide

class ChannelBus private constructor() : LifecycleObserver {

    private val lifecycleOwnerMap = ConcurrentHashMap<LifecycleOwner, ChannelConsumer>()
    
	......
	
    fun receive( lifecycleOwner: LifecycleOwner, context: CoroutineContext = Dispatchers.Main, onEvent: suspend (event: Events) -> Unit
    ) {
        lifecycleOwner.lifecycle.addObserver(this)
        lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent)
    }

}
复制代码

而后重载remove方法,key换成了LifecycleOwner,添加注解。由于如今Activity和Fragment都继承了LifecycleOwner,当Activity和Fragment运行destroy销毁时,当前观察者就会观察到并调用这个方法。函数

@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun remove(lifecycleOwner: LifecycleOwner) {
        lifecycleOwnerMap[lifecycleOwner]?.jobList?.forEach {
            it.cancel()
        }
        lifecycleOwnerMap.remove(lifecycleOwner)
    }
复制代码

因此,咱们在Activity或Fragment里注册消费者时只需

override fun onCreate(savedInstanceState: Bundle?) {
        ......
        ChannelBus.instance.receive(this) {
            activity_main_text.text = it.name
        }
		......
    }
复制代码

当Activity或Fragment销毁时会自动取消注册。

粘性事件

有时候咱们可能须要在消费者订阅的时候能收到以前发送的某些事件,这时候就须要用到粘性事件。简单的实现思路是保存事件,在注册消费者时候发送事件。 建立List保存粘性事件,并添加移除粘性事件的方法。

private val stickyEventsList = mutableListOf<Events>()
 
    fun removeStickEvent(event: Events) {
        stickyEventsList.remove(event)
    }
复制代码

改造send方法,增长一个 Boolean 类型的参数,用来指明是不是粘性的,固然,默认值是false。若是是true,则把事件存入List。

fun send(event: Events, isSticky: Boolean = false) {
        GlobalScope.launch {
            if (isSticky) {
                stickyEventsList.add(event)
            }
            channel.send(event)
        }
    }
复制代码

添加接收粘性事件的消费者方法。

fun receiveSticky( lifecycleOwner: LifecycleOwner, context: CoroutineContext = Dispatchers.Main, onEvent: suspend (event: Events) -> Unit
    ) {
        lifecycleOwner.lifecycle.addObserver(this)
        lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent)
        stickyEventsList.forEach { e ->
            lifecycleOwnerMap[lifecycleOwner]?.jobList?.add(GlobalScope.launch(context) {
                onEvent(e)
            })
        }
    }
复制代码

BroadcastChannel

上面的文章中,同一个事件只能取一次,为了发送到多个消费者,因此使用Map保存,而后依次发送。而BroadcastChannel则能够有多个接收端。

因此,若是用BroadcastChannel来实现则更为简单。 建立BroadcastChannel对象

@ExperimentalCoroutinesApi
class BroadcastChannelBus private constructor() : LifecycleObserver {

    private val broadcastChannel: BroadcastChannel<Events> = BroadcastChannel(Channel.BUFFERED)
    private val lifecycleOwnerMap = ConcurrentHashMap<LifecycleOwner, ChannelConsumer>()

    companion object {
        val instance: BroadcastChannelBus by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
            BroadcastChannelBus()
        }
    }
复制代码

数据类jobList改为job,再增长一个receiveChannel

data class ChannelConsumer(
        val context: CoroutineContext,
        val event: suspend (event: Events) -> Unit,
        val job: Job?,
        val receiveChannel: ReceiveChannel<Events>
    )
复制代码

发送方法不须要改变,receive方法须要更改。 经过val receiveChannel = broadcastChannel.openSubscription()订阅,job和receiveChannel保存到数据类。

fun receive( lifecycleOwner: LifecycleOwner, context: CoroutineContext = Dispatchers.Main, onEvent: suspend (event: Events) -> Unit
    ) {
        lifecycleOwner.lifecycle.addObserver(this)
        val receiveChannel = broadcastChannel.openSubscription()
        val job = GlobalScope.launch(context) {
            for (e in receiveChannel) {
                onEvent(e)
            }
        }
        lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent,job,receiveChannel)
    }
复制代码

因此,最后取消订阅时,关闭receiveChannel并取消任务。

@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun remove(lifecycleOwner: LifecycleOwner) {
        lifecycleOwnerMap[lifecycleOwner]?.receiveChannel?.cancel()
        lifecycleOwnerMap[lifecycleOwner]?.job?.cancel()
        lifecycleOwnerMap.remove(lifecycleOwner)
    }
复制代码

不过,须要注意的是这个api如今是实验性功能,在后续版本的更新中可能会改变。

总结

这篇文章主要是抛砖引玉,使用Channel换一种实现事件总线的思路。实现功能只有一个文件,Demo已上传到github,项目名ChannelBus,你们能够在使用时根据具体的需求进行修改。

kotlin Channel

破解 Kotlin 协程(9) - Channel 篇

Implementing an Event Bus With RxJava - RxBus

相关文章
相关标签/搜索