File Coverage

blib/lib/Net/Clacks/Client.pm
Criterion Covered Total %
statement 56 752 7.4
branch 0 326 0.0
condition 0 111 0.0
subroutine 19 50 38.0
pod 29 29 100.0
total 104 1268 8.2


line stmt bran cond sub pod time code
1             package Net::Clacks::Client;
2             #---AUTOPRAGMASTART---
3 1     1   29 use 5.020;
  1         3  
4 1     1   5 use strict;
  1         3  
  1         19  
5 1     1   5 use warnings;
  1         2  
  1         34  
6 1     1   5 use diagnostics;
  1         2  
  1         10  
7 1     1   39 use mro 'c3';
  1         2  
  1         9  
8 1     1   25 use English;
  1         2  
  1         8  
9 1     1   485 use Carp;
  1         2  
  1         65  
10             our $VERSION = 24;
11 1     1   5 use autodie qw( close );
  1         2  
  1         8  
12 1     1   385 use Array::Contains;
  1         3  
  1         47  
13 1     1   5 use utf8;
  1         3  
  1         6  
14 1     1   36 use Encode qw(is_utf8 encode_utf8 decode_utf8);
  1         2  
  1         54  
15 1     1   6 use feature 'signatures';
  1         2  
  1         87  
16 1     1   5 no warnings qw(experimental::signatures);
  1         2  
  1         49  
17             #---AUTOPRAGMAEND---
18              
19 1     1   624 use IO::Socket::IP;
  1         32111  
  1         5  
20             #use IO::Socket::UNIX;
21 1     1   1221 use Time::HiRes qw[sleep usleep];
  1         1500  
  1         5  
22 1     1   747 use Sys::Hostname;
  1         956  
  1         48  
23 1     1   424 use IO::Select;
  1         1527  
  1         72  
24 1     1   802 use IO::Socket::SSL;
  1         45475  
  1         11  
25 1     1   839 use MIME::Base64;
  1         806  
  1         8285  
