管道

管道是一种特殊的进程间通信机制,它可以用于将一个进程的输出直接传递给另一个进程的输入,从而实现它们之间的数据传输。

匿名管道

pipe 系统调用

pipe是一个系统调用,用于创建一个管道(pipe)。它的原型如下:

int pipe(int pipefd[2]);

pipe系统调用用于创建一个匿名管道(pipe),它是一种特殊的通信机制,用于在两个相关的进程之间进行单向的数据传输。管道具有固定的读端和写端,数据从写端流入管道,从读端流出。

以下是一些关于pipe系统调用的要点:

  1. pipefd 是一个长度为2的整型数组,用于存储管道的读端和写端的文件描述符。pipefd[0]表示管道的读端,pipefd[1]表示管道的写端。

  2. 调用 pipe 系统调用将创建一个新的管道,并将其读端和写端的文件描述符分别存储在 pipefd[0]pipefd[1]中。

  3. 管道是一种 半双工 的通信机制,数据只能在一个方向上流动。一端用于写入数据,另一端用于读取数据。

  4. 管道中的数据是以字节流的形式传输的,没有固定的消息边界。数据写入管道后,可以从另一端读取出来,按照写入的顺序进行读取。

  5. 管道的大小是有限的,通常是几千字节。当管道已满时,继续写入数据会导致写操作阻塞,直到有足够的空间可用为止。类似地,如果 管道为空时进行读取操作,读操作也会阻塞

  6. 管道是由内核维护的,可以在父子进程之间传递数据。父进程创建管道后,可以通过fork系统调用创建子进程,子进程继承了父进程的管道文件描述符,从而实现进程间的通信。

下面是使用 pipe 创建一个匿名管道的例子:

int main(void)
{
    int fds[2];
    if(pipe(fds) != 0)
    {
        perror("pipe error!\n");
        return 1;
    }
    
    char buff[1024];
    while (fgets(buff, sizeof(buff), stdin) != NULL) // 从标准输入读数据
    {
        int len = strlen(buff);

        // 写数据到管道
        if(write(fds[1], buff, len) != len)
        {
            perror("Buffer overflow\n");
            return 1;
        }

        memset(buff, 0x00, sizeof(buff));
        // 从管道读数据
        if(read(fds[0], buff, len) != len)
        {
            perror("read error\n");
            return 1;
        }
    
        // 打印到标准输出
        if(fputs(buff, stdout) != len)
        {
            perror("fputs error\n");
            return 1;
        }
    }
}

以文件描述符(fd)的角度理解管道

使用匿名管道进行进程间通信

匿名管道通常用于父子进程间的通信

int main(void)
{
    char buffer[1024];
    int fds[2];
    if (pipe(fds) != 0)
    {
        perror("pipe error!\n");
        return -1;
    }

    pid_t pid = fork();
    if (pid < 0)
    {
        perror("fork error!\n");
        return -1;
    }
    else if (pid == 0) // 子进程写数据
    {
        for(int i = 0; i < 3; ++i) // 从标准输入读三个数据,并发送到父进程
        {
            printf("Now child process[%d] is running...\n", getpid());
            printf("child[%d]: enter the data you want to send to dad process\n", getpid());
            fgets(buffer, sizeof(buffer), stdin);

            close(fds[0]);                         // 关闭读端
            write(fds[1], buffer, strlen(buffer)); // 向管道写数据
            sleep(3);
        }

        printf("Now child process[%d] is quitting...\n", getpid());
        exit(0);
    }
    else
    {
        printf("Now dad process[%d] is running...\n", getpid());
        while (1)
        {
            printf("Waitting child[%d] process...\n", pid);
            memset(buffer, 0x00, sizeof(buffer));
            close(fds[1]); // 关闭写端
            if(read(fds[0], buffer, sizeof(buffer)) == 0)
                break;

            printf("Dad process[%d] recived message from child process: ", getpid());
            printf("%s", buffer);
        }
        wait(NULL);     
        printf("Now dad process[%d] is quitting...\n", getpid());
    }
    return 0;
}

