2025/4/22 生产者&消费者并发模型
Producer 和 Consumer 共享一个缓冲区
- Producer (生产数据):如果缓冲区有空位,放入;否则等待
- Consumer (消费数据):如果缓冲区有数据,取走;否则等待
example(from jyy)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
void T_produce() {
while (1) {
retry:
mutex_lock(&lk);
int ready = (depth < n);
mutex_unlock(&lk);
if (!ready)
goto retry;
// assert(depth < n);
mutex_lock(&lk);
printf("(");
depth++;
mutex_unlock(&lk);
}
}
void T_consume() {
while (1) {
retry:
mutex_lock(&lk);
int ready = (depth > 0);
mutex_unlock(&lk);
if (!ready)
goto retry;
// assert(depth > 0);
mutex_lock(&lk);
printf(")");
depth--;
mutex_unlock(&lk);
}
}
使用条件变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void T_produce() {
while (1) {
mutex_lock(&lk);
if (!CAN_PRODUCE) {
cond_wait(&cv, &lk);
// This is subtle. Seemingly "more efficient"
// implementation is dangerous for newbies.
// Also not correct changing "if" to "while".
}
printf("(");
depth++;
cond_signal(&cv);
mutex_unlock(&lk);
}
}
void T_consume() {
while (1) {
mutex_lock(&lk);
if (!CAN_CONSUME) {
cond_wait(&cv, &lk);
}
printf(")");
depth--;
cond_signal(&cv);
mutex_unlock(&lk);
}
}
广播
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void T_produce() {
while (1) {
mutex_lock(&lk);
while (!CAN_PRODUCE) {
cond_wait(&cv, &lk);
// We are here if the thread is being waked up, with
// the mutex being acquired. Then we check once again,
// and move out of the loop if CAN_PRODUCE holds.
}
// We still hold the mutex--and we check again.
assert(CAN_PRODUCE);
printf("(");
depth++;
cond_broadcast(&cv);
mutex_unlock(&lk);
}
}
void T_consume() {
while (1) {
mutex_lock(&lk);
while (!CAN_CONSUME) {
cond_wait(&cv, &lk);
}
printf(")");
depth--;
cond_broadcast(&cv);
mutex_unlock(&lk);
}
}
本文由作者按照
CC BY 4.0
进行授权