线程

线程创建

线程ID只有在它所属的进程上下文中才有意义。

1
2
3
4
5
6
#include<pthread.h>
int pthread_equal(pthread_t tid1, pthread_t tid2);
int pthread_self(void); //调用线程获取自身的线程ID
int pthread_create(pthread_t *restrict tidp,
    const pthread_attr_t *restrict attr,
    void *(*start_rtn)(void *), void *restrict arg);

新创建的线程ID会被设置为tidp指向的内存单元。attr参数用于定制各种不同的线程属性。新创建的线程从start_rtn函数的地址开始运行,该函数只有一个无类型指针参数arg。

每个线程都提供errno的副本。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include "apue.h"
#include <pthread.h>

pthread_t ntid;

void printids(const char *s)
{
    pid_t      pid;
    pthread_t  tid;

    pid = getpid();
    tid = pthread_self();
    printf("%s pid %u tid %u (0x%x)\n", s, (unsigned int)pid,
      (unsigned int)tid, (unsigned int)tid);
}

void* thr_fn(void *arg)
{
    printids("new thread: ");
    return((void *)0);
}

int main(void)
{
    int     err;
    err = pthread_create(&ntid, NULL, thr_fn, NULL);
    if (err != 0)
        err_quit("can't create thread: %s\n", strerror(err));
    printids("main thread:");
    sleep(1);
    exit(0);
}

线程终止

1
2
void pthread_exit(void *rval_ptr);
int pthread_join(pthread_t thread, void **rval_ptr);

pthread_join调用线程将一直阻塞,直到指定线程调用pthread_exit,从启动例程中返回或者被取消。如果线程从它的启动例程返回,则rval_ptr包含返回码。如果线程被取消,rval_ptr指定的内存单元就设置为PTHREAD_CANCELED

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include "apue.h"
#include <pthread.h>

void *thr_fn1(void *arg)
{
    printf("thread 1 returning\n");
    return((void *)1);
}

void *thr_fn2(void *arg)
{
    printf("thread 2 exiting\n");
    pthread_exit((void *)2);
}

int main(void)
{
    int         err;
    pthread_t   tid1, tid2;
    void        *tret;

    err = pthread_create(&tid1, NULL, thr_fn1, NULL);
    if (err != 0)
        err_quit("can't create thread 1: %s\n", strerror(err));
    err = pthread_create(&tid2, NULL, thr_fn2, NULL);
    if (err != 0)
        err_quit("can't create thread 2: %s\n", strerror(err));
    err = pthread_join(tid1, &tret);
    if (err != 0)
        err_quit("can't join with thread 1: %s\n", strerror(err));
    printf("thread 1 exit code %d\n", (int)tret);
    err = pthread_join(tid2, &tret);
    if (err != 0)
        err_quit("can't join with thread 2: %s\n", strerror(err));
    printf("thread 2 exit code %d\n", (int)tret);
    exit(0);
}

$ ./a.out
thread 1 returning
thread 2 exiting
thread 1 exit code 1
thread 2 exit code 2

线程可以调用pthread_cancel函数来请求取消同一进程的其他线程。但是线程可以选择忽略取消或者控制如何被取消。pthread_cancel并不等待线程终止,它仅仅提出请求。

1
2
3
4
5
6
int pthread_cancel(pthread_t tid);
//线程清理处理程序
void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);
//分离线程
int pthread_detach(pthread_t tid);

线程同步

当一个线程可以修改的变量,其他线程也可以读取或修改该变量时,就会存在访问数据不一致的问题。

当两个或多个线程试图同时在同一时间修改同一变量时,也存在数据访问不一致问题。

互斥量

互斥量使用pthread_mutex_t数据类型表示,在使用互斥量前,必须首先对它进行初始化

  • 可以把它设置为常量PTHREAD_MUTEX_INITIALIZER(只适用于静态分配的互斥量)。
  • 也可以调用pthread_mutex_init函数初始化。在释放内存前需要调用pthread_mutex_destroy