如果没有 close(fds[1]); 这句话,会发生什么?

在我们的代码中,是子进程先退出,父进程后退出

当子进程写入完数据后,父进程在读取数据时可能会一直阻塞。因为管道的写端仍然保持打开,父进程会一直等待子进程写入更多数据,但实际上子进程已经退出并关闭了写端。所以,父进程无法检测到管道的结束,会一直阻塞在读取操作上。

这体现在子进程退出后,父进程不会退出,而是阻塞等待子进程继续写入数据

同样的,如果修改代码,父进程先退出,子进程后退出,并且没有 close(fds[0]); 这句话,子进程也会阻塞等待父进程读取数据

管道是一种临界资源

当子进程在写数据时,如果有进程也写数据的话,就有可能造成数据不一致,管道通过 互斥 机制保证了资源的互斥访问

当子进程还没有写数据,或者 sleep 时,父进程就像 sleep 了一样,因为管道内部实现了 同步 机制,保证只有管道有数据时,才能读取数据,否则必须阻塞等待

操作系统通过互斥锁保护管道的读写操作,以及使用内核缓冲区来隔离读写操作,从而实现了在管道读取或写入时不被其他进程干扰的机制

SIGPIPE 信号

当一个进程关闭了管道的读端之后,如果还有进程尝试向管道写入数据,就会产生 SIGPIPE 信号

SIGPIPE 信号的 默认行为是终止进程

如果进程希望忽略 SIGPIPE 信号,可以使用 signal 函数(下面有例子)

void sigHandler(int signum)  // 信号处理函数
{
    if (signum == SIGPIPE) {
        printf("Received SIGPIPE signal\n");
    }
}

int main(void)
{
    signal(SIGPIPE, sigHandler);
    
    int pip[2];
    pipe(pip);
    close(pip[0]); // 关闭读端
    write(pip[1], "111", 3); // 尝试写数据
}

此外,虽然向一个关闭写端的管道写入数据,会发生写入失败,按理来说会产生 SIGPIPE 信号,但实际上并不会

命名管道

匿名管道最大的限制就是:只能用于具有亲缘关系的进程通信

如果要实现两个完全不相关的进程之间的通信,就必须用到命名管道了

命名管道实际上是一个 FIFO 类型的文件

创建命名管道

可以在终端中使用 mkfifo 指令来创建一个命名管道:

Sky_Lee@SkyLeeMBP test % mkfifo fifo
Sky_Lee@SkyLeeMBP test % ls  
cfile           client          fifo            server          test            test.txt
cfile.c         client.c        makefile        server.c        test.cpp

查看 fifo 的详细信息:

也可以在程序中创建一个命名管道:

#include <sys/types.h>
#include <sys/stat.h>

int mkfifo(const char *pathname, mode_t mode);

其中:

  • pathname 是创建的命名管道的路径和名称。
  • mode 是创建的命名管道的权限。

使用命名管道进行进程间通信

父子进程
// 将 temp.txt 的数据写入管道
void writeFifo(void)
{
    int pipeFd = open("fifo", O_WRONLY);
    int fileFd = open("temp.txt", O_RDONLY);
    if (pipeFd == -1 || fileFd == -1)
    {
        perror("open error\n");
        exit(-1);
    }

    char buff[1024];
    int bytes;
    while ((bytes = read(fileFd, buff, sizeof(buff))) > 0)
    {
        write(pipeFd, buff, bytes);
    }
    close(fileFd);
    close(pipeFd);
    printf("[%d]Write fifo complete\n", getpid());
}

// 读取管道数据,并打印到标准输出
void readFifo(void)
{
    int fifoFd = open("fifo", O_RDONLY);
    if(fifoFd == -1)
    {
        perror("open fifo error!\n");
        exit(-1);
    }
    char buff[1024];
    int bytes;
    while ((bytes = read(fifoFd, buff, sizeof(buff))) != 0)
    {
        printf("%s\n", buff);
    }
    close(fifoFd);
    printf("[%d]Read fifo complete\n", getpid());
}

