プロセス間通信 (inter process communication)

2プロセス間で同期してデータの送受信。

C言語

メインプロセス〜サブプロセス(メインプロセスから起動)間では、標準入出力による通信も可能。

UNIX ソケットによるプロセス間通信 (C言語)

Server プロセスが listen しているソケットへ、Client プロセスが connect する例。

まずは Server プロセス。ポイントは socket 〜 bind 〜 listen でサーバーソケットを用意し、accept で得たファイルディスクリプタで read / write するだけ。

以下のコード例はちょっとだけ長くなっているが、ポイントは上記の通り簡単。自前のプロトコルやバッファリング、マルチセッション対応のためのスレッド化で色々やっているだけ。

シングルセッションで良いならばスレッドは不要だが、その他は必要になる。

以下のコード例でのプロトコルは、次の通り。

このサンプルコードはファイル /tmp/mySocket を破壊するので注意。

// Server

#include <pthread.h>     // pthread_create
#include <stdio.h>       // printf
#include <stdlib.h>      // atoi
#include <sys/socket.h>  // socket, connect
#include <sys/un.h>      // struct sockaddr_un
#include <unistd.h>      // read, write

#define SOCK_PATH "/tmp/mySocket"
#define BUF_SIZE 99

// fd へ data を len バイト出力。
// 出力したバイト数を返す。
int transmit(int fd, char *data, int len)
{
	char sizeBuf[3];

	if (len > 99) {
		return -1;
	}

	// 最初の2バイトは長さ
	sprintf(sizeBuf, "%2d", len);
	if (write(fd, sizeBuf, 2) != 2) {
		return -1;
	}

	// 本体
	if (write(fd, data, len) != len) {
		return -1;
	}

	return len;
}

// fd から読んで buf へ。
// 読んだバイト数を返す。
int receive(int fd, char *buf)
{
	int len, rlen, left;
	char *bufPtr;

	// 最初の2バイトは長さ
	len = 2;
	bufPtr = buf;
	while (len) {
		rlen = read(fd, bufPtr, len);
		if (rlen <= 0) {
			break;
		}
		bufPtr += rlen;
		len -= rlen;
	}
	if (rlen <= 0) {
		return -1;
	}
	*bufPtr = '\0';
	len = atoi(buf);
	if (len > BUF_SIZE) {
		return -1;
	}

	// 本体
	bufPtr = buf;
	left = len;
	while (left) {
		rlen = read(fd, bufPtr, left);
		if (rlen <= 0) {
			break;
		}
		bufPtr += rlen;
		left -= rlen;
	}
	if (rlen <= 0) {
		return -1;
	}

	return len;
}

// 1セッション用スレッド
void *subThread(void *param)
{
	int fd = (int)param;
	int len;
	char buf[BUF_SIZE];


	for (;;) {
		// Client からのデータを読んで、
		if ((len = receive(fd, buf)) < 0) {
			return NULL;
		}

		// そのまま Client へ返す
		transmit(fd, buf, len);

		// "end" だったら終了
		if (len == 3 && strncmp(buf, "end", 3) == 0) {
			break;
		}
	}

	return NULL;
}

int main()
{
	struct sockaddr_un addr;
	int sfd, fd;
	socklen_t addrLen;
	pthread_t thread;

	// 前回の実行で作られているソケット用ファイルを削除
	unlink(SOCK_PATH);

	sfd = socket(AF_UNIX, SOCK_STREAM, 0);
	if (sfd == -1) {
		return 1;
	}

	addr.sun_family = AF_UNIX;
	strncpy(addr.sun_path, SOCK_PATH, sizeof (addr.sun_path) - 1);

	if (bind(sfd, (struct sockaddr *)&addr, sizeof (struct sockaddr_un))
		== -1) {
		close(sfd);
		return 1;
	}

	if (listen(sfd, 5) == -1) {
		close(sfd);
		return 1;
	}

	for (;;) {
		// Client からの接続を待つ
		fd = accept(sfd, (struct sockaddr *)&addr, &addrLen);
		if (fd == -1) {
			break;
		}

		if (pthread_create(&thread, NULL, subThread, (void *)fd) != 0) {
			close(fd);
			break;
		}
	}

	close(sfd);

	return 0;
}

