小米集团2024年报

小米2024年财报

小米集团产品分类

  • 手机
  • IoT
  • 家电(空调,冰箱,洗衣机)
  • 互联网服务
  • 电动汽车

整体表现

指标 数值(人民币) 同比增长率 备注
集团总收入 3,659亿 35.0% 创历史新高
手机×AIoT分部收入 3,332亿 22.9%
智能电动汽车等创新业务分部收入 328亿 -
集团经调整净利润 272亿 41.3% 创历史新高,含创新业务亏损62亿元
智能电动汽车等创新业务经调整净亏损 62亿 - 于集团经调整净利润内包含此项亏损。

研发

研发支出241亿人民币,研发人员21190人,占员工总数48.5%。

评:

  1. 对比其他互联网公司,小米的研发人员和员工总数相对较低。对比小米业务线,未来研发人员或许会大幅增加。

手机

  • 2024年,小米在全球56个国家和地区的智能手机出货量排名前三,在69个国家和地区的智能手机出货量排名前五。
    非洲、东南亚和中东的智能手机市场份额均实现显著增长。
  • 在中国大陆地区,2024年高端智能手机(零售价人民币3,000元及以上)出货量在整体智能手机出货量中的占比达到23.3%,同比增长3.0个百分点。
  • 据Canalys数据,2024年小米在全球智能手机出货量中排名第三,市场份额为13.8%,同比增长1.0个百分点。
  • 根据Canalys数据,2024年第四季度,小米在中国大陆的市场份额同比增长3.0个百分点至15.8%。

评:

  1. 小米手机的大陆销量仅占总出货量的23.3%。即便未来中国持续处于通缩状态,小米手机的营收依旧可以依靠全球业务支撑。
  2. 按区域划分,手机销售量和互联网服务中的活跃用户相吻合

IoT

  • AIoT平台已连接的IoT设备数(不包括智能手机、平板及笔记本电脑)增长至904.6百万,同比增长22.3%。
  • 拥有五件及以上连接至我们AIoT平台的设备(不包括智能手机、平板及笔记本电脑)用户数达到18.3百万,同比增长26.1%。
  • 2024年12月,米家APP的月活跃用户数同比增长17.5%至100.8百万,小爱同学的月活跃用户数同比增长12.0%至137.1百万。
  • 平板业务在2024年延续高增长态势。根据 Canalys 数据,小米的全球平板产品出货量同比增长 73.1%,是全球前五厂商中
    增速最快的,全球出货量排名前五,中国大陆排名前三。
  • 小米在可穿戴产品领域保持领先优势。根据 Canalys 数据,2024年,小米的可穿戴腕带设备在全球和中国大陆地区均排名第二,TWS
    耳机在中国大陆地区出货量排名第一。
  • 小米耳机功能更新:降噪,AI功能(录音,翻译,同传)。

评:

  1. 结合金山云的业务增长和万卡集群的建设:
    1. 两年100亿次唤醒 金山云为小爱同学提供顶级技术支持
    2. GPU万卡集群搭建中,金山云股价涨超14%,小米及金山生态需求即将放量

家电(空调,冰箱,洗衣机)

产品 出货量 同比增速 备註
空调 超过 680 万台 超过 50% 创历史新高
冰箱 超过 270 万台 超过 30% 创历史新高
洗衣机 超过 190 万台 超过 45% 创历史新高
  • 米家中央空调Pro搭载双缸压缩机,并且支持超一级能效。智能化方面,米家中央空调Pro搭载米家灵云智控引擎,控温更稳更准,提高舒适度。
  • 空调、智能电视、冰箱、洗衣机、热水器、智能门锁6大核心品类,完成送拆装一站式服务升级,覆盖全国2,898个区县

评:

  1. 结合2021小米之家的门店覆盖率:
    1. 小米集团合伙人,中国区、国际部总裁,Redmi品牌总经理卢伟冰在发布会现场宣布,截至2021年10月底,小米之家门店数量将正式突破1万家,
      同时县城市场的覆盖率超过了80%,这是小米新零售发展史上里程碑式的新突破。Source
  2. 冲这个趋势,估计25年就能超越格力,美的,海尔

