File Coverage

blib/lib/Net/Clacks/Client.pm
Criterion Covered Total %
statement 50 650 7.6
branch 0 326 0.0
condition 0 105 0.0
subroutine 17 48 35.4
pod 29 29 100.0
total 96 1158 8.2


line stmt bran cond sub pod time code
1             package Net::Clacks::Client;
2             #---AUTOPRAGMASTART---
3 1     1   17 use 5.020;
  1         3  
4 1     1   5 use strict;
  1         2  
  1         15  
5 1     1   4 use warnings;
  1         2  
  1         19  
6 1     1   4 use diagnostics;
  1         2  
  1         4  
7 1     1   23 use mro 'c3';
  1         2  
  1         4  
8 1     1   33 use English;
  1         2  
  1         5  
9 1     1   301 use Carp;
  1         2  
  1         48  
10             our $VERSION = 22;
11 1     1   5 use autodie qw( close );
  1         1  
  1         5  
12 1     1   230 use Array::Contains;
  1         2  
  1         38  
13 1     1   5 use utf8;
  1         1  
  1         4  
14 1     1   34 use Encode qw(is_utf8 encode_utf8 decode_utf8);
  1         1  
  1         35  
15             #---AUTOPRAGMAEND---
16              
17 1     1   499 use IO::Socket::IP;
  1         27254  
  1         4  
18             #use IO::Socket::UNIX;
19 1     1   906 use Time::HiRes qw[sleep usleep];
  1         1159  
  1         7  
20 1     1   548 use Sys::Hostname;
  1         844  
  1         42  
21 1     1   368 use IO::Select;
  1         1435  
  1         55  
22 1     1   663 use IO::Socket::SSL;
  1         40418  
  1         6  
23 1     1   579 use MIME::Base64;
  1         581  
  1         6340  