1
2
3
4
#include<pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
        const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

调用pthread_mutex_lock对互斥量进行上锁后,调用线程将阻塞直到互斥量被解锁🔓。如果不希望线程被阻塞,可以使用pthread_mutex_trylock尝试对线程加锁,如果不能锁住互斥量,则返回EBUSY

1
2
3
4
#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <stdlib.h>
#include <pthread.h>

struct foo {
    int             f_count;
    pthread_mutex_t f_lock;
    int             f_id;
    /* ... more stuff here ... */
};

struct foo * foo_alloc(int id) /* allocate the object */
{
    struct foo *fp;

    if ((fp = malloc(sizeof(struct foo))) != NULL) {
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
            free(fp);
            return(NULL);
        }
        /* ... continue initialization ... */
    }
    return(fp);
}

void foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}

void foo_rele(struct foo *fp) /* release a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    if (--fp->f_count == 0) { /* last reference */
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } else {
        pthread_mutex_unlock(&fp->f_lock);
    }
}
死锁
  • 如果线程试图对同一个互斥量加锁两次,那么它自身就会陷入死锁状态。
  • 如果一个线程试图锁住另一个线程以相反的顺序锁住的互斥量时,会陷入死锁状态。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include <stdlib.h>
#include <pthread.h>

#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)

struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

struct foo {
    int             f_count; /* protected by hashlock */
    pthread_mutex_t f_lock;
    struct foo     *f_next; /* protected by hashlock */
    int             f_id;
    /* ... more stuff here ... */
};

struct foo *foo_alloc(int id) /* allocate the object */
{
    struct foo  *fp;
    int         idx;

    if ((fp = malloc(sizeof(struct foo))) != NULL) {
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
            free(fp);
            return(NULL);
        }
        idx = HASH(id);
        pthread_mutex_lock(&hashlock);
        fp->f_next = fh[idx];
        fh[idx] = fp;
        pthread_mutex_lock(&fp->f_lock);
        pthread_mutex_unlock(&hashlock);
        /* ... continue initialization ... */
        pthread_mutex_unlock(&fp->f_lock);
    }
    return(fp);
}

void foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&hashlock);
    fp->f_count++;
    pthread_mutex_unlock(&hashlock);
}

struct foo *foo_find(int id) /* find a existing object */
{
    struct foo  *fp;

    pthread_mutex_lock(&hashlock);
    for (fp = fh[HASH(idx)]; fp != NULL; fp = fp->f_next) {
        if (fp->f_id == id) {
            fp->f_count++;
            break;
        }
    }
    pthread_mutex_unlock(&hashlock);
    return(fp);
}

void foo_rele(struct foo *fp) /* release a reference to the object */
{
    struct foo  *tfp;
    int         idx;

    pthread_mutex_lock(&hashlock);
    if (--fp->f_count == 0) { /* last reference, remove from list */
        idx = HASH(fp->f_id);
        tfp = fh[idx];
        if (tfp == fp) {
            fh[idx] = fp->f_next;
        } else {
            while (tfp->f_next != fp)
                tfp = tfp->f_next;
            tfp->f_next = fp->f_next;
        }
        pthread_mutex_unlock(&hashlock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } else {
        pthread_mutex_unlock(&hashlock);
    }
}

pthread_mutex_timedlock互斥量原语允许绑定线程阻塞时间。

1
2
3
4
#include<pthread.h>
#include<time.h>
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,
        const struct timespec *restrict tsptr);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "apue.h"
#include <pthread.h>