互联网服务

营收

  • 互联网服务收入达到人民币 341 亿元,较 2023 年的人民币 301 亿元同比增长 13.3%
  • 互联网服务毛利率为 76.6%,较 2023 年的 74.2%同比提升 2.5 个百分点
  • 广告业务营收247亿元

分区域数据

  • 2024年,境外互联网服务收入同比增长 30.0% 至人民币 110 亿元,占整体互联网服务收入的 32.2%,同比提升 4.1 个百分点。
  • 全球月活跃用户数达到702.3百万,同比增长9.5%。
    • 其中,中国大陆月活跃用户数达到172.9百万,同比增长11.1%。
    • 智能电视全球月活跃用户数达到70.7百万,同比增长7.1%。

评:

  1. 营收来源主要为广告业务。对比苹果2025Q1 service revenue 26b美元,苹果的active devices数量为2.35b。
  2. 小米的海外活跃用户占比高达75%

电动汽车

  • 电动汽车收入328亿,电车毛利率18.5%。净亏损62亿元。
  • 布局销售服务网络,中国大陆58个城区,200家汽车销售门店。
  • 2025年预期交付35万台。

评:

  1. 产能爬坡,工厂建设,销售网络建设
    1. 实探小米汽车二期工厂:工人全天赶工,今年底能封顶 2024-10-24
    2. 小米首座智能家电工厂落户武汉,一期项目聚焦空调品类 2024-11-28

Footnotes

互联网公司员工数据

公司名称 员工数量(估计) 研发支出(人民币 亿元) 研发支出(美元 亿元) 收入(人民币 亿元) 收入(美元 亿元) 研发支出占收入比例
阿里巴巴集团 235,216 567.44 82.63 2082 303.16 27.26%
腾讯控股 105,417 640.78 93.39 5552 808.99 11.54%
百度公司 ~150,000 242 34.1 1346 189.6 18.00%
美团 114,731 212.01 29 2767 383 7.66%
字节跳动 >150,000 未知 未知 未知 ~1460
未知

数据来源:Gemini deep research

tech-blog-index

预测技术在美团弹性伸缩场景的探索与应用

从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地

  • title: 从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
  • link: https://mp.weixin.qq.com/s/iY-h-tkklSPSX7gxle9lzA
  • tags: [‘网易’, ‘Apache Doris’, ‘Big Data’, ‘High throughput logging’]
  • date: 2025-02-13

Netflix财报2024Q2总结

Netflix 2024Q2 财报:
https://s22.q4cdn.com/959853165/files/doc_financials/2024/q2/FINAL-Q2-24-Shareholder-Letter.pdf

Financial Data

Metric Q2’23 Q3’23 Q4’23 Q1’24 Q2’24 Q3’24 Forecast
Revenue $8,187 $8,542 $8,833 $9,370 $9,559 $9,727
Y/Y % Growth 2.7% 7.8% 12.5% 14.8% 16.8% 13.9%
Operating Income $1,827 $1,916 $1,496 $2,633 $2,603 $2,730
Operating Margin 22.3% 22.4% 16.9% 28.1% 27.2% 28.1%
Net Income $1,488 $1,677 $938 $2,332 $2,147 $2,237
Diluted EPS $3.29 $3.73 $2.11 $5.28 $4.88 $5.10
Global Streaming Paid Memberships 238.39M 247.15M 260.28M 269.60M 277.65M
Y/Y % Growth 8.0% 10.8% 12.8% 16.0% 16.5%
Global Streaming Paid Net Additions 5.89M 8.76M 13.12M 9.33M 8.05M
Net cash provided by operating activities $1,440 $1,992 $1,663 $2,213 $1,291
Free Cash Flow $1,339 $1,888 $1,581 $2,137 $1,213
Shares (FD) 451.6M 450.0M 444.3M 441.7M 439.7M

Global Streaming Paid Net Additions增长了8m,Q2股票数环比减少2.6m(由于回购)。

市场排名

