File Coverage

blib/lib/Net/Clacks/Client.pm
Criterion Covered Total %
statement 56 752 7.4
branch 0 326 0.0
condition 0 111 0.0
subroutine 19 50 38.0
pod 29 29 100.0
total 104 1268 8.2


line stmt bran cond sub pod time code
1             package Net::Clacks::Client;
2             #---AUTOPRAGMASTART---
3 1     1   16 use 5.020;
  1         3  
4 1     1   5 use strict;
  1         7  
  1         15  
5 1     1   4 use warnings;
  1         2  
  1         31  
6 1     1   6 use diagnostics;
  1         1  
  1         5  
7 1     1   36 use mro 'c3';
  1         2  
  1         3  
8 1     1   20 use English;
  1         2  
  1         4  
9 1     1   325 use Carp;
  1         2  
  1         52  
10             our $VERSION = 23;
11 1     1   6 use autodie qw( close );
  1         2  
  1         6  
12 1     1   234 use Array::Contains;
  1         2  
  1         39  
13 1     1   6 use utf8;
  1         1  
  1         4  
14 1     1   34 use Encode qw(is_utf8 encode_utf8 decode_utf8);
  1         2  
  1         47  
15 1     1   6 use feature 'signatures';
  1         1  
  1         56  
16 1     1   4 no warnings qw(experimental::signatures);
  1         2  
  1         35  
17             #---AUTOPRAGMAEND---
18              
19 1     1   505 use IO::Socket::IP;
  1         27294  
  1         5  
20             #use IO::Socket::UNIX;
21 1     1   852 use Time::HiRes qw[sleep usleep];
  1         1143  
  1         5  
22 1     1   560 use Sys::Hostname;
  1         850  
  1         41  
23 1     1   390 use IO::Select;
  1         1366  
  1         41  
24 1     1   668 use IO::Socket::SSL;
  1         40664  
  1         7  
25 1     1   623 use MIME::Base64;
  1         576  
  1         7901  