int main(void)
{
    pid_t pid = fork();
    if(pid < 0)
    {
        perror("fork error\n");
        return -1;
    }
    else if(pid == 0)
    {
        printf("[%d]About write to fifo...\n", getpid());
        writeFifo();
        exit(0);
    }
    else
    {
        printf("[%d]About read from fifo...\n", getpid());
        readFifo();
        wait(NULL); // 避免僵尸进程
    }
}
server And client(不相关进程)

server.c:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#define O_FIFO_ERROR 114
#define RD_FIFO_SUCCESS 115
#define RD_FIFO_ERROR 116

int readFromFifo(void);

int main(void)
{
    int status = readFromFifo();
    exit(status);
}

int readFromFifo(void)
{
    int fifoFd = open("fifo", O_RDONLY);
    if(fifoFd == -1)
    {
        perror("open fifoFd error!\n");
        return O_FIFO_ERROR;
    }

    char buff[1024];
    int bytes;
    while (1)
    {
        printf("[%d]Waiting for the client to make a request...\n", getpid());
        bytes = read(fifoFd, buff, sizeof(buff));
        if(bytes > 0)
            printf("[%d]Server recived data from client: %s", getpid(), buff);
        else if(bytes == 0)
        {
            printf("[%d]The client has exited and the server is about shutting down...\n", getpid());
            sleep(1);
            break;
        }
        else
        {
            perror("Read fifo error!\n");
            return RD_FIFO_ERROR;
        }
    }
    close(fifoFd);
    return RD_FIFO_SUCCESS;
}

client.c:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#define O_FIFO_ERROR 114
#define WR_FIFO_SUCCESS 115
#define WR_FIFO_ERROR 116

int writeToFifo(void);

int main(void)
{
    printf("[%d]Waiting for the server...\n", getpid());   
    int status = writeToFifo();
    exit(status);
}

int writeToFifo(void)
{
    int fifoFd = open("fifo", O_WRONLY);
    if(fifoFd == -1)
    {
        perror("open fifoFd error!\n");
        return O_FIFO_ERROR;
    }

    char buff[1024];
    int bytes;
    while (1)
    {
        printf("[%d]Please enter the data you want to send to server: ", getpid());
        fflush(stdout);
        memset(buff, 0x00, sizeof(buff));
        int readBytes = read(1, buff, sizeof(buff));

        printf("[%d]Sending...\n", getpid());
        sleep(1);
        bytes = write(fifoFd, buff, sizeof(buff));
        if(bytes == -1)
        {
            perror("Write fifo error!\n");
            return WR_FIFO_ERROR;
        }

        printf("[%d]Success!\n", getpid());
    }
    close(fifoFd);
    return WR_FIFO_SUCCESS;
}

运行结果:

小细节:

在这段代码中,fflush(stdout); 语句的作用是刷新标准输出缓冲区,确保前面的输出立即显示在终端上。如果没有这句话,可能会导致输出被缓冲起来,直到遇到换行符或者缓冲区满才会显示出来。

如果没有使用 fflush(stdout);,在执行到输入提示信息的时候,输出可能不会立即显示在终端上,而是留存在缓冲区中。这意味着用户可能无法立即看到输入提示信息,而需要等到缓冲区满或者遇到换行符时,才会将提示信息显示出来。

这可能会导致用户的输入体验不太好,因为他们无法及时看到输入提示。而加上 fflush(stdout); 这句话可以解决这个问题,确保输出能够及时显示在终端上。

命名管道的本质

命名管道文件(FIFO)在文件系统中以文件的形式存在,但它并不存储数据在磁盘中。相反,它用作进程间通信的通道,数据从一个进程通过管道写入,并从另一个进程通过管道读取。