根据Nielsen’s The Gauge的Share of US TV Screen Time调查,Youtube占9.9%,Netflix占8.4%排在第二,第三的Amazon Prime Video占3.1%。

streaming, pay TV, film, games, branded advertising总市场份额大概600b美元,Netflix大概占据6%的市场份额。

评:Netflix并没有提及短视频平台tiktok对streaming业务的影响

产品种类

剧和电影

Title Views (Millions)
Bridgerton Season 3 98.5
Queen Charlotte Included in Bridgerton Universe (172M)
Under Paris 90.9
Atlas 79.3
Baby Reindeer 88.4
Heeramandi: The Diamond Bazaar 15.0
Amar Singh Chamkila 8.3
The Roast of Tom Brady 22.6
The Asunta Case 31.3
Crooks 15.7
Queen of Tears 29.3
Hit Man 33.2
City Hunter 16.5

游戏

Netflix特别提及了Grand Theft Auto的火爆

  • Netflix Stories: This is a collection of interactive fiction games based on popular Netflix series and films:
    • Love is Blind
    • Virgin River
    • Perfect Match
    • Emily in Paris (upcoming)
    • Selling Sunset (upcoming)
  • **Squid Game (Upcoming Multiplayer Game):

Netflix并没有给出具体的各个游戏玩家数据

产品

  • 提到了自家的推荐系统:“会持续提升推荐系统,尽管目前已经是行业领先”。
  • 发布了新的TV homepage设计,更新了一些具体的显示和UI设置。
    评:没有公布具体的ab测试数据
  • 重新设计了My Netflix

Partnerships

  • 和厂家合作确保Netflix “easy to find and use”
  • 没有和Disney+还有Max打包销售,Netflix认为自己在市场上有领先地位

广告业务

  • Netflix着重讲述了广告业务的发展,目前45%的新注册用户来自广告tier。Q2,广告tier的用户数增长34%。
  • 提升对广告商的服务
    • 在UK,和Barb合作,使广告客户能更好的”plan campaigns”和”understand their audiences on Netflix”
    • “pause”和”keep watching”广告beta版。客户包括Expedia, Coca-Cola, Ford, L’Oréal and McDonald’s
    • Netflix在自建广告基建,会在2024年晚些时候在加拿大测试,2025年会在其他地区扩大测试规模
    • Netflix也在测试集成Trade Desk, Google DV 360, and Magnite.
  • 2025会增加广告付费用户的规模。而且广告收入也称为很重要的一部分。
  • 建立广告系统需要时间,Netflix并不期待广告收入会成为主要收入来源。
  • Netflix担忧广告业务的增速太快,以至于变现能力不足以应付需求。

Here is a summary of the cash flow and capital structure information found in the sources, presented in Chinese:

Netflix 第二季度现金流和资本结构概述

  • 现金流: Netflix 报告称,2024 年第二季度运营活动产生的净现金为 13 亿美元,而 2023 年同期为 14 亿美元。2024 年第二季度自由
    现金流(FCF)总计 12 亿美元,而去年同期为 13 亿美元。该公司预计,假设汇率没有重大波动,2024 年全年自由现金流约为 60 亿美元。

  • 资本结构: 截至 2024 年第二季度末,Netflix 的总债务为 140 亿美元,现金、现金等价物和短期投资总额为 67 亿美元。 公司在未来 12
    个月内有 18 亿美元的债务到期,计划进行再融资。 此外,Netflix 在 2024 年第二季度回购了 260 万股股票,耗资 16 亿美元,现有股票回购授权
    还有 50 亿美元的额度。

HTTP新method: Query

About: IETF最近的一个关于HTTP新method的proposal

GET为什么不应该有body

根据 HTTP Spec

1
A payload within a GET request message has no defined semantics

GET request的payload是没有定义的,可以理解为C语言中的undefined behavior,文章中还提到了,
GET bodies从不被 Fetch 支持,变成了虽然支持,但是会警告开发者尽量不要用。

因为是 undefined behavior,所以 GET 请求中的 Body,理论上来讲可以被随意处理,像HackerNews
上这个 Comment 所说的:

