管道
管道是一种特殊的进程间通信机制,它可以用于将一个进程的输出直接传递给另一个进程的输入,从而实现它们之间的数据传输。
匿名管道
pipe 系统调用
pipe是一个系统调用,用于创建一个管道(pipe)。它的原型如下:
int pipe(int pipefd[2]);
pipe系统调用用于创建一个匿名管道(pipe),它是一种特殊的通信机制,用于在两个相关的进程之间进行单向的数据传输。管道具有固定的读端和写端,数据从写端流入管道,从读端流出。
以下是一些关于pipe系统调用的要点:
-
pipefd是一个长度为2的整型数组,用于存储管道的读端和写端的文件描述符。pipefd[0]表示管道的读端,pipefd[1]表示管道的写端。 -
调用
pipe系统调用将创建一个新的管道,并将其读端和写端的文件描述符分别存储在pipefd[0]和pipefd[1]中。 -
管道是一种 半双工 的通信机制,数据只能在一个方向上流动。一端用于写入数据,另一端用于读取数据。
-
管道中的数据是以字节流的形式传输的,没有固定的消息边界。数据写入管道后,可以从另一端读取出来,按照写入的顺序进行读取。
-
管道的大小是有限的,通常是几千字节。当管道已满时,继续写入数据会导致写操作阻塞,直到有足够的空间可用为止。类似地,如果 管道为空时进行读取操作,读操作也会阻塞。
-
管道是由内核维护的,可以在父子进程之间传递数据。父进程创建管道后,可以通过
fork系统调用创建子进程,子进程继承了父进程的管道文件描述符,从而实现进程间的通信。
下面是使用 pipe 创建一个匿名管道的例子:
int main(void)
{
int fds[2];
if(pipe(fds) != 0)
{
perror("pipe error!\n");
return 1;
}
char buff[1024];
while (fgets(buff, sizeof(buff), stdin) != NULL) // 从标准输入读数据
{
int len = strlen(buff);
// 写数据到管道
if(write(fds[1], buff, len) != len)
{
perror("Buffer overflow\n");
return 1;
}
memset(buff, 0x00, sizeof(buff));
// 从管道读数据
if(read(fds[0], buff, len) != len)
{
perror("read error\n");
return 1;
}
// 打印到标准输出
if(fputs(buff, stdout) != len)
{
perror("fputs error\n");
return 1;
}
}
}
以文件描述符(fd)的角度理解管道
使用匿名管道进行进程间通信
匿名管道通常用于父子进程间的通信
int main(void)
{
char buffer[1024];
int fds[2];
if (pipe(fds) != 0)
{
perror("pipe error!\n");
return -1;
}
pid_t pid = fork();
if (pid < 0)
{
perror("fork error!\n");
return -1;
}
else if (pid == 0) // 子进程写数据
{
for(int i = 0; i < 3; ++i) // 从标准输入读三个数据,并发送到父进程
{
printf("Now child process[%d] is running...\n", getpid());
printf("child[%d]: enter the data you want to send to dad process\n", getpid());
fgets(buffer, sizeof(buffer), stdin);
close(fds[0]); // 关闭读端
write(fds[1], buffer, strlen(buffer)); // 向管道写数据
sleep(3);
}
printf("Now child process[%d] is quitting...\n", getpid());
exit(0);
}
else
{
printf("Now dad process[%d] is running...\n", getpid());
while (1)
{
printf("Waitting child[%d] process...\n", pid);
memset(buffer, 0x00, sizeof(buffer));
close(fds[1]); // 关闭写端
if(read(fds[0], buffer, sizeof(buffer)) == 0)
break;
printf("Dad process[%d] recived message from child process: ", getpid());
printf("%s", buffer);
}
wait(NULL);
printf("Now dad process[%d] is quitting...\n", getpid());
}
return 0;
}
如果没有 close(fds[1]); 这句话,会发生什么?
在我们的代码中,是子进程先退出,父进程后退出
当子进程写入完数据后,父进程在读取数据时可能会一直阻塞。因为管道的写端仍然保持打开,父进程会一直等待子进程写入更多数据,但实际上子进程已经退出并关闭了写端。所以,父进程无法检测到管道的结束,会一直阻塞在读取操作上。
这体现在子进程退出后,父进程不会退出,而是阻塞等待子进程继续写入数据
同样的,如果修改代码,父进程先退出,子进程后退出,并且没有 close(fds[0]); 这句话,子进程也会阻塞等待父进程读取数据
管道是一种临界资源
当子进程在写数据时,如果有进程也写数据的话,就有可能造成数据不一致,管道通过 互斥 机制保证了资源的互斥访问
当子进程还没有写数据,或者 sleep 时,父进程就像 sleep 了一样,因为管道内部实现了 同步 机制,保证只有管道有数据时,才能读取数据,否则必须阻塞等待
操作系统通过互斥锁保护管道的读写操作,以及使用内核缓冲区来隔离读写操作,从而实现了在管道读取或写入时不被其他进程干扰的机制
SIGPIPE 信号
当一个进程关闭了管道的读端之后,如果还有进程尝试向管道写入数据,就会产生 SIGPIPE 信号
SIGPIPE 信号的 默认行为是终止进程
如果进程希望忽略 SIGPIPE 信号,可以使用 signal 函数(下面有例子)
void sigHandler(int signum) // 信号处理函数
{
if (signum == SIGPIPE) {
printf("Received SIGPIPE signal\n");
}
}
int main(void)
{
signal(SIGPIPE, sigHandler);
int pip[2];
pipe(pip);
close(pip[0]); // 关闭读端
write(pip[1], "111", 3); // 尝试写数据
}
此外,虽然向一个关闭写端的管道写入数据,会发生写入失败,按理来说会产生 SIGPIPE 信号,但实际上并不会
命名管道
匿名管道最大的限制就是:只能用于具有亲缘关系的进程通信
如果要实现两个完全不相关的进程之间的通信,就必须用到命名管道了
命名管道实际上是一个 FIFO 类型的文件
创建命名管道
可以在终端中使用 mkfifo 指令来创建一个命名管道:
Sky_Lee@SkyLeeMBP test % mkfifo fifo
Sky_Lee@SkyLeeMBP test % ls
cfile client fifo server test test.txt
cfile.c client.c makefile server.c test.cpp
查看 fifo 的详细信息:
也可以在程序中创建一个命名管道:
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
其中:
pathname是创建的命名管道的路径和名称。mode是创建的命名管道的权限。
使用命名管道进行进程间通信
父子进程
// 将 temp.txt 的数据写入管道
void writeFifo(void)
{
int pipeFd = open("fifo", O_WRONLY);
int fileFd = open("temp.txt", O_RDONLY);
if (pipeFd == -1 || fileFd == -1)
{
perror("open error\n");
exit(-1);
}
char buff[1024];
int bytes;
while ((bytes = read(fileFd, buff, sizeof(buff))) > 0)
{
write(pipeFd, buff, bytes);
}
close(fileFd);
close(pipeFd);
printf("[%d]Write fifo complete\n", getpid());
}
// 读取管道数据,并打印到标准输出
void readFifo(void)
{
int fifoFd = open("fifo", O_RDONLY);
if(fifoFd == -1)
{
perror("open fifo error!\n");
exit(-1);
}
char buff[1024];
int bytes;
while ((bytes = read(fifoFd, buff, sizeof(buff))) != 0)
{
printf("%s\n", buff);
}
close(fifoFd);
printf("[%d]Read fifo complete\n", getpid());
}
int main(void)
{
pid_t pid = fork();
if(pid < 0)
{
perror("fork error\n");
return -1;
}
else if(pid == 0)
{
printf("[%d]About write to fifo...\n", getpid());
writeFifo();
exit(0);
}
else
{
printf("[%d]About read from fifo...\n", getpid());
readFifo();
wait(NULL); // 避免僵尸进程
}
}
server And client(不相关进程)
server.c:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#define O_FIFO_ERROR 114
#define RD_FIFO_SUCCESS 115
#define RD_FIFO_ERROR 116
int readFromFifo(void);
int main(void)
{
int status = readFromFifo();
exit(status);
}
int readFromFifo(void)
{
int fifoFd = open("fifo", O_RDONLY);
if(fifoFd == -1)
{
perror("open fifoFd error!\n");
return O_FIFO_ERROR;
}
char buff[1024];
int bytes;
while (1)
{
printf("[%d]Waiting for the client to make a request...\n", getpid());
bytes = read(fifoFd, buff, sizeof(buff));
if(bytes > 0)
printf("[%d]Server recived data from client: %s", getpid(), buff);
else if(bytes == 0)
{
printf("[%d]The client has exited and the server is about shutting down...\n", getpid());
sleep(1);
break;
}
else
{
perror("Read fifo error!\n");
return RD_FIFO_ERROR;
}
}
close(fifoFd);
return RD_FIFO_SUCCESS;
}
client.c:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#define O_FIFO_ERROR 114
#define WR_FIFO_SUCCESS 115
#define WR_FIFO_ERROR 116
int writeToFifo(void);
int main(void)
{
printf("[%d]Waiting for the server...\n", getpid());
int status = writeToFifo();
exit(status);
}
int writeToFifo(void)
{
int fifoFd = open("fifo", O_WRONLY);
if(fifoFd == -1)
{
perror("open fifoFd error!\n");
return O_FIFO_ERROR;
}
char buff[1024];
int bytes;
while (1)
{
printf("[%d]Please enter the data you want to send to server: ", getpid());
fflush(stdout);
memset(buff, 0x00, sizeof(buff));
int readBytes = read(1, buff, sizeof(buff));
printf("[%d]Sending...\n", getpid());
sleep(1);
bytes = write(fifoFd, buff, sizeof(buff));
if(bytes == -1)
{
perror("Write fifo error!\n");
return WR_FIFO_ERROR;
}
printf("[%d]Success!\n", getpid());
}
close(fifoFd);
return WR_FIFO_SUCCESS;
}
运行结果:
小细节:
在这段代码中,
fflush(stdout);语句的作用是刷新标准输出缓冲区,确保前面的输出立即显示在终端上。如果没有这句话,可能会导致输出被缓冲起来,直到遇到换行符或者缓冲区满才会显示出来。如果没有使用
fflush(stdout);,在执行到输入提示信息的时候,输出可能不会立即显示在终端上,而是留存在缓冲区中。这意味着用户可能无法立即看到输入提示信息,而需要等到缓冲区满或者遇到换行符时,才会将提示信息显示出来。这可能会导致用户的输入体验不太好,因为他们无法及时看到输入提示。而加上
fflush(stdout);这句话可以解决这个问题,确保输出能够及时显示在终端上。
命名管道的本质
命名管道文件(FIFO)在文件系统中以文件的形式存在,但它并不存储数据在磁盘中。相反,它用作进程间通信的通道,数据从一个进程通过管道写入,并从另一个进程通过管道读取。
当创建一个命名管道文件时,它会在文件系统中占据一定的存储空间,通常以零长度的文件形式存在。这个文件在磁盘上占据一些磁盘空间,但它的作用是提供进程间通信的机制,而不是存储数据本身。
当一个进程向管道写入数据时,数据会被暂存于内核内存中的管道缓冲区中。接收数据的进程从缓冲区中读取数据。
在这个过程中,管道缓冲区起到了临时存储数据的作用,但它不是普通的文件存储。它是一种在内存中维护的缓冲区,用于暂存数据以供进程间通信使用。
总结起来,命名管道文件在文件系统中以文件形式存在,但并不实际存储数据。数据是通过内核内存中的管道缓冲区进行传递。
System V 进程间通信
共享内存
相较于管道通信,共享内存是最快的进程间通信方式,一旦内存映射到共享它的进程的内存空间,这些进程间的数据传递就 不再涉及 OS 内核,也就不需要通过内核来交换数据,提升数据交换的效率
创建共享内存的步骤
-
创建共享内存对象:首先,需要调用
shmget()函数创建一个共享内存对象。这个函数会分配一块共享内存区域,并返回一个唯一的标识符(共享内存ID)用于后续的操作。 -
连接共享内存:接下来,使用
shmat()函数将进程与共享内存区域进行连接。这个函数将共享内存区域映射到进程的虚拟地址空间,使得进程可以直接访问该内存区域。 -
使用共享内存:一旦连接成功,进程就可以像访问普通内存一样使用共享内存区域。进程可以读取和写入共享内存区域中的数据,实现进程间的数据共享。
-
分离共享内存:当进程不再需要访问共享内存时,需要使用
shmdt()函数将共享内存从进程的虚拟地址空间中分离。这个操作不会删除共享内存区域,只是断开了进程与共享内存的连接。 -
删除共享内存对象:最后,当不再需要共享内存区域时,可以调用
shmctl()函数以及指定的命令来删除共享内存对象。这个操作会释放共享内存区域,并且其他进程将无法再连接到该共享内存。
注意:共享内存并不提供同步机制 ,因此在使用共享内存时需要额外考虑进程间的同步问题。
ftok 函数
ftok 函数用于生成一个唯一的键值(key)用于 System V IPC(进程间通信)机制中的消息队列、信号量集和共享内存的创建和访问。
函数原型如下:
key_t ftok(const char *pathname, int proj_id);
pathname:一个指向存在的文件的路径名的字符串。该文件的存在与内容无关,只用于生成唯一的键值。proj_id:一个用户指定的整数,通常为一个正整数。它用于 进一步确保生成的键值的唯一性 。在不同的项目或场景中,可以使用不同的proj_id。
ftok 函数根据给定的 pathname 和 proj_id 生成一个 32 位的键值。该键值是根据 pathname 文件的 st_dev(设备 ID)和 st_ino(inode 号)字段进行计算的。因此,只要 pathname 和 proj_id 保持不变,生成的键值将是唯一的。
shmget 函数
shmget 函数的主要功能是 创建一个新的共享内存段或获取一个已存在的共享内存段的标识符 。
函数原型如下:
#include <sys/ipc.h>
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
-
key:使用ftok函数生成。不同的进程通过相同的key值来访问同一块共享内存区域。 -
size:指定需要创建的共享内存段的大小(字节数),由于申请时需要对齐,因此一般设置为内存分页大小的整数倍 -
shmflg:共享内存的权限标志,常见的有:IPC_CREAT:如果指定的共享内存段不存在,则创建一个新的共享内存段。如果共享内存段已存在,则忽略此标志。IPC_EXCL:与IPC_CREAT结合使用时,如果指定的共享内存段已存在,则返回错误 。如果共享内存段不存在,则创建一个新的共享内存段。SHM_RDONLY:以只读模式打开共享内存段。进程只能读取共享内存的内容,不能修改。SHM_RND:将共享内存大小舍入到系统页面大小的整数倍。
函数返回值:
- 成功:返回共享内存段的标识符(共享内存ID)。
- 失败:返回 -1,并设置
errno变量以指示错误类型。
前面讲 ftok 函数时,提到:只要 pathname 和 proj_id 保持不变,生成的键值将是唯一的。,因此,如果不同的进程使用相同的 pathname 和 proj_id,就能保证 key 标识的唯一性,进而保证通信的进程看到的是同一份资源
例如:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>
void getmem(int flag)
{
// 生成一个唯一的键值(key),用于 System V IPC
key_t memKey = ftok("test.txt", 114514);
if(memKey < 0)
{
perror("ftok error!\n");
exit(-1);
}
// 创建一个新的独占的共享内存段,确保只有一个进程可以使用它
int memID = shmget(memKey, 4096, flag);
if(memID < 0)
{
perror("shmget error!\n");
exit(-1);
}
printf("[%d]: %d %d\n", getpid(), memKey, memID);
// sleep(10);
}
int main(void)
{
int pid = fork();
if(pid == 0)
{
getmem(IPC_CREAT);
}
else if(pid > 0)
{
getmem(IPC_CREAT | IPC_EXCL);
}
return 0;
}
运行结果:
可以看到,父子进程的共享内存的 ID 是一致的
还可以使用终端指令查看当前系统存在的共享内存:
可以看到,与之前的运行结果一致
shmat 函数
shmat 函数用于将共享内存区域 附加 到调用进程的地址空间,使得进程可以访问共享内存中的数据。它的函数原型如下:
void *shmat(int shmid, const void *shmaddr, int shmflg);
shmid:共享内存标识符,是由shmget函数返回的共享内存标识符。shmaddr:指定共享内存附加到进程地址空间的地址。通常将其设置为 NULL,由系统自动选择适当的地址。shmflg:附加标志,用于指定共享内存的附加方式。 通常设置为 0,采取默认的方式进行附加操作。 常用的标志有SHM_RDONLY(以只读方式附加共享内存)和SHM_RND(将shmaddr地址进行舍入)。
shmat 函数将共享内存区域映射到调用进程的地址空间,并 返回一个指向共享内存区域的指针,该指针可用于对共享内存进行读写操作。
shmdt 函数
shmdt 函数用于将共享内存从调用进程的地址空间中分离,使得进程无法再访问共享内存中的数据。它的函数原型如下:
int shmdt(const void *shmaddr);
shmaddr:指向共享内存区域的指针,该指针是由shmat函数返回的。
调用 shmdt 函数后,共享内存区域将从进程的地址空间中分离,但 不会删除共享内存区域本身 。其他仍然附加该共享内存的进程仍然可以访问共享内存中的数据。
注意: 即使所有进程都将共享内存分离,即没有进程再附加该共享内存,该共享内存仍会一直存在
shmctl 函数
shmctl 函数用于 控制 共享内存的操作,包括获取共享内存信息、修改共享内存的权限和删除共享内存等。它的函数原型如下:
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
shmid:共享内存标识符,由shmget函数返回的标识符。cmd:控制命令,用于指定要执行的操作。常用的命令有:IPC_STAT:获取共享内存的信息,并将结果保存在buf指向的结构体中。IPC_SET:设置共享内存的权限,使用buf指向的结构体中的信息。IPC_RMID:删除共享内存。
buf:指向shmid_ds结构体的指针,用于传递或接收共享内存的信息。
使用 shmctl 函数可以实现以下操作:
- 获取共享内存的信息:通过指定
cmd为IPC_STAT,可以获取共享内存的当前状态信息,如共享内存的大小、创建者的用户 ID 和组 ID、访问权限等。 - 修改共享内存的权限:通过指定
cmd为IPC_SET,可以修改共享内存的访问权限。需要提供一个shmid_ds结构体,其中包含要修改的权限信息。 - 删除共享内存:通过指定
cmd为IPC_RMID,可以删除指定的共享内存。删除后,所有附加该共享内存的进程都无法再访问共享内存中的数据。
shmctl 函数的返回值:
- 成功:返回 0。表示函数执行成功。
- 失败:返回 -1。表示函数执行失败。
注意: 删除共享内存并 不会立即释放 该共享内存的空间,而是 在所有附加该共享内存的进程都将其分离后才会释放。
实例
// header.h
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include "string.h"
#define FILE_PATH "test.txt"
#define PROJ_ID 114514
/**
* @brief 分配一块共享内存
*
* @param size 共享内存的字节数
* @param flag 共享内存的权限标志
* @return int 共享内存的 id(-1为分配错误)
*/
int allocMem(size_t size, int flag);
int destroyMem(int memID);
// function.c
#include "header.h"
int allocMem(size_t size, int flag)
{
int memkey = ftok(FILE_PATH, PROJ_ID);
if(memkey < 0)
{
perror("allocMem: ftok error!\n");
return -1;
}
int memID = shmget(memkey, size, flag);
if(memID < 0)
{
perror("allocMem: shmget error!\n");
return -1;
}
return memID;
}
int destroyMem(int memID)
{
if(shmctl(memID, IPC_RMID, NULL) < 0)
{
perror("destroyMem: shmctl error!\n");
return -1;
}
return 0;
}
// server.c
#include "header.h"
int main(void)
{
int memID = allocMem(4096, IPC_CREAT | IPC_EXCL | 0666);
if(memID == -1)
exit(-1);
char * memAddress = shmat(memID, NULL, 0); // 采用默认附加方式
printf("Starting server...\n");
sleep(3);
int refreshTime;
printf("Please enter refresh time(second): ");
fflush(stdout);
scanf("%d", &refreshTime);
// 等待 client 超时的次数,超过 10 次就退出服务器
int cnt = 0;
while (cnt < 10)
{
sleep(refreshTime);
fflush(stdout);
if(memAddress[0] == '\0')
{
++cnt;
printf("Waitting...\n");
}
else
{
printf("Recived data from client: %s", memAddress);
cnt = 0;
}
memAddress[0] = '\0';
}
printf("About quitting server...\n");
shmdt(memAddress);
sleep(2); // 等待 client 分离
destroyMem(memID);
return 0;
}
// client
#include "header.h"
int main(void)
{
int memID = allocMem(4096, IPC_CREAT);
if(memID == -1)
exit(-1);
char * memAddress = shmat(memID, NULL, 0); // 采用默认附加方式
printf("Starting client...\n");
sleep(3);
char buffer[4096];
int loops;
printf("Please enter the num of messages you want to send: ");
fflush(stdout);
scanf("%d", &loops);
while (loops--)
{
printf("Please enter the data you want to send to server: ");
fflush(stdout);
int bytes = read(0, buffer, sizeof(buffer));
buffer[bytes] = '\0';
printf("Sending...\n");
sleep(1);
strcpy(memAddress, buffer);
printf("Success!\n");
}
printf("About quitting client...\n");
shmdt(memAddress);
sleep(2);
return 0;
}
运行结果:
注意,在运行时,应该先打开服务器端,再打开用户端,如果先打开用户端,服务器端会发现已经有共享内存,进而报错(原因是 IPC_CREAT | IPC_EXCL | 0666)