当创建一个命名管道文件时,它会在文件系统中占据一定的存储空间,通常以零长度的文件形式存在。这个文件在磁盘上占据一些磁盘空间,但它的作用是提供进程间通信的机制,而不是存储数据本身。

当一个进程向管道写入数据时,数据会被暂存于内核内存中的管道缓冲区中。接收数据的进程从缓冲区中读取数据。

在这个过程中,管道缓冲区起到了临时存储数据的作用,但它不是普通的文件存储。它是一种在内存中维护的缓冲区,用于暂存数据以供进程间通信使用。

总结起来,命名管道文件在文件系统中以文件形式存在,但并不实际存储数据。数据是通过内核内存中的管道缓冲区进行传递。

System V 进程间通信

共享内存

相较于管道通信,共享内存是最快的进程间通信方式,一旦内存映射到共享它的进程的内存空间,这些进程间的数据传递就 不再涉及 OS 内核,也就不需要通过内核来交换数据,提升数据交换的效率

创建共享内存的步骤

  1. 创建共享内存对象:首先,需要调用 shmget() 函数创建一个共享内存对象。这个函数会分配一块共享内存区域,并返回一个唯一的标识符(共享内存ID)用于后续的操作。

  2. 连接共享内存:接下来,使用 shmat() 函数将进程与共享内存区域进行连接。这个函数将共享内存区域映射到进程的虚拟地址空间,使得进程可以直接访问该内存区域。

  3. 使用共享内存:一旦连接成功,进程就可以像访问普通内存一样使用共享内存区域。进程可以读取和写入共享内存区域中的数据,实现进程间的数据共享。

  4. 分离共享内存:当进程不再需要访问共享内存时,需要使用 shmdt() 函数将共享内存从进程的虚拟地址空间中分离。这个操作不会删除共享内存区域,只是断开了进程与共享内存的连接。

  5. 删除共享内存对象:最后,当不再需要共享内存区域时,可以调用 shmctl() 函数以及指定的命令来删除共享内存对象。这个操作会释放共享内存区域,并且其他进程将无法再连接到该共享内存。

注意:共享内存并不提供同步机制 ,因此在使用共享内存时需要额外考虑进程间的同步问题。

ftok 函数

ftok 函数用于生成一个唯一的键值(key)用于 System V IPC(进程间通信)机制中的消息队列、信号量集和共享内存的创建和访问。

函数原型如下:

key_t ftok(const char *pathname, int proj_id);
  • pathname:一个指向存在的文件的路径名的字符串。该文件的存在与内容无关,只用于生成唯一的键值。
  • proj_id:一个用户指定的整数,通常为一个正整数。它用于 进一步确保生成的键值的唯一性 。在不同的项目或场景中,可以使用不同的 proj_id

ftok 函数根据给定的 pathnameproj_id 生成一个 32 位的键值。该键值是根据 pathname 文件的 st_dev(设备 ID)和 st_ino(inode 号)字段进行计算的。因此,只要 pathnameproj_id 保持不变,生成的键值将是唯一的。

shmget 函数

shmget 函数的主要功能是 创建一个新的共享内存段或获取一个已存在的共享内存段的标识符

函数原型如下:

#include <sys/ipc.h>
#include <sys/shm.h>

int shmget(key_t key, size_t size, int shmflg);
  • key:使用 ftok 函数生成。不同的进程通过相同的 key 值来访问同一块共享内存区域。

  • size:指定需要创建的共享内存段的大小(字节数),由于申请时需要对齐,因此一般设置为内存分页大小的整数倍

  • shmflg:共享内存的权限标志,常见的有:

    • IPC_CREAT:如果指定的共享内存段不存在,则创建一个新的共享内存段。如果共享内存段已存在,则忽略此标志。
    • IPC_EXCL:与 IPC_CREAT 结合使用时,如果指定的共享内存段已存在,则返回错误 。如果共享内存段不存在,则创建一个新的共享内存段。
    • SHM_RDONLY:以只读模式打开共享内存段。进程只能读取共享内存的内容,不能修改。
    • SHM_RND:将共享内存大小舍入到系统页面大小的整数倍。