int main(void)
{
    int err;
    struct timespec tout;
    struct tm *tmp;
    char buf[64];
    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

    pthread_mutex_lock(&lock);
    printf("mutex is locked\n");
    clock_gettime(CLOCK_REALTIME, &tout);
    tmp = localtime(&tout.tv_sec);
    strftime(buf,sizeof(buf),"%r",tmp);
    printf("current time is %s\n", buf);
    tout.tv_sec += 10;
    err = pthread_mutex_timedlock(&lock, &tout);
    clock_gettime(CLOCK_REALTIME, &tout);
    tmp = localtime(&tout.tv_sec);
    strftime(buf,sizeof(buf),"%r",tmp);
    printf("the time is now %s\n", buf);
    if(err == 0)
        printf("mutex locked again!\n");
    else
        printf("can't lock mvtex again:%s\n",strerror(err));
    exit(0);
}

读写锁

当读写锁是写加锁状态时,在这个锁被解锁前,所有试图对这个锁加锁的线程都会被阻塞。

当读写锁是读加锁状态时,所有试图以读模式对它进行加锁的线程都可以得到访问权。但是任何希望以写模式对此锁进行加锁的线程都会阻塞,知道所有的线程释放它们的读锁为止。

当读写锁处于读模式锁住状态,而这时有一个线程试图以写模式获取锁时,读写锁通常会阻塞随后的读模式锁请求,这样可避免读模式锁长期占用,而等待的写模式锁请求一直得不到满足。

PTHREAD_RWLOCK_INITIALIZER常量可以对静态分配的读写锁进行初始化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#include<pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restricy rwlock,
        const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock,
        const struct timespec *restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock,
        const struct timespec *restrict tsptr);
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
#include <stdlib.h>
#include <pthread.h>

struct job {
    struct job *j_next;
    struct job *j_prev;
    pthread_t   j_id;   /* tells which thread handles this job */
    /* ... more stuff here ... */
};

struct queue {
    struct job      *q_head;
    struct job      *q_tail;
    pthread_rwlock_t q_lock;
};

/*
* Initialize a queue.
*/
int queue_init(struct queue *qp)
{
    int err;

    qp->q_head = NULL;
    qp->q_tail = NULL;
    err = pthread_rwlock_init(&qp->q_lock, NULL);
    if (err != 0)
        return(err);
    /* ... continue initialization ... */
    return(0);
}

/*
 * Insert a job at the head of the queue.
 */
void job_insert(struct queue *qp, struct job *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock);
    jp->j_next = qp->q_head;
    jp->j_prev = NULL;
    if (qp->q_head != NULL)
        qp->q_head->j_prev = jp;
    else
        qp->q_tail = jp;      /* list was empty */
    qp->q_head = jp;
    pthread_rwlock_unlock(&qp->q_lock);
}

/*
 * Append a job on the tail of the queue.
 */
void job_append(struct queue *qp, struct job *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock);
    jp->j_next = NULL;
    jp->j_prev = qp->q_tail;
    if (qp->q_tail != NULL)
        qp->q_tail->j_next = jp;
    else
        qp->q_head = jp;   /* list was empty */
    qp->q_tail = jp;
    pthread_rwlock_unlock(&qp->q_lock);
}

/*
 * Remove the given job from a queue.
 */
void job_remove(struct queue *qp, struct job *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock);
    if (jp == qp->q_head) {
        qp->q_head = jp->j_next;
        if (qp->q_tail == jp)
            qp->q_tail = NULL;
    } else if (jp == qp->q_tail) {
        qp->q_tail = jp->j_prev;
        if (qp->q_head == jp)
            qp->q_head = NULL;
    } else {
        jp->j_prev->j_next = jp->j_next;
        jp->j_next->j_prev = jp->j_prev;
    }
    pthread_rwlock_unlock(&qp->q_lock);
}
/*
 * Find a job for the given thread ID.
 */
struct job * job_find(struct queue *qp, pthread_t id)
{
    struct job *jp;

    if (pthread_rwlock_rdlock(&qp->q_lock) != 0)
        return(NULL);

    for (jp = qp->q_head; jp != NULL; jp = jp->j_next)
         if (pthread_equal(jp->j_id, id))
             break;

    pthread_rwlock_unlock(&qp->q_lock);
    return(jp);
}

条件变量

条件变量与互斥量一起使用时,允许线程以无竞争的方式等待特定的条件发生。

