File Coverage

blib/lib/FCGI/ProcManager/Dynamic.pm
Criterion Covered Total %
statement 36 120 30.0
branch 5 54 9.2
condition 0 9 0.0
subroutine 9 15 60.0
pod 7 7 100.0
total 57 205 27.8


line stmt bran cond sub pod time code
1             package FCGI::ProcManager::Dynamic;
2 1     1   7060 use base FCGI::ProcManager;
  1         2  
  1         904  
3              
4             # Copyright (c) 2012, Andrey Velikoredchanin.
5             # This library is free software released under the GNU Lesser General
6             # Public License, Version 3. Please read the important licensing and
7             # disclaimer information included below.
8              
9             # $Id: Dynamic.pm,v 0.6 2012/06/29 11:00:00 Andrey Velikoredchanin $
10              
11 1     1   11686 use strict;
  1         2  
  1         35  
12              
13 1     1   5 use vars qw($VERSION);
  1         6  
  1         40  
14             BEGIN {
15 1     1   14 $VERSION = '0.6';
16             }
17              
18 1     1   4 use POSIX;
  1         2  
  1         4  
19 1     1   1469419 use Time::HiRes qw(usleep);
  1         3338  
  1         7  
20 1     1   1620 use IPC::SysV qw(IPC_PRIVATE IPC_CREAT IPC_NOWAIT IPC_RMID);
  1         3581  
  1         225  
21 1     1   11 use FCGI::ProcManager;
  1         2  
  1         2228  