1
2
3
4
5
... 
GET with a body is iffy. If you owned the entire stack it might be doable.
...
Proxies or firewalls can just drop the message and be completely HTTP compliant.
...

GET的body是具有不确定性的,Load Balancer理论上是可以直接丢掉Body内容的。

新Method “QUERY”有什么好处

Safe and Idempotent

Proposal Section 2
里提到的最重要的好处是safe和Idempotent。

对比POST,QUERY不应改变服务器上的内容,也就是只读请求,而且QUERY同样也应该是幂等的,
“以相同的请求调用这个接口一次和调用这个接口多次,对系统产生的影响是相同的”。

Caching

Proposal Section 2.1 所提到的,QUERY支持HTTP caching的标准

综合来看,QUERY相当于是一个支持了Body的GET。对比传统通过POST来传Body实现复杂查询,有一些优势。

Thoughts

HTTP定义的这些方法在当前真的符合各个不同应用的需求吗,HTTP是否应该在它这一层去定义这些方法,
是否应该由一层只处理单向或双向的消息通讯协议, 其余的逻辑,由更上层的协议去做。
像GraphQL在HTTP POST之上定义自己的Query和Mutation逻辑。

References:

  1. Request bodies in GET requests
  2. The HTTP QUERY Method
  3. Hackernews Thread: Request bodies in GET requests

Kotlin Coroutine LockFreeTaskQueue 源码注释

Lock Free Task Queue

Kotlin Coroutine 的Lock Free Task Queue的注释

常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Suppress("PrivatePropertyName", "MemberVisibilityCanBePrivate")
internal companion object {
const val INITIAL_CAPACITY = 8

// 因为LockFreeTaskQueue用一个Atomic Long存储queue的state,来确保每次更新state都是原子操作
// - head: 30 bit,用来记录circular queue的head index
// - tail: 30 bit,用来记录circular queue的tail index
// - frozen: 1 bit,indicate当前的queue是否被冻结
// - closed: 1 bit, indicate当前的queue是否被close
const val CAPACITY_BITS = 30

const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS) - 1
const val HEAD_SHIFT = 0

// Head Mask: 用来从Long state里获取head的Int值
const val HEAD_MASK = MAX_CAPACITY_MASK.toLong() shl HEAD_SHIFT
const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS

// Tail Mask: 用来从Long state里获取tail的Int值
const val TAIL_MASK = MAX_CAPACITY_MASK.toLong() shl TAIL_SHIFT

const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS

// Frozen Mask: 用来从Long state里获取frozen的flag
const val FROZEN_MASK = 1L shl FROZEN_SHIFT
const val CLOSED_SHIFT = FROZEN_SHIFT + 1

// Closed Mask: 用来从Long state里获取closed的flag
const val CLOSED_MASK = 1L shl CLOSED_SHIFT

const val MIN_ADD_SPIN_CAPACITY = 1024

@JvmField val REMOVE_FROZEN = Symbol("REMOVE_FROZEN")

// Add to queue的enum状态

// 成功Add to the queue
const val ADD_SUCCESS = 0

// 失败:queue已经为frozen状态
const val ADD_FROZEN = 1

// 失败:queue已经被close
const val ADD_CLOSED = 2

infix fun Long.wo(other: Long) = this and other.inv()

// (this wo HEAD_MASK): 抹除Long state里head的值
// (newHead.toLong() shl HEAD_SHIFT): 获取一个新的Long,并写入新的head值
fun Long.updateHead(newHead: Int) = (this wo HEAD_MASK) or (newHead.toLong() shl HEAD_SHIFT)

// (this wo TAIL_MASK): 抹除Long state里tail的值
// (newHead.toLong() shl HEAD_SHIFT): 获取一个新的Long,并写入新的tail值
fun Long.updateTail(newTail: Int) = (this wo TAIL_MASK) or (newTail.toLong() shl TAIL_SHIFT)

inline fun <T> Long.withState(block: (head: Int, tail: Int) -> T): T {
// 从Long state读出head值,并转化为Int
val head = ((this and HEAD_MASK) shr HEAD_SHIFT).toInt()

// 从Long state读出tail值,并转化为Int
val tail = ((this and TAIL_MASK) shr TAIL_SHIFT).toInt()

return block(head, tail)
}

