Kotlin互斥锁(Mutex):协程的线程安全守护神

作者:稀有猿诉日期:2025/10/14

本文译自「Kotlin Mutex: Thread-Safe Concurrency for Coroutines」,原文链接carrion.dev/en/posts/ko…,由Ignacio Carrión发布于2025年10月3日。

kotlin_mutex.webp

使用 Kotlin 协程构建并发应用程序时,保护共享的可变状态至关重要。虽然传统的 Java 同步工具(例如 synchronized 块和 ReentrantLock)可以正常工作,但它们会阻塞线程,并且与协程的挂起模型不兼容。因此,引入 Mutex——一个协程友好的同步原语,它提供互斥而不阻塞线程。

本指南探讨了何时使用 Mutex、最佳实践以及它与其他并发控制机制的比较。

TL;DR:省流版本的建议

  • 当需要保护多个协程访问的共享可变状态时,请使用 Mutex
  • 在协程代码中,优先使用 Mutex 而不是 synchronized,以避免阻塞线程。
  • 使用 mutex.withLock { } 自动获取和释放锁。
  • 对于更复杂的状态管理场景,请考虑使用 ActorStateFlow
  • 对于简单的计数器,请改用 AtomicIntegerAtomicReference
  • 如果需要将并发访问限制为多个许可,请使用 Semaphore
  • 如果不使用 withLock,请始终在 finally 块中释放锁。

什么是互斥锁?

Mutex(互斥)是 kotlinx.coroutines 中的同步原语,用于确保同一时间只有一个协程可以执行临界区。与阻塞线程的传统锁不同,Mutex 会暂停协程,从而使线程可以自由地执行其他工作。

基本结构:

1import kotlinx.coroutines.sync.Mutex
2import kotlinx.coroutines.sync.withLock
3
4val mutex = Mutex()
5
6suspend fun protectedOperation() {
7    mutex.withLock {
8        // Critical section - only one coroutine at a time
9        // Modify shared state safely here
10    }
11}
12

关键特性:

  • 非阻塞:暂停协程而不是阻塞线程
  • 公平:默认按先进先出顺序授予访问权限
  • 不重入安全:持有锁的协程无法再次获取锁(防止死锁)
  • 轻量级:比线程阻塞锁更高效

互斥锁的核心用例

最常见的用例——确保对共享变量的安全访问:

1class CounterService {
2    private var counter = 0
3    private val mutex = Mutex()
4    
5    suspend fun increment() {
6        mutex.withLock {
7            counter++
8        }
9    }
10    
11    suspend fun getCount(): Int {
12        return mutex.withLock {
13            counter
14        }
15    }
16}
17

2. 协调资源访问

当多个协程需要对某个资源进行独占访问时:

1class FileWriter(private val file: File) {
2    private val mutex = Mutex()
3    
4    suspend fun appendLine(line: String) {
5        mutex.withLock {
6            file.appendText("$line\n")
7        }
8    }
9}
10

3.确保顺序执行

即使操作是并发触发的,也必须按顺序执行:

1class OrderProcessor {
2    private val mutex = Mutex()
3    private val orders = mutableListOf<Order>()
4    
5    suspend fun processOrder(order: Order) {
6        mutex.withLock {
7            // Ensure orders are processed sequentially
8            orders.add(order)
9            validateOrder(order)
10            persistOrder(order)
11        }
12    }
13}
14

4. 线程安全的延迟初始化

在挂起上下文中实现线程安全的延迟初始化:

1class DatabaseConnection {
2    private var connection: Connection? = null
3    private val mutex = Mutex()
4    
5    suspend fun getConnection(): Connection {
6        if (connection != null) return connection!!
7        
8        return mutex.withLock {
9            // Double-check inside lock
10            connection ?: createConnection().also { connection = it }
11        }
12    }
13    
14    private suspend fun createConnection(): Connection {
15        delay(1000) // Simulate connection setup
16        return Connection()
17    }
18}
19

最佳实践

1. 始终使用 withLock

即使发生异常,withLock 也会自动处理锁的获取和释放:

1// ✅ Good: Automatic cleanup
2mutex.withLock {
3    dangerousOperation()
4}
5
6// ❌ Bad: Manual management, error-prone
7mutex.lock()
8try {
9    dangerousOperation()
10} finally {
11    mutex.unlock()
12}
13

2. 保持临界区较小

尽量减少锁的持有时间以减少争用:

