File Coverage

blib/lib/Lock/Server.pm
Criterion Covered Total %
statement 264 292 90.4
branch 47 64 73.4
condition 35 62 56.4
subroutine 35 38 92.1
pod 6 6 100.0
total 387 462 83.7


line stmt bran cond sub pod time code
1             package Lock::Server;
2              
3             =head1 NAME
4              
5             Lock::Server - Light-weight RESTful socket based resource locking manager.
6              
7             =head1 DESCRIPTION
8              
9             This creates a socket server that handles lock and
10             unlock requests. The lock requests only return once a lock is
11             obtained or a timeout has occurred. A lock may only be locked
12             for a specific amount of time before the lock is timed out.
13              
14             The protocol used is RESTFUL HTTP though the helper class wraps
15             that. It uses the GET verb with the following paths :
16              
17             * CHECK/key - returns 1 if the key in question is currently locked
18             returns 0 if not
19              
20             * LOCK/key/requester - returns lock expire time or 0
21             if there was an error
22              
23             * UNLOCK/key/requester - returns 1 if the unlock went as expected,
24             0 otherwise
25              
26             * VERIFY/key/requester - returns 1 if the key is locked to the
27             requester and did not time out and 0
28             otherwise.
29             * PING - returns 1 if the server is active
30              
31             * SHUTDOWN - stops this LockServer
32              
33             This does not do deadlock detection, relying on the timeouts to
34             prevent the system from getting in a hopelessly tangled state.
35             Care should be taken, as with any resource locking system, with
36             the use of Lock::Server. Adjust the timeouts for what makes sense
37             with the system you are designing. The lock requests return with the
38             time that the lock will expire.
39              
40             =head1 SYNPOSIS
41              
42             use Lock::Server;
43             use Lock::Server::Client;
44              
45             my $lockServer = new Lock::Server( {
46             lock_timeout => 10, #seconds. default is 3
47             lock_attempt_timeout => 12, #seconds. default is 4
48             port => 888, #default is 8004
49             host => 'localhost', #default 127.0.0.1
50             } );
51              
52             if( my $childPid = $lockServer->start ) {
53             print "Lock server started in child thread $childPid\n";
54             }
55              
56             my $optional_args = { reconnect_attempts => 3, time_between_attempts => 1 };
57             my $lockClient_A = $lockServer->client( "CLIENT_A", $optional_args );
58             my $lockClient_B =
59             new Lock::Server::Client( "CLIENT_B", 'localhost', 888, $optional_args );
60              
61             if( $lockClient_A->lock( "KEYA" ) ) {
62             print "Lock Successfull for locker A and KEYA\n";
63             } else {
64             print "Could not obtain lock in 12 seconds.\n";
65             }
66              
67             # KEYA for LockerI times out after 10 seconds.
68             # Lock Client B waits until it can obtain the lock
69             if( $lockClient_B->lock( "KEYA" ) ) {
70             print "Lock Successfull for Client B lock 'KEYA'\n";
71             } else {
72             print "Could not obtain lock in 12 seconds.\n";
73             }
74              
75             # KEYA for LockerII is now freed. The next locker
76             # attempting to lock KEYA will then obtain the lock.
77             if( $lockClientB->unlock( "KEYA" ) ) {
78             print "Unlock Successfull\n";
79             }
80              
81             if( $lockServer->stop ) {
82             print "Lock server shut down.\n";
83             }
84              
85             =head1 METHODS
86              
87             =cut
88              
89 9     9   17496 use strict;
  9         9  
  9         198  
90 9     9   27 use warnings;
  9         9  
  9         171  
91 9     9   27 no warnings 'uninitialized';
  9         27  
  9         261  
92              
93 9     9   4455 use Data::Dumper;
  9         57420  
  9         396  
94              
95 9     9   4410 use Socket;
  9         25794  
  9         3501  
96 9     9   3924 use IO::Socket::INET;
  9         158337  
  9         36  
97 9     9   8937 use POSIX ":sys_wait_h";
  9         36153  
  9         36  
98              
99 9     9   7857 use vars qw($VERSION);
  9         9  
  9         14904  