// 获取state里的失败原因,如果是Close返回ADD_CLOSED状态,否则返回ADD_FROZEN状态
// FROZEN | CLOSED
fun Long.addFailReason(): Int = if (this and CLOSED_MASK != 0L) ADD_CLOSED else ADD_FROZEN
}

Internal变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private typealias Core<E> = LockFreeTaskQueueCore<E>

internal class LockFreeTaskQueueCore<E : Any>(
// Queue最大存储的值
private val capacity: Int,

// Indicate这个queue是否只有一个consumer,这个field非常重要
// note 2 翻译:
// 当这个queue有多个consumer的时候,这个queue并不是Lock Free
// consumer spins直到producer完成它的操作
private val singleConsumer: Boolean // true when there is only a single consumer (slightly faster)
) {
// Mask: 用来获取queue的实际head index和tail index
private val mask = capacity - 1

// 在queue扩容时,会将新生成的queue存储在这个Atomic Ref里边
// 如果在扩容期间,有别的thread对当前的queue进行操作,在操作完成时,会检查当前的queue的next是否为空
// 如果是null,证明此时没有扩容,写入的atomic Long state成功,queue里的item操作也成功
// 如果不为null,需要将状态更新到新生成的queue中,也就是next ref的queue,
// 更新完成后,需要再次检查next的next是否为空,如果不为空,则说明queue再次扩容,
// 需要重新进行更新,直到更新完成后的next为null。
private val _next = atomic<Core<E>?>(null)

// Atomic Long state,用来记录queue的状态,包括:head index,tail index,是否frozen,是否closed
private val _state = atomic(0L)

// Circular Queue array,生成一个长度为capacity的array,每个item都是一个atomic ref到null
// 这个queue里边存储的是需要执行的任务,consumer会从queue的head读任务出来,producer会将任务加到tail的位置
private val array = atomicArrayOfNulls<Any?>(capacity)

// ...省略代码
}

状态方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 当head index == tail index的时候,queue为空
// Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
val isEmpty: Boolean get() = _state.value.withState { head, tail -> head == tail }

// tail index - head index,然后mask by MAX_CAPACITY_MASK为当前queue的size
val size: Int get() = _state.value.withState { head, tail -> (tail - head) and MAX_CAPACITY_MASK }

// 当前queue是否未关闭状态
fun close(): Boolean {
_state.update { state ->
if (state and CLOSED_MASK != 0L) return true // ok - already closed
if (state and FROZEN_MASK != 0L) return false // frozen -- try next
state or CLOSED_MASK // try set closed bit
}
return true
}

Placeholder类,帮助标记状态的一个内部类

1
2
3
4
5
6
7
// 当扩容时,如果发现array里边有null,说明有别的线程更新了tail index,但还没有写入element,这个时候扩容线程会将array里这个slot
// 设置成Placeholder,来给其他线程在扩容后补写这个element
// Instance of this class is placed into array when we have to copy array, but addLast is in progress --
// it had already reserved a slot in the array (with null) and have not yet put its value there.
// Placeholder keeps the actual index (not masked) to distinguish placeholders on different wraparounds of array
// Internal because of inlining
internal class Placeholder(@JvmField val index: Int)

addLast - producer添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

