PHP多进程编程

从一个fork开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
switch ($pid = pcntl_fork()){
case -1:
die('fork failed');
break;
case 0:
$n = 3;
$msg = "child pid:".getmypid().PHP_EOL;
break;
default:
$n = 5;
$msg = "parent pid:".getmypid()." child pid is: ".$pid. PHP_EOL;
break;
}
for($i = 0; $i < $n; $i++){
echo $msg;
sleep(1);
}

功能很简单——调用pcntl_fork创建一个子进程,子进程和父进程各自输出n个消息结束

从这个例子可以看到:

  1. pcntl_fork是个非常特殊的函数:被调用一次,但返回两次(子进程的返回值是0,父进程的返回值是新子进程的进程ID)
  2. 子进程是父进程的副本——复制了父进程的数据空间、堆、栈等,但不是在fork之后立即就复制了,而是使用了写时复制,即当父子进程对其行进行写操作时,才会复制出来一份副本

另外,这个程序有一个小BUG——会产生僵尸进程

僵尸进程

运行上面的程序,通过ps(1)命令可以看到子进程的STAT变成了Z+,COMMAND变成了defunct(一些系统可能是zombie)

1
2
3
4
5
6
7
8
# ps -axj
PPID PID STAT UID TIME COMMAND
14 292 S+ 0 0:00 php zombie.php
292 293 S+ 0 0:00 php zombie.php
# ps -axj
PPID PID STAT UID TIME COMMAND
14 292 S+ 0 0:00 php zombie.php
292 293 Z+ 0 0:00 [php] <defunct>

一个已经终止,但是其父进程尚未对其进行善后处理(获取终止子进程的有关信息,释放它仍占用的资源)的进程被称为僵尸进程 ——APUE

僵尸进程主要问题是资源没有释放,如何防止僵尸进程的产生呢?
–只需要在父进程里加上wait函数(pcntl_wait/pcntl_waitpid)即可

wait(pcntl_wait&pcntl_waitpid)

wait函数挂起当前进程的执行直到一个子进程退出或接收到一个信号要求中断当前进程或调用一个信号处理函数。 如果一个子进程在调用此函数时已经退出(俗称僵尸进程),此函数立刻返回。子进程使用的所有系统资源将被释放。 –PHP手册

pcntl_wait等同于以-1作为参数pid 的值并且没有options参数来调用pcntl_waitpid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$arr = range(0, 10);
foreach($arr as $x) {
switch ($pid = pcntl_fork()){
case -1:
die('fork failed');
break;
case 0:
print "fork child: ".$x." and pid = ".getmypid().PHP_EOL;
sleep(1);
exit;
default:
print "parent,the child is:".$pid.PHP_EOL;
pcntl_waitpid($pid,$status);
break;
}
}
print "Done \n";

这里有一点要注意的是:如果在循环语句中创建子进程,子进程执行完毕一定不能再继续执行循环(本例中执行完直接exit),否则子进程又会继续创建子进程,如此反复,把系统的资源耗尽

在这个程序里,我故意在子进程里放了一个sleep,这样在执行的时候我们可以看到,子进程是一个接一个创建出来的(只有当上一个子程序终止后,父进程才会继续创建)

这样就似乎与我们使用多进程的初衷相违,如何能同时创建多个子进程并行执行呢

并行执行(非阻塞wait)

变成串行的原因是wait函数阻塞了父进程,只要通过参数(WNOHANG、WUNTRACED)设为非阻塞就行了

  • WNOHANG 如果没有子进程退出立刻返回
  • WUNTRACED 子进程已经退出并且其状态未报告时返回

这两个选项需要系统支持wait3,否则不会有任何效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$arr = range(0, 10);
foreach($arr as $x) {
if ($pid = pcntl_fork()){
$childs[] = $pid;
pcntl_waitpid($pid,$status,WNOHANG | WUNTRACED);
} elseif (0 == $pid){
print "fork child: ".$x." and pid = ".getmypid().PHP_EOL;
exit;
}
}
while(count($childs)){
foreach($childs as $k => $pid){
pcntl_waitpid($pid,$status);
if (pcntl_wifexited($status)){
unset($childs[$k]);
}
}
}
print "Done \n";

最后那段while循环主要是为了防止产生僵尸进程

信号

当任务量比较小的时候,我们可以一次性创建所有子进程来执行;但是,当任务量非常大的时候,就需要通过控制同时运行的进程数,来进行批量处理了
控制运行的进程数有很多种方法,信号无疑是最方便的一种

安装信号处理器(pcntl_signal)

想实现这样的需求,就需求在父进程里维护一个子进程的计数器,在fork成功后增加计数,子进程退出的时候,减少计数

当子进程退出的时候,内核会向父进程发送SIGCHLD信号

由于SIGCHLD是一个异步的信号,首先我们要安装一个SIGCHLD的信号处理器

1
2
3
4
5
6
7
8
function sig_hander($sig){
global $cur;
if (SIGCHLD == $sig){
echo 'SIGCHLD'.PHP_EOL;
$cur--;
}
}
pcntl_signal(SIGCHLD, "sig_hander");