1// ✅ Good: Lock only for critical section
2suspend fun updateUser(userId: String, name: String) {
3    val validated = validateName(name) // Outside lock
4    
5    mutex.withLock {
6        userCache[userId] = validated // Only this needs protection
7    }
8    
9    notifyObservers(userId) // Outside lock
10}
11
12// ❌ Bad: Holding lock during slow operations
13suspend fun updateUserSlow(userId: String, name: String) {
14    mutex.withLock {
15        val validated = validateName(name) // Slow operation inside lock
16        userCache[userId] = validated
17        notifyObservers(userId) // I/O inside lock
18    }
19}
20

3. 避免嵌套锁

互斥锁不可重入。避免两次获取同一个锁:

1// ❌ Bad: Deadlock!
2suspend fun problematic() {
3    mutex.withLock {
4        helperFunction() // Tries to acquire mutex again
5    }
6}
7
8suspend fun helperFunction() {
9    mutex.withLock {
10        // Will suspend forever
11    }
12}
13
14// ✅ Good: Restructure to avoid nesting
15suspend fun better() {
16    mutex.withLock {
17        helperFunctionUnsafe() // No lock acquisition
18    }
19}
20
21fun helperFunctionUnsafe() {
22    // Assumes caller holds lock
23}
24

4. 优先考虑无锁替代方案

对于简单操作,原子类型速度更快:

1// ✅ Better for simple counters
2class AtomicCounter {
3    private val counter = AtomicInteger(0)
4    
5    fun increment() = counter.incrementAndGet()
6    fun get() = counter.get()
7}
8
9// ❌ Overkill for a simple counter
10class MutexCounter {
11    private var counter = 0
12    private val mutex = Mutex()
13    
14    suspend fun increment() {
15        mutex.withLock { counter++ }
16    }
17}
18

5.文档锁不变量

明确锁保护的对象:

1class UserCache {
2    private val mutex = Mutex() // Protects userMap and lastUpdate
3    private val userMap = mutableMapOf<String, User>()
4    private var lastUpdate = 0L
5    
6    suspend fun updateUser(id: String, user: User) {
7        mutex.withLock {
8            userMap[id] = user
9            lastUpdate = System.currentTimeMillis()
10        }
11    }
12}
13

互斥锁 vs. 其他同步方法

互斥锁 vs. synchronized

1// Traditional synchronized (blocks thread)
2class SynchronizedCounter {
3    private var count = 0
4    
5    @Synchronized
6    fun increment() {
7        count++ // Thread blocked while waiting
8    }
9}
10
11// Mutex (suspends coroutine)
12class MutexCounter {
13    private var count = 0
14    private val mutex = Mutex()
15    
16    suspend fun increment() {
17        mutex.withLock {
18            count++ // Coroutine suspended, thread free
19        }
20    }
21}
22

何时该用哪个:

  • 对于非暂停代码和旧版 Java 互操作,请使用 synchronized
  • 对于暂停函数和基于协程的代码,请使用 Mutex
  • 在协程上下文中,Mutex 效率更高,因为线程不会被阻塞

互斥锁 vs. 信号量

1// Mutex: Only one coroutine at a time
2val mutex = Mutex()
3
4// Semaphore: N coroutines at a time
5val semaphore = Semaphore(permits = 3)
6
7// Example: Rate limiting API calls
8class ApiClient {
9    private val semaphore = Semaphore(5) // Max 5 concurrent requests
10    
11    suspend fun makeRequest(endpoint: String): Response {
12        semaphore.withPermit {
13            return httpClient.get(endpoint)
14        }
15    }
16}
17

何时使用谁:

  • 需要独占访问(单次许可)时使用 Mutex
  • 需要将并发限制为 N 个操作时使用 Semaphore

互斥锁 vs. Actor

1// Mutex: Manual synchronization
2class MutexBasedCache {
3    private val cache = mutableMapOf<String, Data>()
4    private val mutex = Mutex()
5    
6    suspend fun get(key: String) = mutex.withLock { cache[key] }
7    suspend fun put(key: String, value: Data) = mutex.withLock { cache[key] = value }
8}
9
10// Actor: Message-based synchronization
11sealed class CacheMessage
12data class Get(val key: String, val response: CompletableDeferred<Data?>) : CacheMessage()
13data class Put(val key: String, val value: Data) : CacheMessage()
14
15fun CoroutineScope.cacheActor() = actor<CacheMessage> {
16    val cache = mutableMapOf<String, Data>()
17    
18    for (msg in channel) {
19        when (msg) {
20            is Get -> msg.response.complete(cache[msg.key])
21            is Put -> cache[msg.key] = msg.value
22        }
23    }
24}
25