// ADD_CLOSED | ADD_FROZEN | ADD_SUCCESS
fun addLast(element: E): Int {
_state.loop { state ->
// 如果当前状态为Frozen或是closed则返回失败原因
if (state and (FROZEN_MASK or CLOSED_MASK) != 0L) return state.addFailReason() // cannot add

state.withState { head, tail ->
val mask = this.mask // manually move instance field to local for performance

// 当queue只剩一个空位的时候,freeze并且扩容,因为single consumer的时候,可能会有一个element我们并不能overwrite
// 在removeFirst的时候,因为removeFirst是先更新head index,再拿掉当前的element,所以需要进行extra margin of one element检查
// If queue is Single-Consumer then there could be one element beyond head that we cannot overwrite,
// so we check for full queue with an extra margin of one element
if ((tail + 2) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy

// If queue is Multi-Consumer then the consumer could still have not cleared element
// despite the above check for one free slot.
if (!singleConsumer && array[tail and mask].value != null) {
// 如果是multi consumer,这个时候要两种情况
// 1. 当queue的capacity < 1024或整个queue已经用掉了一半的时候,进行扩容
// 2. 否则,spin来等待consumer来拿任务
// 这个case就是之前singleConsumer注释所说的,这个queue并不是lock free,因为这种情况会spin来等待consumer来take这个任务
// There are two options in this situation
// 1. Spin-wait until consumer clears the slot
// 2. Freeze & resize to avoid spinning
// We use heuristic here to avoid memory-overallocation
// Freeze & reallocate when queue is small or more than half of the queue is used
if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1) {
return ADD_FROZEN
}
// otherwise spin
return@loop
}

// 获取新的tail的位置
val newTail = (tail + 1) and MAX_CAPACITY_MASK

// 原子操作,来确保state成功更新,如果compareAndSet返回false,此时state被别的thread更新,则继续loop
// 因为这个library是target multi platform,这个地方的compareAndSet不止是Java的compareAndSet,也有JS和native
if (_state.compareAndSet(state, state.updateTail(newTail))) {

// successfully added
array[tail and mask].value = element

// 因为在set value的时候,有可能别的thread更新了状态,所以需要进行检查
// could have been frozen & copied before this item was set -- correct it by filling placeholder
var cur = this
while(true) {
// 没有frozen,那说明addLast成功执行,exit loop
if (cur._state.value and FROZEN_MASK == 0L) break // all fine -- not frozen yet

// 如果当前的queue被frozen了,证明在当前thread set queue的时候,这个queue被扩容了
// 之前注释next的时候提到过,queue扩容时会生成一个新的queue object,把当前的queue._next指到queue上
// 因为在扩容时,新生成的queue array会判断每个element是否为null,如果不为null则copy,
// 如果为null,扩容线程知道有别的thread更新了tail index但还没来得及写入新的element进array里边

// 如果fillPlaceholder返回null,我们不需要再继续检查了,因为确认写入的element被成功复制到了新的queue里
// 如果返回的是新的queue,那么就说明,扩容线程复制的时候并没有复制当前element,fillPlaceholder进行了补写操作
// 当前element到扩容的array里边,这个时候,我们需要继续loop,因为可能扩容线程扩容之后,又有别的线程进行了扩容
// 这时loop会持续补写直到我们发现不需要array里的值不是Placeholder instance为止
cur = cur.next().fillPlaceholder(tail, element) ?: break
}
return ADD_SUCCESS // added successfully
}
}
}
}

private fun fillPlaceholder(index: Int, element: E): Core<E>? {
// 获取这个index里边存的值
val old = array[index and mask].value
/*
* addLast actions: addLast的操作
* 1) Commit tail slot 更新tail的index,也就是allocate tail的slot
* 2) Write element to array slot 写入element到array里边
* 3) Check for array copy 检查array是否扩容
*
* 在操作过程中如果第2步和第3步过程中,发生了扩容,consumer有可能已经获取了这个element
* If copy happened between 2 and 3 then the consumer might have consumed our element,
* then another producer might have written its placeholder in our slot, so we should
* perform *unique* check that current placeholder is our to avoid overwriting another producer placeholder
* 如果Placeholder里边的index是方法传入的index,这时我们确定这个Placeholder是当前线程的Placeholder,而不是其他producer的
*/
if (old is Placeholder && old.index == index) {
// 因为扩容的时候,扩容线程复制了老的array,此时,当前producer线程并没有写入新的element,这个操作是把element写进扩容后的array
array[index and mask].value = element
// we've corrected missing element, should check if that propagated to further copies, just in case
return this
}
// 返回null,这种情况是扩容线程在扩容的时候,已经看到了producer线程写入的新element
// 这个时候我们不需要做进行next的check,因为之后即便再有扩容,已经写进当前扩容的element都会被复制到之后的扩容array里
// it is Ok, no need for further action
return null
}