次に Client プロセス。ポイントは socket でクライアントソケットを用意し、connect で Server へ接続して read / write するだけ。

以下のコード例では Server へ "foo", "bar", "end" と3回データを送り、その都度 Server からの返事を print して、終了している。上の Server は Client からのデータをそのまま返すので、"foo", "bar", "end" がそのまま print されることになる。

上の Server はマルチセッションに対応しているので、この Client を非同期にめったやたらに起動すると、それなりに非同期に動作し、各セッションは正しくやりとりされる。

ちなみに Server がマルチセッション非対応であっても各セッションは正しくやりとりされる。ただ、先着順に1セッションずつ処理されるようになり、多くの場合に各セッションには空き時間があって、後の Client は不当に待たされることになる。

// Client

#include <stdio.h>       // printf
#include <stdlib.h>      // atoi
#include <sys/socket.h>  // socket, connect
#include <sys/un.h>      // struct sockaddr_un
#include <unistd.h>      // read, write

#define SOCK_PATH "/tmp/mySocket"
#define BUF_SIZE 99

char buf[BUF_SIZE];

// fd へ data を len バイト出力。
// 出力したバイト数を返す。
int transmit(int fd, char *data, int len)
{
	// 上記 Server の同名関数と同じ内容。
}

// fd から読んで buf へ。
// 読んだバイト数を返す。
int receive(int fd, char *buf)
{
	// 上記 Server の同名関数と同じ内容。
}

// fd へ str を出力し、返事を待って返事を print する。
void transmitAndReceiveAndPrint(int fd, char *str)
{
	int len = strlen(str);

	if (transmit(fd, str, len) >= 0 &&
		(len = receive(fd, buf)) >= 0) {
		printf("%.*s\n", len, buf);
	}
}

int main()
{
	struct sockaddr_un addr;
	int sfd;


	sfd = socket(AF_UNIX, SOCK_STREAM, 0);
	if (sfd == -1) {
		return 1;
	}

	addr.sun_family = AF_UNIX;
	strncpy(addr.sun_path, SOCK_PATH, sizeof (addr.sun_path) - 1);

	// Server へ接続

	if (connect(sfd, (struct sockaddr *)&addr,
				sizeof (struct sockaddr_un)) == -1) {
		close(sfd);
		return 1;
	}

	// Server と通信

	transmitAndReceiveAndPrint(sfd, "foo");
	transmitAndReceiveAndPrint(sfd, "bar");
	transmitAndReceiveAndPrint(sfd, "end");

	close(sfd);

	return 0;
}

メニューに戻る

セマフォと共有メモリによるプロセス間通信 (C言語)

Client から Server へデータを送る例。

セマフォの取り扱いにはクセがあるが、それさえ越えれば、例えばバイナリセマフォとして利用するのは簡単。

共有メモリも初期化にクセがあるが、読み書きは malloc した領域と同じようにポインタでアクセスするだけ。

以下のサンプルコードは、Client から Server へデータを送り、Server はそれを print するもの。データは共有メモリで送り、セマフォで同期を取る。共有メモリ上のデータフォーマットは、先頭バイトがデータ長で、その後にデータを書くようにした。

まずはサーバーのコード。

このサンプルコードはファイル /tmp/mySem と /tmp/myMap を破壊するので注意。

// Server

#include <stdio.h>   // printf
#include <string.h>  // memcmp

#include "mylib.h"
// ↑
// 以下で使っているセマフォや共有メモリを扱う関数の宣言。
// この内容と、関数定義は後に示す。