26              
27 0     0 1   sub new($class, $server, $port, $username, $password, $clientname, $iscaching = 0) {
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
28 0           my $self = bless {}, $class;
29              
30 0 0 0       if(!defined($server) || !length($server)) {
31 0           croak("server not defined!");
32             }
33 0 0 0       if(!defined($port) || !length($port)) {
34 0           croak("port not defined!");
35             }
36 0 0 0       if(!defined($username) || !length($username)) {
37 0           croak("username not defined!");
38             }
39 0 0 0       if(!defined($password) || !length($password)) {
40 0           croak("password not defined!");
41             }
42 0 0 0       if(!defined($clientname) || !length($clientname)) {
43 0           croak("clientname not defined!");
44             }
45              
46 0           $self->{server} = $server;
47 0           $self->{port} = $port;
48              
49 0           $self->init($username, $password, $clientname, $iscaching);
50              
51 0           return $self;
52             }
53              
54 0     0 1   sub newSocket($class, $socketpath, $username, $password, $clientname, $iscaching = 0) {
  0            
  0            
  0            
  0            
  0            
  0            
  0            
55 0           my $self = bless {}, $class;
56              
57 0 0 0       if(!defined($socketpath) || !length($socketpath)) {
58 0           croak("socketpath not defined!");
59             }
60 0 0 0       if(!defined($username) || !length($username)) {
61 0           croak("username not defined!");
62             }
63 0 0 0       if(!defined($password) || !length($password)) {
64 0           croak("password not defined!");
65             }
66 0 0 0       if(!defined($clientname) || !length($clientname)) {
67 0           croak("clientname not defined!");
68             }
69              
70 0           my $udsloaded = 0;
71 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
72 0           require IO::Socket::UNIX;
73 0           $udsloaded = 1;
74             };
75 0 0         if(!$udsloaded) {
76 0           croak("Specified a unix domain socket, but i couldn't load IO::Socket::UNIX!");
77             }
78              
79 0           $self->{socketpath} = $socketpath;
80              
81 0           $self->init($username, $password, $clientname, $iscaching);
82              
83 0           return $self;
84             }
85              
86 0     0 1   sub init($self, $username, $password, $clientname, $iscaching) {
  0            
  0            
  0            
  0            
  0            
  0            
87 0 0 0       if(!defined($username) || $username eq '') {
88 0           croak("Username not defined!");
89             }
90 0 0 0       if(!defined($password) || $password eq '') {
91 0           croak("Password not defined!");
92             }
93              
94 0 0 0       if(!defined($clientname || $clientname eq '')) {
95 0           croak("Clientname not defined!");
96             }
97 0           $self->{clientname} = $clientname;
98              
99 0           $self->{authtoken} = encode_base64($username, '') . ':' . encode_base64($password, '');
100              
101 0 0         if(!defined($iscaching)) {
102 0           $iscaching = 0;
103             }
104 0           $self->{iscaching} = $iscaching;
105              
106 0 0         if($self->{iscaching}) {
107 0           $self->{cache} = {};
108             }
109              
110 0           $self->{needreconnect} = 1;
111 0           $self->{inlines} = [];
112 0           $self->{firstconnect} = 1;
113              
114 0           $self->{memcached_compatibility} = 0;
115              
116             $self->{remembrancenames} = [
117 0           'Ivy Bdubs',
118             'Terry Pratchett',
119             'Sven Guckes',
120             ];
121 0           $self->{remembranceinterval} = 3600; # One hour
122 0           $self->{nextremembrance} = time + $self->{remembranceinterval};
123              
124 0           $self->reconnect();
125              
126 0           return;
127             }
128              
129 0     0 1   sub reconnect($self) {
  0            
  0            
130 0 0         if(defined($self->{socket})) {
131 0           delete $self->{socket};
132             }
133              
134 0 0         if(!$self->{firstconnect}) {
135             # Not our first connection (=real reconnect).
136             # wait a short random time before reconnecting. In case all
137             # clients got disconnected, we want to avoid having all clients reconnect
138             # at the exact same time
139 0           my $waittime = rand(4000)/1000;
140 0           sleep($waittime);
141             }
142              
143 0           my $socket;
144 0 0 0       if(defined($self->{server}) && defined($self->{port})) {
    0          
145             $socket = IO::Socket::IP->new(
146             PeerHost => $self->{server},
147             PeerPort => $self->{port},
148 0 0         Type => SOCK_STREAM,
149             ) or croak("Failed to connect to Clacks TCP message service: $ERRNO");
150             } elsif(defined($self->{socketpath})) {
151             $socket = IO::Socket::UNIX->new(
152             Peer => $self->{socketpath},
153 0 0         Type => SOCK_STREAM,
154             ) or croak("Failed to connect to Clacks Unix Domain Socket message service: $ERRNO");
155             } else {
156 0           croak("Neither TCP nor Unix domain socket specified. Don't know where to connect to.");
157             }
158              
159             #binmode($socket, ':bytes');
160 0           $socket->blocking(0);
161              
162              
163 0 0         if(ref $socket ne 'IO::Socket::UNIX') {
164             # ONLY USE SSL WHEN RUNNING OVER THE NETWORK
165             # There is simply no point in running it over a local socket.
166 0 0         IO::Socket::SSL->start_SSL($socket,
167             SSL_verify_mode => SSL_VERIFY_NONE,
168             ) or croak("Can't use SSL: " . $SSL_ERROR);
169             }
170              
171 0           $self->{socket} = $socket;
172 0           $self->{selector} = IO::Select->new($self->{socket});
173 0           $self->{failcount} = 0;
174 0           $self->{lastping} = time;
175 0           $self->{inbuffer} = '';
176 0           $self->{incharbuffer} = [];
177 0           $self->{outbuffer} = '';
178 0           $self->{serverinfo} = 'UNKNOWN';
179 0           $self->{needreconnect} = 0;
180 0           $self->{firstline} = 1;
181 0           $self->{headertimeout} = time + 15;
182              
183             # Do *not* nuke "inlines" array, since it may hold "QUIT" messages that the client wants to handle, for example, to re-issue
184             # "LISTEN" commands.
185             # $self->{inlines} = ();
186              
187 0 0         if($self->{firstconnect}) {
188 0           $self->{firstconnect} = 0;
189             } else {
190 0           push @{$self->{inlines}}, "RECONNECTED";
  0            
191             }
192              
193             # Startup "handshake". As everything else, this is asyncronous, both server and
194             # client send their respective version strings and then wait to recieve their counterparts
195             # Also, this part is REQUIRED, just to make sure we actually speek to CLACKS protocol
196 0           $self->{outbuffer} .= 'CLACKS ' . $self->{clientname} . "\r\n";
197 0           $self->{outbuffer} .= 'OVERHEAD A ' . $self->{authtoken} . "\r\n";
198 0           $self->doNetwork();
199              
200 0           return;
201             }
202              
203 0     0 1   sub activate_memcached_compat($self) {
  0            
  0            
204 0           $self->{memcached_compatibility} = 1;
205 0           return;
206             }
207              
208 0     0 1   sub getRawSocket($self) {
  0            
  0            
209 0 0         if($self->{needreconnect}) {
210 0           $self->reconnect();
211             }
212              
213 0           return $self->{socket};
214             }
215              
216 0     0 1   sub doNetwork($self, $readtimeout = 0) {
  0            
  0            
  0            
217 0 0         if(!defined($readtimeout)) {
218             # Don't wait
219 0           $readtimeout = 0;
220             }
221              
222 0 0         if($self->{needreconnect}) {
223 0           $self->reconnect();
224             }
225              
226 0 0 0       if($self->{nextremembrance} && time > $self->{nextremembrance}) {
227             # A person is not dead while their name is still spoken.
228 0           $self->{nextremembrance} = time + $self->{remembranceinterval} + int(rand($self->{remembranceinterval} / 10));
229 0           my $neverforget = $self->{remembrancenames}->[rand @{$self->{remembrancenames}}];
  0            
230 0           $self->{outbuffer} .= 'OVERHEAD GNU ' . $neverforget . "\r\n";
231             }
232              
233             # doNetwork interleaves handling incoming and outgoing traffic.
234             # This is only relevant on slow links.
235             #
236             # It returns even if the outgoing or incoming buffers are not empty
237             # (meaning that partially buffered data can exists). This way we use the
238             # available bandwidth without blocking unduly the application (we assume it's a realtime
239             # application with multiple things going on at the same time)-
240             #
241             # The downside of this is that doNetwork() needs to be called on a regular basis and sending
242             # and recieving might be delayed until the next cycle. This delay can be minimized by simply
243             # not transfering huge values over clacks, but instead using it the way it was intended to be used:
244             # Small variables can be SET directly by clacks, huge datasets should be stored in the
245             # database and the recievers only NOTIFY'd that a change has taken place.
246             #
247             # The big exception here is the here is the ClacksCache part of the story. These functions
248             # call doNetwork() in a loop until the outbuffer is empty. And depending on requirement, they
249             # KEEP on calling doNetwork() until the answer is recieved. This makes ClacksCache functions
250             # syncronous and causes some delay in the calling function. But doing these asyncronous will
251             # cause more headaches, maybe even leading up to insanity. Believe me, i tried, but those pink
252             # elephants stomping about my rubber-padded room are such a distraction...
253              
254 0           my $workCount = 0;
255              
256 0 0         if(length($self->{outbuffer})) {
257 0           my $brokenpipe = 0;
258 0           my $writeok = 0;
259 0           my $written;
260 0           eval {
261 0     0     local $SIG{PIPE} = sub { $brokenpipe = 1; };
  0            
262 0           $written = syswrite($self->{socket}, $self->{outbuffer});
263 0           $writeok = 1;
264             };
265              
266 0 0 0       if($brokenpipe || !$writeok) {
267 0           $self->{needreconnect} = 1;
268 0           push @{$self->{inlines}}, "TIMEOUT";
  0            
269 0           return;
270             }
271              
272 0 0 0       if(defined($written) && $written) {
273 0           $workCount += $written;
274 0 0         if(length($self->{outbuffer}) == $written) {
275 0           $self->{outbuffer} = '';
276             } else {
277 0           $self->{outbuffer} = substr($self->{outbuffer}, $written);
278             }
279             }
280              
281             }
282              
283             {
284 0           my $select = IO::Select->new($self->{socket});
  0            
285 0           my @temp;
286 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
287 0           @temp = $self->{selector}->can_read($readtimeout);
288             };
289 0 0         if(scalar @temp == 0) {
290             # Timeout
291 0           return $workCount;
292             }
293             }
294              
295 0           my $totalread = 0;
296 0           while(1) {
297 0           my $buf;
298 0           my $readok = 0;
299 0           eval {
300 0           sysread($self->{socket}, $buf, 10_000); # Read in at most 10kB at once
301 0           $readok = 1;
302             };
303 0 0         if(!$readok) {
304 0           $self->{needreconnect} = 1;
305 0           push @{$self->{inlines}}, "TIMEOUT";
  0            
306 0           return;
307             }
308 0 0 0       if(defined($buf) && length($buf)) {
309 0           $totalread += length($buf);
310             #print STDERR "+ $buf\n--\n";
311 0           push @{$self->{incharbuffer}}, split//, $buf;
  0            
312 0           next;
313             }
314 0           last;
315             }
316            
317             # Check if we actually got data after checking with can_read() first
318 0 0         if($totalread) {
319 0           $self->{failcount} = 0;
320             } else {
321             # This should normally not happen, but thanks to SSL, it sometimes does
322             # We ignore single instances of those but disconnect if many happen in a row
323 0           $self->{failcount}++;
324 0           sleep(0.05);
325            
326 0 0         if($self->{failcount} > 5) {
327 0           $self->{needreconnect} = 1;
328 0           return;
329             }
330             }
331 0           while(@{$self->{incharbuffer}}) {
  0            
332 0           my $char = shift @{$self->{incharbuffer}};
  0            
333 0           $workCount++;
334 0 0         if($char eq "\r") {
    0          
335 0           next;
336             } elsif($char eq "\n") {
337 0 0         if($self->{inbuffer} eq 'NOP') { # Just drop "No OPerations" packets, only used by server to
338             # verify that the connection is still active
339             #$self->{firstline}
340 0           $self->{inbuffer} = '';
341 0           next;
342             }
343              
344 0 0         if($self->{firstline}) {
345 0 0         if($self->{inbuffer} !~ /^CLACKS\ /) {
346             # Whoops, not a clacks server or something gone wrong with the protocol
347 0           $self->{needreconnect} = 1;
348 0           return 0;
349             } else {
350 0           $self->{firstline} = 0;
351             }
352             }
353              
354 0           push @{$self->{inlines}}, $self->{inbuffer};
  0            
355 0           $self->{inbuffer} = '';
356             } else {
357 0           $self->{inbuffer} .= $char;
358             }
359             }
360              
361 0 0 0       if($self->{firstline} && $self->{headertimeout} < time) {
362 0           $self->{needreconnect} = 1;
363 0           return 0;
364             }
365              
366 0           return $workCount;
367             }
368              
369             my %overheadflags = (
370             A => "auth_token", # Authentication token
371             O => "auth_ok", # Authentication OK
372             F => "auth_failed", # Authentication FAILED
373              
374             E => 'error_message', # Server to client error message
375              
376             C => "close_all_connections",
377             D => "discard_message",
378             G => "forward_message",
379             I => "set_interclacks_mode", # value: true/false, disables 'G' and 'U'
380             M => "informal_message", # informal message
381             N => "no_logging",
382             S => "shutdown_service", # value: positive number (number in seconds before shutdown). If interclacks clients are present, should be high
383             # enough to flush all buffers to them
384             U => "return_to_sender",
385             Z => "no_flags", # Only sent when no other flags are set
386             );
387              
388 0     0 1   sub getNext($self) {
  0            
  0            
389             # Recieve next incoming message (if any)
390              
391             restartgetnext:
392 0           my $line = shift @{$self->{inlines}};
  0            
393              
394 0 0         if(!defined($line)) {
395 0           return;
396             }
397              
398 0           my %data;
399             #print STDERR "> $line\n";
400 0 0         if($line =~ /^NOTIFY\ (.+)/) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
401 0           %data = (
402             type => 'notify',
403             name => $1,
404             );
405             } elsif($line =~ /^SET\ (.+?)\=(.*)/) {
406 0           %data = (
407             type => 'set',
408             name => $1,
409             data => $2,
410             );
411             } elsif($line =~ /^CLACKS\ (.+)/) {
412 0           %data = (
413             type => 'serverinfo',
414             data => $1,
415             );
416             } elsif($line =~ /^DEBUG\ (.+?)\=(.*)/) {
417 0           %data = (
418             type => 'debug',
419             host => $1,
420             command => $2,
421             );
422             } elsif($line =~ /^QUIT/) {
423 0           %data = (
424             type => 'disconnect',
425             data => 'quit',
426             );
427 0           $self->{needreconnect} = 1;
428             } elsif($line =~ /^TIMEOUT/) {
429 0           %data = (
430             type => 'disconnect',
431             data => 'timeout',
432             );
433 0           $self->{needreconnect} = 1;
434             } elsif($line =~ /^RECONNECTED/) {
435 0           %data = (
436             type => 'reconnected',
437             data => 'send your LISTEN requests again',
438             );
439             } elsif($line =~ /^OVERHEAD\ (.+?)\ (.+)/) {
440             # Minimal handling of OVERHEAD flags
441 0           my ($flags, $value) = ($1, $2);
442 0           my @flagparts = split//, $flags;
443 0           my %parsedflags;
444 0           foreach my $key (sort keys %overheadflags) {
445 0 0         if(contains($key, \@flagparts)) {
446 0           $parsedflags{$overheadflags{$key}} = 1;
447             } else {
448 0           $parsedflags{$overheadflags{$key}} = 0;
449             }
450             }
451              
452 0 0         if($parsedflags{auth_ok}) {
    0          
    0          
    0          
453             #print STDERR "Clacks AUTH OK\n";
454 0           goto restartgetnext; # try the next message
455             } elsif($parsedflags{error_message}) {
456 0           %data = (
457             type => 'error_message',
458             data => $value,
459             );
460             } elsif($parsedflags{auth_failed}) {
461 0           croak("Clacks Authentication failed!");
462             } elsif($parsedflags{informal_message}) {
463 0 0         if($parsedflags{forward_message}) {
464 0           %data = (
465             type => 'informal',
466             data => $value,
467             );
468             }
469 0 0         if($parsedflags{return_to_sender}) {
470 0           my $uturn = 'OVERHEAD M';
471 0 0         if($parsedflags{no_logging}) {
472 0           $uturn .= 'N';
473             }
474 0           $uturn .= ' ' . $value;
475 0           $self->{outbuffer} .= $uturn;
476             }
477             }
478              
479             } else {
480             # UNKNOWN, ignore
481 0           goto restartgetnext; # try the next message
482             }
483              
484 0 0         if(!defined($data{type})) {
485 0           return;
486             }
487              
488 0           $data{rawline} = $line;
489              
490 0           return \%data;
491             }
492              
493              
494 0     0 1   sub ping($self) {
  0            
  0            
495 0 0         if($self->{lastping} < (time - 120)) {
496             # Only send a ping every 120 seconds or less
497 0           $self->{outbuffer} .= "PING\r\n";
498 0           $self->{lastping} = time;
499             }
500              
501 0           return;
502             }
503              
504 0     0 1   sub disablePing($self) {
  0            
  0            
505 0           $self->{outbuffer} .= "NOPING\r\n";
506              
507 0           return;
508             }
509              
510              
511 0     0 1   sub notify($self, $varname) {
  0            
  0            
  0            
512 0 0 0       if(!defined($varname) || !length($varname)) {
513 0           carp("varname not defined!");
514 0           return;
515             }
516              
517 0 0         if($self->{needreconnect}) {
518 0           $self->reconnect;
519             }
520              
521 0           $self->{outbuffer} .= "NOTIFY $varname\r\n";
522              
523 0 0         if($self->{memcached_compatibility}) {
524 0           while(1) {
525 0           $self->doNetwork();
526 0 0         if($self->{needreconnect}) {
527             # Nothing we can do, really...
528 0           return;
529             }
530 0 0         last if(!length($self->{outbuffer}));
531 0           usleep(1000);
532             }
533 0           $self->autohandle_messages();
534             }
535              
536 0           return;
537             }
538              
539 0     0 1   sub set($self, $varname, $value, $forcesend = 0) { ## no critic (NamingConventions::ProhibitAmbiguousNames)
  0            
  0            
  0            
  0            
  0            
540 0 0 0       if(!defined($varname) || !length($varname)) {
541 0           carp("varname not defined!");
542 0           return;
543             }
544 0 0         if(!defined($value)) {
545 0           carp("value not defined!");
546 0           return;
547             }
548              
549 0 0         if(!defined($forcesend)) {
550 0           $forcesend = 0;
551             }
552              
553 0 0         if($self->{needreconnect}) {
554 0           $self->reconnect;
555             }
556              
557             # Handle caching to lower output volumne
558 0 0 0       if($self->{iscaching} && !$forcesend && defined($self->{cache}->{$varname}) && $self->{cache}->{$varname} eq $value) {
      0        
      0        
559             # Already the same value send
560 0           return;
561             }
562              
563 0 0         if($self->{iscaching}) {
564 0           $self->{cache}->{$varname} = $value;
565             }
566              
567 0           $self->{outbuffer} .= "SET $varname=$value\r\n";
568              
569 0 0         if($self->{memcached_compatibility}) {
570 0           while(1) {
571 0           $self->doNetwork();
572 0 0         if($self->{needreconnect}) {
573             # Nothing we can do, really...
574 0           return;
575             }
576 0 0         last if(!length($self->{outbuffer}));
577 0           usleep(1000);
578             }
579              
580 0           $self->autohandle_messages();
581             }
582              
583 0           return;
584             }
585              
586 0     0 1   sub listen($self, $varname) { ## no critic (Subroutines::ProhibitBuiltinHomonyms)
  0            
  0            
  0            
587 0 0 0       if(!defined($varname) || !length($varname)) {
588 0           carp("varname not defined!");
589 0           return;
590             }
591              
592 0 0         if($self->{needreconnect}) {
593 0           $self->reconnect;
594             }
595              
596 0           $self->{outbuffer} .= "LISTEN $varname\r\n";
597              
598 0           return;
599             }
600              
601 0     0 1   sub unlisten($self, $varname) {
  0            
  0            
  0            
602 0 0 0       if(!defined($varname) || !length($varname)) {
603 0           carp("varname not defined!");
604 0           return;
605             }
606              
607 0 0         if($self->{needreconnect}) {
608 0           $self->reconnect;
609             }
610              
611 0           $self->{outbuffer} .= "UNLISTEN $varname\r\n";
612              
613 0           return;
614             }
615              
616 0     0 1   sub setMonitormode($self, $active) {
  0            
  0            
  0            
617 0 0         if($self->{needreconnect}) {
618 0           $self->reconnect;
619             }
620              
621 0 0 0       if(!defined($active) || !$active) {
622 0           $self->{outbuffer} .= "UNMONITOR\r\n";
623             } else {
624 0           $self->{outbuffer} .= "MONITOR\r\n";
625             }
626              
627 0           return;
628             }
629              
630 0     0 1   sub getServerinfo($self) {
  0            
  0            
631 0           return $self->{serverinfo};
632             }
633              
634             # ---------------- ClackCache handling --------------------
635             # ClacksCache handling always implies doNetwork()
636             # Also, we do NOT use the caching system used for SET
637 0     0 1   sub store($self, $varname, $value) {
  0            
  0            
  0            
  0            
638 0 0 0       if(!defined($varname) || !length($varname)) {
639 0           carp("varname not defined!");
640 0           return;
641             }
642 0 0         if(!defined($value)) {
643 0           carp("value not defined!");
644 0           return;
645             }
646              
647 0 0         if($self->{needreconnect}) {
648 0           $self->reconnect;
649             }
650              
651 0           $self->{outbuffer} .= "STORE $varname=$value\r\n";
652 0           while(1) {
653 0           $self->doNetwork();
654 0 0         if($self->{needreconnect}) {
655             # Nothing we can do, really...
656 0           return;
657             }
658 0 0         last if(!length($self->{outbuffer}));
659 0           usleep(1000);
660             }
661              
662 0 0         if($self->{memcached_compatibility}) {
663 0           $self->autohandle_messages();
664             }
665              
666 0           return;
667             }
668              
669 0     0 1   sub retrieve($self, $varname) {
  0            
  0            
  0            
670 0 0 0       if(!defined($varname) || !length($varname)) {
671 0           carp("varname not defined!");
672 0           return;
673             }
674              
675 0           my $value;
676              
677 0 0         if($self->{needreconnect}) {
678 0           $self->reconnect;
679             }
680              
681 0           $self->{outbuffer} .= "RETRIEVE $varname\r\n";
682              
683             # Make sure we send everything
684 0           while(1) {
685 0           $self->doNetwork();
686 0 0         if($self->{needreconnect}) {
687             # Nothing we can do, really...
688 0           return;
689             }
690 0 0         last if(!length($self->{outbuffer}));
691             }
692              
693             # Now, wait for the answer
694 0           my $answerline;
695 0           while(1) {
696 0           $self->doNetwork(0.5);
697 0 0         if($self->{needreconnect}) {
698             # Nothing we can do, really...
699 0           return;
700             }
701 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
702 0 0 0       if($self->{inlines}->[$i] =~ /^RETRIEVED\ $varname/ || $self->{inlines}->[$i] =~ /^NOTRETRIEVED\ $varname/) {
703             # Remove the answer from in in-queue directly (out of sequence), because we don't need in in the getNext function
704 0           $answerline = splice @{$self->{inlines}}, $i, 1;
  0            
705 0           last;
706             }
707             }
708 0 0         last if(defined($answerline));
709             }
710              
711 0 0         if($answerline =~ /^RETRIEVED\ (.+?)\=(.*)/) {
712 0           my ($key, $val) = ($1, $2);
713 0 0         if($key ne $varname) {
714 0           print STDERR "Retrieved clacks key $key does not match requested varname $varname!\n";
715 0           return;
716             }
717 0           return $val;
718             }
719              
720             # No matching key
721 0           return;
722             }
723              
724 0     0 1   sub remove($self, $varname) {
  0            
  0            
  0            
725 0 0 0       if(!defined($varname) || !length($varname)) {
726 0           carp("varname not defined!");
727 0           return;
728             }
729              
730 0 0         if($self->{needreconnect}) {
731 0           $self->reconnect;
732             }
733              
734 0           $self->{outbuffer} .= "REMOVE $varname\r\n";
735 0           while(1) {
736 0           $self->doNetwork();
737 0 0         if($self->{needreconnect}) {
738             # Nothing we can do, really...
739 0           return;
740             }
741 0 0         last if(!length($self->{outbuffer}));
742 0           usleep(1000);
743             }
744              
745 0 0         if($self->{memcached_compatibility}) {
746 0           $self->autohandle_messages();
747             }
748              
749 0           return;
750             }
751              
752 0     0 1   sub increment($self, $varname, $stepsize = '') {
  0            
  0            
  0            
  0            
753 0 0 0       if(!defined($varname) || !length($varname)) {
754 0           carp("varname not defined!");
755 0           return;
756             }
757              
758 0 0         if($self->{needreconnect}) {
759 0           $self->reconnect;
760             }
761              
762 0 0 0       if(!defined($stepsize) || $stepsize eq '') {
763 0           $self->{outbuffer} .= "INCREMENT $varname\r\n";
764             } else {
765 0           $stepsize = 0 + $stepsize;
766 0           $self->{outbuffer} .= "INCREMENT $varname=$stepsize\r\n";
767             }
768              
769 0           while(1) {
770 0           $self->doNetwork();
771 0 0         if($self->{needreconnect}) {
772             # Nothing we can do, really...
773 0           return;
774             }
775 0 0         last if(!length($self->{outbuffer}));
776 0           usleep(1000);
777             }
778              
779 0 0         if($self->{memcached_compatibility}) {
780 0           $self->autohandle_messages();
781             }
782              
783 0           return;
784             }
785              
786 0     0 1   sub decrement($self, $varname, $stepsize = '') {
  0            
  0            
  0            
  0            
787 0 0 0       if(!defined($varname) || !length($varname)) {
788 0           carp("varname not defined!");
789 0           return;
790             }
791              
792 0 0         if($self->{needreconnect}) {
793 0           $self->reconnect;
794             }
795              
796 0 0 0       if(!defined($stepsize) || $stepsize eq '') {
797 0           $self->{outbuffer} .= "DECREMENT $varname\r\n";
798             } else {
799 0           $stepsize = 0 + $stepsize;
800 0           $self->{outbuffer} .= "DECREMENT $varname=$stepsize\r\n";
801             }
802 0           while(1) {
803 0           $self->doNetwork();
804 0 0         if($self->{needreconnect}) {
805             # Nothing we can do, really...
806 0           return;
807             }
808 0 0         last if(!length($self->{outbuffer}));
809 0           usleep(1000);
810             }
811              
812 0 0         if($self->{memcached_compatibility}) {
813 0           $self->autohandle_messages();
814             }
815              
816 0           return;
817             }
818              
819 0     0 1   sub clearcache($self) {
  0            
  0            
820 0 0         if($self->{needreconnect}) {
821 0           $self->reconnect;
822             }
823              
824 0           $self->{outbuffer} .= "CLEARCACHE\r\n";
825 0           while(1) {
826 0           $self->doNetwork();
827 0 0         if($self->{needreconnect}) {
828             # Nothing we can do, really...
829 0           return;
830             }
831 0 0         last if(!length($self->{outbuffer}));
832 0           usleep(1000);
833             }
834              
835 0 0         if($self->{memcached_compatibility}) {
836 0           $self->autohandle_messages();
837             }
838              
839 0           return;
840             }
841              
842 0     0 1   sub keylist($self) {
  0            
  0            
843 0 0         if($self->{needreconnect}) {
844 0           $self->reconnect;
845             }
846              
847 0           my $value;
848              
849 0           $self->{outbuffer} .= "KEYLIST\r\n";
850              
851             # Make sure we send everything
852 0           while(1) {
853 0           $self->doNetwork();
854 0 0         if($self->{needreconnect}) {
855             # Nothing we can do, really...
856 0           return;
857             }
858 0 0         last if(!length($self->{outbuffer}));
859 0           usleep(1000);
860             }
861              
862             # Now, wait for the answer
863 0           my $liststartfound = 0;
864 0           my $listendfound = 0;
865 0           while(1) {
866 0           $self->doNetwork(0.5);
867 0 0         if($self->{needreconnect}) {
868             # Nothing we can do, really...
869 0           return;
870             }
871 0           $liststartfound = 0;
872 0           $listendfound = 0;
873 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
874 0 0         if($self->{inlines}->[$i] =~ /^KEYLISTSTART/) {
875 0           $liststartfound = 1;
876 0           next;
877             }
878 0 0         next unless($liststartfound);
879 0 0         if($self->{inlines}->[$i] =~ /^KEYLISTEND/) {
880 0           $listendfound = 1;
881 0           last;
882             }
883             }
884 0 0         last if($listendfound);
885             }
886              
887             # Now, grab the keys from inlines buffer
888 0           my @keys;
889 0           my $idx = 0;
890 0           my $listfound = 0;
891 0           while($idx < scalar @{$self->{inlines}}) {
  0            
892 0 0         if($self->{inlines}->[$idx] =~ /^KEYLISTSTART/) {
893             # Just remove this line
894 0           splice @{$self->{inlines}}, $idx, 1;
  0            
895 0           $listfound = 1;
896 0           next;
897             }
898              
899 0 0         if(!$listfound) {
900 0           $idx++;
901 0           next;
902             }
903              
904 0 0         if($self->{inlines}->[$idx] =~ /^KEYLISTEND/) {
905             # End of list
906 0           last;
907             }
908              
909 0 0         if($self->{inlines}->[$idx] =~ /^KEY\ (.+)/) {
910 0           push @keys, $1;
911             # Don't increment $idx, but the rest of the array one element down
912 0           splice @{$self->{inlines}}, $idx, 1;
  0            
913             } else {
914 0           $idx++;
915             }
916             }
917              
918 0 0         if($self->{memcached_compatibility}) {
919 0           $self->autohandle_messages();
920             }
921              
922 0           return @keys;
923             }
924              
925 0     0 1   sub clientlist($self) {
  0            
  0            
926 0 0         if($self->{needreconnect}) {
927 0           $self->reconnect;
928             }
929              
930 0           my $value;
931              
932 0           $self->{outbuffer} .= "CLIENTLIST\r\n";
933              
934             # Make sure we send everything
935 0           while(1) {
936 0           $self->doNetwork();
937 0 0         if($self->{needreconnect}) {
938             # Nothing we can do, really...
939 0           return;
940             }
941 0 0         last if(!length($self->{outbuffer}));
942 0           usleep(1000);
943             }
944              
945             # Now, wait for the answer
946 0           my $liststartfound = 0;
947 0           my $listendfound = 0;
948 0           while(1) {
949 0           $self->doNetwork(0.5);
950 0 0         if($self->{needreconnect}) {
951             # Nothing we can do, really...
952 0           return;
953             }
954 0           $liststartfound = 0;
955 0           $listendfound = 0;
956 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
957 0 0         if($self->{inlines}->[$i] =~ /^CLIENTLISTSTART/) {
958 0           $liststartfound = 1;
959 0           next;
960             }
961 0 0         next unless($liststartfound);
962 0 0         if($self->{inlines}->[$i] =~ /^CLIENTLISTEND/) {
963 0           $listendfound = 1;
964 0           last;
965             }
966             }
967 0 0         last if($listendfound);
968             }
969              
970             # Now, grab the keys from inlines buffer
971 0           my @keys;
972 0           my $idx = 0;
973 0           my $listfound = 0;
974 0           while($idx < scalar @{$self->{inlines}}) {
  0            
975 0 0         if($self->{inlines}->[$idx] =~ /^CLIENTLISTSTART/) {
976             # Just remove this line
977 0           splice @{$self->{inlines}}, $idx, 1;
  0            
978 0           $listfound = 1;
979 0           next;
980             }
981              
982 0 0         if(!$listfound) {
983 0           $idx++;
984 0           next;
985             }
986              
987 0 0         if($self->{inlines}->[$idx] =~ /^CLIENTLISTEND/) {
988             # End of list
989 0           last;
990             }
991              
992 0 0         if($self->{inlines}->[$idx] =~ /^CLIENT\ (.+)/) {
993 0           my @parts = split/\;/, $1;
994 0           my %data;
995 0           foreach my $part (@parts) {
996 0           my ($datakey, $datavalue) = split/\=/, $part, 2;
997             #$datakey = lc $datakey;
998 0           $data{lc $datakey} = $datavalue;
999             }
1000 0           push @keys, \%data;
1001             # Don't increment $idx, but the rest of the array one element down
1002 0           splice @{$self->{inlines}}, $idx, 1;
  0            
1003             } else {
1004 0           $idx++;
1005             }
1006             }
1007              
1008 0 0         if($self->{memcached_compatibility}) {
1009 0           $self->autohandle_messages();
1010             }
1011              
1012 0           return @keys;
1013             }
1014              
1015 0     0 1   sub flush($self, $flushid = '') {
  0            
  0            
  0            
1016 0 0 0       if(!defined($flushid) || $flushid eq '') {
1017 0           $flushid = 'AUTO' . int(rand(1_000_000)) . int(rand(1_000_000));
1018             }
1019              
1020 0 0         if($self->{needreconnect}) {
1021 0           $self->reconnect;
1022             }
1023              
1024 0           $self->{outbuffer} .= "FLUSH $flushid\r\n";
1025              
1026             # Make sure we send everything
1027 0           while(1) {
1028 0           $self->doNetwork();
1029 0 0         if($self->{needreconnect}) {
1030             # Nothing we can do, really...
1031 0           return;
1032             }
1033 0 0         last if(!length($self->{outbuffer}));
1034             }
1035              
1036             # Now, wait for the answer
1037 0           my $answerline;
1038 0           while(1) {
1039 0           $self->doNetwork(0.5);
1040 0 0         if($self->{needreconnect}) {
1041             # Nothing we can do, really...
1042 0           return;
1043             }
1044 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
1045 0 0         if($self->{inlines}->[$i] =~ /^FLUSHED\ $flushid/) {
1046             # Remove the answer from in in-queue directly (out of sequence), because we don't need in in the getNext function
1047 0           $answerline = splice @{$self->{inlines}}, $i, 1;
  0            
1048 0           last;
1049             }
1050             }
1051 0 0         last if(defined($answerline));
1052             }
1053              
1054 0           return;
1055             }
1056              
1057              
1058 0     0 1   sub autohandle_messages($self) {
  0            
  0            
1059 0           $self->doNetwork();
1060              
1061 0           while((my $line = $self->getNext())) {
1062 0 0         if($line->{type} eq 'disconnect') {
1063 0           $self->{needreconnect} = 1;
1064             }
1065             }
1066              
1067 0           return;
1068             }
1069              
1070             # ---------------- ClackCache handling --------------------
1071              
1072 0     0 1   sub sendRawCommand($self, $command) {
  0            
  0            
  0            
1073 0 0 0       if(!defined($command) || !length($command)) {
1074 0           carp("command not defined!");
1075 0           return;
1076             }
1077              
1078 0 0         if($self->{needreconnect}) {
1079 0           $self->reconnect;
1080             }
1081              
1082 0           $self->{outbuffer} .= $command . "\r\n";
1083              
1084 0           return;
1085             }
1086              
1087             # setAndStore combines the SET and STORE command into the SETANDSTORE server command. This is mostly done
1088             # for optimizing interclacks connections
1089             # Other clients will only get a SET notification, but the server also runs a STORE operation
1090 0     0 1   sub setAndStore($self, $varname, $value, $forcesend = 0) {
  0            
  0            
  0            
  0            
  0            
1091 0 0 0       if(!defined($varname) || !length($varname)) {
1092 0           carp("varname not defined!");
1093 0           return;
1094             }
1095              
1096 0 0         if(!defined($value)) {
1097 0           carp("value not defined!");
1098 0           return;
1099             }
1100              
1101 0 0         if(!defined($forcesend)) {
1102 0           $forcesend = 0;
1103             }
1104              
1105 0           $self->{outbuffer} .= "SETANDSTORE $varname=$value\r\n";
1106              
1107 0 0         if($self->{memcached_compatibility}) {
1108 0           while(1) {
1109 0           $self->doNetwork();
1110 0 0         if($self->{needreconnect}) {
1111             # Nothing we can do, really...
1112 0           return;
1113             }
1114 0 0         last if(!length($self->{outbuffer}));
1115 0           usleep(1000);
1116             }
1117              
1118 0           $self->autohandle_messages();
1119             }
1120              
1121 0           return;
1122             }
1123              
1124 0     0 1   sub disconnect($self) {
  0            
  0            
1125 0 0         if($self->{needreconnect}) {
1126             # We are not connected, just do nothing
1127 0           return;
1128             }
1129              
1130 0           $self->flush();
1131 0           $self->{outbuffer} .= "QUIT\r\n";
1132 0           my $endtime = time + 1; # Wait a maximum of one second to send
1133 0           while(1) {
1134 0 0         last if(time > $endtime);
1135 0           $self->doNetwork();
1136 0 0         last if(!length($self->{outbuffer}));
1137 0           sleep(0.05);
1138             }
1139 0           sleep(0.5); # Wait another half second for the OS to flush the socket
1140              
1141 0           delete $self->{socket};
1142 0           $self->{needreconnect} = 1;
1143              
1144 0           return;
1145             }
1146              
1147 0     0     sub DESTROY($self) {
  0            
  0            
1148             # Try to disconnect cleanly, but socket might already be DESTROYed, so catch any errors
1149 0           eval {
1150 0           $self->disconnect();
1151             };
1152              
1153 0           return;
1154             }
1155              
1156             1;
1157             __END__