100              
101             $VERSION = '1.72';
102              
103              
104             $Lock::Server::DEBUG = 1;
105              
106             =head2 Lock::Server::new( $args )
107              
108             Creates a new lock server for the given optional arguments.
109            
110             Arguments are :
111             * port - port to serve on. Defaults to 8004
112             * lock_timeout - low long should a lock last in seconds
113             * lock_attempt_timeout - how long should a requester
114             wait for a lock in seconds
115             * allow_shutdown - allows a client to shut the server down
116             * reconnect_attempts - if port is busy when starting the server
117             how many retries to connect before giving up and failing startup
118             * time_between_attempts - interval between reconnection attempts
119              
120             =cut
121             sub new {
122 11     11 1 177 my( $pkg, $args ) = @_;
123 11   33     78 my $class = ref( $pkg ) || $pkg;
124             bless {
125             lock_timeout => $args->{lock_timeout} || 3,
126             lock_attempt_timeout => $args->{lock_attempt_timeout} || 4,
127             host => $args->{host} || '127.0.0.1',
128             port => $args->{port} || 8004,
129             allow_shutdown => $args->{allow_shutdown},
130             _pids => {},
131             _id2pid => {},
132             _locks => {},
133             _locker_counts => {},
134             attempts => $args->{reconnect_attemps} || 10,
135 11   50     299 time_between_attempts => $args->{time_between_attempts} || 5,
      50        
      50        
      50        
      50        
      50        
136              
137             }, $class;
138             } #new
139              
140              
141             =head2 client( lockername )
142              
143             Returns a client with the given name that can send lock and unlock requests for keys.
144              
145             =cut
146             sub client {
147 21     21 1 6001231 my( $self, $name, $args ) = @_;
148 21         553 Lock::Server::Client->new( $name, $self->{host}, $self->{port}, $args );
149             }
150              
151             =head2 ping
152              
153             Returns '1' if this lock server is up and running
154              
155             =cut
156             sub ping {
157 8     8 1 33 return shift->client("PING")->ping;
158             }
159              
160             =head2 stop
161              
162             Kills the lock server, breaking off any connections that are waiting for a lock.
163              
164             =cut
165             sub stop {
166 3     3 1 4 my $self = shift;
167              
168 3         26 _log( " with '$self->{listener_socket}' socket" );
169 3 50       14 $self->{listener_socket}->close if $self->{listener_socket};
170              
171 3 100       53 if( my $pid = $self->{server_pid} ) {
172 2         14 $self->{error} = "Sending INT signal to lock server of pid '$pid'";
173 2         6 _log( " Killing lock server proc $pid" );
174 2         38 kill 'INT', $pid;
175              
176 2         12 my $res = waitpid( $pid, WNOHANG );
177              
178 2         6 _log( " STOP DONE" );
179             } else {
180 1         8 $self->{error} = "No lock server running";
181 1         6 return 0;
182             }
183              
184 2         4 return 1;
185             }
186              
187             =head2 start
188              
189             Starts the lock server in a child process, opening up a
190             tcpip socket and returning the child pid or 0 if there
191             was an error.
192              
193             =cut
194             sub start {
195 11     11 1 165 my $self = shift;
196 11         51 my $sock = $self->_create_listener_socket;
197 11         24 $self->{listener_socket} = $sock;
198 11 50       31 die "Unable to open lockserver socket $@,$! " unless $sock;
199              
200 11 100       7115 if( my $pid = fork ) {
201             # parent
202 7         105 $self->{server_pid} = $pid;
203 7         227 return $pid;
204             }
205              
206             # child process
207 4         323 $0 = "LockServer";
208 4         188 $self->_run_loop( $sock );
209 1         159 exit;
210             } #start
211              
212             =head2 run
213              
214             Runs the lock server.
215              
216             =cut
217             sub run {
218 0     0 1 0 my $self = shift;
219 0         0 my $sock = $self->_create_listener_socket;
220 0         0 $self->{listener_socket} = $sock;
221 0 0       0 die "Unable to open lockserver socket $@,$! " unless $sock;
222 0         0 $self->_run_loop( $sock );
223 0         0 exit;
224             } #run
225              
226             sub _create_listener_socket {
227 11     11   22 my $self = shift;
228              
229              
230 11         11 my( $listener_socket, $count );
231              
232 11         51 my $st = time;
233            
234 11   33     151 until( $listener_socket || $count++ > $self->{attempts} ) {
235 17         287 $listener_socket = new IO::Socket::INET(
236             Listen => 10,
237             Type => SOCK_STREAM,
238             LocalAddr => "$self->{host}:$self->{port}",
239             );
240 17 100       5498 last if $listener_socket;
241 6         340 print STDERR "Unable to open the lock server socket $@, $!. Retry $count of 10\n";
242 6 50 33     60000740 sleep $count*$self->{time_between_attempts} unless $listener_socket || $count > $self->{attempts};
243             }
244 11 50       33 unless( $listener_socket ) {
245              
246 0         0 $self->{error} = "Unable to open socket on port '$self->{port}' : $! $@\n";
247 0         0 _log( "unable to start lock server : $@ $!." );
248 0         0 return 0;
249             }
250              
251             # if this is cancelled, make sure all child procs are killed too
252             $SIG{TERM} = $SIG{INT} = sub {
253 1     1   783 _log( "lock server : got INT signal. Shutting down." );
254 1 50       8 $listener_socket && $listener_socket->close;
255              
256 1         24 kill 'INT', keys %{ $self->{_pids} };
  1         18  
257              
258 1         19 while( (my $kidpid = waitpid( -1, WNOHANG ) ) > 0 ) {
259 2         8 _log( " Killed $kidpid" );
260             }
261 1         5 $self->{_pids} = {};
262 1         4 _log( "lock server : got INT signal. EXITING." );
263 1         74 exit;
264 11         198 };
265 11         124 setsockopt( $listener_socket, SOL_SOCKET, SO_KEEPALIVE, 1 );
266 11         22 return $listener_socket;
267             } #_create_listener_socket
268              
269             sub _run_loop {
270 4     4   46 my( $self, $listener_socket ) = @_;
271 4         323 while( my $connection = $listener_socket->accept ) {
272 104         15377236 my $req = <$connection>;
273 104         933 $req =~ s/\s+$//s;
274 104         358 _log( "lock server : incoming request : '$req'" );
275             # could have headers, but ignore those. Find \n\n
276 104         288 while( my $data = <$connection> ) {
277 104         124 chomp $data;
278 104 50       271 last unless $data =~ /\S/;
279             }
280              
281 104         491 my( $cmd, $key, $locker_id ) = split( '/', substr( $req, 5 ) );
282 104 100       408 if( $cmd eq 'CHECK' ) {
    100          
    100          
    100          
    100          
    50          
283 19         72 $self->_check( $connection, $key );
284             } elsif( $cmd eq 'LOCK' ) {
285 22         66 $self->_lock( $connection, $key, $locker_id );
286             } elsif( $cmd eq 'UNLOCK' ) {
287 17         95 $self->_unlock( $connection, $key, $locker_id );
288             } elsif( $cmd eq 'VERIFY' ) {
289 35         119 $self->_verify( $connection, $key, $locker_id );
290             } elsif( $cmd eq 'PING' ) {
291 9         2763 print $connection "1\n";
292 9         88 $connection->close;
293             } elsif( $cmd eq 'SHUTDOWN') {
294 2 100       10 if( $self->{allow_shutdown}) {
295 1         33 print $connection "1\n";
296 1         6 $connection->close;
297 1         23 $self->stop;
298             } else {
299 1         3 _log( "lock server : got shutdown request but not configured to allow it" );
300 1         4 $connection->close;
301             }
302             } else {
303 0         0 _log( "lock server : did not understand command '$cmd'" );
304 0         0 $connection->close;
305             }
306             } #while still getting connections
307             } #_run_loop
308              
309             sub _check {
310 19     19   19 my( $self, $connection, $key_to_check ) = @_;
311 19         33 _log( "locker server check for key '$key_to_check'" );
312              
313 19   100     97 $self->{_locks}{$key_to_check} ||= [];
314 19         26 my $lockers = $self->{_locks}{$key_to_check};
315              
316            
317             #check for timed out lockers
318 19         29 my $t = time;
319 19   100     120 while( @$lockers && $t > $self->{_locker_counts}{$lockers->[0]}{$key_to_check} ) {
320 1         10 _log( "lock server _check : '$key_to_check' timed out for locker '$lockers->[0]'" );
321 1 50       2 if( 1 == keys %{ $self->{_locker_counts}{$lockers->[0]} } ) {
  1         17  
322 0         0 delete $self->{_locker_counts}{$lockers->[0]};
323             } else {
324 1         3 delete $self->{_locker_counts}{$lockers->[0]}{$key_to_check};
325             }
326 1         7 shift @$lockers;
327             }
328              
329              
330 19 100       41 if( @$lockers ) {
331 12         246 print $connection "1\n";
332             } else {
333 7         155 print $connection "0\n";
334             }
335 19         82 $connection->close;
336             }
337              
338             sub _log {
339 299     299   317 my $msg = shift;
340 299         590 $msg = "($$) $msg";
341 299 50       803 print STDERR "Lock::Server : $msg\n" if $Lock::Server::DEBUG;
342             }
343              
344             sub _lock {
345 22     22   45 my( $self, $connection, $key_to_lock, $locker_id ) = @_;
346              
347 22         56 _log( "lock request : for '$locker_id' and key '$key_to_lock'" );
348              
349 22   100     102 $self->{_locks}{$key_to_lock} ||= [];
350 22         28 my $lockers = $self->{_locks}{$key_to_lock};
351             #check for timed out lockers
352 22         20 my $t = time;
353              
354 22   66     118 while( @$lockers && $t > $self->{_locker_counts}{$lockers->[0]}{$key_to_lock} ) {
355 0         0 _log( "lock '$key_to_lock' timed out for locker '$lockers->[0]'" );
356 0 0       0 if( 1 == keys %{ $self->{_locker_counts}{$lockers->[0]} } ) {
  0         0  
357 0         0 delete $self->{_locker_counts}{$lockers->[0]};
358             } else {
359 0         0 delete $self->{_locker_counts}{$lockers->[0]}{$key_to_lock};
360             }
361 0         0 shift @$lockers;
362             }
363              
364 22 100       47 if( 0 < (grep { $_ eq $locker_id } @$lockers) ) {
  8         35  
365 3         9 _log( "lock request error. '$locker_id' already in the lock queue" );
366 3         63 print $connection "0\n";
367 3         12 $connection->close;
368 3         84 return;
369             }
370              
371             # store when this times out
372 19         41 my $timeout_time = time + $self->{lock_timeout};
373 19         61 $self->{_locker_counts}{$locker_id}{$key_to_lock} = $timeout_time;
374 19         27 push @$lockers, $locker_id;
375              
376 19         59 _log( "lock request : there are now ".scalar(@$lockers)." lockers" );
377 19 100       45 if( @$lockers > 1 ) {
378 5 100       3163 if( (my $pid=fork)) {
379 3         44 $self->{_id2pid}{$locker_id} = $pid;
380 3         42 $self->{_pids}{$pid} = 1;
381 3         95 _log( "lock request : parent process associating '$locker_id' with pid '$pid' ".scalar(@$lockers)." lockers" );
382             # parent
383             } else {
384             # use Devel::SimpleProfiler;Devel::SimpleProfiler::start;
385 2         128 $0 = "LockServer processing request";
386             $SIG{INT} = sub {
387 0     0   0 _log( "lock request : child got INT, exiting." );
388 0         0 $connection->close;
389 0         0 exit;
390 2         163 };
391             $SIG{HUP} = sub {
392 1     1   7 _log( "lock request : child got HUP, so is now locked. exiting" );
393 1         95 $connection->print( "$timeout_time\n" );
394 1         70 $connection->close;
395 1         148 exit;
396 2         57 };
397 2         43 _log( "lock request : child ready to wait" );
398 2         7008236 sleep $self->{lock_attempt_timeout};
399 2         83 _log( "lock request failed : child timed out" );
400 1         50 print $connection "0\n";
401 1         19 $connection->close;
402 1         180 exit;
403             }
404             } else {
405 14         22 _log( "lock request : no need to invoke more processes. locking" );
406 14         320 print $connection "$timeout_time\n";
407 14         54 $connection->close;
408             }
409             } #_lock
410              
411             sub _unlock {
412 17     17   26 my( $self, $connection, $key_to_unlock, $locker_id ) = @_;
413 17         53 _log( "unlock request : key '$key_to_unlock' for locker '$locker_id'" );
414              
415 17   50     66 $self->{_locks}{$key_to_unlock} ||= [];
416 17         26 my $lockers = $self->{_locks}{$key_to_unlock};
417              
418 17 100       44 if( $lockers->[0] eq $locker_id ) {
419 11         19 shift @$lockers;
420 11         26 delete $self->{_locker_counts}{$locker_id}{$key_to_unlock};
421 11 100       14 if( 0 == scalar(keys %{$self->{_locker_counts}{$locker_id}}) ) {
  11         65  
422 7         24 _log( "unlock : remove information about '$locker_id'" );
423 7         11 delete $self->{_id2pid}{$locker_id};
424 7         28 delete $self->{_locker_counts}{$locker_id};
425             }
426 11         40 _log( "unlocking '$locker_id'" );
427 11 100       28 if( @$lockers ) {
428 2         6 my $next_locker_id = $lockers->[0];
429 2         12 my $pid = $self->{_id2pid}{$next_locker_id};
430 2         12 _log( "unlock : next locker in queue is '$next_locker_id'. Sending kill signal to its pid '$pid'" );
431 2         82 kill 'HUP', $pid;
432             } else {
433 9         29 _log( "unlock : now no one waiting on a lock for key '$key_to_unlock'" );
434             }
435 11         22 _log( "unlock : done, informing connection" );
436 11         270 print $connection "1\n";
437 11         55 $connection->close;
438             } else {
439 6         18 _log( "unlock error : Wrong locker_id to unlock for unlock for locker '$locker_id' and key '$key_to_unlock'. The locker_id must be the one at the front of the queue" );
440             # "Wrong locker_id to unlock. The locker_id must be the one at the front of the queue";
441 6         111 print $connection "0\n";
442 6         24 $connection->close;
443             }
444             } #_unlock
445              
446             sub _verify {
447 35     35   50 my( $self, $connection, $key_to_check, $locker_id ) = @_;
448              
449 35         86 _log( "verify : locker server check for key '$key_to_check' for locker '$locker_id'" );
450              
451 35   50     90 $self->{_locks}{$key_to_check} ||= [];
452 35         37 my $lockers = $self->{_locks}{$key_to_check};
453              
454             #check for timed out lockers
455 35         59 my $t = time;
456 35   66     164 while( @$lockers && $t > $self->{_locker_counts}{$lockers->[0]}{$key_to_check} ) {
457 0         0 _log( "verify: '$key_to_check' timed out for locker '$lockers->[0]'" );
458 0 0       0 if( 1 == keys %{ $self->{_locker_counts}{$lockers->[0]} } ) {
  0         0  
459 0         0 delete $self->{_locker_counts}{$lockers->[0]};
460             } else {
461 0         0 delete $self->{_locker_counts}{$lockers->[0]}{$key_to_check};
462             }
463 0         0 shift @$lockers;
464             }
465              
466 35 100       70 if( $lockers->[0] eq $locker_id ) {
467 17         324 print $connection "1\n";
468             } else {
469 18         346 print $connection "0\n";
470             }
471 35         108 $connection->close;
472             }
473              
474              
475              
476             =head1 Helper package
477              
478             =head2 NAME
479              
480             Lock::Server::Client - client for locking server.
481              
482             =head2 DESCRIPTION
483              
484             Sends request to a Lock::Server to lock, unlock and check locks.
485              
486             =head2 METHODS
487              
488             =cut
489             package Lock::Server::Client;
490              
491 9     9   36 use strict;
  9         9  
  9         180  
