文章

2025/4/22 生产者&消费者并发模型

Producer 和 Consumer 共享一个缓冲区

  • Producer (生产数据):如果缓冲区有空位,放入;否则等待
  • Consumer (消费数据):如果缓冲区有数据,取走;否则等待

example(from jyy)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
void T_produce() {

while (1) {

retry:

mutex_lock(&lk);

int ready = (depth < n);

mutex_unlock(&lk);

if (!ready)

goto retry;

  

// assert(depth < n);

  

mutex_lock(&lk);

printf("(");

depth++;

mutex_unlock(&lk);

}

}

  

void T_consume() {

while (1) {

retry:

mutex_lock(&lk);

int ready = (depth > 0);

mutex_unlock(&lk);

if (!ready)

goto retry;

  

// assert(depth > 0);

  

mutex_lock(&lk);

printf(")");

depth--;

mutex_unlock(&lk);

}

}

使用条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void T_produce() {

while (1) {

mutex_lock(&lk);

  

if (!CAN_PRODUCE) {

cond_wait(&cv, &lk);

// This is subtle. Seemingly "more efficient"

// implementation is dangerous for newbies.

// Also not correct changing "if" to "while".

}

  

printf("(");

depth++;

  

cond_signal(&cv);

mutex_unlock(&lk);

}

}

  

void T_consume() {

while (1) {

mutex_lock(&lk);

  

if (!CAN_CONSUME) {

cond_wait(&cv, &lk);

}

  

printf(")");

depth--;

  

cond_signal(&cv);

mutex_unlock(&lk);

}

}

广播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void T_produce() {

while (1) {

mutex_lock(&lk);

  

while (!CAN_PRODUCE) {

cond_wait(&cv, &lk);

// We are here if the thread is being waked up, with

// the mutex being acquired. Then we check once again,

// and move out of the loop if CAN_PRODUCE holds.

}

  

// We still hold the mutex--and we check again.

assert(CAN_PRODUCE);

  

printf("(");

depth++;

  

cond_broadcast(&cv);

mutex_unlock(&lk);

}

}

  

void T_consume() {

while (1) {

mutex_lock(&lk);

  

while (!CAN_CONSUME) {

cond_wait(&cv, &lk);

}

  

printf(")");

depth--;

  

cond_broadcast(&cv);

mutex_unlock(&lk);

}

}
本文由作者按照 CC BY 4.0 进行授权