removeFistOrNull consumer来拿任务,如果没有任务,则返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

// REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
fun removeFirstOrNull(): Any? {
_state.loop { state ->
// 当前queue已被frozen,发生了扩容,返回这个状态,让调用这个方法的consumer决定该做什么
if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify
state.withState { head, tail ->
// 当前queue为空,返回null
if ((tail and mask) == (head and mask)) return null // empty
val element = array[head and mask].value
if (element == null) {
// 如果是single consumer,则producer没有完成加入element
// If queue is Single-Consumer, then element == null only when add has not finished yet
if (singleConsumer) return null // consider it not added yet
// 如果是multi consumer,我们需要spin,来尝试继续获取element,这个是之前说的这个queue会spin
// retry (spin) until consumer adds it
return@loop
}

// 如果当前element是Placeholder,说明producer还没来得及补写扩容之后的queue array,这时我们认为状态是not added yet,所以返回null
// element == Placeholder can only be when add has not finished yet
if (element is Placeholder) return null // consider it not added yet

// 我们这个地方不能直接更新element为null,因为会有一个edge case
// 情况为:当前有两个线程,一个producer线程即将进行扩容,一个consumer线程,
// 假设consumer线程直接把array[head]变成了null,之后producer线程进行扩容,因为扩容时,null的element会被当做是别的producer线程还没来得及写入的element
// 这时扩容线程把这个element变成了placeholder,这回break queue的状态

// 正确操作是先更新head index,如果更新成功,再将array[previous head]设成null,因为这个时候即便producer线程进行扩容,也只会copy新的head到tail的elements

// we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
val newHead = (head + 1) and MAX_CAPACITY_MASK
if (_state.compareAndSet(state, state.updateHead(newHead))) {
// Array could have been copied by another thread and it is perfectly fine, since only elements
// between head and tail were copied and there are no extra steps we should take here
array[head and mask].value = null // now can safely put null (state was updated)
return element // successfully removed in fast-path
}
// multi consumer的时候需要spin来确保当前head不被别的consumer拿掉
// Multi-Consumer queue must retry this loop on CAS failure (another consumer might have removed element)
if (!singleConsumer) return@loop

// 如果是single consumer的case,之前的compareAndSet失败是因为producer线程进行了扩容
// 这个时候需要从扩容之后的queue里边拿到element,并且更新head
// Single-consumer queue goes to slow-path for remove in case of interference
var cur = this
while (true) {
@Suppress("UNUSED_VALUE")
cur = cur.removeSlowPath(head, newHead) ?: return element
}
}
}
}

private fun removeSlowPath(oldHead: Int, newHead: Int): Core<E>? {
_state.loop { state ->
state.withState { head, _ ->
// Extra检查,head值不应该变,如果变化了,说明并不是single consumer,
// 这个时候会报错,因为single consumer的操作和multi consumer操作不一样,assumption不正确queue的操作就不正确
assert { head == oldHead } // "This queue can have only one consumer"

// Frozen状态说明扩容了,这个时候要去point到扩容之后的queue
if (state and FROZEN_MASK != 0L) {
// state was already frozen, so removed element was copied to next
return next() // continue to correct head in next
}
// 原子操作更新head,如果这个时候更新成功,则将head 设成null
// 更新不成功,说明有别的producer线程又进行了扩容,这个时候需要继续loop,直到head成功被更新,也就是没有别的线程扩容
if (_state.compareAndSet(state, state.updateHead(newHead))) {
array[head and mask].value = null // now can safely put null (state was updated)
return null
}
}
}
}

扩容逻辑,在addLast的时候,满足之前提到的条件会进行扩容

扩容实际是copy之前的array里边的元素到新的array里边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 当前queue的next或是copy,如果有next则说明别的线程扩容了,如果next是null,则进行扩容
fun next(): LockFreeTaskQueueCore<E> = allocateOrGetNextCopy(markFrozen())