条件本身是由互斥量保护的。线程在改变条件状态之前必须首先锁住互斥量。其他线程在获得互斥量之前不会察觉到这种改变,因为互斥量必须在锁定以后才能计算条件。

常量PTHREAD_COND_INITIALIZER赋给静态分配的条件变量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#include<pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond,
        const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *restrict cond,
        pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
        pthread_mutex_t *restrict mutex,
        const struct timespec *restrict tsptr);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <pthread.h>

struct msg {
    struct msg *m_next;
    /* ... more stuff here ... */
};
struct msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

void process_msg(void)
{
    struct msg *mp;

    for (;;) {
        pthread_mutex_lock(&qlock);
        while (workq == NULL)
            pthread_cond_wait(&qready, &qlock);
        mp = workq;
        workq = mp->m_next;
        pthread_mutex_unlock(&qlock);
        /* now process the message mp */
    }
}

void enqueue_msg(struct msg *mp)
{
    pthread_mutex_lock(&qlock);
    mp->m_next = workq;
    workq = mp;
    pthread_mutex_unlock(&qlock);
    pthread_cond_signal(&qready);
}

自旋锁

自旋锁不是通过休眠使进程阻塞,而是在获取锁之前一直处于忙等待(自旋)阻塞状态。

自旋锁通常作为底层原语用于实现其他类型的锁。在用户层,自旋锁并不是非常有用。

1
2
3
4
5
6
#include<pthread.h>
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);

屏障

屏障(barrier)是用户协调多个线程并行的同步机制。

1
2
3
4
5
6
#include<pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
        const pthread_barrierattr_t *restrict attr,
        unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
int pthread_barrier_wait(pthread_barrier_t *barrier);
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
#include "apue.h"
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>

#define NTHR 8
#define NUMNUM 8000000L
#define TNUM (NUMNUM/NTHR)

long nums[NUMNUM];
long snums[NUMNUM];

pthread_barrier_t b;

#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t, 
                int (*)(const void *, const void *));
#endif

int complong(const void *arg1, const void *arg2)
{
    long l1 = *(long *)arg1;
    long l2 = *(long *)arg2;

    if(l1 == l2)
        return 0;
    else if(l1 < l2)
        return -1;
    else
        return 1;
}

void *thr_fn(void *arg)
{
    long idx = (long)arg;

    heapsort(&nums[idx], TNUM, sizeof(long), complong);
    pthread_barrier_wait(&b);
    /* ... */
    return ((void *)0);
}

void merge()
{
    long idx[NTHR];
    long i, minidx, sidx, num;

    for(i = 0; i<NTHR; i++)
        idx[i] = i * TNUM;
    for(sidx = 0; sidx < NUMNUM; sidx++)
    {
        num = LONG_MAX;
        for(i = 0; i < NTHR; i++)
        {
            if((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num))
            {
                num = nums[idx[i]];
                minidx = i;
            }
        }
        snums[sidx] = nums[idx[minidx]];
        idx[minidx]++;
    }
}

int main()
{
    unsigned long       i;
    struct timeval      start, end;
    long long           startusec, endusec;
    double              elapsed;
    int                 err;
    pthread_t           tid;

    srandom(1);
    for(i = 0; i < NUMNUM; i++)
        nums[i] = random();

    gettimeeeofday(&start, NULL);
    pthread_barrier_init(&b, NULL, NTHR+1);
    for(i = 0; i < NTHR; i++)
    {
        err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
        if(err != 0)
            err_exit(err, "can't create thread");
    }
    pthread_barrier_wait(&b);
    merge();
    gettimeofday(&end, NULL);

    startusec = start.tv_sec * 1000000 + start.tv_usec;
    endusec = end.tv_sec * 1000000 + end.tv_usec;
    elapsed = (double)(endusec - startusec) / 1000000.0;
    printf("sort took %.4f seconds\n", elapsed);
    for(i = 0; i < NUMNUM; i++)
        printf("%ld\n", snums[i]);
    exit(0);
}