何时使用谁:

  • 使用 Mutex 进行直接方法调用的简单同步
  • 对于复杂的状态机或需要消息队列时,使用 Actor
  • Actor 提供更好的封装性,并且可以处理背压

Mutex 与 StateFlow

1// Mutex: Imperative state management
2class MutexState {
3    private var state = 0
4    private val mutex = Mutex()
5    
6    suspend fun updateState(transform: (Int) -> Int) {
7        mutex.withLock {
8            state = transform(state)
9        }
10    }
11}
12
13// StateFlow: Reactive state management
14class FlowState {
15    private val _state = MutableStateFlow(0)
16    val state: StateFlow<Int> = _state.asStateFlow()
17    
18    fun updateState(transform: (Int) -> Int) {
19        _state.update(transform) // Thread-safe built-in
20    }
21}
22

何时使用哪个:

  • 需要自定义同步逻辑时使用 Mutex
  • 使用 StateFlow 进行内置线程安全的可观察状态
  • StateFlow 更适合 UI 状态和响应式架构

Mutex 与原子类型

1// AtomicInteger: Lock-free for simple operations
2class AtomicCounter {
3    private val counter = AtomicInteger(0)
4    
5    fun increment() = counter.incrementAndGet()
6    fun addAndGet(delta: Int) = counter.addAndGet(delta)
7}
8
9// Mutex: For complex operations
10class ComplexCounter {
11    private var counter = 0
12    private var history = mutableListOf<Int>()
13    private val mutex = Mutex()
14    
15    suspend fun increment() {
16        mutex.withLock {
17            counter++
18            history.add(counter) // Multiple operations
19        }
20    }
21}
22

何时使用哪个:

  • 使用原子类型进行单变量操作(计数器、标志)
  • 需要协调多个变量时使用 Mutex
  • 原子操作速度更快,但受限于特定操作

常见陷阱

1. 忘记使用 suspend

互斥操作需要暂停:

1// ❌ Won't compile
2fun broken() {
3    mutex.withLock { } // Error: suspend function called in non-suspend context
4}
5
6// ✅ Correct
7suspend fun correct() {
8    mutex.withLock { }
9}
10

2. 长时间操作期间持有锁

1// ❌ Bad: Holding lock during I/O
2suspend fun bad(url: String) {
3    mutex.withLock {
4        val data = httpClient.get(url) // Network call inside lock
5        cache[url] = data
6    }
7}
8
9// ✅ Good: Fetch outside lock
10suspend fun good(url: String) {
11    val data = httpClient.get(url)
12    mutex.withLock {
13        cache[url] = data
14    }
15}
16

3. 假设可重入

1// ❌ Deadlock: Mutex is not reentrant
2suspend fun outer() {
3    mutex.withLock {
4        inner() // Deadlock!
5    }
6}
7
8suspend fun inner() {
9    mutex.withLock {
10        // Never reached
11    }
12}
13

4. 不处理取消

持有锁时务必考虑取消:

1// ✅ Good: withLock handles cancellation
2suspend fun proper() {
3    mutex.withLock {
4        doWork()
5    } // Lock released even on cancellation
6}
7
8// ❌ Risky: Manual lock management
9suspend fun risky() {
10    mutex.lock()
11    try {
12        doWork() // If cancelled here, lock stays acquired
13    } finally {
14        mutex.unlock()
15    }
16}
17

性能考量

  • 互斥 vs. synchronized:在协程密集型代码中,互斥更高效,因为线程不会被阻塞
  • 争用:高争用会降低性能;考虑分片(为不同的键设置多个锁)
  • 锁粒度:更细粒度的锁(更多锁,每个锁保护更少的数据)可减少争用
  • 无锁替代方案:对于简单操作,原子类型和 StateFlow 速度更快

示例:分片以减少争用:

1class ShardedCache(private val shardCount: Int = 16) {
2    private val mutexes = Array(shardCount) { Mutex() }
3    private val caches = Array(shardCount) { mutableMapOf<String, Data>() }
4    
5    private fun shardIndex(key: String) = key.hashCode() and (shardCount - 1)
6    
7    suspend fun put(key: String, value: Data) {
8        val index = shardIndex(key)
9        mutexes[index].withLock {
10            caches[index][key] = value
11        }
12    }
13    
14    suspend fun get(key: String): Data? {
15        val index = shardIndex(key)
16        return mutexes[index].withLock {
17            caches[index][key]
18        }
19    }
20}
21

真实示例:线程安全的Repository