22              
23             =head1 NAME
24              
25             FCGI::ProcManager::Dynamic - extension for FCGI::ProcManager, it can dynamically control number of work processes depending on the load.
26              
27             =head1 SYNOPSIS
28              
29             # In Object-oriented style.
30             use CGI::Fast;
31             use FCGI::ProcManager::Dynamic;
32             my $proc_manager = FCGI::ProcManager::Dynamic->new({
33             n_processes => 8,
34             min_nproc => 8,
35             max_nproc => 32,
36             delta_nproc => 4,
37             delta_time => 60,
38             max_requests => 300
39             });
40             $proc_manager->pm_manage();
41             while ($proc_manager->pm_loop() && (my $cgi = CGI::Fast->new())) {
42             $proc_manager->pm_pre_dispatch();
43             # ... handle the request here ...
44             $proc_manager->pm_post_dispatch();
45             }
46              
47             =head1 DESCRIPTION
48              
49             FCGI::ProcManager::Dynamic the same as FCGI::ProcManager, but it has additional settings and functions for dynamic control of work processes's number.
50              
51             =head1 Addition options
52              
53             =head2 min_nproc
54              
55             The minimum amount of worker processes.
56              
57             =head2 max_nproc
58              
59             The maximum amount of worker processes.
60              
61             =head2 delta_nproc
62              
63             amount of worker processes which will be changed for once in case of their increase or decrease.
64              
65             =head2 delta_time
66              
67             Delta of time from last change of processes's amount, when they will be reduced while lowering of loading.
68              
69             =head2 max_requests
70              
71             Amount of requests for one worker process. If it will be exceeded worker process will be recreated.
72              
73             =head1 Addition functions
74              
75             =head2 pm_loop
76              
77             Function is needed for correct completion of worker process's cycle if max_requests will be exceeded.
78              
79             =head1 BUGS
80              
81             No known bugs, but this does not mean no bugs exist.
82              
83             =head1 SEE ALSO
84              
85             L
86             L
87              
88             =head1 MAINTAINER
89              
90             Andrey Velikoredchanin
91              
92             =head1 AUTHOR
93              
94             Andrey Velikoredchanin
95              
96             =head1 COPYRIGHT
97              
98             FCGI-ProcManager-Dynamic - A Perl FCGI Dynamic Process Manager
99             Copyright (c) 2012, Andrey Velikoredchanin.
100              
101             This library is free software; you can redistribute it and/or
102             modify it under the terms of the GNU Lesser General Public
103             License as published by the Free Software Foundation; either
104             version 3 of the License, or (at your option) any later version.
105              
106             BECAUSE THIS LIBRARY IS LICENSED FREE OF CHARGE, THIS LIBRARY IS
107             BEING PROVIDED "AS IS WITH ALL FAULTS," WITHOUT ANY WARRANTIES
108             OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING, WITHOUT
109             LIMITATION, ANY IMPLIED WARRANTIES OF TITLE, NONINFRINGEMENT,
110             MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE, AND THE
111             ENTIRE RISK AS TO SATISFACTORY QUALITY, PERFORMANCE, ACCURACY,
112             AND EFFORT IS WITH THE YOU. See the GNU Lesser General Public
113             License for more details.
114              
115             You should have received a copy of the GNU Lesser General Public
116             License along with this library; if not, write to the Free Software
117             Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
118              
119             =cut
120              
121             sub pm_manage
122             {
123 1     1 1 1057 my $self = shift;
124              
125 1         3 $self->{USED_PROCS} = 0;
126              
127 1 50       4 if (!defined($self->{min_nproc})) { $self->{min_nproc} = $self->n_processes(); };
  1         2  
128 1 50       58 if (!defined($self->{max_nproc})) { $self->{max_nproc} = 8; };
  1         3  
129 1 50       11 if (!defined($self->{delta_nproc})) { $self->{delta_nproc} = 5; };
  1         3  
130 1 50       4 if (!defined($self->{delta_time})) { $self->{delta_time} = 5; };
  1         2  
131              
132 1         8 $self->{_last_delta_time} = time();
133              
134             # Создает очередь сообщений
135 1 50       11 if (!($self->{ipcqueue} = msgget(IPC_PRIVATE, IPC_CREAT | 0666))) {
136 0         0 die "Cannot create shared message pipe!";
137             };
138              
139 1         90 $self->{USEDPIDS} = {};
140              
141 1         9 $self->SUPER::pm_manage();
142             }
143              
144             sub pm_wait
145             {
146 0     0 1   my $self = shift;
147              
148             # wait for the next server to die.
149 0           my $pid = 0;
150 0           while ($pid >= 0)
151             {
152 0           $pid = waitpid(-1, WNOHANG);
153              
154 0 0         if ($pid > 0)
155             {
156             # notify when one of our servers have died.
157 0           delete($self->{PIDS}->{$pid});
158 0           $self->pm_notify("worker (pid $pid) exited with status ".$?);
159             };
160              
161             # Читаем сообщения
162 0           my $rcvd;
163 0           my $delta_killed = $self->{delta_nproc};
164 0           while (msgrcv($self->{ipcqueue}, $rcvd, 60, 0, IPC_NOWAIT))
165             {
166 0           my ($code, $cpid) = unpack("l! l!", $rcvd);
167 0 0         if ($code eq '1')
    0          
168             {
169 0           $self->{USEDPIDS}->{$cpid} = 1;
170             }
171             elsif ($code eq '2')
172             {
173 0           delete($self->{USEDPIDS}->{$cpid});
174             };
175             };
176              
177             # Сверяем нет-ли в списке загруженных PID уже удаленных и считаем количество используемых
178 0           $self->{USED_PROCS} = 0;
179 0           foreach my $cpid (keys %{$self->{USEDPIDS}})
  0            
180             {
181 0 0         if (!defined($self->{PIDS}->{$cpid}))
182             {
183 0           delete($self->{USEDPIDS}->{$cpid});
184             }
185             else
186             {
187 0           $self->{USED_PROCS}++;
188             };
189             };
190              
191             # Балансировка процессов
192             # Если загружены все процессы, добавляем
193 0 0 0       if ($self->{USED_PROCS} >= $self->{n_processes})
    0          
    0          
    0          
    0          
194             {
195             # Добавляем процессы
196 0 0         my $newnp = (($self->{n_processes} + $self->{delta_nproc}) < $self->{max_nproc})? ($self->{n_processes} + $self->{delta_nproc}):$self->{max_nproc};
197              
198 0 0         if ($newnp != $self->{n_processes})
199             {
200 0           $self->pm_notify("increase workers count to $newnp");
201 0           $self->SUPER::n_processes($newnp);
202 0           $pid = -10;
203 0           $self->{_last_delta_time} = time();
204             };
205             }
206             elsif (($self->{USED_PROCS} < $self->{min_nproc}) && ((time() - $self->{_last_delta_time}) >= $self->{delta_time}))
207 0           {
208             # Если загруженных процессов меньше минимального количества, уменьшаем на delta_nproc до минимального значения
209              
210 0 0         my $newnp = (($self->{n_processes} - $self->{delta_nproc}) > $self->{min_nproc})? ($self->{n_processes} - $self->{delta_nproc}):$self->{min_nproc};
211              
212 0 0         if ($newnp != $self->{n_processes})
213             {
214 0           $self->pm_notify("decrease workers count to $newnp");
215              
216             # В цикле убиваем нужное количество незанятых процессов
217 0           my $i = 0;
218 0           foreach my $dpid (keys %{$self->{PIDS}})
  0            
219             {
220             # Убиваем только если процесс свободен
221 0 0         if (!defined($self->{USEDPIDS}->{$dpid})) {
222 0           $i++;
223 0 0         if ($i <= ($self->{n_processes} - $newnp))
224             {
225 0           $self->pm_notify("kill worker $dpid");
226 0           kill(SIGKILL, $dpid);
227 0           delete($self->{PIDS}->{$dpid});
228             }
229             else
230             {
231 0           last;
232             };
233             };
234             };
235 0           $self->SUPER::n_processes($newnp);
236 0           $self->{_last_delta_time} = time();
237             };
238             }
239             elsif (keys(%{$self->{PIDS}}) < $self->{n_processes})
240 0           {
241             # Если количество процессов меньше текущего - добавляем
242 0           $self->pm_notify("increase workers to ".$self->{n_processes});
243 0           $self->{_last_delta_time} = time();
244 0           $pid = -10;
245             }
246             elsif (keys(%{$self->{PIDS}}) < $self->{min_nproc})
247             {
248             # Если количество процессов меньше минимального - добавляем
249 0           $self->pm_notify("increase workers to minimal ".$self->{min_nproc});
250 0           $self->SUPER::n_processes($self->{min_nproc});
251 0           $self->{_last_delta_time} = time();
252 0           $pid = -10;
253             }
254             elsif ($self->{USED_PROCS} >= ($self->{n_processes} - $self->{delta_nproc}))
255             {
256             # Если количество занятых рабочих процессов больше чем первое меньшее количество процессов относительно текущего, то отдаляем уменьшение процессов на delta_time
257 0           $self->{_last_delta_time} = time();
258             };
259              
260 0 0         if ($pid == 0)
261             {
262 0           usleep(100000);
263             };
264             };
265              
266 0           return $pid;
267             };
268              
269             sub pm_pre_dispatch
270             {
271 0     0 1   my $self = shift;
272 0           $self->SUPER::pm_pre_dispatch();
273              
274 0 0         if (!msgsnd($self->{ipcqueue}, pack("l! l!", 1, $$), IPC_NOWAIT)) {
275 0           print STDERR "Error when execute MSGSND in pm_pre_dispatch\n";
276 0           $self->{msgsenderr} = 1;
277             } else {
278 0           $self->{msgsenderr} = 0;
279             };
280              
281             # Счетчик запросов
282 0 0         if (!defined($self->{requestcount})) {
283 0           $self->{requestcount} = 1;
284             } else {
285 0           $self->{requestcount}++;
286             };
287             };
288              
289             sub pm_post_dispatch
290             {
291 0     0 1   my $self = shift;
292              
293 0 0         if (!$self->{msgsenderr}) {
294 0           msgsnd($self->{ipcqueue}, pack("l! l!", 2, $$), 0);
295             };
296              
297 0           $self->SUPER::pm_post_dispatch();
298              
299             # Если определено максимальное количество запросов и оно превышено - выходим из чайлда
300 0 0 0       if (defined($self->{max_requests}) && ($self->{max_requests} ne '') && ($self->{requestcount} >= $self->{max_requests})) {
      0        
301 0 0         if ($self->{pm_loop_used}) {
302 0           $self->{exit_flag} = 1;
303             } else {
304             # Если в цикле не используется pm_loop - выходим "жестко"
305 0           exit;
306             };
307             };
308             };
309              
310             sub pm_die
311             {
312 0     0 1   my $self = shift;
313              
314 0           msgctl($self->{ipcqueue}, IPC_RMID, 0);
315              
316 0           $self->SUPER::pm_die();
317             };
318              
319             sub pm_loop
320             {
321 0     0 1   my $self = shift;
322              
323 0           $self->{pm_loop_used} = 1;
324              
325 0           return(!($self->{exit_flag}));
326             };
327              
328             sub pm_notify {
329 0     0 1   my ($this,$msg) = @_;
330 0 0         if (defined($msg)) {
331 0           $msg =~ s/\s*$/\n/;
332 0           my $time = POSIX::strftime('%Y-%m-%d %H:%M:%S', localtime(time()));
333 0           print STDERR $time, " - FastCGI: ".$this->role()." (pid $$): ".$msg;
334             };
335             };
336              
337             1;