线程控制

线程属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <pthread.h>

int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t   *attr);

int pthread_attr_getdetachstate(const pthread_attr_t *restrict attr,
            int *detachstate);
//PTHREAD_CREATE_DETACHED, PTHREAD_CREATE_JOINABLE
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);

int pthread_attr_getstack(const pthread_attr_t *restrict attr,
            void **restrict stackaddr, size_t *restrict stacksize);
int pthread_attr_setstack(const pthread_attr_t *attr,
            void *stackaddr, size_t* stacksize);

int pthread_attr_getstacksize(const pthread_attr_t *restrict attr,
            size_t *restrict stacksize);
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);

int pthread_attr_getguardsize(const pthread_attr_t *restrict attr,
            size_t *restrict guardsize);
int pthread_attr_setguardsize(pthread_attr_t *attr, size_t guardsize);
名称 描述
detachstate 线程分离状态属性
guardsize 线程栈末尾的警戒缓冲区大小(字节数)
stackaddr 线程栈的最低地址
stacksize 线程栈的最小长度(字节数)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#include "apue.h"
#include <pthread.h>

int makethread(void *(*fn)(void *), void *arg)
{
    int             err;
    pthread_t       tid;
    pthread_attr_t  attr;

    err = pthread_attr_init(&attr);
    if (err != 0)
        return(err);
    err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if (err == 0)
        err = pthread_create(&tid, &attr, fn, arg);
    pthread_attr_destroy(&attr);
    return(err);
}

同步属性

互斥量属性

在进程中,多个线程可以访问同一个同步对象,这是默认行为,这种情况下,进程共享互斥量属性需设置为PTHREAD_PROCESS_PRIVATE

如果进行共享互斥量属性设置为PTHREAD_PROCESS_SHARED,从多个进程彼此之间共享的内存数据块中分配的互斥量就可以用于这些进程的同步。

互斥量健壮属性与在多个进程间共享的互斥量有关。当持有互斥量的进程终止时,互斥量处于锁定状态,恢复起来很困难,其他阻塞这个锁的进程将会一直阻塞下去。

健壮属性取值:

  • PTHREAD_MUTEX_STALLED默认值,持有互斥量的进程终止时不需要采取特别的动作。
  • PTHREAD_MUTEX_ROBUST,该属性时调用线程获取pthread_mutex_lock锁时,该锁被另一个进程持有,但该进程终止并没有对锁进行解释时,此线程阻塞,从pthread_mutex_lock返回值为EOWNERDEAD而不是0。

如果应用状态无法恢复,在线程对互斥量解锁以后,该互斥量将处于永久不可用状态。pthread_mutex_consistent指明与该互斥量相关的状态在互斥量解锁之前是一致的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
#include <pthread.h>

int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
//进程共享属性
int pthread_mutexattr_getpshared(const pthread_mutexattr_t *restrict 
                attr,int *restrict pshared);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr,
                int pshared);
//健壮属性
int pthread_mutexattr_getrobust(const pthread_mutexattr_t *restrict 
                attr,int *restrict robust);
int pthread_mutexattr_getrobust(pthread_mutexattr_t *restrict attr,
                int robust);

int pthread_mutex_consistent(pthread_mutex_t *mutex);
//类型属性
int pthread_mutexattr_gettype(const pthread_mutexattr_t *restrict 
                attr, int *restrict type);
int pthread_mutexattr_settype(pthread_mutexattr_ *attr, int type);
互斥量类型属性 没有解锁时重新加锁? 不占用锁时解锁? 在已解锁时解锁?
PTHREAD_MUTEX_NORMAL 死锁 未定义 未定义
PTHREAD_MUTEX_ERRORCHECK 返回错误 返回错误 返回错误
PTHREAD_MUTEX_RECURSIVE 允许 返回错误 返回错误
PTHREAD_MUTEX_DDDEFAULT 未定义 未定义 未定义

