多线程编程
学习《Linux高性能服务器编程》第十四章多线程编程,里面介绍了各种Linux编程中多线程编程的内容,为了印象深刻一些,多动手多实践,所以记下这个笔记。
这一章分为创建线程和结束线程、线程属性、Posix信号量、互斥锁、条件变量。
创建线程和结束线程
pthread_create
创建一个线程使用pthread_create
函数
1 2 3
| #include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
|
thread
参数是新线程的标识符,后续pthread_*
函数通过它来引入新线程。
其类型pthread_r
定义如下
1 2 3
|
typedef unsigned long int pthread_t;
|
pthread_t
是一个整型类型。实际上,Linux 上几乎所有的资源标识符都是一个整型数,比如socket
、各种System V IPC
标识符等。
attr
参数用于设置新线程的属性。给它NULL
表示使用默认线程属性。线程拥有众多属性。
start_routine
和arg
参数分别指定新线程将运行的函数以及其参数。
pthread_create
成功时返回0,失败时返回错误码。一个用户可以打开的线程数量不能超过RLIMIT_NPROC
软资源限制。此外,系统上所有用户能创建的线程总数也不得超过/proc/sys/kernel/threads-max
内核参数所定义的值。
eg:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <pthread.h>
void sys_err(const char *str) { perror(str); exit(1); }
void *thread_func(void *arg) { int p = *(int *)arg; printf("--I'm the thread: pid = %d, tid= %lu, arg is %d\n", getpid(), pthread_self(), p);
return NULL; }
int main(int argc, char *argv[]) { int ret; pthread_t tid;
int p = 12; ret = pthread_create(&tid, NULL, thread_func, (void *)&p); if (ret != 0) { sys_err("pthread_create error"); }
sleep(2); printf("main: I'm Main, pid = %d, tid= %lu\n", getpid(), pthread_self());
return 0; }
|
pthread_exit
线程一旦被创建好,内核就可以调度内核线程来执行start_routine
函数指针所指向的函数了。线程函数在结束时最好调用如下函数,以确保安全、干净地退出:
1 2 3
| #include <pthread.h>
void pthread_exit(void *retval);
|
pthread_exit
函数通过retval
参数向线程的回收者传递其退出信息。它执行完之后不会返回到调用者,而且永远不会失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <pthread.h>
void sys_err(const char *str) { perror(str); exit(1); }
void func(void) { pthread_exit(NULL);
return; }
void *thread_func(void *arg) { int p = *(int *)arg; if (p == 2) { pthread_exit(NULL); } printf("--I'm the thread: pid = %d, tid= %lu, arg is %d\n", getpid(), pthread_self(), p); return NULL; }
int main(int argc, char *argv[]) { int i; int ret; pthread_t tid;
int p = 2; ret = pthread_create(&tid, NULL, thread_func, (void *)&p); if (ret != 0) { sys_err("pthread_create error"); }
sleep(2);
printf("main: I'm Main, pid = %d, tid= %lu\n", getpid(), pthread_self());
return 0; }
|
pthread_join
一个进程中的所有线程都可以调用pthread_join
函数来回收其他线程(前提是目标线程是可回收的),即等待其他线程结束,这类似于回收进程的wait
和 waitpid
系统调用。pthrcad_join
的定义如下:
1 2 3
| #include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
|
thread
参数是目标线程的标识符,retval
参数则是目标线程返回的退出信息。该函数会一直阻塞,直到被回收的线程结束为止。该函数成功时返回0,失败则返回错误码。可能的错误码如下表所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <pthread.h>
void sys_err(const char *str) { perror(str); exit(1); }
void *tfn(void *arg) { return (void *)74; }
int main(int argc, char *argv[]) { pthread_t tid; int *retval;
int ret = pthread_create(&tid, NULL, tfn, NULL); if (ret != 0) sys_err("pthread_create error");
ret = pthread_join(tid, (void **)&retval); if (ret != 0) sys_err("pthread_join error"); printf("child thread exit with %d\n", (void *)retval);
pthread_exit(NULL); }
|
pthread_cancel
有时候我们希望能够取消线程,可以通过pthread_cancel
函数实现
1 2 3
| #include <pthread.h>
int pthread_cancel(pthread_t thread);
|
thread
参数是目标线程的标识符。该函数成功时返回0,失败则返回错误码。不过,接收到取消请求的目标线程可以决定是否允许被取消以及如何取消,这分别由如下两个函数完成:
1 2 3 4
| #include <pthread.h>
int pthread_setcancelstate(int state, int *oldstate); int pthread_setcanceltype(int type, int *oldtype);
|
这两个函数的第一个参数分别用于设置线程的取消状态(是否允许取消)和取消类型(如何取消),第二个参数则分别记录线程原来的取消状态和取消类型。state
参数有两个可选值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <pthread.h>
void *tfn(void *arg) { while (1) { printf("thread: pid = %d, tid = %lu\n", getpid(), pthread_self()); sleep(1); }
return NULL; }
int main(int argc, char *argv[]) { pthread_t tid;
int ret = pthread_create(&tid, NULL, tfn, NULL); if (ret != 0) { fprintf(stderr, "pthread_create error:%s\n", strerror(ret)); exit(1); }
printf("main: pid = %d, tid = %lu\n", getpid(), pthread_self());
sleep(5);
ret = pthread_cancel(tid); if (ret != 0) { fprintf(stderr, "pthread_cancel error:%s\n", strerror(ret)); exit(1); }
printf("after 5s tid = %lu is cancel\n", tid);
return 0; }
|
线程属性
pthread_attr_t
结构体定义了一套完整的线程属性,如下所示:
1 2 3 4 5
| union pthread_attr_t { char __size[__SIZEOF_PTHREAD_ATTR_T]; long int __align; };
|
各种线程属性全部包含在一个字符数组中。线程库定义了一系列函数来操作pthread_attr_t
类型的变量,以方便我们获取和设置线程属性。这些函数包括:
1 2 3 4 5 6 7 8 9 10 11 12 13
| #include <pthread.h>
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr); int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate); int pthread_attr_getdetachstate(const pthread_attr_t *attr, int *detachstate); int pthread_attr_setstackaddr(pthread_attr_t *attr, void *stackaddr); int pthread_attr_getstackaddr(const pthread_attr_t *attr, void **stackaddr); int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize); int pthread_attr_getstacksize(const pthread_attr_t *attr, size_t *stacksize); ......
|
线程属性:
detachstate
,线程的脱离状态。它有PTHREAD_CREATE_JOINABLE
和PTHREAD_CREATE_DETACH
两个可选值。前者指定线程是可以被回收的,后者使调用线程脱离与进程中其他线程的同步。脱离了与其他线程同步的线程称为“脱离线程”。脱离线程在退出时将自行释放其占用的系统资源。线程创建时该属性的默认值是PTHREAD_CREATE_JOINABLE
。此外,我们也可以使用pthread_detach
函数直接将线程设置为脱离线程。
stackaddr
和stacksize
,线程堆栈的起始地址和大小。一般来说,我们不需要自己来管理线程堆栈,因为Linux默认为每个线程分配了足够的堆栈空间(一般是8 MB)。我们可以使用ulimt -s命令来查看或修改这个默认值。
guardsize
,保护区域大小。如果guardsize
大于0,则系统创建线程的时候会在其堆栈的尾部额外分配guardsize
字节的空间,作为保护堆栈不被错误地覆盖的区域。如果guardsize
等于0,则系统不为新创建的线程设置堆栈保护区。如果使用者通过pthread_attr_setstackaddr
或pthread_attr_setstack
函数手动设置线程的堆栈,则guardsize
属性将被忽略。
schedparam
,线程调度参数。其类型是sched_param
结构体。该结构体目前还只有一个整型类型的成员:sched_priority
,该成员表示线程的运行优先级。
schedpolicy
,线程调度策略。该属性有SCHED_FIFO
、SCHED_RR
和SCHED_OTHER
三个可选值,其中SCHED_OTHER
是默认值。SCHED_RR
表示采用轮转算法(round-robin)调度,SCHED_FIFO
表示使用先进先出的方法调度,这两种调度方法都具备实时调度功能,但只能用于以超级用户身份运行的进程。
inheritsched
,是否继承调用线程的调度属性。该属性有PTHREAD_INHERIT_SCHED
和PTHREAD_EXPLICIT_SCHED
两个可选值。前者表示新线程沿用其创建者的线程调度参数,这种情况下再设置新线程的调度参数属性将没有任何效果。后者表示调用者要明确地指定新线程的调度参数。
scope
,线程间竞争CPU的范围,即线程优先级的有效范围。POSIX标准定义了该属性的PTHREAD_SCOPE_SYSTEM
和PTHREAD_SCOPE_PROCESS
两个可选值,前者表示目标线程与系统中所有线程一起竞争CPU的使用,后者表示目标线程仅与其他隶属于同一进程的线程竞争CPU的使用。目前Linux只支持PTHREAD_SCOPE_SYSTEM
这一种取值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <pthread.h>
void *tfn(void *arg) { int n = 3;
while (n--) { printf("thread count %d\n", n); sleep(1); } int ret = 42; pthread_exit((void *)&ret); }
int main(int argc, char *argv[]) { pthread_t tid; void *tret; int err;
pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_create(&tid, &attr, tfn, NULL); pthread_attr_destroy(&attr); while (1) { sleep(1); err = pthread_join(tid, &tret); if (err != 0) fprintf(stderr, "thread_join error: %s\n", strerror(err)); else fprintf(stderr, "thread exit code %d\n", *(int *)tret); }
return 0; }
|
POSIX信号量
多线程访问相同的资源时,也存在同步的问题,所以也需要同步机制。这里的POSIX信号量就是线程同步的机制之一。POSIX信号量和多进程编程(System V IPC)当中的信号量语义上是相同的,但是并不能保证能够进行混用,所以在线程当中还是使用POSIX比较好。
POSIX信号量函数的名字都以sem_
开头,并不像大多数线程函数那样以pthread_
开头。常用的POSIX信号量函数是下面5个:
1 2 3 4 5 6 7
| #include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value); int sem_destroy(sem_t *sem); int sem_wait(sem_t *sem); int sem_trywait(sem_t *sem); int sem_post(sem_t *sem);
|
这些函数的第一个参数sem指向被操作的信号量。
sem_init
函数用于初始化一个未命名的信号量(POSIX信号量API支持命名信号量,不过在书中不讨论它)。pshared
参数指定信号量的类型。如果其值为0,就表示这个信号量是当前进程的局部信号量,否则该信号量就可以在多个进程之间共享。value
参数指定信号量的初始值。此外,初始化一个已经被初始化的信号量将导致不可预期的结果。
sem_destroy
函数用于销毁信号量,以释放其占用的内核资源。如果销毁一个正被其他线程等待的信号量,则将导致不可预期的结果。
sem_wait
函数以原子操作的方式将信号量的值减1。如果信号量的值为0,则sem_wait
将被阻塞,直到这个信号量具有非0值。
sem_trywait
与sem_wait
函数相似,不过它始终立即返回,而不论被操作的信号量是否具有非0值,相当于sem_wait
的非阻塞版本。当信号量的值非0时,sem_trywait
对信号量执行减1操作。当信号量的值为0时,它将返回-1并设置errno
为EAGAIN
。
sem_post
函数以原子操作的方式将信号量的值加1。当信号量的值大于0时,其他正在调用sem_wait
等待信号量的线程将被唤醒。
使用信号量完成的生产者消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
#include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <stdio.h> #include <semaphore.h>
#define NUM 5
int queue[NUM]; sem_t blank_number, product_number;
void *producer(void *arg) { int i = 0;
while (1) { sem_wait(&blank_number); queue[i] = rand() % 1000 + 1; printf("----Produce---%d\n", queue[i]); sem_post(&product_number);
i = (i + 1) % NUM; sleep(rand() % 1); } }
void *consumer(void *arg) { int i = 0;
while (1) { sem_wait(&product_number); printf("-Consume---%d\n", queue[i]); queue[i] = 0; sem_post(&blank_number);
i = (i + 1) % NUM; sleep(rand() % 3); } }
int main(int argc, char *argv[]) { pthread_t pid, cid;
sem_init(&blank_number, 0, NUM); sem_init(&product_number, 0, 0);
pthread_create(&pid, NULL, producer, NULL); pthread_create(&cid, NULL, consumer, NULL);
pthread_join(pid, NULL); pthread_join(cid, NULL);
sem_destroy(&blank_number); sem_destroy(&product_number);
return 0; }
|
互斥锁
互斥锁(也称互斥量)可以用于保护关键代码段,以确保其独占式的访问,这有点像一个二进制信号量。当进入关键代码段时,我们需要获得互斥锁并将其加锁,这等价于二进制信号量的Р操作﹔当离开关键代码段时,我们需要对互斥锁解锁,以唤醒其他等待该互斥锁的线程,这等价于二进制信号量的V操作。
互斥锁基础API
POSIX互斥锁的相关函数主要有如下5个:
这些函数的第一个参数mutex
指向要操作的目标互斥锁,互斥锁的类型是pthread_mutex_t
结构体。
pthread_mutex_init
函数用于初始化互斥锁。mutexattr
参数指定互斥锁的属性。如果将它设置为NULL,则表示使用默认属性。我们将在下一小节讨论互斥锁的属性。除了这个函数外,我们还可以使用如下方式来初始化一个互斥锁:
1
| pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
|
宏PTHREAD_MUTEX_INITIALIZER
实际上只是把互斥锁的各个字段都初始化为0。pthread_mutex_destroy
函数用于销毁互斥锁,以释放其占用的内核资源。销毁一个已经加锁的互斥锁将导致不可预期的后果。pthread_mutex_lock
函数以原子操作的方式给一个互斥锁加锁。如果目标互斥锁已经被锁上,pthread_mutex_lock
调用将阻塞,直到该互斥锁的占有者将其解锁。
pthread_mutex_trylock
与pthread_mutex_lock
函数类似,不过它始终立即返回,而不论被操作的互斥锁是否已经被加锁,相当于pthread_mutex_lock
的非阻塞版本。当目标互斥锁未被加锁时,pthread_mutex_trylock
对互斥锁执行加锁操作。当互斥锁已经被加锁时,pthread_mutex_trylock
将返回错误码EBUSY
。需要注意的是,这里讨论的pthread_mutex_lock
和pthread_mutex_trylock
的行为是针对普通锁而言的。后面我们将看到,对于其他类型的锁而言,这两个加锁函数会有不同的行为。
pthread_mutex_unlock
函数以原子操作的方式给一个互斥锁解锁。如果此时有其他线程正在等待这个互斥锁,则这些线程中的某一个将获得它。
上面这些函数成功时返回0,失败则返回错误码。
互斥锁属性
pthread_mutexattr_t
结构体定义了一套完整的互斥锁属性。线程库提供了一系列函数来操作pthread_mutexattr_t
类型的变量,以方便我们获取和设置互斥锁属性。这里我们列出其中一些主要的函数:
本书只讨论互斥锁的两种常用属性:pshared
和type
。互斥锁属性pshared
指定是否允许跨进程共享互斥锁,其可选值有两个:
PTHREAD_PROCESS_SHARED
。互斥锁可以被跨进程共享。
PTHREAD_PROCESS_PRIVATE
。互斥锁只能被和锁的初始化线程隶属于同一个进程的线程共享。
互斥锁属性type指定互斥锁的类型。Linux支持如下4种类型的互斥锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| #include <stdio.h> #include <string.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h>
pthread_mutex_t mutex;
void err_thread(int ret, char *str) { if (ret != 0) { fprintf(stderr, "%s:%s\n", str, strerror(ret)); pthread_exit(NULL); } }
void *tfn(void *arg) { srand(time(NULL));
while (1) {
pthread_mutex_lock(&mutex); printf("hello "); sleep(rand() % 3); printf("world\n"); pthread_mutex_unlock(&mutex);
sleep(rand() % 3); }
return NULL; }
int main(int argc, char *argv[]) { int flag = 5; pthread_t tid; srand(time(NULL));
pthread_mutex_init(&mutex, NULL); pthread_create(&tid, NULL, tfn, NULL); while (flag--) {
pthread_mutex_lock(&mutex);
printf("HELLO "); sleep(rand() % 3); printf("WORLD\n"); pthread_mutex_unlock(&mutex);
sleep(rand() % 3); } pthread_cancel(tid); pthread_join(tid, NULL);
pthread_mutex_destroy(&mutex);
return 0; }
|
条件变量
如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量则是用于在线程之间同步共享数据的值。条件变量提供了一种线程间的通知机制:当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程。
条件变量的相关函数主要有如下5个:
这些函数的第一个参数cond
指向要操作的目标条件变量,条件变量的类型是pthread_cond_t
结构体。
pthread_cond_init
函数用于初始化条件变量。cond_attr
参数指定条件变量的属性。如果将它设置为NULL,则表示使用默认属性。条件变量的属性不多,而和互斥锁的属性类型相似。除了pthread_cond_init
函数外,我们还可以使用如下方式来初始化一个条件变量:
1
| pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
|
宏PTHREAD_COND_INITIALIZER
实际上只是把条件变量的各个字段都初始化为0。
pthread_cond_destroy
函数用于销毁条件变量,以释放其占用的内核资源。销毁一个正在被等待的条件变量将失败并返回EBUSY
。
pthread_cond_broadcast
函数以广播的方式唤醒所有等待目标条件变量的线程。
pthread_cond_signal
函数用于唤醒一个等待目标条件变量的线程。至于哪个线程将被唤醒,则取决于线程的优先级和调度策略。有时候我们可能想唤醒一个指定的线程,但pthread
没有对该需求提供解决方法。不过我们可以间接地实现该需求:定义一个能够唯一表示目标线程的全局变量,在唤醒等待条件变量的线程前先设置该变量为目标线程,然后采用广播方式唤醒所有等待条件变量的线程,这些线程被唤醒后都检查该变量以判断被唤醒的是否是自己,如果是就开始执行后续代码,如果不是则返回继续等待。
pthread_cond_wait
函数用于等待目标条件变量。mutex
参数是用于保护条件变量的互斥锁,以确保pthread_cond_wait
操作的原子性。在调用pthread_cond_wait
前,必须确保互斥锁mutex
已经加锁,否则将导致不可预期的结果。pthread_cond_wait
函数执行时,首先把调用线程放入条件变量的等待队列中,然后将互斥锁mutex
解锁。可见,从pthread_cond_wait
开始执行到其调用线程被放入条件变量的等待队列之间的这段时间内,pthread_cond_signal
和pthread_cond_broadcast
等函数不会修改条件变量。换言之,pthread_cond_wait
函数不会错过目标条件变量的任何变化。当pthread_cond_wait
函数成功返回时,互斥锁mutex
将再次被锁上。
上面这些函数成功时返回0,失败则返回错误码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <stdio.h>
struct msg { struct msg *next; int num; };
struct msg *head;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER; pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void *consumer(void *p) { struct msg *mp;
while (1) { pthread_mutex_lock(&lock); while (head == NULL) { pthread_cond_wait(&has_product, &lock); } mp = head; head = mp->next; pthread_mutex_unlock(&lock);
printf("-Consume %lu---%d\n", pthread_self(), mp->num); free(mp); sleep(rand() % 5); } }
void *producer(void *p) { struct msg *mp;
while (1) { mp = (msg *)malloc(sizeof(struct msg)); mp->num = rand() % 1000 + 1; printf("-Produce ---------------------%d\n", mp->num);
pthread_mutex_lock(&lock); mp->next = head; head = mp; pthread_mutex_unlock(&lock);
pthread_cond_signal(&has_product); sleep(rand() % 5); } }
int main(int argc, char *argv[]) { pthread_t pid, cid; srand(time(NULL));
pthread_create(&pid, NULL, producer, NULL); pthread_create(&cid, NULL, consumer, NULL);
pthread_join(pid, NULL); pthread_join(cid, NULL);
return 0; }
|