1class UserRepository(
2    private val api: UserApi,
3    private val database: UserDatabase
4) {
5    private val cache = mutableMapOf<String, User>()
6    private val mutex = Mutex()
7    
8    suspend fun getUser(userId: String): User? {
9        // Check cache first (read lock)
10        mutex.withLock {
11            cache[userId]?.let { return it }
12        }
13        
14        // Try database (outside lock)
15        database.getUser(userId)?.let { user ->
16            mutex.withLock {
17                cache[userId] = user
18            }
19            return user
20        }
21        
22        // Fetch from API (outside lock)
23        return try {
24            val user = api.fetchUser(userId)
25            mutex.withLock {
26                cache[userId] = user
27                database.insertUser(user)
28            }
29            user
30        } catch (e: Exception) {
31            null
32        }
33    }
34    
35    suspend fun updateUser(user: User) {
36        mutex.withLock {
37            cache[user.id] = user
38            database.updateUser(user)
39        }
40    }
41    
42    suspend fun clearCache() {
43        mutex.withLock {
44            cache.clear()
45        }
46    }
47}
48

测试互斥锁保护的代码

1@Test
2fun `concurrent increments should be thread-safe`() = runTest {
3    val counter = CounterService()
4    
5    // Launch 1000 concurrent increments
6    val jobs = List(1000) {
7        launch {
8            counter.increment()
9        }
10    }
11    
12    jobs.joinAll()
13    
14    // Should be exactly 1000
15    assertEquals(1000, counter.getCount())
16}
17
18@Test
19fun `mutex prevents race conditions`() = runTest {
20    val cache = mutableMapOf<String, Int>()
21    val mutex = Mutex()
22    
23    // Simulate race condition
24    coroutineScope {
25        repeat(100) {
26            launch {
27                mutex.withLock {
28                    val current = cache["key"] ?: 0
29                    delay(1) // Simulate work
30                    cache["key"] = current + 1
31                }
32            }
33        }
34    }
35    
36    assertEquals(100, cache["key"])
37}
38

总结

Mutex 是一个强大的工具,用于在基于协程的应用程序中保护共享可变状态。它提供线程安全的同步,而不会阻塞线程,使其成为并发协程代码的理想选择。

关键要点

  • 使用 withLock 进行自动锁管理
  • 保持临界区简洁高效
  • 适当时考虑更简单的替代方案(例如原子操作、StateFlow)
  • 了解何时使用 Mutex 而非其他同步原语
  • 始终妥善处理取消操作

记住:最好的同步就是没有同步。尽可能地,设计系统时,通过使用不可变数据结构、消息传递(Actors/Channels)或响应式流(Flow/StateFlow)来完全避免共享可变状态。但是,当你在协程代码中确实需要互斥时,Mutex 是你的最佳选择。

欢迎搜索并关注 公众号「稀有猿诉」 获取更多的优质文章!

保护原创,请勿转载!


Kotlin互斥锁(Mutex):协程的线程安全守护神》 是转载文章,点击查看原文


相关推荐


Coze源码分析-资源库-编辑数据库-前端源码-核心逻辑与接口
lypzcgf2025/10/13

编辑数据库逻辑 1. 表单验证系统 文件位置:frontend/packages/data/memory/database-v2-base/src/components/base-info-modal/index.tsx 编辑数据库表单的验证规则: // 数据库名称验证规则 const nameValidationRules = [ { required: true, whitespace: true, message: I18n.t('database_name_c


C++ 中 rfind 方法详解
oioihoii2025/10/11

rfind 是 C++ 字符串类 std::string 和 std::wstring 的成员函数,用于从字符串的末尾向前搜索指定的子字符串或字符。 函数原型 // 搜索整个字符串 size_type rfind(const basic_string& str, size_type pos = npos) const noexcept; size_type rfind(const CharT* s, size_type pos = npos) const; size_type rfind(Cha


HTML 元素帮助手册
hubenchang05152025/10/9

#HTML 元素帮助手册 转载自 MDN #主根元素 元素描述<html>表示一个 HTML 文档的根(顶级元素),所以它也被称为根元素。所有其它元素必须是此元素的后代。 #文档元数据 元素描述<base>指定用于一个文档中包含的所有相对 URL 的根 URL。一份中只能有一个该元素。<head>包含文档相关的配置信息(元数据),包括文档的标题、脚本和样式表等。<link>指定当前文档与外部资源的关系。该元素最常用于链接 CSS,此外也可以被用来创建站点图标(比如“favicon”样式图标和


Rust语言简介
xqlily2025/10/8

