File Coverage

blib/lib/FCGI/ProcManager/Dynamic.pm
Criterion Covered Total %
statement 22 37 59.4
branch 0 10 0.0
condition n/a
subroutine 8 9 88.8
pod n/a
total 30 56 53.5


line stmt bran cond sub pod time code
1             package FCGI::ProcManager::Dynamic;
2 1     1   3837 use base FCGI::ProcManager;
  1         1  
  1         398  
3              
4             # Copyright (c) 2012, Andrey Velikoredchanin.
5             # This library is free software released under the GNU Lesser General
6             # Public License, Version 3. Please read the important licensing and
7             # disclaimer information included below.
8              
9             # $Id: Dynamic.pm,v 0.6 2012/06/29 11:00:00 Andrey Velikoredchanin $
10              
11 1     1   8201 use strict;
  1         1  
  1         18  
12              
13 1     1   3 use vars qw($VERSION);
  1         2  
  1         31  
14             BEGIN {
15 1     1   12 $VERSION = '0.6';
16             }
17              
18 1     1   3 use POSIX;
  1         1  
  1         2  
19 1     1   1663 use Time::HiRes qw(usleep);
  1         912  
  1         3  
20 1     1   539 use IPC::SysV qw(IPC_PRIVATE IPC_CREAT IPC_NOWAIT IPC_RMID);
  1         707  
  1         80  
21 1     1   5 use FCGI::ProcManager;
  1         0  
  1         904  