// 把当前state标记成frozen,提示其他线程,这个queue已经过期(有新的扩容queue生成)
private fun markFrozen(): Long =
_state.updateAndGet { state ->
if (state and FROZEN_MASK != 0L) return state // already marked
state or FROZEN_MASK
}

private fun allocateOrGetNextCopy(state: Long): Core<E> {
_next.loop { next ->
// 如果next不为null,期间已有别的线程扩容了
if (next != null) return next // already allocated & copied
// 如果是null,则进行扩容
// 因为在扩容时,可能有别的线程更改了状态,所以compareAndSet失败的时候,next依旧为null,这时进行继续loop check
// 这个地方也是个spin
_next.compareAndSet(null, allocateNextCopy(state))
}
}

private fun allocateNextCopy(state: Long): Core<E> {
// 生成一个新的Queue,容量是之前的两倍
val next = LockFreeTaskQueueCore<E>(capacity * 2, singleConsumer)
state.withState { head, tail ->
var index = head
// 扩容逻辑,range是head index到tail index
// 之前提到过,removeFirst会先更新head index,addLast会先更新tail index
while (index and mask != tail and mask) {
// 如果array[index]不为null,则复制到新的array,如果是null,说明此时producer线程更新完tail index后还没写入array
// 这个时候我们创建一个Placeholder,来给producer线程补写element进扩容后的array
// replace nulls with placeholders on copy
val value = array[index and mask].value ?: Placeholder(index)
next.array[index and next.mask].value = value
index++
}
// 更新状态,reset frozen flag为0
next._state.value = state wo FROZEN_MASK
}
return next
}

最后LockFreeTaskQueue,这里边的逻辑比较straight forward

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
internal open class LockFreeTaskQueue<E : Any>(
singleConsumer: Boolean // true when there is only a single consumer (slightly faster & lock-free)
) {
private val _cur = atomic(Core<E>(Core.INITIAL_CAPACITY, singleConsumer))

// Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
val isEmpty: Boolean get() = _cur.value.isEmpty
val size: Int get() = _cur.value.size

fun close() {
// 不断尝试close当前queue,因为可能发生扩容,所以要不断spin来尝试
_cur.loop { cur ->
if (cur.close()) return // closed this copy
_cur.compareAndSet(cur, cur.next()) // move to next
}
}

fun addLast(element: E): Boolean {
_cur.loop { cur ->
// addLast
when (cur.addLast(element)) {
Core.ADD_SUCCESS -> return true
Core.ADD_CLOSED -> return false
// Frozen状态说明有扩容,获取扩容后的queue,这个地方进行了spin
Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next
}
}
}

@Suppress("UNCHECKED_CAST")
fun removeFirstOrNull(): E? {
_cur.loop { cur ->
val result = cur.removeFirstOrNull()
if (result !== Core.REMOVE_FROZEN) return result as E?
// Frozen状态说明有扩容,获取扩容后的queue,这个地方进行了spin
_cur.compareAndSet(cur, cur.next())
}
}

// Used for validation in tests only
fun <R> map(transform: (E) -> R): List<R> = _cur.value.map(transform)

// Used for validation in tests only
fun isClosed(): Boolean = _cur.value.isClosed()
}

总结

基本每一个要处理异步编程的library都需要有个queue来维护任务,Kotlin这个coroutine因为要target multiplatform,
写这么个queue要考虑很多case,因为有的platform可能是single consumer,有的又是multi consumer,
为了这个还写了不同的处理逻辑,也算是做到极致了。

对比别的library可能值需要做到MPSC,比如Reactor的 MpscLinkedQueue
,kotlin coroutine的queue的逻辑要复杂得多。

在效率方面,如果是对于target是JVM的,应该跟用线程池实现NIO的效率差不多,我记得之前看扔物线的 Kotlin的协程
提到过这个概念,Kotlin的coroutine并不会比线程池更效率,因为它底层也是这么实现的。

而且从另一个角度看,在JS里,这种在Kotlin维护一个task queue,未必比native的event loop更效率,
因为Kotlin最终是transpile成了JS,而Node里边的event loop是libuv用C实现的。

不知道这个想法对不对。