Rust是一种现代的系统编程语言,由Mozilla基金会开发,并于2010年首次发布。它旨在解决传统语言(如C和C++)中的常见问题,如内存安全错误和并发性挑战,同时保持高性能。Rust强调安全性、速度和并发性,使其在系统开发、嵌入式系统和WebAssembly等领域广受欢迎。下面,我将从核心特点、优势和应用场景入手,逐步介绍Rust,并附上一个简单示例。 核心特点 内存安全:Rust通过独特的“所有权系统”避免空指针解引用、缓冲区溢出等常见错误。例如,编译器在编译时检查内存访问,确保


SpringBoot安全进阶:利用门限算法加固密钥与敏感配置
风象南2025/10/7

一、背景:单点密钥的隐患 在企业信息系统中,密钥是最核心的安全资产。无论是数据库加密、支付签名,还是用户隐私保护,背后都依赖一把"超级钥匙"。 然而,现实中我们常常遇到这些场景: 单点保管风险:某个核心密钥仅由一个运维人员或系统服务持有,一旦泄露或者丢失,整个系统可能崩盘。 操作合规问题:金融或政府系统中,法规往往要求多方共同参与,才能执行高风险操作。 分布式架构挑战:在云环境或多数据中心下,如何既能保证数据安全,又能防止任何一个节点"作恶"? 一句话总结: 👉 一个人掌握所有密钥 = 系统安


纯电汽车emc整改:设计缺陷到合规达标的系统方案|深圳南柯电子
深圳南柯电子2025/10/5

在新能源汽车产业迈入智能化、电动化深水区的当下,电磁兼容性(EMC)已成为决定产品安全与市场竞争力的核心指标。某头部车企曾因电机控制器辐射超标导致整车上市延迟,直接损失超3亿元;某新势力品牌因车载充电机传导骚扰超标引发用户投诉,召回成本高达1.2亿元。这些案例揭示了一个残酷现实:EMC整改不再是产品上市前的“补救措施”,而是贯穿研发、生产、运维全生命周期的系统工程。 一、纯电汽车emc整改的标准为纲:构建EMC合规的“法律底线” 纯电汽车EMC整改需严格遵循国内外双重标准体系。国内以GB/T


零基础从头教学Linux(Day 45)
小白银子2025/10/4

OpenResty介绍与实战 一、概述 OpenResty是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web应用、Web服务和动态网关。 简单地说 OpenResty 的目标是让你的Web服务直接跑在 Nginx 服务内部,充分利用 Nginx 的非阻塞 I/O 模型,不仅仅对 HTTP 客户端请求,甚至于对远程后端诸如 MySQL、PostgreSQL、Me


从汇编角度看C++优化:编译器真正做了什么
oioihoii2025/10/3

我们写的C++代码,对人类来说是清晰的逻辑表达,但对机器来说,只是一串抽象的字符。编译器,特别是像GCC、Clang这样的现代编译器,扮演着“翻译官”兼“优化大师”的角色。它们将高级代码转化为机器指令,并在此过程中,对代码进行脱胎换骨般的重塑,以求达到极致的性能。 今天,我们将深入汇编层面,揭开编译器优化的神秘面纱,看看我们的代码在编译器的“熔炉”中究竟经历了什么。 为什么选择汇编语言? 汇编是机器指令的人类可读形式,是连接高级语言与硬件执行的最直接桥梁。通过查看编译器生成的汇编代码,我们可以:


Manim实现渐变填充特效
databook2025/10/2

本文将介绍如何使用Manim框架实现动态渐变填充特效,通过自定义动画类来控制物体的颜色随时间平滑变化。 1. 实现原理 1.1. 自定义动画类设计 在Manim中,所有动画效果都是通过继承Animation基类并实现相应的方法来创建的。 我们设计了一个名为GradientFillAnimation的类,专门用于实现颜色渐变填充效果: class GradientFillAnimation(Animation): """动态渐变填充动画类""" def __init__(


15:00开始面试,15:06就出来了,问的问题有点变态。。。
测试界晓晓2025/10/2

从小厂出来,没想到在另一家公司又寄了。 到这家公司开始上班,加班是每天必不可少的,看在钱给的比较多的份上,就不太计较了。没想到8月一纸通知,所有人不准加班,加班费不仅没有了,薪资还要降40%,这下搞的饭都吃不起了。 还在有个朋友内推我去了一家互联网公司,兴冲冲见面试官,没想到一道题把我给问死了: 如果模块请求http改为了https,测试方案应该如何制定,修改? 感觉好简单的题,硬是没有答出来,早知道好好看看一大佬软件测试面试宝典了。 通过大数据总结发现,其实软件测试岗的面试都是差不多

首页编辑器站点地图

Copyright © 2025 聚合阅读

License: CC BY-SA 4.0