C语言进程间通信:基础篇

06-09-2025

进程间通信(IPC)是指允许进程之间进行通信并同步其执行的机制和方法。默认情况下,进程拥有独立的内存空间且不共享数据。然而实际上,进程经常需要共享数据和资源。此外,它们还需要协调执行以实现预期行为,这被称为进程同步。

匿名管道是最简单的IPC机制之一。它允许相关进程之间的单向通信,通常是父子进程之间。数据只能单向流动(单向通信),只有具有共同祖先的进程才能使用这种机制。它的存在依赖于父进程,并且是基于内存的,因为它在内核内存中使用缓冲区。

具体来说,匿名管道通过pipe()系统调用创建,它返回两个文件描述符:pipefd[0]用于读取,pipefd[1]用于写入。创建管道后,父进程可以使用pipefd[1]向管道写入数据,子进程可以使用pipefd[0]从管道读取数据。需要注意的是,这种管道有缓冲区大小限制(通常为几KB)。

管道设置和进程派生

我们让每个进程P_i创建一个到P_{i+1}的管道。P_0调用create_children

void create_children(int n, int pipes[][2]) {
    for (int i = 0; i < n - 1; i++) {
        pipe(pipes[i]); // 创建管道
        pid_t pid = fork(); // 创建子进程
        if (pid == 0) { // 子进程
            close(pipes[i][1]); // 关闭管道的写入端
            if (i == n - 2) break; // 最后一个子进程退出循环
        } else { // 父进程
            close(pipes[i][0]); // 关闭管道的读取端
            break;
        }
    }
}

在上面的代码中,pipes[][2]是一个管道数组,其中每个管道是一个包含两个文件描述符的数组。第一个文件描述符用于读取,第二个用于写入。每次迭代中,都会创建一个新管道并派生一个子进程。然后子进程关闭写入端(因为它只需要读取),父进程关闭读取端(因为它只需要写入)。最后一个子进程会退出循环。

在命令行(shell)中, 操作符在命令之间创建匿名管道。当我们链接一系列管道时,我们将每个进程的输出作为下一个进程的输入。例如,ls -l grep “txt”会列出当前目录中的所有文件和目录,然后过滤输出只显示以”txt”结尾的文件和目录。单个管道只支持单向通信。两个管道可以实现双向通信。管道没有名称(因此称为匿名管道)。管道是UNIX操作系统最基本的方面之一。它是构建基于进程的并行和分布式应用程序的极其强大的概念。

消息传递 现在让我们看看如何编码/解码消息,以及send_message和receive_message函数。

首先我们如下定义Message结构体:

struct {
    int dest_id;     // 目标进程ID
    int src_id;      // 源进程ID
    char msg[50];    // 实际的消息内容(最多50个字符)
} packet;

其中进程根据dest_id转发消息。

消息发送

发送消息时,我们将执行以下操作:

  • 创建一个包含目标ID、源ID和消息的数据包
  • 安全地将消息复制到数据包中
  • 根据进程ID确定要使用的正确管道
  • 将整个数据包写入管道
void send_message(int process, char *msg, int msg_len, int src_id, int pipes[][2], int n) {
    // 初始化数据包,包含目标、源和消息
    struct { int dest_id; int src_id; char msg[50]; } packet = {process, src_id, {0}};

    // 将消息安全地复制到数据包中(使用strncpy)
    strncpy(packet.msg, msg, 50);
    if (src_id < process && src_id < n - 1)
        write(pipes[src_id][1], &packet, sizeof(packet));
    else if (src_id > process)
        write(pipes[src_id - 1][1], &packet, sizeof(packet));
}

#### 消息接收 接收消息时,我们将执行以下操作:

  • 尝试从该进程可能连接的所有管道读取
  • 对于每个管道:
    • 如果消息是给当前进程的(packet.dest_id == curr_id),则将其复制到输出缓冲区
    • 如果消息是给其他进程的,则将其转发到正确的方向
    • 使用进程ID确定消息转发的方向
void receive_message(char *buffer, int *nread, int buffer_max, int curr_id, int pipes[][2], int n) {
    struct { int dest_id; int src_id; char msg[50]; } packet;

    // 检查该进程可能读取的所有管道
    for (int i = 0; i < n - 1; i++) {
        // 检查该进程是否应该从pipe[i]读取
        if (curr_id == i + 1) {
            // 如果这条消息是给我的,将其复制到缓冲区
            *nread = read(pipes[i][0], &packet, sizeof(packet));
            if (*nread > 0) {
                if (packet.dest_id == curr_id) {
                    strncpy(buffer, packet.msg, buffer_max);
                    *nread = strlen(packet.msg);
                } 
                // 否则,如果需要,将其转发到下一个进程
                else if (packet.dest_id > curr_id && curr_id < n - 1)
                    write(pipes[curr_id][1], &packet, sizeof(packet));
            }
        }

        // 检查该进程是否应该从pipe[i-1]读取(用于向左传递的消息)
        if (curr_id == i && i < n - 1) {
            *nread = read(pipes[i][0], &packet, sizeof(packet));
            if (*nread > 0 && packet.dest_id < curr_id)
                write(pipes[i - 1][1], &packet, sizeof(packet));
        }
    }
}

消息路由

为了路由消息,我们将:

  • 让每条消息包含用于路由的dest_id和src_id
  • 使用进程根据目标ID转发消息
  • 双向通信:
    • 系统允许消息双向流动
    • 每个进程既可以是发送者也可以是接收者
  • 进程链:
    • 进程按线性链排列
    • 每个进程通过管道与其邻居连接
  • 流量控制:
    • 协议确保消息只向目标方向转发
    • 使用进程ID确定方向,防止无限循环

示例流程

  • 进程1想向进程3发送”Hello”
  • 进程1调用send_message(3, “Hello”, 5, 1, pipes, n)
  • 消息被写入进程1和进程2之间的管道
  • 进程2收到消息,发现dest_id = 3(不是给自己的)
  • 进程2将消息转发给进程3
  • 进程3收到消息,发现dest_id = 3(是给自己的),然后处理它

资源清理

最后,我们必须free_resources,因为关闭管道对于防止资源泄漏和确保正确清理资源很重要。

void free_resources(int curr_id, int pipes[][2], int n) {
    // 遍历系统中的所有可能管道
    // n个进程有n-1个管道(因为它们按链式排列)
    for (int i = 0; i < n - 1; i++) {
        // 如果这个进程是pipe[i]的左端(进程i)
        if (curr_id == i) 
            // 关闭pipe[i]的写入端(索引1是写入端)
            close(pipes[i][1]);
        
        // 如果这个进程是pipe[i]的右端(进程i+1)
        if (curr_id == i + 1) 
            // 关闭pipe[i]的读取端(索引0是读取端)
            close(pipes[i][0]);
    }
}

其中:

  • curr_id:当前进程的ID(0到n-1)
  • pipes[][2]:二维数组,每行代表一个管道,包含[read_fd, write_fd]
  • n:系统中的进程总数
  • 为了防止循环,我们需要确保进程只在dest_id != curr_id时转发消息。为了确保收敛,我们使用线性链确保消息通过定向转发准确地到达dest_id一次。