进程间通信(InterProcess Communication, IPC)
- 匿名管道
- 有名管道
- 信号
- 消息队列
- 共享内存
- 信号量
- 套接字
管道
匿名管道
- 历史上,管道是半双工的(即数据只能在一个方向上流动)。现在某些系统提供全双工管道,但是为了最佳可移植性,不应预先假定系统支持全双工管道。
- 匿名管道只能在具有公共祖先的两个进程之间使用。通常,一个管道由一个进程创建,在进程调用fork之后,这个管道就能在父进程和子进程之间使用了。
pipe
函数创建管道,fd[0]
为读而打开,fd[1]
为写而打开。
1
2
|
#include<unsitd.h>
int pipe(int fd[2]);
|
通常,进程会先调用pipe,接着调用fork,从而创建从父进程到子进程的IPC通道。

fork之后做什么取决于我们想要的数据流的方向。对于从父进程到子进程的管道,父进程关闭管道的读端(fd[0]),子进程关闭写端(fd[1])。状态结构如下图所示。

- 当读(read)一个写端已被关闭的管道时,在所有数据都被读取后,read返回0,表示文件结束。
- 如果写(write)一个读端已被关闭的管道,则产生信号SIGPIPE。如果忽略该信号或者捕捉该信号并从其处理程序返回,则write返回-1,errno设置为EPIPE。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#include "apue.h"
int main(void)
{
int n;
int fd[2];
pid_t pid;
char line[MAXLINE];
if (pipe(fd) < 0)
err_sys("pipe error");
if ((pid = fork()) < 0) {
err_sys("fork error");
} else if (pid > 0) { /* parent */
close(fd[0]);
write(fd[1], "hello world\n", 12);
} else { /* child */
close(fd[1]);
n = read(fd[0], line, MAXLINE);
write(STDOUT_FILENO, line, n);
}
exit(0);
}
|
popen
函数:创建一个管道,fork一个子进程,关闭未使用的管道端,执行一个shell运行命令,然后等待命令终止。
1
2
3
|
#include<stdio.h>
FILE *popen(const char *cmdstring, const char *type);
int pclose(FILE *fp);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#include "apue.h"
#include <sys/wait.h>
#define PAGER "${PAGER:-more}" /* environment variable, or default */
int main(int argc, char *argv[])
{
char line[MAXLINE];
FILE *fpin, *fpout;
if (argc != 2)
err_quit("usage: a.out <pathname>");
if ((fpin = fopen(argv[1], "r")) == NULL)
err_sys("can't open %s", argv[1]);
if ((fpout = popen(PAGER, "w")) == NULL)
err_sys("popen error");
/* copy argv[1] to pager */
while (fgets(line, MAXLINE, fpin) != NULL) {
if (fputs(line, fpout) == EOF)
err_sys("fputs error to pipe");
}
if (ferror(fpin))
err_sys("fgets error");
if (pclose(fpout) == -1)
err_sys("pclose error");
exit(0);
}
|
FIFO(命名管道)
FIFO可以在不相关的进程间交换数据。
1
2
3
|
#include<sys/stat.h>
int mkfifo(const char *path, mode_t mode);
int mkfifoat(int fd, const char *path, mode_t mode);
|
当使用mkfifo
创建一个FIFO时,需要用open
来打开它。当open
一个FIFO时,非阻塞标志(O_NONBLOCK)会产生如下影响:
- 没指定O_NONBLOCK是,只读open要阻塞到某个其它进程为写而打开这个FIFO为止。
- 指定了O_NONBLOCK时,只读open立即返回。如果没有进程为读而打开一个FIFO,那么只写open将返回-1,并将errno设置成ENXIO。
若write一个尚无进程为读而打开的FIFO,则产生信号SIGPIPE。若某个FIFO的最后一个写进程关闭了该FIFO,则将为该FIFO的读进程产生一个文件结束标志。
XSI IPC
每个内核中的IPC结构(消息队列,信号量或共享内存)都用一个非负整数的标识符(identifier)加以引用。
ftok
函数通关一个路径名和项目id产生一个键。
1
2
|
#include<sys/ipc.h>
key_t ftok(const char *path, int id);
|
XSI IPC为每一个IPC结构关联了一个ipc_perm结构。该结构规定了权限和所有者,它至少包含下列成员:
1
2
3
4
5
6
7
8
|
struct ipc_perm {
uid_t uid; /* owner's effective user id */
gid_t gid; /* owner's effective group id */
uid_t cuid; /* creator's effective user id */
gid_t cgid; /* creator's effective group id */
mode_t mode; /* access modes */
//...
};
|
权限 |
位 |
用户读 |
0400 |
用户写(更改) |
0200 |
组读 |
0040 |
组写(更改) |
0020 |
其它读 |
0004 |
其它写(更改) |
0002 |
在linux中可以运行ipcs -l
来显示IPC相关的限制。
消息队列
消息队列是消息的链接表,存储在内核中,由消息队列标识符标识。
smgget
用于创建一个新队列或打开一个现有队列。msgsnd
将新消息添加到队列尾端。每个消息包含一个正的长整型类型的字段,一个非负的长度以及实际数据字节数,所有这些都在将消息添加到队列时,传递给msgsnd
。msgrcv
用于从队列中读取消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#include<sys/msg.h>
struct msqid_ds {
struct ipc_perm msg_perm; /* see Section 15.6.2 */
msgqnum_t msg_qnum; /* # of messages on queue */
msglen_t msg_qbytes; /* max # of bytes on queue */
pid_t msg_lspid; /* pid of last msgsnd() */
pid_t msg_lrpid; /* pid of last msgrcv() */
time_t msg_stime; /* last-msgsnd() time */
time_t msg_rtime; /* last-msgrcv() time */
time_t msg_ctime; /* last-change time */
//...
};
int msgget(key_t key, int flag);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
int msgsnd(int msqid, const void *ptr, size_t nbytes, int flag);
ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int flag);
|
信号量
信号量是一个计数器,用于为多个进程提供对共享数据对象的访问。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
#include<sys/sem.h>
int semget(key_t key, int nsems, int flag);
int semctl(int semid, int semnum, int cmd, .../* union semun arg */);
union semun{
int val;
struct semid_ds *buf;
unsigned short *array;
};
int semop(int semid, struct sembuf semoparray[], size_t nops);
struct sembuf{
unsigned short sem_num;
short sem_op;
short sem_flg;
};
|
共享内存
1
2
3
4
5
|
#include<sys/shm.h>
int shmget(key_t key, size_t size, int flag);
int shmctl(int shmid, int cmd, struct shmid_ds* buf);
void *shmat(int shmid, const void *addr, int flag);
int shmdt(const void *addr);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
#include "apue.h"
#include <sys/shm.h>
#define ARRAY_SIZE 40000
#define MALLOC_SIZE 100000
#define SHM_SIZE 100000
#define SHM_MODE 0600 /* user read/write */
char array[ARRAY_SIZE]; /* uninitialized data = bss */
int main(void)
{
int shmid;
char *ptr, *shmptr;
printf("array[] from %lx to %lx\n", (unsigned long)&array[0],
(unsigned long)&array[ARRAY_SIZE]);
printf("stack around %lx\n", (unsigned long)&shmid);
if ((ptr = malloc(MALLOC_SIZE)) == NULL)
err_sys("malloc error");
printf("malloced from %lx to %lx\n", (unsigned long)ptr,
(unsigned long)ptr+MALLOC_SIZE);
if ((shmid = shmget(IPC_PRIVATE, SHM_SIZE, SHM_MODE)) < 0)
err_sys("shmget error");
if ((shmptr = shmat(shmid, 0, 0)) == (void *)-1)
err_sys("shmat error");
printf("shared memory attached from %lx to %lx\n",
(unsigned long)shmptr, (unsigned long)shmptr+SHM_SIZE);
if (shmctl(shmid, IPC_RMID, 0) < 0)
err_sys("shmctl error");
exit(0);
}
|
POSIX信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#include<semaphore.h>
sem_t *sem_open(const char *name, int oflag, .../* mode_t mode,
unsigned int value */);
int sem_close(sem_t *sem);
int sem_unlink(const char *name);
int sem_trywait(sem_t *sem);
int sem_wait(sem_t *sem);
#include<time.h>
int sem_timedwait(sem_t *restrict sem,
const struct timespec *restrict tsptr);
int sem_post(sem_t *sem);
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
int sem_getvalue(sem_t *restrict sem, int *restrict valp);
|