int main()
{
	// セマフォを取得
	// Client の動作を制約するもの(0 番)と、
	// Server の動作を制約するもの(1 番)の2つを作る。
	// semId で特定されるセマフォ集合に2つが含まれる。
	int semId = newSem();
	if (semId < 0) {
		return 1;
	}

	// 2つのセマフォ、両方とも0個の状態にする。
	if (initSem(semId) < 0) {
		delSem(semId);
		return 1;
	}

	// 共有メモリを取得
	char *shAdr = newMMap();
	if (shAdr == NULL) {
		delSem(semId);
		return 1;
	}
	int len;

	*shAdr = 0;

	// "end" が送られてくるまでループ
	while (*shAdr != 3 ||
		   memcmp(shAdr + 1, "end", 3)) {
		// Client が一回動作できるように、Client 制約用 セマフォを +1
		releaseSem(semId, 0);

		// Server 制約用 セマフォを取得。
		// このセマフォは、Client が共有メモリ上にデータを用意した後に
		// +1 する。(後述の Client のコードの 22 行)
		getSem(semId, 1);

		// 共有メモリからデータを取得して表示。
		len = (int)*shAdr;
		printf("%.*s\n", len, shAdr + 1);
	}

	// 共有メモリとセマフォを削除。
	delSem(semId);
	delMMap();

	return 0;
}

続いて Client のサンプルコード。

// Client

#include <string.h>  // strlen, memcpy

#include "mylib.h"

// Server へ str を送る
void transmit(int semId, char *shAdr, char *str)
{
	char len = (char)strlen(str);

	// Client 制約用 セマフォを取得。
	// このセマフォは、Server が受信準備を完了した後に +1 する。
	// (前記 Server のコードの 42 行)
	getSem(semId, 0);

	// 共有メモリへデータ設定。
	memcpy(shAdr, &len, 1);
	memcpy(shAdr + 1, str, len);

	// Server が一回動作できるように、ServerClient 制約用 セマフォを +1
	releaseSem(semId, 1);
}

int main()
{
	// セマフォを取得
	// Client の動作を制約するもの(0 番)と、
	// Server の動作を制約するもの(1 番)の2つを取得する。
	// semId で特定されるセマフォ集合に2つが含まれる。
	int semId = newSem();
	if (semId < 0) {
		return 1;
	}

	// 共有メモリを取得
	char *shAdr = newMMap();
	if (shAdr == NULL) {
		return 1;
	}

	// Server へデータ送信

	transmit(semId, shAdr, "foo");
	transmit(semId, shAdr, "bar");
	transmit(semId, shAdr, "end");

	return 0;
}

Server と Client の main 関数での、セマフォと共有メモリの初期化は似ている。処理の意味としては、Server で新規作成して、Server と Client でこれを取得する、というような処理を行うのだが、ある意味無理矢理共有していて、共有しきれない部分で若干異るコードになっている。

最後に、Server, Client で使っている関数のコード。mylib.h はこんな感じ。

// Library

int newSem(void);
int initSem(int semId);
void delSem(int semId);
void getSem(int semId, int no);
void releaseSem(int semId, int no);

char *newMMap(void);
void delMMap(void);

これらの関数定義は、以下。クセのある部分をこの中に隠してある。

// Library

#include <fcntl.h>     // O_RDWR
#include <sys/ipc.h>   // IPC_*
#include <sys/mman.h>  // mmap
#include <sys/sem.h>   // sem*
#include <sys/stat.h>  // S_*
#include <unistd.h>    // close



// ユニークな key のためのユニークなファイル
#define SEM_FILE "/tmp/mySem"

// セマフォを2個取得
int newSem(void)
{
	// セマフォ用ファイルがなければ作る (IPC_CREAT)。
	// 今回の例では、最初に Server がここをコールした時に作成されて、
	// その後に Client がここをコールした時は作成したものを開く。
	// アクセス権 (0600 のとこ) は、きちんと設定しないと
	// 37 行でエラーになる。
	int fd = open(SEM_FILE, O_CREAT, 0600);
	if (fd == -1) {
		return -1;
	}
	// ファイルディスクリプタは使わないので、閉じておく。
	close(fd);

	// ユニークな key
	key_t key = ftok(SEM_FILE, 1);
	if (key == -1) {
		return -1;
	}

	// セマフォを2個取得。なければ作る (IPC_CREAT)
	int semId = semget(key, 2, IPC_CREAT | 0600);
	if (semId == -1) {
		return -1;
	}

	return semId;
}