函数返回值:

  • 成功:返回共享内存段的标识符(共享内存ID)。
  • 失败:返回 -1,并设置 errno 变量以指示错误类型。

前面讲 ftok 函数时,提到:只要 pathnameproj_id 保持不变,生成的键值将是唯一的。,因此,如果不同的进程使用相同的 pathnameproj_id,就能保证 key 标识的唯一性,进而保证通信的进程看到的是同一份资源

例如:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>

void getmem(int flag)
{
    // 生成一个唯一的键值(key),用于 System V IPC
    key_t memKey = ftok("test.txt", 114514);
    if(memKey < 0)
    {
        perror("ftok error!\n");
        exit(-1);
    }


    // 创建一个新的独占的共享内存段,确保只有一个进程可以使用它
    int memID = shmget(memKey, 4096, flag); 
    if(memID < 0)
    {
        perror("shmget error!\n");
        exit(-1);
    }

    printf("[%d]: %d %d\n", getpid(), memKey, memID);

    // sleep(10);
}

int main(void)
{
    int pid = fork();

    if(pid == 0)
    {
        getmem(IPC_CREAT);
    }
    else if(pid > 0)
    {
        getmem(IPC_CREAT | IPC_EXCL);
    }

    return 0;
}

运行结果:

可以看到,父子进程的共享内存的 ID 是一致的

还可以使用终端指令查看当前系统存在的共享内存:

可以看到,与之前的运行结果一致

shmat 函数

shmat 函数用于将共享内存区域 附加 到调用进程的地址空间,使得进程可以访问共享内存中的数据。它的函数原型如下:

void *shmat(int shmid, const void *shmaddr, int shmflg);
  • shmid:共享内存标识符,是由 shmget 函数返回的共享内存标识符。
  • shmaddr:指定共享内存附加到进程地址空间的地址。通常将其设置为 NULL,由系统自动选择适当的地址。
  • shmflg:附加标志,用于指定共享内存的附加方式。 通常设置为 0,采取默认的方式进行附加操作。 常用的标志有 SHM_RDONLY(以只读方式附加共享内存)和 SHM_RND(将 shmaddr 地址进行舍入)。

shmat 函数将共享内存区域映射到调用进程的地址空间,并 返回一个指向共享内存区域的指针,该指针可用于对共享内存进行读写操作。

shmdt 函数

shmdt 函数用于将共享内存从调用进程的地址空间中分离,使得进程无法再访问共享内存中的数据。它的函数原型如下:

int shmdt(const void *shmaddr);
  • shmaddr:指向共享内存区域的指针,该指针是由 shmat 函数返回的。

调用 shmdt 函数后,共享内存区域将从进程的地址空间中分离,但 不会删除共享内存区域本身 。其他仍然附加该共享内存的进程仍然可以访问共享内存中的数据。

注意: 即使所有进程都将共享内存分离,即没有进程再附加该共享内存,该共享内存仍会一直存在

shmctl 函数

shmctl 函数用于 控制 共享内存的操作,包括获取共享内存信息、修改共享内存的权限和删除共享内存等。它的函数原型如下:

int shmctl(int shmid, int cmd, struct shmid_ds *buf);
  • shmid:共享内存标识符,由 shmget 函数返回的标识符。
  • cmd:控制命令,用于指定要执行的操作。常用的命令有:
    • IPC_STAT:获取共享内存的信息,并将结果保存在 buf 指向的结构体中。
    • IPC_SET:设置共享内存的权限,使用 buf 指向的结构体中的信息。
    • IPC_RMID:删除共享内存。
  • buf:指向 shmid_ds 结构体的指针,用于传递或接收共享内存的信息。