26              
27 0     0 1   sub new($class, $server, $port, $username, $password, $clientname, $iscaching = 0) {
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
28 0           my $self = bless {}, $class;
29              
30 0 0 0       if(!defined($server) || !length($server)) {
31 0           croak("server not defined!");
32             }
33 0 0 0       if(!defined($port) || !length($port)) {
34 0           croak("port not defined!");
35             }
36 0 0 0       if(!defined($username) || !length($username)) {
37 0           croak("username not defined!");
38             }
39 0 0 0       if(!defined($password) || !length($password)) {
40 0           croak("password not defined!");
41             }
42 0 0 0       if(!defined($clientname) || !length($clientname)) {
43 0           croak("clientname not defined!");
44             }
45              
46 0           $self->{server} = $server;
47 0           $self->{port} = $port;
48              
49 0           $self->init($username, $password, $clientname, $iscaching);
50              
51 0           return $self;
52             }
53              
54 0     0 1   sub newSocket($class, $socketpath, $username, $password, $clientname, $iscaching = 0) {
  0            
  0            
  0            
  0            
  0            
  0            
  0            
55 0           my $self = bless {}, $class;
56              
57 0 0 0       if(!defined($socketpath) || !length($socketpath)) {
58 0           croak("socketpath not defined!");
59             }
60 0 0 0       if(!defined($username) || !length($username)) {
61 0           croak("username not defined!");
62             }
63 0 0 0       if(!defined($password) || !length($password)) {
64 0           croak("password not defined!");
65             }
66 0 0 0       if(!defined($clientname) || !length($clientname)) {
67 0           croak("clientname not defined!");
68             }
69              
70 0           my $udsloaded = 0;
71 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
72 0           require IO::Socket::UNIX;
73 0           $udsloaded = 1;
74             };
75 0 0         if(!$udsloaded) {
76 0           croak("Specified a unix domain socket, but i couldn't load IO::Socket::UNIX!");
77             }
78              
79 0           $self->{socketpath} = $socketpath;
80              
81 0           $self->init($username, $password, $clientname, $iscaching);
82              
83 0           return $self;
84             }
85              
86 0     0 1   sub init($self, $username, $password, $clientname, $iscaching) {
  0            
  0            
  0            
  0            
  0            
  0            
87 0 0 0       if(!defined($username) || $username eq '') {
88 0           croak("Username not defined!");
89             }
90 0 0 0       if(!defined($password) || $password eq '') {
91 0           croak("Password not defined!");
92             }
93              
94 0 0 0       if(!defined($clientname || $clientname eq '')) {
95 0           croak("Clientname not defined!");
96             }
97 0           $self->{clientname} = $clientname;
98              
99 0           $self->{authtoken} = encode_base64($username, '') . ':' . encode_base64($password, '');
100              
101 0 0         if(!defined($iscaching)) {
102 0           $iscaching = 0;
103             }
104 0           $self->{iscaching} = $iscaching;
105              
106 0 0         if($self->{iscaching}) {
107 0           $self->{cache} = {};
108             }
109              
110 0           $self->{needreconnect} = 1;
111 0           $self->{inlines} = [];
112 0           $self->{firstconnect} = 1;
113              
114 0           $self->{memcached_compatibility} = 0;
115              
116             $self->{remembrancenames} = [
117 0           'Ivy Bdubs',
118             'Terry Pratchett',
119             'Sven Guckes',
120             'Sheila', # faithful four-legged family member of @NightStorm_KPC
121             ];
122 0           $self->{remembranceinterval} = 3600; # One hour
123 0           $self->{nextremembrance} = time + $self->{remembranceinterval};
124              
125 0           $self->reconnect();
126              
127 0           return;
128             }
129              
130 0     0 1   sub reconnect($self) {
  0            
  0            
131 0 0         if(defined($self->{socket})) {
132 0           delete $self->{socket};
133             }
134              
135 0 0         if(!$self->{firstconnect}) {
136             # Not our first connection (=real reconnect).
137             # wait a short random time before reconnecting. In case all
138             # clients got disconnected, we want to avoid having all clients reconnect
139             # at the exact same time
140 0           my $waittime = rand(4000)/1000;
141 0           sleep($waittime);
142             }
143              
144 0           my $socket;
145 0 0 0       if(defined($self->{server}) && defined($self->{port})) {
    0          
146             $socket = IO::Socket::IP->new(
147             PeerHost => $self->{server},
148             PeerPort => $self->{port},
149 0 0         Type => SOCK_STREAM,
150             ) or croak("Failed to connect to Clacks TCP message service: $ERRNO");
151             } elsif(defined($self->{socketpath})) {
152             $socket = IO::Socket::UNIX->new(
153             Peer => $self->{socketpath},
154 0 0         Type => SOCK_STREAM,
155             ) or croak("Failed to connect to Clacks Unix Domain Socket message service: $ERRNO");
156             } else {
157 0           croak("Neither TCP nor Unix domain socket specified. Don't know where to connect to.");
158             }
159              
160             #binmode($socket, ':bytes');
161 0           $socket->blocking(0);
162              
163              
164 0 0         if(ref $socket ne 'IO::Socket::UNIX') {
165             # ONLY USE SSL WHEN RUNNING OVER THE NETWORK
166             # There is simply no point in running it over a local socket.
167 0 0         IO::Socket::SSL->start_SSL($socket,
168             SSL_verify_mode => SSL_VERIFY_NONE,
169             ) or croak("Can't use SSL: " . $SSL_ERROR);
170             }
171              
172 0           $self->{socket} = $socket;
173 0           $self->{selector} = IO::Select->new($self->{socket});
174 0           $self->{failcount} = 0;
175 0           $self->{lastping} = time;
176 0           $self->{inbuffer} = '';
177 0           $self->{incharbuffer} = [];
178 0           $self->{outbuffer} = '';
179 0           $self->{serverinfo} = 'UNKNOWN';
180 0           $self->{needreconnect} = 0;
181 0           $self->{firstline} = 1;
182 0           $self->{headertimeout} = time + 15;
183              
184             # Do *not* nuke "inlines" array, since it may hold "QUIT" messages that the client wants to handle, for example, to re-issue
185             # "LISTEN" commands.
186             # $self->{inlines} = ();
187              
188 0 0         if($self->{firstconnect}) {
189 0           $self->{firstconnect} = 0;
190             } else {
191 0           push @{$self->{inlines}}, "RECONNECTED";
  0            
192             }
193              
194             # Startup "handshake". As everything else, this is asyncronous, both server and
195             # client send their respective version strings and then wait to recieve their counterparts
196             # Also, this part is REQUIRED, just to make sure we actually speek to CLACKS protocol
197 0           $self->{outbuffer} .= 'CLACKS ' . $self->{clientname} . "\r\n";
198 0           $self->{outbuffer} .= 'OVERHEAD A ' . $self->{authtoken} . "\r\n";
199 0           $self->doNetwork();
200              
201 0           return;
202             }
203              
204 0     0 1   sub activate_memcached_compat($self) {
  0            
  0            
205 0           $self->{memcached_compatibility} = 1;
206 0           return;
207             }
208              
209 0     0 1   sub getRawSocket($self) {
  0            
  0            
210 0 0         if($self->{needreconnect}) {
211 0           $self->reconnect();
212             }
213              
214 0           return $self->{socket};
215             }
216              
217 0     0 1   sub doNetwork($self, $readtimeout = 0) {
  0            
  0            
  0            
218 0 0         if(!defined($readtimeout)) {
219             # Don't wait
220 0           $readtimeout = 0;
221             }
222              
223 0 0         if($self->{needreconnect}) {
224 0           $self->reconnect();
225             }
226              
227 0 0 0       if($self->{nextremembrance} && time > $self->{nextremembrance}) {
228             # A person is not dead while their name is still spoken.
229 0           $self->{nextremembrance} = time + $self->{remembranceinterval} + int(rand($self->{remembranceinterval} / 10));
230 0           my $neverforget = $self->{remembrancenames}->[rand @{$self->{remembrancenames}}];
  0            
231 0           $self->{outbuffer} .= 'OVERHEAD GNU ' . $neverforget . "\r\n";
232             }
233              
234             # doNetwork interleaves handling incoming and outgoing traffic.
235             # This is only relevant on slow links.
236             #
237             # It returns even if the outgoing or incoming buffers are not empty
238             # (meaning that partially buffered data can exists). This way we use the
239             # available bandwidth without blocking unduly the application (we assume it's a realtime
240             # application with multiple things going on at the same time)-
241             #
242             # The downside of this is that doNetwork() needs to be called on a regular basis and sending
243             # and recieving might be delayed until the next cycle. This delay can be minimized by simply
244             # not transfering huge values over clacks, but instead using it the way it was intended to be used:
245             # Small variables can be SET directly by clacks, huge datasets should be stored in the
246             # database and the recievers only NOTIFY'd that a change has taken place.
247             #
248             # The big exception here is the here is the ClacksCache part of the story. These functions
249             # call doNetwork() in a loop until the outbuffer is empty. And depending on requirement, they
250             # KEEP on calling doNetwork() until the answer is recieved. This makes ClacksCache functions
251             # syncronous and causes some delay in the calling function. But doing these asyncronous will
252             # cause more headaches, maybe even leading up to insanity. Believe me, i tried, but those pink
253             # elephants stomping about my rubber-padded room are such a distraction...
254              
255 0           my $workCount = 0;
256              
257 0 0         if(length($self->{outbuffer})) {
258 0           my $brokenpipe = 0;
259 0           my $writeok = 0;
260 0           my $written;
261 0           eval {
262 0     0     local $SIG{PIPE} = sub { $brokenpipe = 1; };
  0            
263 0           $written = syswrite($self->{socket}, $self->{outbuffer});
264 0           $writeok = 1;
265             };
266              
267 0 0 0       if($brokenpipe || !$writeok) {
268 0           $self->{needreconnect} = 1;
269 0           push @{$self->{inlines}}, "TIMEOUT";
  0            
270 0           return;
271             }
272              
273 0 0 0       if(defined($written) && $written) {
274 0           $workCount += $written;
275 0 0         if(length($self->{outbuffer}) == $written) {
276 0           $self->{outbuffer} = '';
277             } else {
278 0           $self->{outbuffer} = substr($self->{outbuffer}, $written);
279             }
280             }
281              
282             }
283              
284             {
285 0           my $select = IO::Select->new($self->{socket});
  0            
286 0           my @temp;
287 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
288 0           @temp = $self->{selector}->can_read($readtimeout);
289             };
290 0 0         if(scalar @temp == 0) {
291             # Timeout
292 0           return $workCount;
293             }
294             }
295              
296 0           my $totalread = 0;
297 0           while(1) {
298 0           my $buf;
299 0           my $readok = 0;
300 0           eval {
301 0           sysread($self->{socket}, $buf, 10_000); # Read in at most 10kB at once
302 0           $readok = 1;
303             };
304 0 0         if(!$readok) {
305 0           $self->{needreconnect} = 1;
306 0           push @{$self->{inlines}}, "TIMEOUT";
  0            
307 0           return;
308             }
309 0 0 0       if(defined($buf) && length($buf)) {
310 0           $totalread += length($buf);
311             #print STDERR "+ $buf\n--\n";
312 0           push @{$self->{incharbuffer}}, split//, $buf;
  0            
313 0           next;
314             }
315 0           last;
316             }
317            
318             # Check if we actually got data after checking with can_read() first
319 0 0         if($totalread) {
320 0           $self->{failcount} = 0;
321             } else {
322             # This should normally not happen, but thanks to SSL, it sometimes does
323             # We ignore single instances of those but disconnect if many happen in a row
324 0           $self->{failcount}++;
325 0           sleep(0.05);
326            
327 0 0         if($self->{failcount} > 5) {
328 0           $self->{needreconnect} = 1;
329 0           return;
330             }
331             }
332 0           while(@{$self->{incharbuffer}}) {
  0            
333 0           my $char = shift @{$self->{incharbuffer}};
  0            
334 0           $workCount++;
335 0 0         if($char eq "\r") {
    0          
336 0           next;
337             } elsif($char eq "\n") {
338 0 0         if($self->{inbuffer} eq 'NOP') { # Just drop "No OPerations" packets, only used by server to
339             # verify that the connection is still active
340             #$self->{firstline}
341 0           $self->{inbuffer} = '';
342 0           next;
343             }
344              
345 0 0         if($self->{firstline}) {
346 0 0         if($self->{inbuffer} !~ /^CLACKS\ /) {
347             # Whoops, not a clacks server or something gone wrong with the protocol
348 0           $self->{needreconnect} = 1;
349 0           return 0;
350             } else {
351 0           $self->{firstline} = 0;
352             }
353             }
354              
355 0           push @{$self->{inlines}}, $self->{inbuffer};
  0            
356 0           $self->{inbuffer} = '';
357             } else {
358 0           $self->{inbuffer} .= $char;
359             }
360             }
361              
362 0 0 0       if($self->{firstline} && $self->{headertimeout} < time) {
363 0           $self->{needreconnect} = 1;
364 0           return 0;
365             }
366              
367 0           return $workCount;
368             }
369              
370             my %overheadflags = (
371             A => "auth_token", # Authentication token
372             O => "auth_ok", # Authentication OK
373             F => "auth_failed", # Authentication FAILED
374              
375             E => 'error_message', # Server to client error message
376              
377             C => "close_all_connections",
378             D => "discard_message",
379             G => "forward_message",
380             I => "set_interclacks_mode", # value: true/false, disables 'G' and 'U'
381             M => "informal_message", # informal message
382             N => "no_logging",
383             S => "shutdown_service", # value: positive number (number in seconds before shutdown). If interclacks clients are present, should be high
384             # enough to flush all buffers to them
385             U => "return_to_sender",
386             Z => "no_flags", # Only sent when no other flags are set
387             );
388              
389 0     0 1   sub getNext($self) {
  0            
  0            
390             # Recieve next incoming message (if any)
391              
392             restartgetnext:
393 0           my $line = shift @{$self->{inlines}};
  0            
394              
395 0 0         if(!defined($line)) {
396 0           return;
397             }
398              
399 0           my %data;
400             #print STDERR "> $line\n";
401 0 0         if($line =~ /^NOTIFY\ (.+)/) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
402 0           %data = (
403             type => 'notify',
404             name => $1,
405             );
406             } elsif($line =~ /^SET\ (.+?)\=(.*)/) {
407 0           %data = (
408             type => 'set',
409             name => $1,
410             data => $2,
411             );
412             } elsif($line =~ /^CLACKS\ (.+)/) {
413 0           %data = (
414             type => 'serverinfo',
415             data => $1,
416             );
417             } elsif($line =~ /^DEBUG\ (.+?)\=(.*)/) {
418 0           %data = (
419             type => 'debug',
420             host => $1,
421             command => $2,
422             );
423             } elsif($line =~ /^QUIT/) {
424 0           %data = (
425             type => 'disconnect',
426             data => 'quit',
427             );
428 0           $self->{needreconnect} = 1;
429             } elsif($line =~ /^TIMEOUT/) {
430 0           %data = (
431             type => 'disconnect',
432             data => 'timeout',
433             );
434 0           $self->{needreconnect} = 1;
435             } elsif($line =~ /^RECONNECTED/) {
436 0           %data = (
437             type => 'reconnected',
438             data => 'send your LISTEN requests again',
439             );
440             } elsif($line =~ /^OVERHEAD\ (.+?)\ (.+)/) {
441             # Minimal handling of OVERHEAD flags
442 0           my ($flags, $value) = ($1, $2);
443 0           my @flagparts = split//, $flags;
444 0           my %parsedflags;
445 0           foreach my $key (sort keys %overheadflags) {
446 0 0         if(contains($key, \@flagparts)) {
447 0           $parsedflags{$overheadflags{$key}} = 1;
448             } else {
449 0           $parsedflags{$overheadflags{$key}} = 0;
450             }
451             }
452              
453 0 0         if($parsedflags{auth_ok}) {
    0          
    0          
    0          
454             #print STDERR "Clacks AUTH OK\n";
455 0           goto restartgetnext; # try the next message
456             } elsif($parsedflags{error_message}) {
457 0           %data = (
458             type => 'error_message',
459             data => $value,
460             );
461             } elsif($parsedflags{auth_failed}) {
462 0           croak("Clacks Authentication failed!");
463             } elsif($parsedflags{informal_message}) {
464 0 0         if($parsedflags{forward_message}) {
465 0           %data = (
466             type => 'informal',
467             data => $value,
468             );
469             }
470 0 0         if($parsedflags{return_to_sender}) {
471 0           my $uturn = 'OVERHEAD M';
472 0 0         if($parsedflags{no_logging}) {
473 0           $uturn .= 'N';
474             }
475 0           $uturn .= ' ' . $value;
476 0           $self->{outbuffer} .= $uturn;
477             }
478             }
479              
480             } else {
481             # UNKNOWN, ignore
482 0           goto restartgetnext; # try the next message
483             }
484              
485 0 0         if(!defined($data{type})) {
486 0           return;
487             }
488              
489 0           $data{rawline} = $line;
490              
491 0           return \%data;
492             }
493              
494              
495 0     0 1   sub ping($self) {
  0            
  0            
496 0 0         if($self->{lastping} < (time - 120)) {
497             # Only send a ping every 120 seconds or less
498 0           $self->{outbuffer} .= "PING\r\n";
499 0           $self->{lastping} = time;
500             }
501              
502 0           return;
503             }
504              
505 0     0 1   sub disablePing($self) {
  0            
  0            
506 0           $self->{outbuffer} .= "NOPING\r\n";
507              
508 0           return;
509             }
510              
511              
512 0     0 1   sub notify($self, $varname) {
  0            
  0            
  0            
513 0 0 0       if(!defined($varname) || !length($varname)) {
514 0           carp("varname not defined!");
515 0           return;
516             }
517              
518 0 0         if($self->{needreconnect}) {
519 0           $self->reconnect;
520             }
521              
522 0           $self->{outbuffer} .= "NOTIFY $varname\r\n";
523              
524 0 0         if($self->{memcached_compatibility}) {
525 0           while(1) {
526 0           $self->doNetwork();
527 0 0         if($self->{needreconnect}) {
528             # Nothing we can do, really...
529 0           return;
530             }
531 0 0         last if(!length($self->{outbuffer}));
532 0           usleep(1000);
533             }
534 0           $self->autohandle_messages();
535             }
536              
537 0           return;
538             }
539              
540 0     0 1   sub set($self, $varname, $value, $forcesend = 0) { ## no critic (NamingConventions::ProhibitAmbiguousNames)
  0            
  0            
  0            
  0            
  0            
541 0 0 0       if(!defined($varname) || !length($varname)) {
542 0           carp("varname not defined!");
543 0           return;
544             }
545 0 0         if(!defined($value)) {
546 0           carp("value not defined!");
547 0           return;
548             }
549              
550 0 0         if(!defined($forcesend)) {
551 0           $forcesend = 0;
552             }
553              
554 0 0         if($self->{needreconnect}) {
555 0           $self->reconnect;
556             }
557              
558             # Handle caching to lower output volumne
559 0 0 0       if($self->{iscaching} && !$forcesend && defined($self->{cache}->{$varname}) && $self->{cache}->{$varname} eq $value) {
      0        
      0        
560             # Already the same value send
561 0           return;
562             }
563              
564 0 0         if($self->{iscaching}) {
565 0           $self->{cache}->{$varname} = $value;
566             }
567              
568 0           $self->{outbuffer} .= "SET $varname=$value\r\n";
569              
570 0 0         if($self->{memcached_compatibility}) {
571 0           while(1) {
572 0           $self->doNetwork();
573 0 0         if($self->{needreconnect}) {
574             # Nothing we can do, really...
575 0           return;
576             }
577 0 0         last if(!length($self->{outbuffer}));
578 0           usleep(1000);
579             }
580              
581 0           $self->autohandle_messages();
582             }
583              
584 0           return;
585             }
586              
587 0     0 1   sub listen($self, $varname) { ## no critic (Subroutines::ProhibitBuiltinHomonyms)
  0            
  0            
  0            
588 0 0 0       if(!defined($varname) || !length($varname)) {
589 0           carp("varname not defined!");
590 0           return;
591             }
592              
593 0 0         if($self->{needreconnect}) {
594 0           $self->reconnect;
595             }
596              
597 0           $self->{outbuffer} .= "LISTEN $varname\r\n";
598              
599 0           return;
600             }
601              
602 0     0 1   sub unlisten($self, $varname) {
  0            
  0            
  0            
603 0 0 0       if(!defined($varname) || !length($varname)) {
604 0           carp("varname not defined!");
605 0           return;
606             }
607              
608 0 0         if($self->{needreconnect}) {
609 0           $self->reconnect;
610             }
611              
612 0           $self->{outbuffer} .= "UNLISTEN $varname\r\n";
613              
614 0           return;
615             }
616              
617 0     0 1   sub setMonitormode($self, $active) {
  0            
  0            
  0            
618 0 0         if($self->{needreconnect}) {
619 0           $self->reconnect;
620             }
621              
622 0 0 0       if(!defined($active) || !$active) {
623 0           $self->{outbuffer} .= "UNMONITOR\r\n";
624             } else {
625 0           $self->{outbuffer} .= "MONITOR\r\n";
626             }
627              
628 0           return;
629             }
630              
631 0     0 1   sub getServerinfo($self) {
  0            
  0            
632 0           return $self->{serverinfo};
633             }
634              
635             # ---------------- ClackCache handling --------------------
636             # ClacksCache handling always implies doNetwork()
637             # Also, we do NOT use the caching system used for SET
638 0     0 1   sub store($self, $varname, $value) {
  0            
  0            
  0            
  0            
639 0 0 0       if(!defined($varname) || !length($varname)) {
640 0           carp("varname not defined!");
641 0           return;
642             }
643 0 0         if(!defined($value)) {
644 0           carp("value not defined!");
645 0           return;
646             }
647              
648 0 0         if($self->{needreconnect}) {
649 0           $self->reconnect;
650             }
651              
652 0           $self->{outbuffer} .= "STORE $varname=$value\r\n";
653 0           while(1) {
654 0           $self->doNetwork();
655 0 0         if($self->{needreconnect}) {
656             # Nothing we can do, really...
657 0           return;
658             }
659 0 0         last if(!length($self->{outbuffer}));
660 0           usleep(1000);
661             }
662              
663 0 0         if($self->{memcached_compatibility}) {
664 0           $self->autohandle_messages();
665             }
666              
667 0           return;
668             }
669              
670 0     0 1   sub retrieve($self, $varname) {
  0            
  0            
  0            
671 0 0 0       if(!defined($varname) || !length($varname)) {
672 0           carp("varname not defined!");
673 0           return;
674             }
675              
676 0           my $value;
677              
678 0 0         if($self->{needreconnect}) {
679 0           $self->reconnect;
680             }
681              
682 0           $self->{outbuffer} .= "RETRIEVE $varname\r\n";
683              
684             # Make sure we send everything
685 0           while(1) {
686 0           $self->doNetwork();
687 0 0         if($self->{needreconnect}) {
688             # Nothing we can do, really...
689 0           return;
690             }
691 0 0         last if(!length($self->{outbuffer}));
692             }
693              
694             # Now, wait for the answer
695 0           my $answerline;
696 0           while(1) {
697 0           $self->doNetwork(0.5);
698 0 0         if($self->{needreconnect}) {
699             # Nothing we can do, really...
700 0           return;
701             }
702 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
703 0 0 0       if($self->{inlines}->[$i] =~ /^RETRIEVED\ $varname/ || $self->{inlines}->[$i] =~ /^NOTRETRIEVED\ $varname/) {
704             # Remove the answer from in in-queue directly (out of sequence), because we don't need in in the getNext function
705 0           $answerline = splice @{$self->{inlines}}, $i, 1;
  0            
706 0           last;
707             }
708             }
709 0 0         last if(defined($answerline));
710             }
711              
712 0 0         if($answerline =~ /^RETRIEVED\ (.+?)\=(.*)/) {
713 0           my ($key, $val) = ($1, $2);
714 0 0         if($key ne $varname) {
715 0           print STDERR "Retrieved clacks key $key does not match requested varname $varname!\n";
716 0           return;
717             }
718 0           return $val;
719             }
720              
721             # No matching key
722 0           return;
723             }
724              
725 0     0 1   sub remove($self, $varname) {
  0            
  0            
  0            
726 0 0 0       if(!defined($varname) || !length($varname)) {
727 0           carp("varname not defined!");
728 0           return;
729             }
730              
731 0 0         if($self->{needreconnect}) {
732 0           $self->reconnect;
733             }
734              
735 0           $self->{outbuffer} .= "REMOVE $varname\r\n";
736 0           while(1) {
737 0           $self->doNetwork();
738 0 0         if($self->{needreconnect}) {
739             # Nothing we can do, really...
740 0           return;
741             }
742 0 0         last if(!length($self->{outbuffer}));
743 0           usleep(1000);
744             }
745              
746 0 0         if($self->{memcached_compatibility}) {
747 0           $self->autohandle_messages();
748             }
749              
750 0           return;
751             }
752              
753 0     0 1   sub increment($self, $varname, $stepsize = '') {
  0            
  0            
  0            
  0            
754 0 0 0       if(!defined($varname) || !length($varname)) {
755 0           carp("varname not defined!");
756 0           return;
757             }
758              
759 0 0         if($self->{needreconnect}) {
760 0           $self->reconnect;
761             }
762              
763 0 0 0       if(!defined($stepsize) || $stepsize eq '') {
764 0           $self->{outbuffer} .= "INCREMENT $varname\r\n";
765             } else {
766 0           $stepsize = 0 + $stepsize;
767 0           $self->{outbuffer} .= "INCREMENT $varname=$stepsize\r\n";
768             }
769              
770 0           while(1) {
771 0           $self->doNetwork();
772 0 0         if($self->{needreconnect}) {
773             # Nothing we can do, really...
774 0           return;
775             }
776 0 0         last if(!length($self->{outbuffer}));
777 0           usleep(1000);
778             }
779              
780 0 0         if($self->{memcached_compatibility}) {
781 0           $self->autohandle_messages();
782             }
783              
784 0           return;
785             }
786              
787 0     0 1   sub decrement($self, $varname, $stepsize = '') {
  0            
  0            
  0            
  0            
788 0 0 0       if(!defined($varname) || !length($varname)) {
789 0           carp("varname not defined!");
790 0           return;
791             }
792              
793 0 0         if($self->{needreconnect}) {
794 0           $self->reconnect;
795             }
796              
797 0 0 0       if(!defined($stepsize) || $stepsize eq '') {
798 0           $self->{outbuffer} .= "DECREMENT $varname\r\n";
799             } else {
800 0           $stepsize = 0 + $stepsize;
801 0           $self->{outbuffer} .= "DECREMENT $varname=$stepsize\r\n";
802             }
803 0           while(1) {
804 0           $self->doNetwork();
805 0 0         if($self->{needreconnect}) {
806             # Nothing we can do, really...
807 0           return;
808             }
809 0 0         last if(!length($self->{outbuffer}));
810 0           usleep(1000);
811             }
812              
813 0 0         if($self->{memcached_compatibility}) {
814 0           $self->autohandle_messages();
815             }
816              
817 0           return;
818             }
819              
820 0     0 1   sub clearcache($self) {
  0            
  0            
821 0 0         if($self->{needreconnect}) {
822 0           $self->reconnect;
823             }
824              
825 0           $self->{outbuffer} .= "CLEARCACHE\r\n";
826 0           while(1) {
827 0           $self->doNetwork();
828 0 0         if($self->{needreconnect}) {
829             # Nothing we can do, really...
830 0           return;
831             }
832 0 0         last if(!length($self->{outbuffer}));
833 0           usleep(1000);
834             }
835              
836 0 0         if($self->{memcached_compatibility}) {
837 0           $self->autohandle_messages();
838             }
839              
840 0           return;
841             }
842              
843 0     0 1   sub keylist($self) {
  0            
  0            
844 0 0         if($self->{needreconnect}) {
845 0           $self->reconnect;
846             }
847              
848 0           my $value;
849              
850 0           $self->{outbuffer} .= "KEYLIST\r\n";
851              
852             # Make sure we send everything
853 0           while(1) {
854 0           $self->doNetwork();
855 0 0         if($self->{needreconnect}) {
856             # Nothing we can do, really...
857 0           return;
858             }
859 0 0         last if(!length($self->{outbuffer}));
860 0           usleep(1000);
861             }
862              
863             # Now, wait for the answer
864 0           my $liststartfound = 0;
865 0           my $listendfound = 0;
866 0           while(1) {
867 0           $self->doNetwork(0.5);
868 0 0         if($self->{needreconnect}) {
869             # Nothing we can do, really...
870 0           return;
871             }
872 0           $liststartfound = 0;
873 0           $listendfound = 0;
874 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
875 0 0         if($self->{inlines}->[$i] =~ /^KEYLISTSTART/) {
876 0           $liststartfound = 1;
877 0           next;
878             }
879 0 0         next unless($liststartfound);
880 0 0         if($self->{inlines}->[$i] =~ /^KEYLISTEND/) {
881 0           $listendfound = 1;
882 0           last;
883             }
884             }
885 0 0         last if($listendfound);
886             }
887              
888             # Now, grab the keys from inlines buffer
889 0           my @keys;
890 0           my $idx = 0;
891 0           my $listfound = 0;
892 0           while($idx < scalar @{$self->{inlines}}) {
  0            
893 0 0         if($self->{inlines}->[$idx] =~ /^KEYLISTSTART/) {
894             # Just remove this line
895 0           splice @{$self->{inlines}}, $idx, 1;
  0            
896 0           $listfound = 1;
897 0           next;
898             }
899              
900 0 0         if(!$listfound) {
901 0           $idx++;
902 0           next;
903             }
904              
905 0 0         if($self->{inlines}->[$idx] =~ /^KEYLISTEND/) {
906             # End of list
907 0           last;
908             }
909              
910 0 0         if($self->{inlines}->[$idx] =~ /^KEY\ (.+)/) {
911 0           push @keys, $1;
912             # Don't increment $idx, but the rest of the array one element down
913 0           splice @{$self->{inlines}}, $idx, 1;
  0            
914             } else {
915 0           $idx++;
916             }
917             }
918              
919 0 0         if($self->{memcached_compatibility}) {
920 0           $self->autohandle_messages();
921             }
922              
923 0           return @keys;
924             }
925              
926 0     0 1   sub clientlist($self) {
  0            
  0            
927 0 0         if($self->{needreconnect}) {
928 0           $self->reconnect;
929             }
930              
931 0           my $value;
932              
933 0           $self->{outbuffer} .= "CLIENTLIST\r\n";
934              
935             # Make sure we send everything
936 0           while(1) {
937 0           $self->doNetwork();
938 0 0         if($self->{needreconnect}) {
939             # Nothing we can do, really...
940 0           return;
941             }
942 0 0         last if(!length($self->{outbuffer}));
943 0           usleep(1000);
944             }
945              
946             # Now, wait for the answer
947 0           my $liststartfound = 0;
948 0           my $listendfound = 0;
949 0           while(1) {
950 0           $self->doNetwork(0.5);
951 0 0         if($self->{needreconnect}) {
952             # Nothing we can do, really...
953 0           return;
954             }
955 0           $liststartfound = 0;
956 0           $listendfound = 0;
957 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
958 0 0         if($self->{inlines}->[$i] =~ /^CLIENTLISTSTART/) {
959 0           $liststartfound = 1;
960 0           next;
961             }
962 0 0         next unless($liststartfound);
963 0 0         if($self->{inlines}->[$i] =~ /^CLIENTLISTEND/) {
964 0           $listendfound = 1;
965 0           last;
966             }
967             }
968 0 0         last if($listendfound);
969             }
970              
971             # Now, grab the keys from inlines buffer
972 0           my @keys;
973 0           my $idx = 0;
974 0           my $listfound = 0;
975 0           while($idx < scalar @{$self->{inlines}}) {
  0            
976 0 0         if($self->{inlines}->[$idx] =~ /^CLIENTLISTSTART/) {
977             # Just remove this line
978 0           splice @{$self->{inlines}}, $idx, 1;
  0            
979 0           $listfound = 1;
980 0           next;
981             }
982              
983 0 0         if(!$listfound) {
984 0           $idx++;
985 0           next;
986             }
987              
988 0 0         if($self->{inlines}->[$idx] =~ /^CLIENTLISTEND/) {
989             # End of list
990 0           last;
991             }
992              
993 0 0         if($self->{inlines}->[$idx] =~ /^CLIENT\ (.+)/) {
994 0           my @parts = split/\;/, $1;
995 0           my %data;
996 0           foreach my $part (@parts) {
997 0           my ($datakey, $datavalue) = split/\=/, $part, 2;
998             #$datakey = lc $datakey;
999 0           $data{lc $datakey} = $datavalue;
1000             }
1001 0           push @keys, \%data;
1002             # Don't increment $idx, but the rest of the array one element down
1003 0           splice @{$self->{inlines}}, $idx, 1;
  0            
1004             } else {
1005 0           $idx++;
1006             }
1007             }
1008              
1009 0 0         if($self->{memcached_compatibility}) {
1010 0           $self->autohandle_messages();
1011             }
1012              
1013 0           return @keys;
1014             }
1015              
1016 0     0 1   sub flush($self, $flushid = '') {
  0            
  0            
  0            
1017 0 0 0       if(!defined($flushid) || $flushid eq '') {
1018 0           $flushid = 'AUTO' . int(rand(1_000_000)) . int(rand(1_000_000));
1019             }
1020              
1021 0 0         if($self->{needreconnect}) {
1022 0           $self->reconnect;
1023             }
1024              
1025 0           $self->{outbuffer} .= "FLUSH $flushid\r\n";
1026              
1027             # Make sure we send everything
1028 0           while(1) {
1029 0           $self->doNetwork();
1030 0 0         if($self->{needreconnect}) {
1031             # Nothing we can do, really...
1032 0           return;
1033             }
1034 0 0         last if(!length($self->{outbuffer}));
1035             }
1036              
1037             # Now, wait for the answer
1038 0           my $answerline;
1039 0           while(1) {
1040 0           $self->doNetwork(0.5);
1041 0 0         if($self->{needreconnect}) {
1042             # Nothing we can do, really...
1043 0           return;
1044             }
1045 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
1046 0 0         if($self->{inlines}->[$i] =~ /^FLUSHED\ $flushid/) {
1047             # Remove the answer from in in-queue directly (out of sequence), because we don't need in in the getNext function
1048 0           $answerline = splice @{$self->{inlines}}, $i, 1;
  0            
1049 0           last;
1050             }
1051             }
1052 0 0         last if(defined($answerline));
1053             }
1054              
1055 0           return;
1056             }
1057              
1058              
1059 0     0 1   sub autohandle_messages($self) {
  0            
  0            
1060 0           $self->doNetwork();
1061              
1062 0           while((my $line = $self->getNext())) {
1063 0 0         if($line->{type} eq 'disconnect') {
1064 0           $self->{needreconnect} = 1;
1065             }
1066             }
1067              
1068 0           return;
1069             }
1070              
1071             # ---------------- ClackCache handling --------------------
1072              
1073 0     0 1   sub sendRawCommand($self, $command) {
  0            
  0            
  0            
1074 0 0 0       if(!defined($command) || !length($command)) {
1075 0           carp("command not defined!");
1076 0           return;
1077             }
1078              
1079 0 0         if($self->{needreconnect}) {
1080 0           $self->reconnect;
1081             }
1082              
1083 0           $self->{outbuffer} .= $command . "\r\n";
1084              
1085 0           return;
1086             }
1087              
1088             # setAndStore combines the SET and STORE command into the SETANDSTORE server command. This is mostly done
1089             # for optimizing interclacks connections
1090             # Other clients will only get a SET notification, but the server also runs a STORE operation
1091 0     0 1   sub setAndStore($self, $varname, $value, $forcesend = 0) {
  0            
  0            
  0            
  0            
  0            
1092 0 0 0       if(!defined($varname) || !length($varname)) {
1093 0           carp("varname not defined!");
1094 0           return;
1095             }
1096              
1097 0 0         if(!defined($value)) {
1098 0           carp("value not defined!");
1099 0           return;
1100             }
1101              
1102 0 0         if(!defined($forcesend)) {
1103 0           $forcesend = 0;
1104             }
1105              
1106 0           $self->{outbuffer} .= "SETANDSTORE $varname=$value\r\n";
1107              
1108 0 0         if($self->{memcached_compatibility}) {
1109 0           while(1) {
1110 0           $self->doNetwork();
1111 0 0         if($self->{needreconnect}) {
1112             # Nothing we can do, really...
1113 0           return;
1114             }
1115 0 0         last if(!length($self->{outbuffer}));
1116 0           usleep(1000);
1117             }
1118              
1119 0           $self->autohandle_messages();
1120             }
1121              
1122 0           return;
1123             }
1124              
1125 0     0 1   sub disconnect($self) {
  0            
  0            
1126 0 0         if($self->{needreconnect}) {
1127             # We are not connected, just do nothing
1128 0           return;
1129             }
1130              
1131 0           $self->flush();
1132 0           $self->{outbuffer} .= "QUIT\r\n";
1133 0           my $endtime = time + 1; # Wait a maximum of one second to send
1134 0           while(1) {
1135 0 0         last if(time > $endtime);
1136 0           $self->doNetwork();
1137 0 0         last if(!length($self->{outbuffer}));
1138 0           sleep(0.05);
1139             }
1140 0           sleep(0.5); # Wait another half second for the OS to flush the socket
1141              
1142 0           delete $self->{socket};
1143 0           $self->{needreconnect} = 1;
1144              
1145 0           return;
1146             }
1147              
1148 0     0     sub DESTROY($self) {
  0            
  0            
1149             # Try to disconnect cleanly, but socket might already be DESTROYed, so catch any errors
1150 0           eval {
1151 0           $self->disconnect();
1152             };
1153              
1154 0           return;
1155             }
1156              
1157             1;
1158             __END__