// 2個のセマフォの初期数を、両方とも0にする。
int initSem(int semId)
{
	union {
		int val;
		struct semid_ds *buf;
		unsigned short *array;
		struct seminfo *__buf;
	} v;
	v.val = 0;
	if (semctl(semId, 0, SETVAL, v)) {
		return -1;
	}
	if (semctl(semId, 1, SETVAL, v)) {
		return -1;
	}

	return 0;
}

// セマフォ削除
void delSem(int semId)
{
	// セマフォ削除
	semctl(semId, 0, IPC_RMID);
	// 関連ファイルも削除
	unlink(SEM_FILE);
}

// セマフォをひとつ取得。
// 0個しかない場合はブロックされる。
void getSem(int semId, int no)
{
	struct sembuf sem;

	sem.sem_num = no;
	sem.sem_op = -1;
	sem.sem_flg = 0;

	semop(semId, &sem, 1);
} 

// セマフォをひとつ追加。
void releaseSem(int semId, int no)
{
	struct sembuf sem;

	sem.sem_num = no;
	sem.sem_op = 1;
	sem.sem_flg = 0;

	semop(semId, &sem, 1);
} 


#define MMAP_FILE "/tmp/myMap"
#define MMAP_SIZE 100

// 共有メモリを取得
char *newMMap(void)
{
	// 共有メモリ用ファイルがなければ作る (IPC_CREAT)。
	// 今回の例では、最初に Server がここをコールした時に作成されて、
	// その後に Client がここをコールした時は作成したものを開く。
	// アクセス権 (0600 のとこ) は、きちんと設定しないと
	// 124 行でエラーになる。
	int fd = open(MMAP_FILE, O_CREAT | O_RDWR, 0600);
	if (fd == -1) {
		return NULL;
	}

	// ファイルサイズを、共有メモリで使うサイズにする。
	// このサイズを越えてアクセスするとバスエラーとなる。
	if (truncate(MMAP_FILE, MMAP_SIZE) == -1) {
		close(fd);
		return NULL;
	}

	// 共有メモリを取得。なければ作る。
	char *shAdr =
		mmap(0, MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
	if (shAdr == (void *)-1) {
		close(fd);
		return NULL;
	}

	return shAdr;
}

// 共有メモリ削除
void delMMap(void)
{
	// 共有メモリ解放
	munmap(0, MMAP_SIZE);
	// 関連ファイルも削除
	unlink(MMAP_FILE);
}

サンプルプログラムの使い方

リンクで Server と Client の二つの実行形式ファイルを作る。どちらも最後のライブラリコードをリンクする。

実行は、まず Server を起動しておいて、別の端末で Client を起動する。同期処理が手抜きなので、そうしないと正しく動作しない。

IPC を利用した開発での注意

セマフォなどの IPC の実体は、UNIX システムグローバルなものとして作られ、きちんと削除しないと残ってしまう。開発中にプログラムダウンした時などに残ってしまい、次回の起動に差し支えることもあるので、これを削除する方法は知っておいた方が良い。

現存する IPC は ipcs コマンドで表示される。今回の例ではセマフォを使っているので、例えば以下のような表示が含まれる。

$ ipcs
...
------ セマフォ配列 --------
キー       semid      所有者    権限     nsems
0x39689244 4194312    your_user 600        2
...

この IPC セマフォを削除するには、ipcrm コマンドで以下のようにする。

$ ipcrm -s 4194312

IPC は他のプログラムが使っていて、ipcs で色々表示される場合がある。それらを削除してしまうと何か問題が出るはずなので、注意が必要。自分のプログラムを起動する前に、ipcs で削除してはいけないものを確認しておく。

メニューに戻る

Python

メインプロセス〜サブプロセス(メインプロセスから起動)間では、標準入出力による通信も可能。