本文译自「Kotlin Mutex: Thread-Safe Concurrency for Coroutines」,原文链接carrion.dev/en/posts/ko…,由Ignacio Carrión发布于2025年10月3日。
使用 Kotlin 协程构建并发应用程序时,保护共享的可变状态至关重要。虽然传统的 Java 同步工具(例如 synchronized 块和 ReentrantLock)可以正常工作,但它们会阻塞线程,并且与协程的挂起模型不兼容。因此,引入 Mutex——一个协程友好的同步原语,它提供互斥而不阻塞线程。
本指南探讨了何时使用 Mutex、最佳实践以及它与其他并发控制机制的比较。
TL;DR:省流版本的建议
- 当需要保护多个协程访问的共享可变状态时,请使用
Mutex。 - 在协程代码中,优先使用
Mutex而不是synchronized,以避免阻塞线程。 - 使用
mutex.withLock { }自动获取和释放锁。 - 对于更复杂的状态管理场景,请考虑使用
Actor或StateFlow。 - 对于简单的计数器,请改用
AtomicInteger或AtomicReference。 - 如果需要将并发访问限制为多个许可,请使用
Semaphore。 - 如果不使用
withLock,请始终在 finally 块中释放锁。
什么是互斥锁?
Mutex(互斥)是 kotlinx.coroutines 中的同步原语,用于确保同一时间只有一个协程可以执行临界区。与阻塞线程的传统锁不同,Mutex 会暂停协程,从而使线程可以自由地执行其他工作。
基本结构:
1import kotlinx.coroutines.sync.Mutex 2import kotlinx.coroutines.sync.withLock 3 4val mutex = Mutex() 5 6suspend fun protectedOperation() { 7 mutex.withLock { 8 // Critical section - only one coroutine at a time 9 // Modify shared state safely here 10 } 11} 12
关键特性:
- 非阻塞:暂停协程而不是阻塞线程
- 公平:默认按先进先出顺序授予访问权限
- 不重入安全:持有锁的协程无法再次获取锁(防止死锁)
- 轻量级:比线程阻塞锁更高效
互斥锁的核心用例
最常见的用例——确保对共享变量的安全访问:
1class CounterService { 2 private var counter = 0 3 private val mutex = Mutex() 4 5 suspend fun increment() { 6 mutex.withLock { 7 counter++ 8 } 9 } 10 11 suspend fun getCount(): Int { 12 return mutex.withLock { 13 counter 14 } 15 } 16} 17
2. 协调资源访问
当多个协程需要对某个资源进行独占访问时:
1class FileWriter(private val file: File) { 2 private val mutex = Mutex() 3 4 suspend fun appendLine(line: String) { 5 mutex.withLock { 6 file.appendText("$line\n") 7 } 8 } 9} 10
3.确保顺序执行
即使操作是并发触发的,也必须按顺序执行:
1class OrderProcessor { 2 private val mutex = Mutex() 3 private val orders = mutableListOf<Order>() 4 5 suspend fun processOrder(order: Order) { 6 mutex.withLock { 7 // Ensure orders are processed sequentially 8 orders.add(order) 9 validateOrder(order) 10 persistOrder(order) 11 } 12 } 13} 14
4. 线程安全的延迟初始化
在挂起上下文中实现线程安全的延迟初始化:
1class DatabaseConnection { 2 private var connection: Connection? = null 3 private val mutex = Mutex() 4 5 suspend fun getConnection(): Connection { 6 if (connection != null) return connection!! 7 8 return mutex.withLock { 9 // Double-check inside lock 10 connection ?: createConnection().also { connection = it } 11 } 12 } 13 14 private suspend fun createConnection(): Connection { 15 delay(1000) // Simulate connection setup 16 return Connection() 17 } 18} 19
最佳实践
1. 始终使用 withLock
即使发生异常,withLock 也会自动处理锁的获取和释放:
1// ✅ Good: Automatic cleanup 2mutex.withLock { 3 dangerousOperation() 4} 5 6// ❌ Bad: Manual management, error-prone 7mutex.lock() 8try { 9 dangerousOperation() 10} finally { 11 mutex.unlock() 12} 13
2. 保持临界区较小
尽量减少锁的持有时间以减少争用:
1// ✅ Good: Lock only for critical section 2suspend fun updateUser(userId: String, name: String) { 3 val validated = validateName(name) // Outside lock 4 5 mutex.withLock { 6 userCache[userId] = validated // Only this needs protection 7 } 8 9 notifyObservers(userId) // Outside lock 10} 11 12// ❌ Bad: Holding lock during slow operations 13suspend fun updateUserSlow(userId: String, name: String) { 14 mutex.withLock { 15 val validated = validateName(name) // Slow operation inside lock 16 userCache[userId] = validated 17 notifyObservers(userId) // I/O inside lock 18 } 19} 20
3. 避免嵌套锁
互斥锁不可重入。避免两次获取同一个锁:
1// ❌ Bad: Deadlock! 2suspend fun problematic() { 3 mutex.withLock { 4 helperFunction() // Tries to acquire mutex again 5 } 6} 7 8suspend fun helperFunction() { 9 mutex.withLock { 10 // Will suspend forever 11 } 12} 13 14// ✅ Good: Restructure to avoid nesting 15suspend fun better() { 16 mutex.withLock { 17 helperFunctionUnsafe() // No lock acquisition 18 } 19} 20 21fun helperFunctionUnsafe() { 22 // Assumes caller holds lock 23} 24
4. 优先考虑无锁替代方案
对于简单操作,原子类型速度更快:
1// ✅ Better for simple counters 2class AtomicCounter { 3 private val counter = AtomicInteger(0) 4 5 fun increment() = counter.incrementAndGet() 6 fun get() = counter.get() 7} 8 9// ❌ Overkill for a simple counter 10class MutexCounter { 11 private var counter = 0 12 private val mutex = Mutex() 13 14 suspend fun increment() { 15 mutex.withLock { counter++ } 16 } 17} 18
5.文档锁不变量
明确锁保护的对象:
1class UserCache { 2 private val mutex = Mutex() // Protects userMap and lastUpdate 3 private val userMap = mutableMapOf<String, User>() 4 private var lastUpdate = 0L 5 6 suspend fun updateUser(id: String, user: User) { 7 mutex.withLock { 8 userMap[id] = user 9 lastUpdate = System.currentTimeMillis() 10 } 11 } 12} 13
互斥锁 vs. 其他同步方法
互斥锁 vs. synchronized
1// Traditional synchronized (blocks thread) 2class SynchronizedCounter { 3 private var count = 0 4 5 @Synchronized 6 fun increment() { 7 count++ // Thread blocked while waiting 8 } 9} 10 11// Mutex (suspends coroutine) 12class MutexCounter { 13 private var count = 0 14 private val mutex = Mutex() 15 16 suspend fun increment() { 17 mutex.withLock { 18 count++ // Coroutine suspended, thread free 19 } 20 } 21} 22
何时该用哪个:
- 对于非暂停代码和旧版 Java 互操作,请使用
synchronized - 对于暂停函数和基于协程的代码,请使用
Mutex - 在协程上下文中,
Mutex效率更高,因为线程不会被阻塞
互斥锁 vs. 信号量
1// Mutex: Only one coroutine at a time 2val mutex = Mutex() 3 4// Semaphore: N coroutines at a time 5val semaphore = Semaphore(permits = 3) 6 7// Example: Rate limiting API calls 8class ApiClient { 9 private val semaphore = Semaphore(5) // Max 5 concurrent requests 10 11 suspend fun makeRequest(endpoint: String): Response { 12 semaphore.withPermit { 13 return httpClient.get(endpoint) 14 } 15 } 16} 17
何时使用谁:
- 需要独占访问(单次许可)时使用
Mutex - 需要将并发限制为 N 个操作时使用
Semaphore
互斥锁 vs. Actor
1// Mutex: Manual synchronization 2class MutexBasedCache { 3 private val cache = mutableMapOf<String, Data>() 4 private val mutex = Mutex() 5 6 suspend fun get(key: String) = mutex.withLock { cache[key] } 7 suspend fun put(key: String, value: Data) = mutex.withLock { cache[key] = value } 8} 9 10// Actor: Message-based synchronization 11sealed class CacheMessage 12data class Get(val key: String, val response: CompletableDeferred<Data?>) : CacheMessage() 13data class Put(val key: String, val value: Data) : CacheMessage() 14 15fun CoroutineScope.cacheActor() = actor<CacheMessage> { 16 val cache = mutableMapOf<String, Data>() 17 18 for (msg in channel) { 19 when (msg) { 20 is Get -> msg.response.complete(cache[msg.key]) 21 is Put -> cache[msg.key] = msg.value 22 } 23 } 24} 25
何时使用谁:
- 使用
Mutex进行直接方法调用的简单同步 - 对于复杂的状态机或需要消息队列时,使用
Actor - Actor 提供更好的封装性,并且可以处理背压
Mutex 与 StateFlow
1// Mutex: Imperative state management 2class MutexState { 3 private var state = 0 4 private val mutex = Mutex() 5 6 suspend fun updateState(transform: (Int) -> Int) { 7 mutex.withLock { 8 state = transform(state) 9 } 10 } 11} 12 13// StateFlow: Reactive state management 14class FlowState { 15 private val _state = MutableStateFlow(0) 16 val state: StateFlow<Int> = _state.asStateFlow() 17 18 fun updateState(transform: (Int) -> Int) { 19 _state.update(transform) // Thread-safe built-in 20 } 21} 22
何时使用哪个:
- 需要自定义同步逻辑时使用
Mutex - 使用
StateFlow进行内置线程安全的可观察状态 StateFlow更适合 UI 状态和响应式架构
Mutex 与原子类型
1// AtomicInteger: Lock-free for simple operations 2class AtomicCounter { 3 private val counter = AtomicInteger(0) 4 5 fun increment() = counter.incrementAndGet() 6 fun addAndGet(delta: Int) = counter.addAndGet(delta) 7} 8 9// Mutex: For complex operations 10class ComplexCounter { 11 private var counter = 0 12 private var history = mutableListOf<Int>() 13 private val mutex = Mutex() 14 15 suspend fun increment() { 16 mutex.withLock { 17 counter++ 18 history.add(counter) // Multiple operations 19 } 20 } 21} 22
何时使用哪个:
- 使用原子类型进行单变量操作(计数器、标志)
- 需要协调多个变量时使用
Mutex - 原子操作速度更快,但受限于特定操作
常见陷阱
1. 忘记使用 suspend
互斥操作需要暂停:
1// ❌ Won't compile 2fun broken() { 3 mutex.withLock { } // Error: suspend function called in non-suspend context 4} 5 6// ✅ Correct 7suspend fun correct() { 8 mutex.withLock { } 9} 10
2. 长时间操作期间持有锁
1// ❌ Bad: Holding lock during I/O 2suspend fun bad(url: String) { 3 mutex.withLock { 4 val data = httpClient.get(url) // Network call inside lock 5 cache[url] = data 6 } 7} 8 9// ✅ Good: Fetch outside lock 10suspend fun good(url: String) { 11 val data = httpClient.get(url) 12 mutex.withLock { 13 cache[url] = data 14 } 15} 16
3. 假设可重入
1// ❌ Deadlock: Mutex is not reentrant 2suspend fun outer() { 3 mutex.withLock { 4 inner() // Deadlock! 5 } 6} 7 8suspend fun inner() { 9 mutex.withLock { 10 // Never reached 11 } 12} 13
4. 不处理取消
持有锁时务必考虑取消:
1// ✅ Good: withLock handles cancellation 2suspend fun proper() { 3 mutex.withLock { 4 doWork() 5 } // Lock released even on cancellation 6} 7 8// ❌ Risky: Manual lock management 9suspend fun risky() { 10 mutex.lock() 11 try { 12 doWork() // If cancelled here, lock stays acquired 13 } finally { 14 mutex.unlock() 15 } 16} 17
性能考量
- 互斥 vs. synchronized:在协程密集型代码中,互斥更高效,因为线程不会被阻塞
- 争用:高争用会降低性能;考虑分片(为不同的键设置多个锁)
- 锁粒度:更细粒度的锁(更多锁,每个锁保护更少的数据)可减少争用
- 无锁替代方案:对于简单操作,原子类型和
StateFlow速度更快
示例:分片以减少争用:
1class ShardedCache(private val shardCount: Int = 16) { 2 private val mutexes = Array(shardCount) { Mutex() } 3 private val caches = Array(shardCount) { mutableMapOf<String, Data>() } 4 5 private fun shardIndex(key: String) = key.hashCode() and (shardCount - 1) 6 7 suspend fun put(key: String, value: Data) { 8 val index = shardIndex(key) 9 mutexes[index].withLock { 10 caches[index][key] = value 11 } 12 } 13 14 suspend fun get(key: String): Data? { 15 val index = shardIndex(key) 16 return mutexes[index].withLock { 17 caches[index][key] 18 } 19 } 20} 21
真实示例:线程安全的Repository
1class UserRepository( 2 private val api: UserApi, 3 private val database: UserDatabase 4) { 5 private val cache = mutableMapOf<String, User>() 6 private val mutex = Mutex() 7 8 suspend fun getUser(userId: String): User? { 9 // Check cache first (read lock) 10 mutex.withLock { 11 cache[userId]?.let { return it } 12 } 13 14 // Try database (outside lock) 15 database.getUser(userId)?.let { user -> 16 mutex.withLock { 17 cache[userId] = user 18 } 19 return user 20 } 21 22 // Fetch from API (outside lock) 23 return try { 24 val user = api.fetchUser(userId) 25 mutex.withLock { 26 cache[userId] = user 27 database.insertUser(user) 28 } 29 user 30 } catch (e: Exception) { 31 null 32 } 33 } 34 35 suspend fun updateUser(user: User) { 36 mutex.withLock { 37 cache[user.id] = user 38 database.updateUser(user) 39 } 40 } 41 42 suspend fun clearCache() { 43 mutex.withLock { 44 cache.clear() 45 } 46 } 47} 48
测试互斥锁保护的代码
1@Test 2fun `concurrent increments should be thread-safe`() = runTest { 3 val counter = CounterService() 4 5 // Launch 1000 concurrent increments 6 val jobs = List(1000) { 7 launch { 8 counter.increment() 9 } 10 } 11 12 jobs.joinAll() 13 14 // Should be exactly 1000 15 assertEquals(1000, counter.getCount()) 16} 17 18@Test 19fun `mutex prevents race conditions`() = runTest { 20 val cache = mutableMapOf<String, Int>() 21 val mutex = Mutex() 22 23 // Simulate race condition 24 coroutineScope { 25 repeat(100) { 26 launch { 27 mutex.withLock { 28 val current = cache["key"] ?: 0 29 delay(1) // Simulate work 30 cache["key"] = current + 1 31 } 32 } 33 } 34 } 35 36 assertEquals(100, cache["key"]) 37} 38
总结
Mutex 是一个强大的工具,用于在基于协程的应用程序中保护共享可变状态。它提供线程安全的同步,而不会阻塞线程,使其成为并发协程代码的理想选择。
关键要点:
- 使用
withLock进行自动锁管理 - 保持临界区简洁高效
- 适当时考虑更简单的替代方案(例如原子操作、StateFlow)
- 了解何时使用 Mutex 而非其他同步原语
- 始终妥善处理取消操作
记住:最好的同步就是没有同步。尽可能地,设计系统时,通过使用不可变数据结构、消息传递(Actors/Channels)或响应式流(Flow/StateFlow)来完全避免共享可变状态。但是,当你在协程代码中确实需要互斥时,Mutex 是你的最佳选择。
欢迎搜索并关注 公众号「稀有猿诉」 获取更多的优质文章!
保护原创,请勿转载!
《Kotlin互斥锁(Mutex):协程的线程安全守护神》 是转载文章,点击查看原文。