读写锁属性

读写锁支持的唯一属性是进程共享属性。与互斥量进程共享属性一样。

1
2
3
4
5
6
7
8
9
#include <pthread.h>

int pthread_rwlockattr_init(pthread_rwlockattr_t *attr);
int pthread_rwlockattr_destroy(pthread_rwlockattr_t *attr);

int pthread_rwlockattr_getpshared(const pthread_rwlockattr_t *
                    restrict attr,int *restrict pshared);
int pthread_rwlockattr_setpshared(pthread_rwlockattr_t *attr,
                    int pshared);

条件变量属性

条件变量支持两个属性:进程共享属性和时钟属性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#include <pthread.h>

int pthread_condattr_init(pthread_condattr_t *attr);
int pthread_condattr_destroy(pthread_condattr_t *attr);

int pthread_condattr_getpshared(const pthread_condattr_t *
                    restrict attr,int *restrict pshared);
int pthread_condattr_setpshared(pthread_condattr_t *attr,
                    int pshared);

int pthread_condattr_getclock(const pthread_condattr_t *
            restrict attr,clockid_t *restrict clock_id);
int pthread_condattr_setclock(pthread_condattr_t *attr,
            clockid_t clock_id);

屏障属性

目前屏障属性只有进程共享属性。

1
2
3
4
5
6
7
8
#include<pthread.h>
int pthread_barrierattr_init(pthread_barrierattr_t *attr);
int pthread_barrierattr_destroy(pthread_barrierattr_t *attr);
int pthread_barrierattr_getpshared(const 
            pthread_barrierattr_t *restrict attr, 
            int *restrict pshared);
int pthread_barrierattr_setpshared(pthread_barrierattr_t 
            *attr,int pshared);

重入

一个函数在相同的时间点可以被多个线程安全地调用,就称该函数是线程安全的。

线程特定数据

线程特定数据(thread-specific data)也称为线程私有数据(thread-private data),是存储和查询某个特定线程相关数据的一种机制。

线程私有数据是关于每个线程可以访问它自己单独的数据副本,而不需要担心与其它线程的同步访问问题。

一个进程中的所有线程都可以访问这个进程的整个地址空间。除了使用寄存器以外,一个线程没有办法阻止另一个线程访问它的数据。线程特定数据也不例外。但管理线程特定数据的函数可以提高线程间的数据独立性,使线程不容易访问到其它线程的特定数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#include <pthread.h>

int pthread_key_create(pthread_key_t *keyp, void (*destructor)(void *));
int pthread_key_delete(pthread_key_t *key);

pthread_once_t initflag = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *initflag, void (*initfn)(void));

void *pthread_getspecific(pthread_key_t key);
int pthread_setspecific(pthread_key_t key, const void *value);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <limits.h>
#include <string.h>
#include <pthread.h>
#include <stdlib.h>

static pthread_key_t key;
static pthread_once_t init_done = PTHREAD_ONCE_INIT;
pthread_mutex_t env_mutex = PTHREAD_MUTEX_INITIALIZER;

extern char **environ;

static void thread_init(void)
{
    pthread_key_create(&key, free);
}

char *getenv(const char *name)
{
    int     i, len;
    char    *envbuf;

    pthread_once(&init_done, thread_init);
    pthread_mutex_lock(&env_mutex);
    envbuf = (char *)pthread_getspecific(key);
    if (envbuf == NULL) {
        envbuf = malloc(ARG_MAX);
        if (envbuf == NULL) {
            pthread_mutex_unlock(&env_mutex);
            return(NULL);
        }
        pthread_setspecific(key, envbuf);
    }
    len = strlen(name);
    for (i = 0; environ[i] != NULL; i++) {
        if ((strncmp(name, environ[i], len) == 0) &&
          (environ[i][len] == '=')) {
            strcpy(envbuf, &environ[i][len+1]);
            pthread_mutex_unlock(&env_mutex);
            return(envbuf);
        }
    }
    pthread_mutex_unlock(&env_mutex);
    return(NULL);
}

