swoole —— 从新定义PHPphp
swoole 的进程之间有两种通讯方式,一种是消息队列(queue),另外一种是管道(pipe),对swoole_process 的研究在swoole中显得尤其重要。html
预备知识前端
IO多路复用sql
swoole 中的io多路复用表现为底层的 epoll进程模型,在C语言中表现为 epoll 函数。thinkphp
Event loop 事件循环安全
swoole 对 epoll 实现了一个Reactor线程模型封装,设置了read事件和write事件的监听回调函数。(详见swoole_event_add)微信
swoole_processswoole
咱们在php-fpm.conf配置文件中发现,php-fpm中有两种进程池管理设置。架构
接下来用swoole代码来实现,这里只是为理解swoole_process、进程间通讯、定时器等使用,实际状况使用封装好的swoole_server来实现task任务队列池会更方便。微信公众平台
假若有个定时投递的任务队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
<?php
/**
* 动态进程池,相似fpm
* 动态新建进程
* 有初始进程数,最小进程数,进程不够处理时候新建进程,不超过最大进程数
*/
// 一个进程定时投递任务
/**
* 1. tick
* 2. process及其管道通信
* 3. event loop 事件循环
*/
class
processPool
{
private
$pool
;
/**
* @var swoole_process[] 记录全部worker的process对象
*/
private
$workers
= [];
/**
* @var array 记录worker工做状态
*/
private
$used_workers
= [];
/**
* @var int 最小进程数
*/
private
$min_woker_num
= 5;
/**
* @var int 初始进程数
*/
private
$start_worker_num
= 10;
/**
* @var int 最大进程数
*/
private
$max_woker_num
= 20;
/**
* 进程闲置销毁秒数
* @var int
*/
private
$idle_seconds
= 5;
/**
* @var int 当前进程数
*/
private
$curr_num
;
/**
* 闲置进程时间戳
* @var array
*/
private
$active_time
= [];
public
function
__construct()
{
$this
->pool =
new
swoole_process(
function
() {
// 循环创建worker进程
for
(
$i
= 0;
$i
<
$this
->start_worker_num;
$i
++) {
$this
->createWorker();
}
echo
'初始化进程数:'
.
$this
->curr_num . PHP_EOL;
// 每秒定时往闲置的worker的管道中投递任务
swoole_timer_tick(1000,
function
(
$timer_id
) {
static
$count
= 0;
$count
++;
$need_create
= true;
foreach
(
$this
->used_workers
as
$pid
=>
$used
) {
if
(
$used
== 0) {
$need_create
= false;
$this
->workers[
$pid
]->write(
$count
.
' job'
);
// 标记使用中
$this
->used_workers[
$pid
] = 1;
$this
->active_time[
$pid
] = time();
break
;
}
}
foreach
(
$this
->used_workers
as
$pid
=>
$used
)
// 若是全部worker队列都没有闲置的,则新建一个worker来处理
if
(
$need_create
&&
$this
->curr_num <
$this
->max_woker_num) {
$new_pid
=
$this
->createWorker();
$this
->workers[
$new_pid
]->write(
$count
.
' job'
);
$this
->used_workers[
$new_pid
] = 1;
$this
->active_time[
$new_pid
] = time();
}
// 闲置超过一段时间则销毁进程
foreach
(
$this
->active_time
as
$pid
=>
$timestamp
) {
if
((time() -
$timestamp
) >
$this
->idle_seconds &&
$this
->curr_num >
$this
->min_woker_num) {
// 销毁该进程
if
(isset(
$this
->workers[
$pid
]) &&
$this
->workers[
$pid
]
instanceof
swoole_process) {
$this
->workers[
$pid
]->write(
'exit'
);
unset(
$this
->workers[
$pid
]);
$this
->curr_num =
count
(
$this
->workers);
unset(
$this
->used_workers[
$pid
]);
unset(
$this
->active_time[
$pid
]);
echo
"{$pid} destroyed\n"
;
break
;
}
}
}
echo
"任务{$count}/{$this->curr_num}\n"
;
if
(
$count
== 20) {
foreach
(
$this
->workers
as
$pid
=>
$worker
) {
$worker
->write(
'exit'
);
}
// 关闭定时器
swoole_timer_clear(
$timer_id
);
// 退出进程池
$this
->pool->
exit
(0);
exit
();
}
});
});
$master_pid
=
$this
->pool->start();
echo
"Master $master_pid start\n"
;
while
(
$ret
= swoole_process::wait()) {
$pid
=
$ret
[
'pid'
];
echo
"process {$pid} existed\n"
;
}
}
/**
* 建立一个新进程
* @return int 新进程的pid
*/
public
function
createWorker()
{
$worker_process
=
new
swoole_process(
function
(swoole_process
$worker
) {
// 给子进程管道绑定事件
swoole_event_add(
$worker
->pipe,
function
(
$pipe
)
use
(
$worker
) {
$data
= trim(
$worker
->read());
if
(
$data
==
'exit'
) {
$worker
->
exit
(0);
exit
();
}
echo
"{$worker->pid} 正在处理 {$data}\n"
;
sleep(5);
// 返回结果,表示空闲
$worker
->write(
"complete"
);
});
});
$worker_pid
=
$worker_process
->start();
// 给父进程管道绑定事件
swoole_event_add(
$worker_process
->pipe,
function
(
$pipe
)
use
(
$worker_process
) {
$data
= trim(
$worker_process
->read());
if
(
$data
==
'complete'
) {
// 标记为空闲
// echo "{$worker_process->pid} 空闲了\n";
$this
->used_workers[
$worker_process
->pid] = 0;
}
});
// 保存process对象
$this
->workers[
$worker_pid
] =
$worker_process
;
// 标记为空闲
$this
->used_workers[
$worker_pid
] = 0;
$this
->active_time[
$worker_pid
] = time();
$this
->curr_num =
count
(
$this
->workers);
return
$worker_pid
;
}
}
new
processPool();
|