一、概念
协程必须运行在一个线程上,所以要指定调度器。是一个抽象类,Dispatcher是一个标准库中帮我们封装了切换线程的帮助类,可以调度协程在哪类线程上执行。创建协程时,上下文如果没有指定也没有继承到调度器,则会添加一个默认调度器(调度器通过 ContinuationInterceptor 延续体拦截器实现的)。
通过Dispatchers调度,而不是Thread因为不是单纯指定线程。
二、模式
- 由于子协程会继承父协程的上下文,在父协程上指定调度器模式后子协程默认使用这个模式。
- IO 和 DEFAULT 模式共享同一线程池,重用线程起到优化(DEFAULT 切换到 IO 大概率停留在同一线程上),两者对线程数量限制是独立的不会让对方饥饿。最大限度一起使用的话,默认同时活跃的线程数为 64+CPU数。
Dispatcher.Main | 运行于主线程,在 Android 中就是 UI 线程,用来处理一些 UI 交互的轻量级任务。 | 调用 suspend 函数 调用 UI 函数 更新 LiveData |
Dispatcher.Main.immediate | 协程的调度是有成本的,当我们已经处在主线程时,开启一个调度到主线程的子协程,会经历挂起等待恢复,这是不必要的开销,甚至队列很长会导致数据延迟显示(例如 ViewModelScope 就处在Android默认的主线程中,因此上下文中的调度器使用了这个),此时指定为 immediate 就只会在需要的时候调度,否则直接执行。 | 函数被withContext包装在Dispatcher.Main上运行时使用。 |
Dispatcher.IO | 运行于线程池,专为IO阻塞型任务进行了优化。最大线程数为64个,只要没超过且没有空闲线程就一直可开辟新线程执行新任务。 |
数据库 文件读写 网络处理 |
Dispatcher.Default | 运行于线程池,专为CPU密集型计算任务进行了优化。最大线程数为CPU核心个数(但不少于2个),若全在忙碌时新任务无法得到执行。 |
数组排序 Json解析 处理差异判断 计算Bitmap |
Dispatcher.Unconfined | 不改变线程,在启动它的线程执行,在恢复它的线程执行。调度成本最低性能最好,但有处在主线程上调用了阻塞操作的风险。 | 当不需要关心协程在哪个线程上被挂起时使用。 |
三、限制线程数 limitedParallelism()
1.6版本引入。
- 对于Default模式:当有一个开销很大的任务,可能会导致其它使用相同调度器的协程抢不到线程执行权,这个时候就可以用来限制该协程的线程使用数量。
- 对于IO模式:当有一个开销很大的任务,可能会导致阻塞太多线程让其它任务暂停等待,突破默认64个线程的限制加速执行(不显著)。
- 传参将线程限制为1,解决多线程并发修改数据的同步问题。但如果阻塞了它,其它操作都要等待。
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher |
suspend fun main(): Unit = coroutineScope {
//使用默认IO模式
launch {
printTime(Dispatchers.IO) //打印:Dispatchers.IO 花费了: 2038/
}
//使用limitedParallelism增加线程
launch {
val dispatcher = Dispatchers.IO.limitedParallelism(100)
printTime(dispatcher) //打印:LimitedDispatcher@1cc12797 花费了: 1037
}
}
suspend fun printTime(dispatcher: CoroutineDispatcher) {
val time = measureTimeMillis {
coroutineScope {
repeat(100) {
launch(dispatcher) {
Thread.sleep(1000)
}
}
}
}
println("$dispatcher 花费了: $time")
}
四、多线程并发问题
创建10个协程,每个协程执行 i++ 1000次,预期结果 i=10000。
4.1 避免使用共享变量
fun main() = runBlocking {
val deferreds = mutableListOf<Deferred<Int>>()
repeat(10) {
val deferred = async(Dispatchers.Default) {
var i = 0
repeat(1000) { i++ }
return@async i
}
deferreds.add(deferred)
}
var result = 0
deferreds.forEach {
result += it.await()
}
println("i = $result")
}
打印:i = 10000,耗时:77
4.2 使用Java方法(不推荐)
可以使用 Synchronized、Lock、Atomic。由于是线程模型下的阻塞方式,不支持调用挂起函数,会影响协程挂起特性。
4.2.1 使用同步锁
fun main() = runBlocking {
val start = System.currentTimeMillis()
var i = 0
val jobs = mutableListOf<Job>()
@Synchronized
fun add() { i++ }
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) { add() }
}
jobs.add(job)
}
jobs.joinAll()
println("i = $i,耗时:${System.currentTimeMillis() - start}")
}
//打印:i = 10000,耗时:71
4.2.2 使用同步代码块
fun main() = runBlocking {
val start = System.currentTimeMillis()
val lock = Any()
var i = 0
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) {
synchronized(lock) { i++ }
}
}
jobs.add(job)
}
jobs.joinAll()
println("i = $i,耗时:${System.currentTimeMillis() - start}")
}
//打印:i = 10000,耗时:73
4.2.3 使用可重入锁 ReenTrantLock
fun main() = runBlocking {
val start = System.currentTimeMillis()
val lock = ReentrantLock()
var i = 0
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) {
lock.lock()
i++
lock.unlock()
}
}
jobs.add(job)
}
jobs.joinAll()
println("i = $i,耗时:${System.currentTimeMillis() - start}")
}
//打印:i = 10000,耗时:83
4.2.4 使用 AtomicInteger 保证原子性
fun main() = runBlocking {
val start = System.currentTimeMillis()
var i = AtomicInteger(0)
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) { i.incrementAndGet() }
}
jobs.add(job)
}
jobs.joinAll()
println("i = $i,耗时:${System.currentTimeMillis() - start}")
}
//打印:i = 10000,耗时:89
4.3 使用单线程(不推荐)
在没有 limitedParallelism() 的 1.6 版本以前就是这样做的,该方式的问题是容易忘记使用 close() 关闭,以及可能会抵消的使用该线程池(将未使用的线程保持活跃状态却不与其它服务共享这些线程)。
fun main() = runBlocking {
val start = System.currentTimeMillis()
val mySingleDispatcher = Executors.newSingleThreadExecutor {
Thread(it, "我的线程").apply { isDaemon = true }
}.asCoroutineDispatcher()
var i = 0
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(mySingleDispatcher) {
repeat(1000) { i++ }
}
jobs.add(job)
}
jobs.joinAll()
println("i = $i,耗时:${System.currentTimeMillis() - start}")
}
//打印:i = 10000,耗时:64
4.4 使用 Mutex
Java方式不支持调用挂起函数,同步锁是阻塞式的会影响协程特性,为此 Kotlin 提供了非阻塞式锁Mutex。使用 mutex.lock() 和 mutex.unlock() 包裹需要同步的计算逻辑就可以实现多线程同步了,但由于包裹内容可能出现的异常使得 unlock() 无法被执行,写在 finally{} 中会很繁琐,因此提供了扩展函数 mutex.withLock{ },本质就是在 finally{ } 中调用了 unlock()。
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T { lock(owner) try { return action() } finally { // 注意,这里并没有 catch 代码块,所以不会捕获异常 unlock(owner) } } |
fun main() = runBlocking {
val start = System.currentTimeMillis()
var i = 0
val mutex = Mutex()
//使用方式一
mutex.lock()
// try {
// repeat(10000) { i++ }
// } catch (e: Exception) {
// e.printStackTrace()
// } finally {
// mutex.unlock()
// }
//使用方式二
mutex.withLock {
try {
repeat(10000) { i++ }
} catch (e: Exception) {
e.printStackTrace()
}
}
println("i = $i,耗时:${System.currentTimeMillis() - start}")
}
//方式一打印:i = 10000,耗时:17
//方式二打印:i = 10000,耗时:17
4.5 使用 Actor
Actor是一个并发同步模型,本质是基于Channel管道消息实现的。文章来源:https://uudwc.com/A/9dBPj
sealed class Msg {
object AddMsg : Msg()
class ResultMsg(val result: CompletableDeferred<Int>) : Msg()
}
@OptIn(ObsoleteCoroutinesApi::class)
fun main() = runBlocking {
val start = System.currentTimeMillis()
val actor = actor<Msg> {
var i = 0
for (msg in channel) {
when (msg) {
is Msg.AddMsg -> i++
is Msg.ResultMsg -> msg.result.complete(i)
}
}
}
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch {
repeat(1000) {
actor.send(Msg.AddMsg)
}
}
jobs.add(job)
}
jobs.joinAll()
val deferred = CompletableDeferred<Int>()
actor.send(Msg.ResultMsg(deferred))
val result = deferred.await()
actor.close()
println("i = $result,耗时:${System.currentTimeMillis() - start}")
}
//打印:i = 10000,耗时:167
4.6 使用Semaphore
Semaphore是协程中的信号量 ,指定通行的数量为1就可以保证并发的数量为1。文章来源地址https://uudwc.com/A/9dBPj