取消选项

1
2
3
4
5
#include <pthread.h>

int pthread_setcancelstate(int state, int *oldstate);
void pthread_testcancel(void);
int pthread_setcanceltype(int type, int *oldtype);

线程和信号

1
2
3
4
5
6
7
#include <signal.h>

int pthread_sigmask(int how, const sigset_t *restrict set,
                 sigset_t *restrict oset);
int sigwait(const sigset_t *restrict set, 
                int *restrict signop);
int pthread_kill(pthread_t thread, int signo);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include "apue.h"
#include <pthread.h>

int         quitflag;   /* set nonzero by thread */
sigset_t    mask;

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wait = PTHREAD_COND_INITIALIZER;

void *thr_fn(void *arg)
{
    int err, signo;

    for (;;) {
        err = sigwait(&mask, &signo);
        if (err != 0)
            err_exit(err, "sigwait failed");
        switch (signo) {
        case SIGINT:
            printf("\ninterrupt\n");
            break;

        case SIGQUIT:
            pthread_mutex_lock(&lock);
            quitflag = 1;
            pthread_mutex_unlock(&lock);
            pthread_cond_signal(&wait);
            return(0);

        default:
            printf("unexpected signal %d\n", signo);
            exit(1);
        }
    }
}
int main(void)
{
    int         err;
    sigset_t    oldmask;
    pthread_t   tid;

    sigemptyset(&mask);
    sigaddset(&mask, SIGINT);
    sigaddset(&mask, SIGQUIT);
    if ((err = pthread_sigmask(SIG_BLOCK, &mask, &oldmask)) != 0)
        err_exit(err, "SIG_BLOCK error");

    err = pthread_create(&tid, NULL, thr_fn, 0);
    if (err != 0)
        err_exit(err, "can't create thread");

    pthread_mutex_lock(&lock);
    while (quitflag == 0)
        pthread_cond_wait(&wait, &lock);
    pthread_mutex_unlock(&lock);

    /* SIGQUIT has been caught and is now blocked; do whatever */
    quitflag = 0;

    /* reset signal mask which unblocks SIGQUIT */
    if (sigprocmask(SIG_SETMASK, &oldmask, NULL) < 0)
        err_sys("SIG_SETMASK error");
    exit(0);
}

线程和fork

1
2
3
#include <pthread.h>
int pthread_atfork(void (*prepare)(void), 
        void (*parent)(void), void (*child)(void));
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include "apue.h"
#include <pthread.h>

pthread_mutex_t lock1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lock2 = PTHREAD_MUTEX_INITIALIZER;

void prepare(void)
{
    printf("preparing locks...\n");
    pthread_mutex_lock(&lock1);
    pthread_mutex_lock(&lock2);
}
void parent(void)
{
    printf("parent unlocking locks...\n");
    pthread_mutex_unlock(&lock1);
    pthread_mutex_unlock(&lock2);
}

void child(void)
{
    printf("child unlocking locks...\n");
    pthread_mutex_unlock(&lock1);
    pthread_mutex_unlock(&lock2);
}

void *thr_fn(void *arg)
{
    printf("thread started...\n");
    pause();
    return(0);
}

int main(void)
{
    int         err;
    pid_t       pid;
    pthread_t   tid;

#if defined(BSD) || defined(MACOS)
    printf("pthread_atfork is unsupported\n");
#else
    if ((err = pthread_atfork(prepare, parent, child)) != 0)
        err_exit(err, "can't install fork handlers");
    err = pthread_create(&tid, NULL, thr_fn, 0);
    if (err != 0)
        err_exit(err, "can't create thread");
    sleep(2);
    printf("parent about to fork...\n");
    if ((pid = fork()) < 0)
        err_quit("fork failed");
    else if (pid == 0) /* child */
        printf("child returned from fork\n");
    else        /* parent */
        printf("parent returned from fork\n");
#endif
    exit(0);
}

线程和I/O