使用 shmctl 函数可以实现以下操作:

  • 获取共享内存的信息:通过指定 cmdIPC_STAT,可以获取共享内存的当前状态信息,如共享内存的大小、创建者的用户 ID 和组 ID、访问权限等。
  • 修改共享内存的权限:通过指定 cmdIPC_SET,可以修改共享内存的访问权限。需要提供一个 shmid_ds 结构体,其中包含要修改的权限信息。
  • 删除共享内存:通过指定 cmdIPC_RMID,可以删除指定的共享内存。删除后,所有附加该共享内存的进程都无法再访问共享内存中的数据。

shmctl 函数的返回值:

  • 成功:返回 0。表示函数执行成功。
  • 失败:返回 -1。表示函数执行失败。

注意: 删除共享内存并 不会立即释放 该共享内存的空间,而是 在所有附加该共享内存的进程都将其分离后才会释放。

实例

// header.h
#pragma once

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include "string.h"

#define FILE_PATH "test.txt"
#define PROJ_ID 114514

/**
 * @brief 分配一块共享内存
 * 
 * @param size 共享内存的字节数
 * @param flag 共享内存的权限标志
 * @return int 共享内存的 id(-1为分配错误)
 */
int allocMem(size_t size, int flag);

int destroyMem(int memID);

// function.c
#include "header.h"

int allocMem(size_t size, int flag)
{
    int memkey = ftok(FILE_PATH, PROJ_ID);
    if(memkey < 0)
    {
        perror("allocMem: ftok error!\n");
        return -1;
    }

    int memID = shmget(memkey, size, flag);
    if(memID < 0)
    {
        perror("allocMem: shmget error!\n");
        return -1;
    }

    return memID;
}

int destroyMem(int memID)
{
    if(shmctl(memID, IPC_RMID, NULL) < 0)
    {
        perror("destroyMem: shmctl error!\n");
        return -1;
    }
    return 0;
}

// server.c
#include "header.h"

int main(void)
{
    int memID = allocMem(4096, IPC_CREAT | IPC_EXCL | 0666);
    if(memID == -1)
        exit(-1);
    char * memAddress = shmat(memID, NULL, 0); // 采用默认附加方式
    printf("Starting server...\n");
    sleep(3);
    int refreshTime;
    printf("Please enter refresh time(second): ");
    fflush(stdout);
    scanf("%d", &refreshTime);
    // 等待 client 超时的次数,超过 10 次就退出服务器
    int cnt = 0; 
    while (cnt < 10)
    {
        sleep(refreshTime);
        fflush(stdout);
        if(memAddress[0] == '\0')
        {
            ++cnt;
            printf("Waitting...\n");
        }
        else 
        {
            printf("Recived data from client: %s", memAddress);
            cnt = 0;
        }
        memAddress[0] = '\0';
    }
    
    printf("About quitting server...\n");
    shmdt(memAddress);
    sleep(2); // 等待 client 分离
    destroyMem(memID);
    return 0;
}

// client
#include "header.h"

int main(void)
{
    int memID = allocMem(4096, IPC_CREAT);
    if(memID == -1)
        exit(-1);
    char * memAddress = shmat(memID, NULL, 0); // 采用默认附加方式
    printf("Starting client...\n");
    sleep(3);
    char buffer[4096];
    int loops;
    printf("Please enter the num of messages you want to send: ");
    fflush(stdout);
    scanf("%d", &loops);

    while (loops--)
    {
        printf("Please enter the data you want to send to server: ");
        fflush(stdout);

        int bytes = read(0, buffer, sizeof(buffer));
        buffer[bytes] = '\0';
        printf("Sending...\n");
        sleep(1);
        strcpy(memAddress, buffer);
        printf("Success!\n");
    }
    
    printf("About quitting client...\n");
    shmdt(memAddress);
    sleep(2);
    return 0;
}

运行结果:

注意,在运行时,应该先打开服务器端,再打开用户端,如果先打开用户端,服务器端会发现已经有共享内存,进而报错(原因是 IPC_CREAT | IPC_EXCL | 0666