492 9     9   54 use warnings;
  9         9  
  9         216  
493 9     9   27 no warnings 'uninitialized';
  9         9  
  9         243  
494              
495 9     9   27 use IO::Socket::INET;
  9         36  
  9         27  
496              
497             =head3 new( lockername, host, port )
498              
499             Creates a client object with the given name for the host and port.
500              
501             =cut
502             sub new {
503 22     22   130 my( $pkg, $lockerName, $host, $port, $args ) = @_;
504 22 50       179 die "Must supply locker name" unless $lockerName;
505              
506 22   50     69 $host ||= '127.0.0.1';
507 22   50     65 $port ||= '8004';
508              
509 22   33     275 my $class = ref( $pkg ) || $pkg;
510             bless {
511             host => $host,
512             port => $port,
513             name => $lockerName,
514             attempts => $args->{reconnect_attemps} || 3,
515 22   50     603 time_between_attempts => $args->{time_between_attempts} || 3,
      50        
516             }, $class;
517             } #new
518              
519             sub _get_sock {
520 161     161   148 my $self = shift;
521 161   66     411 my $attempts = shift || $self->{attempts};
522              
523             # try a few times, then give up
524 161         108 my( $sock, $count );
525 161   66     737 until( $sock || $count++ > $attempts ) {
526 161         1330 $sock = new IO::Socket::INET( "$self->{host}:$self->{port}" );
527 161 100 66     12053063 sleep $self->{time_between_attempts}*($count) unless $sock || $count > $attempts;
528             }
529 157 50       211 die "Could not connect : $@" unless $sock;
530 157         743 binmode $sock, ':utf8';
531 157         184 $sock;
532             }
533              
534             =head3 isLocked( key )
535              
536             Returns true if the key is locked by anyone.
537              
538             =cut
539             sub isLocked {
540 27     27   5004388 my( $self, $key ) = @_;
541 27         43 my $sock = $self->_get_sock;
542              
543 27         135 $sock->print( "GET /CHECK/$key\n\n" );
544 27         6012 my $resp = <$sock>;
545 27         147 $sock->close;
546 27         939 chomp $resp;
547 27         153 $resp;
548             }
549              
550             =head3 lockedByMe( key )
551              
552             Returns true if the key is locked by this client or
553             anyone with the name of this client. The name was given in the constructor.
554              
555             =cut
556             sub lockedByMe {
557 58     58   147 my( $self, $key ) = @_;
558 58         104 my $sock = $self->_get_sock;
559              
560 58         261 $sock->print( "GET /VERIFY/$key/$self->{name}\n\n" );
561 58         11689 my $resp = <$sock>;
562 58         188 $sock->close;
563 58         1992 chomp $resp;
564 58         330 $resp;
565             }
566              
567             =head3 lock( key )
568              
569             Attempt to get the lock for the given key. Returns true if the lock
570             was obtained.
571              
572             =cut
573             sub lock {
574 24     24   94 my( $self, $key ) = @_;
575 24         59 my $sock = $self->_get_sock;
576              
577 24         135 $sock->print( "GET /LOCK/$key/$self->{name}\n\n" );
578 24         7021600 my $resp = <$sock>;
579 24         181 $sock->close;
580 24         969 chomp $resp;
581 24         170 $resp;
582             }
583              
584             =head3 unlock( key )
585              
586             Attempt to get unlock the given key. Returns true if the
587             key was locked to this client ( or someting with the same name ).
588              
589             =cut
590             sub unlock {
591 27     27   2010460 my( $self, $key ) = @_;
592 27         48 my $sock = $self->_get_sock;
593 27         180 $sock->print( "GET /UNLOCK/$key/$self->{name}\n\n" );
594 27         6650 my $resp = <$sock>;
595 27         99 $sock->close;
596 27         989 chomp $resp;
597 27         157 $resp;
598             }
599              
600             sub ping {
601 22     22   1000395 my( $self, $timeout ) = @_;
602              
603 22   50     150 $timeout //= 3;
604              
605            
606 22     6   498 local $SIG{ALRM} = sub { die "ALARM\n" };
  6         285  
607 22         521 alarm $timeout;
608 22         57 my $resp = '0';
609 22         109 eval {
610 22         58 my $sock = $self->_get_sock( 1 );
611 18         191 $sock->print( "GET /PING\n\n" );
612 18         6004405 $resp = <$sock>;
613 16         38 alarm 0;
614 16         113 $sock->close;
615             };
616 22         5952 chomp $resp;
617 22         388 $resp;
618             } #ping
619              
620             sub shutdown {
621 3     3   7 my( $self, $timeout ) = @_;
622              
623 3   50     18 $timeout //= 3;
624              
625 3     0   39 local $SIG{ALRM} = sub { die "ALARM\n" };
  0         0  
626 3         11 alarm $timeout;
627 3         3 eval {
628 3         12 my $sock = $self->_get_sock( 1 );
629 3         11 $sock->print( "GET /SHUTDOWN\n\n" );
630 3         70 alarm 0;
631 3         19 $sock->close;
632             };
633 3         98 $@;
634             } #shutdown
635              
636              
637             1;
638              
639              
640             __END__