22              
23             =head1 NAME
24              
25             FCGI::ProcManager::Dynamic - extension for FCGI::ProcManager, it can dynamically control number of work processes depending on the load.
26              
27             =head1 SYNOPSIS
28              
29             # In Object-oriented style.
30             use CGI::Fast;
31             use FCGI::ProcManager::Dynamic;
32             my $proc_manager = FCGI::ProcManager::Dynamic->new({
33             n_processes => 8,
34             min_nproc => 8,
35             max_nproc => 32,
36             delta_nproc => 4,
37             delta_time => 60,
38             max_requests => 300
39             });
40             $proc_manager->pm_manage();
41             while ($proc_manager->pm_loop() && (my $cgi = CGI::Fast->new())) {
42             $proc_manager->pm_pre_dispatch();
43             # ... handle the request here ...
44             $proc_manager->pm_post_dispatch();
45             }
46              
47             =head1 DESCRIPTION
48              
49             FCGI::ProcManager::Dynamic the same as FCGI::ProcManager, but it has additional settings and functions for dynamic control of work processes's number.
50              
51             =head1 Addition options
52              
53             =head2 min_nproc
54              
55             The minimum amount of worker processes.
56              
57             =head2 max_nproc
58              
59             The maximum amount of worker processes.
60              
61             =head2 delta_nproc
62              
63             amount of worker processes which will be changed for once in case of their increase or decrease.
64              
65             =head2 delta_time
66              
67             Delta of time from last change of processes's amount, when they will be reduced while lowering of loading.
68              
69             =head2 max_requests
70              
71             Amount of requests for one worker process. If it will be exceeded worker process will be recreated.
72              
73             =head1 Addition functions
74              
75             =head2 pm_loop
76              
77             Function is needed for correct completion of worker process's cycle if max_requests will be exceeded.
78              
79             =head1 BUGS
80              
81             No known bugs, but this does not mean no bugs exist.
82              
83             =head1 SEE ALSO
84              
85             L
86             L
87              
88             =head1 MAINTAINER
89              
90             Andrey Velikoredchanin
91              
92             =head1 AUTHOR
93              
94             Andrey Velikoredchanin
95              
96             =head1 COPYRIGHT
97              
98             FCGI-ProcManager-Dynamic - A Perl FCGI Dynamic Process Manager
99             Copyright (c) 2012, Andrey Velikoredchanin.
100              
101             This library is free software; you can redistribute it and/or
102             modify it under the terms of the GNU Lesser General Public
103             License as published by the Free Software Foundation; either
104             version 3 of the License, or (at your option) any later version.
105              
106             BECAUSE THIS LIBRARY IS LICENSED FREE OF CHARGE, THIS LIBRARY IS
107             BEING PROVIDED "AS IS WITH ALL FAULTS," WITHOUT ANY WARRANTIES
108             OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING, WITHOUT
109             LIMITATION, ANY IMPLIED WARRANTIES OF TITLE, NONINFRINGEMENT,
110             MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE, AND THE
111             ENTIRE RISK AS TO SATISFACTORY QUALITY, PERFORMANCE, ACCURACY,
112             AND EFFORT IS WITH THE YOU. See the GNU Lesser General Public
113             License for more details.
114              
115             You should have received a copy of the GNU Lesser General Public
116             License along with this library; if not, write to the Free Software
117             Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
118              
119             =cut
120              
121             sub pm_manage
122             {
123 0     0     my $self = shift;
124              
125 0           $self->{USED_PROCS} = 0;
126              
127 0 0         if (!defined($self->{min_nproc})) { $self->{min_nproc} = $self->n_processes(); };
  0            
128 0 0         if (!defined($self->{max_nproc})) { $self->{max_nproc} = 8; };
  0            
129 0 0         if (!defined($self->{delta_nproc})) { $self->{delta_nproc} = 5; };
  0            
130 0 0         if (!defined($self->{delta_time})) { $self->{delta_time} = 5; };
  0            
131              
132 0           $self->{_last_delta_time} = time();
133              
134             # Создает очередь сообщений
135 0 0         if (!defined($self->{ipcqueue} = msgget(IPC_PRIVATE, IPC_CREAT | 0666))) {
136 0           die "Cannot create shared message pipe!";
137             };
138              
139 0           $self->{USEDPIDS} = {};
140              
141 0           $self->SUPER::pm_manage();
142             }
143              
144             sub pm_wait
145             {
146             my $self = shift;
147              
148             # wait for the next server to die.
149             my $pid = 0;
150             while ($pid >= 0)
151             {
152             $pid = waitpid(-1, WNOHANG);
153              
154             if ($pid > 0)
155             {
156             # notify when one of our servers have died.
157             delete($self->{PIDS}->{$pid});
158             $self->pm_notify("worker (pid $pid) exited with status ".$?);
159             };
160              
161             # Читаем сообщения
162             my $rcvd;
163             my $delta_killed = $self->{delta_nproc};
164             while (msgrcv($self->{ipcqueue}, $rcvd, 60, 0, IPC_NOWAIT))
165             {
166             my ($code, $cpid) = unpack("l! l!", $rcvd);
167             if ($code eq '1')
168             {
169             $self->{USEDPIDS}->{$cpid} = 1;
170             }
171             elsif ($code eq '2')
172             {
173             delete($self->{USEDPIDS}->{$cpid});
174             };
175             };
176              
177             # Сверяем нет-ли в списке загруженных PID уже удаленных и считаем количество используемых
178             $self->{USED_PROCS} = 0;
179             foreach my $cpid (keys %{$self->{USEDPIDS}})
180             {
181             if (!defined($self->{PIDS}->{$cpid}))
182             {
183             delete($self->{USEDPIDS}->{$cpid});
184             }
185             else
186             {
187             $self->{USED_PROCS}++;
188             };
189             };
190              
191             # Балансировка процессов
192             # Если загружены все процессы, добавляем
193             if ($self->{USED_PROCS} >= $self->{n_processes})
194             {
195             # Добавляем процессы
196             my $newnp = (($self->{n_processes} + $self->{delta_nproc}) < $self->{max_nproc})? ($self->{n_processes} + $self->{delta_nproc}):$self->{max_nproc};
197              
198             if ($newnp != $self->{n_processes})
199             {
200             $self->pm_notify("increase workers count to $newnp");
201             $self->SUPER::n_processes($newnp);
202             $pid = -10;
203             $self->{_last_delta_time} = time();
204             };
205             elsif (keys(%{$self->{PIDS}}) < $self->{min_nproc})
206             {
207             # Если количество процессов меньше минимального - добавляем
208             $self->pm_notify("increase workers to minimal ".$self->{min_nproc});
209             $self->SUPER::n_processes($self->{min_nproc});
210             $self->{_last_delta_time} = time();
211             $pid = -10;
212             }
213             elsif (($self->{USED_PROCS} < $self->{min_nproc}) && ((time() - $self->{_last_delta_time}) >= $self->{delta_time}))
214             {
215             # Если загруженных процессов меньше минимального количества, уменьшаем на delta_nproc до минимального значения
216              
217             my $newnp = (($self->{n_processes} - $self->{delta_nproc}) > $self->{min_nproc})? ($self->{n_processes} - $self->{delta_nproc}):$self->{min_nproc};
218              
219             if ($newnp != $self->{n_processes})
220             {
221             $self->pm_notify("decrease workers count to $newnp");
222              
223             # В цикле убиваем нужное количество незанятых процессов
224             my $i = 0;
225             foreach my $dpid (keys %{$self->{PIDS}})
226             {
227             # Убиваем только если процесс свободен
228             if (!defined($self->{USEDPIDS}->{$dpid})) {
229             $i++;
230             if ($i <= ($self->{n_processes} - $newnp))
231             {
232             $self->pm_notify("kill worker $dpid");
233             kill(SIGKILL, $dpid);
234             delete($self->{PIDS}->{$dpid});
235             }
236             else
237             {
238             last;
239             };
240             };
241             };
242             $self->SUPER::n_processes($newnp);
243             $self->{_last_delta_time} = time();
244             };
245             }
246             elsif (keys(%{$self->{PIDS}}) < $self->{n_processes})
247             {
248             # Если количество процессов меньше текущего - добавляем
249             $self->pm_notify("increase workers to ".$self->{n_processes});
250             $self->{_last_delta_time} = time();
251             $pid = -10;
252             }
253             }
254             elsif ($self->{USED_PROCS} >= ($self->{n_processes} - $self->{delta_nproc}))
255             {
256             # Если количество занятых рабочих процессов больше чем первое меньшее количество процессов относительно текущего, то отдаляем уменьшение процессов на delta_time
257             $self->{_last_delta_time} = time();
258             };
259              
260             if ($pid == 0)
261             {
262             usleep(100000);
263             };
264             };
265              
266             return $pid;
267             };
268              
269             sub pm_pre_dispatch
270             {
271             my $self = shift;
272             $self->SUPER::pm_pre_dispatch();
273              
274             if (!msgsnd($self->{ipcqueue}, pack("l! l!", 1, $$), IPC_NOWAIT)) {
275             print STDERR "Error when execute MSGSND in pm_pre_dispatch\n";
276             $self->{msgsenderr} = 1;
277             } else {
278             $self->{msgsenderr} = 0;
279             };
280              
281             # Счетчик запросов
282             if (!defined($self->{requestcount})) {
283             $self->{requestcount} = 1;
284             } else {
285             $self->{requestcount}++;
286             };
287             };
288              
289             sub pm_post_dispatch
290             {
291             my $self = shift;
292              
293             if (!$self->{msgsenderr}) {
294             msgsnd($self->{ipcqueue}, pack("l! l!", 2, $$), 0);
295             };
296              
297             $self->SUPER::pm_post_dispatch();
298              
299             # Если определено максимальное количество запросов и оно превышено - выходим из чайлда
300             if (defined($self->{max_requests}) && ($self->{max_requests} ne '') && ($self->{requestcount} >= $self->{max_requests})) {
301             if ($self->{pm_loop_used}) {
302             $self->{exit_flag} = 1;
303             } else {
304             # Если в цикле не используется pm_loop - выходим "жестко"
305             exit;
306             };
307             };
308             };
309              
310             sub pm_die
311             {
312             my $self = shift;
313              
314             msgctl($self->{ipcqueue}, IPC_RMID, 0);
315              
316             $self->SUPER::pm_die();
317             };
318              
319             sub pm_loop
320             {
321             my $self = shift;
322              
323             $self->{pm_loop_used} = 1;
324              
325             return(!($self->{exit_flag}));
326             };
327              
328             sub pm_notify {
329             my ($this,$msg) = @_;
330             if (defined($msg)) {
331             $msg =~ s/\s*$/\n/;
332             my $time = POSIX::strftime('%Y-%m-%d %H:%M:%S', localtime(time()));
333             print STDERR $time, " - FastCGI: ".$this->role()." (pid $$): ".$msg;
334             };
335             };
336              
337             1;