24              
25             sub new {
26 0     0 1   my ($class, $server, $port, $username, $password, $clientname, $iscaching) = @_;
27 0           my $self = bless {}, $class;
28              
29 0 0 0       if(!defined($server) || !length($server)) {
30 0           croak("server not defined!");
31             }
32 0 0 0       if(!defined($port) || !length($port)) {
33 0           croak("port not defined!");
34             }
35 0 0 0       if(!defined($username) || !length($username)) {
36 0           croak("username not defined!");
37             }
38 0 0 0       if(!defined($password) || !length($password)) {
39 0           croak("password not defined!");
40             }
41 0 0 0       if(!defined($clientname) || !length($clientname)) {
42 0           croak("clientname not defined!");
43             }
44              
45 0           $self->{server} = $server;
46 0           $self->{port} = $port;
47              
48 0           $self->init($username, $password, $clientname, $iscaching);
49              
50 0           return $self;
51             }
52              
53             sub newSocket {
54 0     0 1   my ($class, $socketpath, $username, $password, $clientname, $iscaching) = @_;
55 0           my $self = bless {}, $class;
56              
57 0 0 0       if(!defined($socketpath) || !length($socketpath)) {
58 0           croak("socketpath not defined!");
59             }
60 0 0 0       if(!defined($username) || !length($username)) {
61 0           croak("username not defined!");
62             }
63 0 0 0       if(!defined($password) || !length($password)) {
64 0           croak("password not defined!");
65             }
66 0 0 0       if(!defined($clientname) || !length($clientname)) {
67 0           croak("clientname not defined!");
68             }
69              
70 0           my $udsloaded = 0;
71 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
72 0           require IO::Socket::UNIX;
73 0           $udsloaded = 1;
74             };
75 0 0         if(!$udsloaded) {
76 0           croak("Specified a unix domain socket, but i couldn't load IO::Socket::UNIX!");
77             }
78              
79 0           $self->{socketpath} = $socketpath;
80              
81 0           $self->init($username, $password, $clientname, $iscaching);
82              
83 0           return $self;
84             }
85              
86             sub init {
87 0     0 1   my ($self, $username, $password, $clientname, $iscaching) = @_;
88              
89 0 0 0       if(!defined($username) || $username eq '') {
90 0           croak("Username not defined!");
91             }
92 0 0 0       if(!defined($password) || $password eq '') {
93 0           croak("Password not defined!");
94             }
95              
96 0 0 0       if(!defined($clientname || $clientname eq '')) {
97 0           croak("Clientname not defined!");
98             }
99 0           $self->{clientname} = $clientname;
100              
101 0           $self->{authtoken} = encode_base64($username, '') . ':' . encode_base64($password, '');
102              
103 0 0         if(!defined($iscaching)) {
104 0           $iscaching = 0;
105             }
106 0           $self->{iscaching} = $iscaching;
107              
108 0 0         if($self->{iscaching}) {
109 0           $self->{cache} = {};
110             }
111              
112 0           $self->{needreconnect} = 1;
113 0           $self->{inlines} = [];
114 0           $self->{firstconnect} = 1;
115              
116 0           $self->{memcached_compatibility} = 0;
117              
118             $self->{remembrancenames} = [
119 0           'Ivy Bdubs',
120             'Terry Pratchett',
121             'Sven Guckes',
122             ];
123 0           $self->{remembranceinterval} = 3600; # One hour
124 0           $self->{nextremembrance} = time + $self->{remembranceinterval};
125              
126 0           $self->reconnect();
127              
128 0           return;
129             }
130              
131             sub reconnect {
132 0     0 1   my ($self) = @_;
133              
134 0 0         if(defined($self->{socket})) {
135 0           delete $self->{socket};
136             }
137              
138 0 0         if(!$self->{firstconnect}) {
139             # Not our first connection (=real reconnect).
140             # wait a short random time before reconnecting. In case all
141             # clients got disconnected, we want to avoid having all clients reconnect
142             # at the exact same time
143 0           my $waittime = rand(4000)/1000;
144 0           sleep($waittime);
145             }
146              
147 0           my $socket;
148 0 0 0       if(defined($self->{server}) && defined($self->{port})) {
    0          
149             $socket = IO::Socket::IP->new(
150             PeerHost => $self->{server},
151             PeerPort => $self->{port},
152 0 0         Type => SOCK_STREAM,
153             ) or croak("Failed to connect to Clacks TCP message service: $ERRNO");
154             } elsif(defined($self->{socketpath})) {
155             $socket = IO::Socket::UNIX->new(
156             Peer => $self->{socketpath},
157 0 0         Type => SOCK_STREAM,
158             ) or croak("Failed to connect to Clacks Unix Domain Socket message service: $ERRNO");
159             } else {
160 0           croak("Neither TCP nor Unix domain socket specified. Don't know where to connect to.");
161             }
162              
163             #binmode($socket, ':bytes');
164 0           $socket->blocking(0);
165              
166              
167 0 0         if(ref $socket ne 'IO::Socket::UNIX') {
168             # ONLY USE SSL WHEN RUNNING OVER THE NETWORK
169             # There is simply no point in running it over a local socket.
170 0 0         IO::Socket::SSL->start_SSL($socket,
171             SSL_verify_mode => SSL_VERIFY_NONE,
172             ) or croak("Can't use SSL: " . $SSL_ERROR);
173             }
174              
175 0           $self->{socket} = $socket;
176 0           $self->{selector} = IO::Select->new($self->{socket});
177 0           $self->{failcount} = 0;
178 0           $self->{lastping} = time;
179 0           $self->{inbuffer} = '';
180 0           $self->{incharbuffer} = [];
181 0           $self->{outbuffer} = '';
182 0           $self->{serverinfo} = 'UNKNOWN';
183 0           $self->{needreconnect} = 0;
184 0           $self->{firstline} = 1;
185 0           $self->{headertimeout} = time + 15;
186              
187             # Do *not* nuke "inlines" array, since it may hold "QUIT" messages that the client wants to handle, for example, to re-issue
188             # "LISTEN" commands.
189             # $self->{inlines} = ();
190              
191 0 0         if($self->{firstconnect}) {
192 0           $self->{firstconnect} = 0;
193             } else {
194 0           push @{$self->{inlines}}, "RECONNECTED";
  0            
195             }
196              
197             # Startup "handshake". As everything else, this is asyncronous, both server and
198             # client send their respective version strings and then wait to recieve their counterparts
199             # Also, this part is REQUIRED, just to make sure we actually speek to CLACKS protocol
200 0           $self->{outbuffer} .= 'CLACKS ' . $self->{clientname} . "\r\n";
201 0           $self->{outbuffer} .= 'OVERHEAD A ' . $self->{authtoken} . "\r\n";
202 0           $self->doNetwork();
203              
204 0           return;
205             }
206              
207             sub activate_memcached_compat {
208 0     0 1   my ($self) = @_;
209              
210 0           $self->{memcached_compatibility} = 1;
211 0           return;
212             }
213              
214             sub getRawSocket {
215 0     0 1   my ($self) = @_;
216              
217 0 0         if($self->{needreconnect}) {
218 0           $self->reconnect();
219             }
220              
221 0           return $self->{socket};
222             }
223              
224             sub doNetwork {
225 0     0 1   my ($self, $readtimeout) = @_;
226              
227 0 0         if(!defined($readtimeout)) {
228             # Don't wait
229 0           $readtimeout = 0;
230             }
231              
232 0 0         if($self->{needreconnect}) {
233 0           $self->reconnect();
234             }
235              
236 0 0 0       if($self->{nextremembrance} && time > $self->{nextremembrance}) {
237             # A person is not dead while their name is still spoken.
238 0           $self->{nextremembrance} = time + $self->{remembranceinterval} + int(rand($self->{remembranceinterval} / 10));
239 0           my $neverforget = $self->{remembrancenames}->[rand @{$self->{remembrancenames}}];
  0            
240 0           $self->{outbuffer} .= 'OVERHEAD GNU ' . $neverforget . "\r\n";
241             }
242              
243             # doNetwork interleaves handling incoming and outgoing traffic.
244             # This is only relevant on slow links.
245             #
246             # It returns even if the outgoing or incoming buffers are not empty
247             # (meaning that partially buffered data can exists). This way we use the
248             # available bandwidth without blocking unduly the application (we assume it's a realtime
249             # application with multiple things going on at the same time)-
250             #
251             # The downside of this is that doNetwork() needs to be called on a regular basis and sending
252             # and recieving might be delayed until the next cycle. This delay can be minimized by simply
253             # not transfering huge values over clacks, but instead using it the way it was intended to be used:
254             # Small variables can be SET directly by clacks, huge datasets should be stored in the
255             # database and the recievers only NOTIFY'd that a change has taken place.
256             #
257             # The big exception here is the here is the ClacksCache part of the story. These functions
258             # call doNetwork() in a loop until the outbuffer is empty. And depending on requirement, they
259             # KEEP on calling doNetwork() until the answer is recieved. This makes ClacksCache functions
260             # syncronous and causes some delay in the calling function. But doing these asyncronous will
261             # cause more headaches, maybe even leading up to insanity. Believe me, i tried, but those pink
262             # elephants stomping about my rubber-padded room are such a distraction...
263              
264 0           my $workCount = 0;
265              
266 0 0         if(length($self->{outbuffer})) {
267 0           my $brokenpipe = 0;
268 0           my $writeok = 0;
269 0           my $written;
270 0           eval {
271 0     0     local $SIG{PIPE} = sub { $brokenpipe = 1; };
  0            
272 0           $written = syswrite($self->{socket}, $self->{outbuffer});
273 0           $writeok = 1;
274             };
275              
276 0 0 0       if($brokenpipe || !$writeok) {
277 0           $self->{needreconnect} = 1;
278 0           push @{$self->{inlines}}, "TIMEOUT";
  0            
279 0           return;
280             }
281              
282 0 0 0       if(defined($written) && $written) {
283 0           $workCount += $written;
284 0 0         if(length($self->{outbuffer}) == $written) {
285 0           $self->{outbuffer} = '';
286             } else {
287 0           $self->{outbuffer} = substr($self->{outbuffer}, $written);
288             }
289             }
290              
291             }
292              
293             {
294 0           my $select = IO::Select->new($self->{socket});
  0            
295 0           my @temp;
296 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
297 0           @temp = $self->{selector}->can_read($readtimeout);
298             };
299 0 0         if(scalar @temp == 0) {
300             # Timeout
301 0           return $workCount;
302             }
303             }
304              
305 0           my $totalread = 0;
306 0           while(1) {
307 0           my $buf;
308 0           my $readok = 0;
309 0           eval {
310 0           sysread($self->{socket}, $buf, 10_000); # Read in at most 10kB at once
311 0           $readok = 1;
312             };
313 0 0         if(!$readok) {
314 0           $self->{needreconnect} = 1;
315 0           push @{$self->{inlines}}, "TIMEOUT";
  0            
316 0           return;
317             }
318 0 0 0       if(defined($buf) && length($buf)) {
319 0           $totalread += length($buf);
320             #print STDERR "+ $buf\n--\n";
321 0           push @{$self->{incharbuffer}}, split//, $buf;
  0            
322 0           next;
323             }
324 0           last;
325             }
326            
327             # Check if we actually got data after checking with can_read() first
328 0 0         if($totalread) {
329 0           $self->{failcount} = 0;
330             } else {
331             # This should normally not happen, but thanks to SSL, it sometimes does
332             # We ignore single instances of those but disconnect if many happen in a row
333 0           $self->{failcount}++;
334 0           sleep(0.05);
335            
336 0 0         if($self->{failcount} > 5) {
337 0           $self->{needreconnect} = 1;
338 0           return;
339             }
340             }
341 0           while(@{$self->{incharbuffer}}) {
  0            
342 0           my $char = shift @{$self->{incharbuffer}};
  0            
343 0           $workCount++;
344 0 0         if($char eq "\r") {
    0          
345 0           next;
346             } elsif($char eq "\n") {
347 0 0         if($self->{inbuffer} eq 'NOP') { # Just drop "No OPerations" packets, only used by server to
348             # verify that the connection is still active
349             #$self->{firstline}
350 0           $self->{inbuffer} = '';
351 0           next;
352             }
353              
354 0 0         if($self->{firstline}) {
355 0 0         if($self->{inbuffer} !~ /^CLACKS\ /) {
356             # Whoops, not a clacks server or something gone wrong with the protocol
357 0           $self->{needreconnect} = 1;
358 0           return 0;
359             } else {
360 0           $self->{firstline} = 0;
361             }
362             }
363              
364 0           push @{$self->{inlines}}, $self->{inbuffer};
  0            
365 0           $self->{inbuffer} = '';
366             } else {
367 0           $self->{inbuffer} .= $char;
368             }
369             }
370              
371 0 0 0       if($self->{firstline} && $self->{headertimeout} < time) {
372 0           $self->{needreconnect} = 1;
373 0           return 0;
374             }
375              
376 0           return $workCount;
377             }
378              
379             my %overheadflags = (
380             A => "auth_token", # Authentication token
381             O => "auth_ok", # Authentication OK
382             F => "auth_failed", # Authentication FAILED
383              
384             E => 'error_message', # Server to client error message
385              
386             C => "close_all_connections",
387             D => "discard_message",
388             G => "forward_message",
389             I => "set_interclacks_mode", # value: true/false, disables 'G' and 'U'
390             M => "informal_message", # informal message
391             N => "no_logging",
392             S => "shutdown_service", # value: positive number (number in seconds before shutdown). If interclacks clients are present, should be high
393             # enough to flush all buffers to them
394             U => "return_to_sender",
395             Z => "no_flags", # Only sent when no other flags are set
396             );
397              
398             sub getNext {
399 0     0 1   my ($self) = @_;
400              
401             # Recieve next incoming message (if any)
402              
403             restartgetnext:
404 0           my $line = shift @{$self->{inlines}};
  0            
405              
406 0 0         if(!defined($line)) {
407 0           return;
408             }
409              
410 0           my %data;
411             #print STDERR "> $line\n";
412 0 0         if($line =~ /^NOTIFY\ (.+)/) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
413 0           %data = (
414             type => 'notify',
415             name => $1,
416             );
417             } elsif($line =~ /^SET\ (.+?)\=(.*)/) {
418 0           %data = (
419             type => 'set',
420             name => $1,
421             data => $2,
422             );
423             } elsif($line =~ /^CLACKS\ (.+)/) {
424 0           %data = (
425             type => 'serverinfo',
426             data => $1,
427             );
428             } elsif($line =~ /^DEBUG\ (.+?)\=(.*)/) {
429 0           %data = (
430             type => 'debug',
431             host => $1,
432             command => $2,
433             );
434             } elsif($line =~ /^QUIT/) {
435 0           %data = (
436             type => 'disconnect',
437             data => 'quit',
438             );
439 0           $self->{needreconnect} = 1;
440             } elsif($line =~ /^TIMEOUT/) {
441 0           %data = (
442             type => 'disconnect',
443             data => 'timeout',
444             );
445 0           $self->{needreconnect} = 1;
446             } elsif($line =~ /^RECONNECTED/) {
447 0           %data = (
448             type => 'reconnected',
449             data => 'send your LISTEN requests again',
450             );
451             } elsif($line =~ /^OVERHEAD\ (.+?)\ (.+)/) {
452             # Minimal handling of OVERHEAD flags
453 0           my ($flags, $value) = ($1, $2);
454 0           my @flagparts = split//, $flags;
455 0           my %parsedflags;
456 0           foreach my $key (sort keys %overheadflags) {
457 0 0         if(contains($key, \@flagparts)) {
458 0           $parsedflags{$overheadflags{$key}} = 1;
459             } else {
460 0           $parsedflags{$overheadflags{$key}} = 0;
461             }
462             }
463              
464 0 0         if($parsedflags{auth_ok}) {
    0          
    0          
    0          
465             #print STDERR "Clacks AUTH OK\n";
466 0           goto restartgetnext; # try the next message
467             } elsif($parsedflags{error_message}) {
468 0           %data = (
469             type => 'error_message',
470             data => $value,
471             );
472             } elsif($parsedflags{auth_failed}) {
473 0           croak("Clacks Authentication failed!");
474             } elsif($parsedflags{informal_message}) {
475 0 0         if($parsedflags{forward_message}) {
476 0           %data = (
477             type => 'informal',
478             data => $value,
479             );
480             }
481 0 0         if($parsedflags{return_to_sender}) {
482 0           my $uturn = 'OVERHEAD M';
483 0 0         if($parsedflags{no_logging}) {
484 0           $uturn .= 'N';
485             }
486 0           $uturn .= ' ' . $value;
487 0           $self->{outbuffer} .= $uturn;
488             }
489             }
490              
491             } else {
492             # UNKNOWN, ignore
493 0           goto restartgetnext; # try the next message
494             }
495              
496 0 0         if(!defined($data{type})) {
497 0           return;
498             }
499              
500 0           $data{rawline} = $line;
501              
502 0           return \%data;
503             }
504              
505              
506             sub ping {
507 0     0 1   my ($self) = @_;
508              
509 0 0         if($self->{lastping} < (time - 120)) {
510             # Only send a ping every 120 seconds or less
511 0           $self->{outbuffer} .= "PING\r\n";
512 0           $self->{lastping} = time;
513             }
514              
515 0           return;
516             }
517              
518             sub disablePing {
519 0     0 1   my ($self) = @_;
520              
521 0           $self->{outbuffer} .= "NOPING\r\n";
522              
523 0           return;
524             }
525              
526              
527             sub notify {
528 0     0 1   my ($self, $varname) = @_;
529              
530 0 0 0       if(!defined($varname) || !length($varname)) {
531 0           carp("varname not defined!");
532 0           return;
533             }
534              
535 0 0         if($self->{needreconnect}) {
536 0           $self->reconnect;
537             }
538              
539 0           $self->{outbuffer} .= "NOTIFY $varname\r\n";
540              
541 0 0         if($self->{memcached_compatibility}) {
542 0           while(1) {
543 0           $self->doNetwork();
544 0 0         if($self->{needreconnect}) {
545             # Nothing we can do, really...
546 0           return;
547             }
548 0 0         last if(!length($self->{outbuffer}));
549 0           usleep(1000);
550             }
551 0           $self->autohandle_messages();
552             }
553              
554 0           return;
555             }
556              
557             sub set { ## no critic (NamingConventions::ProhibitAmbiguousNames)
558 0     0 1   my ($self, $varname, $value, $forcesend) = @_;
559              
560 0 0 0       if(!defined($varname) || !length($varname)) {
561 0           carp("varname not defined!");
562 0           return;
563             }
564 0 0         if(!defined($value)) {
565 0           carp("value not defined!");
566 0           return;
567             }
568              
569 0 0         if(!defined($forcesend)) {
570 0           $forcesend = 0;
571             }
572              
573 0 0         if($self->{needreconnect}) {
574 0           $self->reconnect;
575             }
576              
577             # Handle caching to lower output volumne
578 0 0 0       if($self->{iscaching} && !$forcesend && defined($self->{cache}->{$varname}) && $self->{cache}->{$varname} eq $value) {
      0        
      0        
579             # Already the same value send
580 0           return;
581             }
582              
583 0 0         if($self->{iscaching}) {
584 0           $self->{cache}->{$varname} = $value;
585             }
586              
587 0           $self->{outbuffer} .= "SET $varname=$value\r\n";
588              
589 0 0         if($self->{memcached_compatibility}) {
590 0           while(1) {
591 0           $self->doNetwork();
592 0 0         if($self->{needreconnect}) {
593             # Nothing we can do, really...
594 0           return;
595             }
596 0 0         last if(!length($self->{outbuffer}));
597 0           usleep(1000);
598             }
599              
600 0           $self->autohandle_messages();
601             }
602              
603 0           return;
604             }
605              
606             sub listen { ## no critic (Subroutines::ProhibitBuiltinHomonyms)
607 0     0 1   my ($self, $varname) = @_;
608              
609 0 0 0       if(!defined($varname) || !length($varname)) {
610 0           carp("varname not defined!");
611 0           return;
612             }
613              
614 0 0         if($self->{needreconnect}) {
615 0           $self->reconnect;
616             }
617              
618 0           $self->{outbuffer} .= "LISTEN $varname\r\n";
619              
620 0           return;
621             }
622              
623             sub unlisten {
624 0     0 1   my ($self, $varname) = @_;
625              
626 0 0 0       if(!defined($varname) || !length($varname)) {
627 0           carp("varname not defined!");
628 0           return;
629             }
630              
631 0 0         if($self->{needreconnect}) {
632 0           $self->reconnect;
633             }
634              
635 0           $self->{outbuffer} .= "UNLISTEN $varname\r\n";
636              
637 0           return;
638             }
639              
640             sub setMonitormode {
641 0     0 1   my ($self, $active) = @_;
642              
643 0 0         if($self->{needreconnect}) {
644 0           $self->reconnect;
645             }
646              
647 0 0 0       if(!defined($active) || !$active) {
648 0           $self->{outbuffer} .= "UNMONITOR\r\n";
649             } else {
650 0           $self->{outbuffer} .= "MONITOR\r\n";
651             }
652              
653 0           return;
654             }
655              
656             sub getServerinfo {
657 0     0 1   my ($self) = @_;
658              
659 0           return $self->{serverinfo};
660             }
661              
662             # ---------------- ClackCache handling --------------------
663             # ClacksCache handling always implies doNetwork()
664             # Also, we do NOT use the caching system used for SET
665             sub store {
666 0     0 1   my ($self, $varname, $value) = @_;
667              
668 0 0 0       if(!defined($varname) || !length($varname)) {
669 0           carp("varname not defined!");
670 0           return;
671             }
672 0 0         if(!defined($value)) {
673 0           carp("value not defined!");
674 0           return;
675             }
676              
677 0 0         if($self->{needreconnect}) {
678 0           $self->reconnect;
679             }
680              
681 0           $self->{outbuffer} .= "STORE $varname=$value\r\n";
682 0           while(1) {
683 0           $self->doNetwork();
684 0 0         if($self->{needreconnect}) {
685             # Nothing we can do, really...
686 0           return;
687             }
688 0 0         last if(!length($self->{outbuffer}));
689 0           usleep(1000);
690             }
691              
692 0 0         if($self->{memcached_compatibility}) {
693 0           $self->autohandle_messages();
694             }
695              
696 0           return;
697             }
698              
699             sub retrieve {
700 0     0 1   my ($self, $varname) = @_;
701              
702 0 0 0       if(!defined($varname) || !length($varname)) {
703 0           carp("varname not defined!");
704 0           return;
705             }
706              
707 0           my $value;
708              
709 0 0         if($self->{needreconnect}) {
710 0           $self->reconnect;
711             }
712              
713 0           $self->{outbuffer} .= "RETRIEVE $varname\r\n";
714              
715             # Make sure we send everything
716 0           while(1) {
717 0           $self->doNetwork();
718 0 0         if($self->{needreconnect}) {
719             # Nothing we can do, really...
720 0           return;
721             }
722 0 0         last if(!length($self->{outbuffer}));
723             }
724              
725             # Now, wait for the answer
726 0           my $answerline;
727 0           while(1) {
728 0           $self->doNetwork(0.5);
729 0 0         if($self->{needreconnect}) {
730             # Nothing we can do, really...
731 0           return;
732             }
733 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
734 0 0 0       if($self->{inlines}->[$i] =~ /^RETRIEVED\ $varname/ || $self->{inlines}->[$i] =~ /^NOTRETRIEVED\ $varname/) {
735             # Remove the answer from in in-queue directly (out of sequence), because we don't need in in the getNext function
736 0           $answerline = splice @{$self->{inlines}}, $i, 1;
  0            
737 0           last;
738             }
739             }
740 0 0         last if(defined($answerline));
741             }
742              
743 0 0         if($answerline =~ /^RETRIEVED\ (.+?)\=(.*)/) {
744 0           my ($key, $val) = ($1, $2);
745 0 0         if($key ne $varname) {
746 0           print STDERR "Retrieved clacks key $key does not match requested varname $varname!\n";
747 0           return;
748             }
749 0           return $val;
750             }
751              
752             # No matching key
753 0           return;
754             }
755              
756             sub remove {
757 0     0 1   my ($self, $varname) = @_;
758              
759 0 0 0       if(!defined($varname) || !length($varname)) {
760 0           carp("varname not defined!");
761 0           return;
762             }
763              
764 0 0         if($self->{needreconnect}) {
765 0           $self->reconnect;
766             }
767              
768 0           $self->{outbuffer} .= "REMOVE $varname\r\n";
769 0           while(1) {
770 0           $self->doNetwork();
771 0 0         if($self->{needreconnect}) {
772             # Nothing we can do, really...
773 0           return;
774             }
775 0 0         last if(!length($self->{outbuffer}));
776 0           usleep(1000);
777             }
778              
779 0 0         if($self->{memcached_compatibility}) {
780 0           $self->autohandle_messages();
781             }
782              
783 0           return;
784             }
785              
786             sub increment {
787 0     0 1   my ($self, $varname, $stepsize) = @_;
788              
789 0 0 0       if(!defined($varname) || !length($varname)) {
790 0           carp("varname not defined!");
791 0           return;
792             }
793              
794 0 0         if($self->{needreconnect}) {
795 0           $self->reconnect;
796             }
797              
798 0 0         if(!defined($stepsize)) {
799 0           $self->{outbuffer} .= "INCREMENT $varname\r\n";
800             } else {
801 0           $stepsize = 0 + $stepsize;
802 0           $self->{outbuffer} .= "INCREMENT $varname=$stepsize\r\n";
803             }
804              
805 0           while(1) {
806 0           $self->doNetwork();
807 0 0         if($self->{needreconnect}) {
808             # Nothing we can do, really...
809 0           return;
810             }
811 0 0         last if(!length($self->{outbuffer}));
812 0           usleep(1000);
813             }
814              
815 0 0         if($self->{memcached_compatibility}) {
816 0           $self->autohandle_messages();
817             }
818              
819 0           return;
820             }
821              
822             sub decrement {
823 0     0 1   my ($self, $varname, $stepsize) = @_;
824              
825 0 0 0       if(!defined($varname) || !length($varname)) {
826 0           carp("varname not defined!");
827 0           return;
828             }
829              
830 0 0         if($self->{needreconnect}) {
831 0           $self->reconnect;
832             }
833              
834 0 0         if(!defined($stepsize)) {
835 0           $self->{outbuffer} .= "DECREMENT $varname\r\n";
836             } else {
837 0           $stepsize = 0 + $stepsize;
838 0           $self->{outbuffer} .= "DECREMENT $varname=$stepsize\r\n";
839             }
840 0           while(1) {
841 0           $self->doNetwork();
842 0 0         if($self->{needreconnect}) {
843             # Nothing we can do, really...
844 0           return;
845             }
846 0 0         last if(!length($self->{outbuffer}));
847 0           usleep(1000);
848             }
849              
850 0 0         if($self->{memcached_compatibility}) {
851 0           $self->autohandle_messages();
852             }
853              
854 0           return;
855             }
856              
857             sub clearcache {
858 0     0 1   my ($self) = @_;
859              
860 0 0         if($self->{needreconnect}) {
861 0           $self->reconnect;
862             }
863              
864 0           $self->{outbuffer} .= "CLEARCACHE\r\n";
865 0           while(1) {
866 0           $self->doNetwork();
867 0 0         if($self->{needreconnect}) {
868             # Nothing we can do, really...
869 0           return;
870             }
871 0 0         last if(!length($self->{outbuffer}));
872 0           usleep(1000);
873             }
874              
875 0 0         if($self->{memcached_compatibility}) {
876 0           $self->autohandle_messages();
877             }
878              
879 0           return;
880             }
881              
882             sub keylist {
883 0     0 1   my ($self) = @_;
884              
885 0 0         if($self->{needreconnect}) {
886 0           $self->reconnect;
887             }
888              
889 0           my $value;
890              
891 0           $self->{outbuffer} .= "KEYLIST\r\n";
892              
893             # Make sure we send everything
894 0           while(1) {
895 0           $self->doNetwork();
896 0 0         if($self->{needreconnect}) {
897             # Nothing we can do, really...
898 0           return;
899             }
900 0 0         last if(!length($self->{outbuffer}));
901 0           usleep(1000);
902             }
903              
904             # Now, wait for the answer
905 0           my $liststartfound = 0;
906 0           my $listendfound = 0;
907 0           while(1) {
908 0           $self->doNetwork(0.5);
909 0 0         if($self->{needreconnect}) {
910             # Nothing we can do, really...
911 0           return;
912             }
913 0           $liststartfound = 0;
914 0           $listendfound = 0;
915 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
916 0 0         if($self->{inlines}->[$i] =~ /^KEYLISTSTART/) {
917 0           $liststartfound = 1;
918 0           next;
919             }
920 0 0         next unless($liststartfound);
921 0 0         if($self->{inlines}->[$i] =~ /^KEYLISTEND/) {
922 0           $listendfound = 1;
923 0           last;
924             }
925             }
926 0 0         last if($listendfound);
927             }
928              
929             # Now, grab the keys from inlines buffer
930 0           my @keys;
931 0           my $idx = 0;
932 0           my $listfound = 0;
933 0           while($idx < scalar @{$self->{inlines}}) {
  0            
934 0 0         if($self->{inlines}->[$idx] =~ /^KEYLISTSTART/) {
935             # Just remove this line
936 0           splice @{$self->{inlines}}, $idx, 1;
  0            
937 0           $listfound = 1;
938 0           next;
939             }
940              
941 0 0         if(!$listfound) {
942 0           $idx++;
943 0           next;
944             }
945              
946 0 0         if($self->{inlines}->[$idx] =~ /^KEYLISTEND/) {
947             # End of list
948 0           last;
949             }
950              
951 0 0         if($self->{inlines}->[$idx] =~ /^KEY\ (.+)/) {
952 0           push @keys, $1;
953             # Don't increment $idx, but the rest of the array one element down
954 0           splice @{$self->{inlines}}, $idx, 1;
  0            
955             } else {
956 0           $idx++;
957             }
958             }
959              
960 0 0         if($self->{memcached_compatibility}) {
961 0           $self->autohandle_messages();
962             }
963              
964 0           return @keys;
965             }
966              
967             sub clientlist {
968 0     0 1   my ($self) = @_;
969              
970 0 0         if($self->{needreconnect}) {
971 0           $self->reconnect;
972             }
973              
974 0           my $value;
975              
976 0           $self->{outbuffer} .= "CLIENTLIST\r\n";
977              
978             # Make sure we send everything
979 0           while(1) {
980 0           $self->doNetwork();
981 0 0         if($self->{needreconnect}) {
982             # Nothing we can do, really...
983 0           return;
984             }
985 0 0         last if(!length($self->{outbuffer}));
986 0           usleep(1000);
987             }
988              
989             # Now, wait for the answer
990 0           my $liststartfound = 0;
991 0           my $listendfound = 0;
992 0           while(1) {
993 0           $self->doNetwork(0.5);
994 0 0         if($self->{needreconnect}) {
995             # Nothing we can do, really...
996 0           return;
997             }
998 0           $liststartfound = 0;
999 0           $listendfound = 0;
1000 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
1001 0 0         if($self->{inlines}->[$i] =~ /^CLIENTLISTSTART/) {
1002 0           $liststartfound = 1;
1003 0           next;
1004             }
1005 0 0         next unless($liststartfound);
1006 0 0         if($self->{inlines}->[$i] =~ /^CLIENTLISTEND/) {
1007 0           $listendfound = 1;
1008 0           last;
1009             }
1010             }
1011 0 0         last if($listendfound);
1012             }
1013              
1014             # Now, grab the keys from inlines buffer
1015 0           my @keys;
1016 0           my $idx = 0;
1017 0           my $listfound = 0;
1018 0           while($idx < scalar @{$self->{inlines}}) {
  0            
1019 0 0         if($self->{inlines}->[$idx] =~ /^CLIENTLISTSTART/) {
1020             # Just remove this line
1021 0           splice @{$self->{inlines}}, $idx, 1;
  0            
1022 0           $listfound = 1;
1023 0           next;
1024             }
1025              
1026 0 0         if(!$listfound) {
1027 0           $idx++;
1028 0           next;
1029             }
1030              
1031 0 0         if($self->{inlines}->[$idx] =~ /^CLIENTLISTEND/) {
1032             # End of list
1033 0           last;
1034             }
1035              
1036 0 0         if($self->{inlines}->[$idx] =~ /^CLIENT\ (.+)/) {
1037 0           my @parts = split/\;/, $1;
1038 0           my %data;
1039 0           foreach my $part (@parts) {
1040 0           my ($datakey, $datavalue) = split/\=/, $part, 2;
1041             #$datakey = lc $datakey;
1042 0           $data{lc $datakey} = $datavalue;
1043             }
1044 0           push @keys, \%data;
1045             # Don't increment $idx, but the rest of the array one element down
1046 0           splice @{$self->{inlines}}, $idx, 1;
  0            
1047             } else {
1048 0           $idx++;
1049             }
1050             }
1051              
1052 0 0         if($self->{memcached_compatibility}) {
1053 0           $self->autohandle_messages();
1054             }
1055              
1056 0           return @keys;
1057             }
1058              
1059             sub flush {
1060 0     0 1   my ($self, $flushid) = @_;
1061              
1062 0 0 0       if(!defined($flushid) || $flushid eq '') {
1063 0           $flushid = 'AUTO' . int(rand(1_000_000)) . int(rand(1_000_000));
1064             }
1065              
1066 0 0         if($self->{needreconnect}) {
1067 0           $self->reconnect;
1068             }
1069              
1070 0           $self->{outbuffer} .= "FLUSH $flushid\r\n";
1071              
1072             # Make sure we send everything
1073 0           while(1) {
1074 0           $self->doNetwork();
1075 0 0         if($self->{needreconnect}) {
1076             # Nothing we can do, really...
1077 0           return;
1078             }
1079 0 0         last if(!length($self->{outbuffer}));
1080             }
1081              
1082             # Now, wait for the answer
1083 0           my $answerline;
1084 0           while(1) {
1085 0           $self->doNetwork(0.5);
1086 0 0         if($self->{needreconnect}) {
1087             # Nothing we can do, really...
1088 0           return;
1089             }
1090 0           for(my $i = 0; $i < scalar @{$self->{inlines}}; $i++) {
  0            
1091 0 0         if($self->{inlines}->[$i] =~ /^FLUSHED\ $flushid/) {
1092             # Remove the answer from in in-queue directly (out of sequence), because we don't need in in the getNext function
1093 0           $answerline = splice @{$self->{inlines}}, $i, 1;
  0            
1094 0           last;
1095             }
1096             }
1097 0 0         last if(defined($answerline));
1098             }
1099              
1100 0           return;
1101             }
1102              
1103              
1104             sub autohandle_messages {
1105 0     0 1   my ($self) = @_;
1106              
1107 0           $self->doNetwork();
1108              
1109 0           while((my $line = $self->getNext())) {
1110 0 0         if($line->{type} eq 'disconnect') {
1111 0           $self->{needreconnect} = 1;
1112             }
1113             }
1114              
1115 0           return;
1116             }
1117              
1118             # ---------------- ClackCache handling --------------------
1119              
1120             sub sendRawCommand {
1121 0     0 1   my ($self, $command) = @_;
1122              
1123 0 0 0       if(!defined($command) || !length($command)) {
1124 0           carp("command not defined!");
1125 0           return;
1126             }
1127              
1128 0 0         if($self->{needreconnect}) {
1129 0           $self->reconnect;
1130             }
1131              
1132 0           $self->{outbuffer} .= $command . "\r\n";
1133              
1134 0           return;
1135             }
1136              
1137             # setAndStore combines the SET and STORE command into the SETANDSTORE server command. This is mostly done
1138             # for optimizing interclacks connections
1139             # Other clients will only get a SET notification, but the server also runs a STORE operation
1140             sub setAndStore {
1141 0     0 1   my ($self, $varname, $value, $forcesend) = @_;
1142              
1143 0 0 0       if(!defined($varname) || !length($varname)) {
1144 0           carp("varname not defined!");
1145 0           return;
1146             }
1147              
1148 0 0         if(!defined($value)) {
1149 0           carp("value not defined!");
1150 0           return;
1151             }
1152              
1153 0 0         if(!defined($forcesend)) {
1154 0           $forcesend = 0;
1155             }
1156              
1157 0           $self->{outbuffer} .= "SETANDSTORE $varname=$value\r\n";
1158              
1159 0 0         if($self->{memcached_compatibility}) {
1160 0           while(1) {
1161 0           $self->doNetwork();
1162 0 0         if($self->{needreconnect}) {
1163             # Nothing we can do, really...
1164 0           return;
1165             }
1166 0 0         last if(!length($self->{outbuffer}));
1167 0           usleep(1000);
1168             }
1169              
1170 0           $self->autohandle_messages();
1171             }
1172              
1173 0           return;
1174             }
1175              
1176             sub disconnect {
1177 0     0 1   my ($self) = @_;
1178              
1179 0 0         if($self->{needreconnect}) {
1180             # We are not connected, just do nothing
1181 0           return;
1182             }
1183              
1184 0           $self->flush();
1185 0           $self->{outbuffer} .= "QUIT\r\n";
1186 0           my $endtime = time + 1; # Wait a maximum of one second to send
1187 0           while(1) {
1188 0 0         last if(time > $endtime);
1189 0           $self->doNetwork();
1190 0 0         last if(!length($self->{outbuffer}));
1191 0           sleep(0.05);
1192             }
1193 0           sleep(0.5); # Wait another half second for the OS to flush the socket
1194              
1195 0           delete $self->{socket};
1196 0           $self->{needreconnect} = 1;
1197              
1198 0           return;
1199             }
1200              
1201             sub DESTROY {
1202 0     0     my ($self) = @_;
1203              
1204             # Try to disconnect cleanly, but socket might already be DESTROYed, so catch any errors
1205 0           eval {
1206 0           $self->disconnect();
1207             };
1208              
1209 0           return;
1210             }
1211              
1212             1;
1213             __END__