ticks与pcntl_signal_dispatch

仅仅安装处理器是不够的,还要设置一下信号的触发时机

ticks

ticks是cpu时钟周期,可以通过它来设置执行底层语句时的回调函数——这个信号处理器配置使用便可实现异步通知

1
declare(ticks = 1);

ticks的使用非常的简单,只需要使用declare定义一下触发事件的低级语句数数即可,本例设置的是1,即每执行一条低级语句,就会执行一次信号安装器的回调函数,因此ticks是比较低效的,并且在php7中已经废弃掉了,建议使用pcntl_signal_dispatch

pcntl_signal_dispatch

如果看php源码就会发现,ticks最终也是调用pcntl_signal_dispatch。即然如此,何不自己在程序中直接使用pcntl_signal_dispatch呢

1
pcntl_signal_dispatch();

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$max = 8;
$i = $cur = 0;
function sig_hander($sig){
global $cur;
if (SIGCHLD == $sig){
echo 'SIGCHLD'.PHP_EOL;
$cur--;
}
}
pcntl_signal(SIGCHLD, "sig_hander");
$fp = fopen('./test.txt', 'a+');
while(true){
pcntl_signal_dispatch();
if (($cur <= $max) && ($i <= 20)){
if ($pid = pcntl_fork()){
$cur++;
pcntl_waitpid($pid,$status,WNOHANG | WUNTRACED);
} else {
$s = rand(2, 5);
sleep($s);
//echo "child : ",getmypid(),' sleep : ', $s, PHP_EOL;
$line = "child : ".getmypid().' sleep : '. $s. PHP_EOL;
fwrite($fp,$line);
exit;
}
$i++;
}
if (0 === $cur){
fclose($fp);
break;
}
}

进程间通信(IPC)

当执行完任务,我们想拿回子进程处理后的结果,给合并到一起(Map/Reduce),即子进程处理完的结果,要”告诉”父进程,这就要用到进程间通信了

常用的IPC有三个:

  1. 信号量 : 用于管理资源的访问
  2. 共享内存: 用于在程序之间高效地共享数据
  3. 消息队列: 在程序之间传递数据的一种简单方法

可以通过ipcs查看当前系统中的IPC信息

1
2
3
4
5
6
7
8
9
10
root@69f6ffa32d27:/home/root# ipcs
------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
------ Semaphore Arrays --------
key semid owner perms nsems
------ Message Queues --------
key msqid owner perms used-bytes messages
1
2
3
# ipcs -s
# ipcs -m
# ipcs -q

这里我们使用的是共享内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$tasks = array();
for($i = 0; $i < 5; $i++){
$tasks[$i] = range($i * 10, ($i+10)*10);
}
foreach($tasks as $task){
$pid = pcntl_fork();
if ($pid){
$childs[] = $pid;
} elseif (0 == $pid){
$shm_id = shmop_open(getmypid(), 'c', 0644, 1024);
$value = array_sum($task);
shmop_write($shm_id,array_sum($task),0);
shmop_close($shm_id);
exit;
}
}
$data = array();
while(count($childs)){
foreach($childs as $k => $pid){
pcntl_waitpid($pid,$status);
if (pcntl_wifexited($status)){
$shm_id = shmop_open($pid, 'a', 0644, 1024);
$data[$pid] = shmop_read($shm_id, 0, 1024);
shmop_close($shm_id);
unset($childs[$k]);
}
}
}
var_dump($data);

守护进程

最后,看一个生成守护进程的例子吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
umask(0);
if (($pid = pcntl_fork()) < 0){
exit('can not fork');
} elseif ($pid != 0){
sleep(6);
exit(0);
}
sleep(5);
posix_setsid();
if (($pid = pcntl_fork()) < 0){
sleep(3);
exit('can not fork');
} elseif ($pid != 0){
exit(0);
}
sleep(5);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ps -axj
PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND
14 140 140 14 pts/1 140 S+ 0 0:00 php daemonize.php
140 141 140 14 pts/1 140 S+ 0 0:00 php daemonize.php
29 145 145 29 pts/2 145 R+ 0 0:00 ps -axj
# ps -axj
PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND
14 140 140 14 pts/1 140 S+ 0 0:00 php daemonize.php
140 141 141 141 ? -1 Zs 0 0:00 [php] <defunct>
0 146 141 141 ? -1 S 0 0:00 php daemonize.php
29 147 147 29 pts/2 147 R+ 0 0:00 ps -axj
# ps -axj
PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND
0 146 141 141 ? -1 S 0 0:00 php daemonize.php
29 149 149 29 pts/2 149 R+ 0 0:00 ps -axj

THE END

好了,到这里这篇这就结束了。因为APUE里已经写的比较细了,所以这里就以”实例+粗知识点”形式将多进程涉及的一些概念及函数走了一遍,录之笔记。欢迎一起沟通学习!

参考

坚持原创技术分享,您的支持将鼓励我继续创作!