进程间通信(InterProcess Communication, IPC)

  • 匿名管道
  • 有名管道
  • 信号
  • 消息队列
  • 共享内存
  • 信号量
  • 套接字

管道

匿名管道

  • 历史上,管道是半双工的(即数据只能在一个方向上流动)。现在某些系统提供全双工管道,但是为了最佳可移植性,不应预先假定系统支持全双工管道。
  • 匿名管道只能在具有公共祖先的两个进程之间使用。通常,一个管道由一个进程创建,在进程调用fork之后,这个管道就能在父进程和子进程之间使用了。

pipe函数创建管道,fd[0]为读而打开,fd[1]为写而打开。

1
2
#include<unsitd.h>
int pipe(int fd[2]);

通常,进程会先调用pipe,接着调用fork,从而创建从父进程到子进程的IPC通道。

fork之后做什么取决于我们想要的数据流的方向。对于从父进程到子进程的管道,父进程关闭管道的读端(fd[0]),子进程关闭写端(fd[1])。状态结构如下图所示。

  • 当读(read)一个写端已被关闭的管道时,在所有数据都被读取后,read返回0,表示文件结束。
  • 如果写(write)一个读端已被关闭的管道,则产生信号SIGPIPE。如果忽略该信号或者捕捉该信号并从其处理程序返回,则write返回-1,errno设置为EPIPE。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include "apue.h"
int main(void)
{
    int     n;
    int     fd[2];
    pid_t   pid;
    char    line[MAXLINE];

    if (pipe(fd) < 0)
        err_sys("pipe error");
    if ((pid = fork()) < 0) {
        err_sys("fork error");
    } else if (pid > 0) {       /* parent */
        close(fd[0]);
        write(fd[1], "hello world\n", 12);
    } else {                /* child */
        close(fd[1]);
        n = read(fd[0], line, MAXLINE);
        write(STDOUT_FILENO, line, n);
    }
    exit(0);
}

popen函数:创建一个管道,fork一个子进程,关闭未使用的管道端,执行一个shell运行命令,然后等待命令终止。

1
2
3
#include<stdio.h>
FILE *popen(const char *cmdstring, const char *type);
int pclose(FILE *fp);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "apue.h"
#include <sys/wait.h>

#define PAGER   "${PAGER:-more}" /* environment variable, or default */

int main(int argc, char *argv[])
{
    char    line[MAXLINE];
    FILE    *fpin, *fpout;

    if (argc != 2)
        err_quit("usage: a.out <pathname>");
    if ((fpin = fopen(argv[1], "r")) == NULL)
        err_sys("can't open %s", argv[1]);

    if ((fpout = popen(PAGER, "w")) == NULL)
        err_sys("popen error");

    /* copy argv[1] to pager */
    while (fgets(line, MAXLINE, fpin) != NULL) {
        if (fputs(line, fpout) == EOF)
            err_sys("fputs error to pipe");
    }
    if (ferror(fpin))
        err_sys("fgets error");
    if (pclose(fpout) == -1)
        err_sys("pclose error");
    exit(0);
}

FIFO(命名管道)

FIFO可以在不相关的进程间交换数据。

1
2
3
#include<sys/stat.h>
int mkfifo(const char *path, mode_t mode);
int mkfifoat(int fd, const char *path, mode_t mode);

当使用mkfifo创建一个FIFO时,需要用open来打开它。当open一个FIFO时,非阻塞标志(O_NONBLOCK)会产生如下影响:

  • 没指定O_NONBLOCK是,只读open要阻塞到某个其它进程为写而打开这个FIFO为止。
  • 指定了O_NONBLOCK时,只读open立即返回。如果没有进程为读而打开一个FIFO,那么只写open将返回-1,并将errno设置成ENXIO。

若write一个尚无进程为读而打开的FIFO,则产生信号SIGPIPE。若某个FIFO的最后一个写进程关闭了该FIFO,则将为该FIFO的读进程产生一个文件结束标志。

XSI IPC

每个内核中的IPC结构(消息队列,信号量或共享内存)都用一个非负整数的标识符(identifier)加以引用。

ftok函数通关一个路径名和项目id产生一个键。

1
2
#include<sys/ipc.h>
key_t ftok(const char *path, int id);

XSI IPC为每一个IPC结构关联了一个ipc_perm结构。该结构规定了权限和所有者,它至少包含下列成员:

1
2
3
4
5
6
7
8
struct ipc_perm {
    uid_t  uid;  /* owner's effective user id */
    gid_t  gid;  /* owner's effective group id */
    uid_t  cuid; /* creator's effective user id */
    gid_t  cgid; /* creator's effective group id */
    mode_t mode; /* access modes */
    //...
};
权限
用户读 0400
用户写(更改) 0200
组读 0040
组写(更改) 0020
其它读 0004
其它写(更改) 0002

在linux中可以运行ipcs -l来显示IPC相关的限制。

消息队列

消息队列是消息的链接表,存储在内核中,由消息队列标识符标识。

smgget用于创建一个新队列或打开一个现有队列。msgsnd将新消息添加到队列尾端。每个消息包含一个正的长整型类型的字段,一个非负的长度以及实际数据字节数,所有这些都在将消息添加到队列时,传递给msgsndmsgrcv用于从队列中读取消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#include<sys/msg.h>

struct msqid_ds {
    struct ipc_perm  msg_perm;     /* see Section 15.6.2 */
    msgqnum_t        msg_qnum;     /* # of messages on queue */
    msglen_t         msg_qbytes;   /* max # of bytes on queue */
    pid_t            msg_lspid;    /* pid of last msgsnd() */
    pid_t            msg_lrpid;    /* pid of last msgrcv() */
    time_t           msg_stime;    /* last-msgsnd() time */
    time_t           msg_rtime;    /* last-msgrcv() time */
    time_t           msg_ctime;    /* last-change time */
    //...
};

int msgget(key_t key, int flag);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
int msgsnd(int msqid, const void *ptr, size_t nbytes, int flag);
ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int flag);

信号量

信号量是一个计数器,用于为多个进程提供对共享数据对象的访问。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#include<sys/sem.h>
int semget(key_t key, int nsems, int flag);
int semctl(int semid, int semnum, int cmd, .../* union semun arg */);

union semun{
    int                 val;
    struct semid_ds     *buf;
    unsigned short      *array;
};

int semop(int semid, struct sembuf semoparray[], size_t nops);

struct sembuf{
    unsigned short  sem_num;
    short           sem_op;
    short           sem_flg;
};

共享内存

1
2
3
4
5
#include<sys/shm.h>
int shmget(key_t key, size_t size, int flag);
int shmctl(int shmid, int cmd, struct shmid_ds* buf);
void *shmat(int shmid, const void *addr, int flag);
int shmdt(const void *addr);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include "apue.h"
#include <sys/shm.h>

#define ARRAY_SIZE  40000
#define MALLOC_SIZE 100000
#define SHM_SIZE    100000
#define SHM_MODE    0600    /* user read/write */

char    array[ARRAY_SIZE];  /* uninitialized data = bss */

int main(void)
{
    int     shmid;
    char    *ptr, *shmptr;

    printf("array[] from %lx to %lx\n", (unsigned long)&array[0],
                    (unsigned long)&array[ARRAY_SIZE]);
    printf("stack around %lx\n", (unsigned long)&shmid);

    if ((ptr = malloc(MALLOC_SIZE)) == NULL)
        err_sys("malloc error");
    printf("malloced from %lx to %lx\n", (unsigned long)ptr,
                    (unsigned long)ptr+MALLOC_SIZE);

    if ((shmid = shmget(IPC_PRIVATE, SHM_SIZE, SHM_MODE)) < 0)
        err_sys("shmget error");
    if ((shmptr = shmat(shmid, 0, 0)) == (void *)-1)
        err_sys("shmat error");
    printf("shared memory attached from %lx to %lx\n",
        (unsigned long)shmptr, (unsigned long)shmptr+SHM_SIZE);

    if (shmctl(shmid, IPC_RMID, 0) < 0)
        err_sys("shmctl error");

    exit(0);
}

POSIX信号量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#include<semaphore.h>
sem_t *sem_open(const char *name, int oflag, .../* mode_t mode,
        unsigned int value */);
int sem_close(sem_t *sem);
int sem_unlink(const char *name);
int sem_trywait(sem_t *sem);
int sem_wait(sem_t *sem);
#include<time.h>
int sem_timedwait(sem_t *restrict sem,
        const struct timespec *restrict tsptr);
int sem_post(sem_t *sem);
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
int sem_getvalue(sem_t *restrict sem, int *restrict valp);