File Coverage

blib/lib/Net/MemcacheQ.pm
Criterion Covered Total %
statement 18 85 21.1
branch 0 40 0.0
condition 0 3 0.0
subroutine 6 16 37.5
pod 5 5 100.0
total 29 149 19.4


line stmt bran cond sub pod time code
1             #########
2             # Author: rmp
3             # Last Modified: $Date$
4             # Id: $Id$
5             # Source: $Source$
6             # $HeadURL$
7             #
8             package Net::MemcacheQ;
9 2     2   39246 use strict;
  2         3  
  2         55  
10 2     2   10 use warnings;
  2         3  
  2         49  
11 2     2   3603 use IO::Socket::INET;
  2         46488  
  2         16  
12 2     2   2884 use Readonly;
  2         11797  
  2         104  
13 2     2   13 use Carp;
  2         5  
  2         105  
14 2     2   865 use English qw(-no_match_vars);
  2         2349  
  2         15  
15              
16             Readonly::Scalar our $DEFAULT_HOST => '127.0.0.1';
17             Readonly::Scalar our $DEFAULT_PORT => 22_201;
18              
19             our $DEBUG = 0;
20             our $DEBUG_INFO = 1;
21             our $VERSION = '1.04';
22              
23             sub new {
24 0     0 1   my ($class, $ref) = @_;
25              
26 0 0         if(!$ref) {
27 0           $ref = {};
28             }
29              
30 0           bless $ref, $class;
31 0           return $ref;
32             }
33              
34             sub _host {
35 0     0     my ($self) = @_;
36 0 0         if($self->{host}) {
37 0           return $self->{host};
38             }
39 0           return $DEFAULT_HOST;
40             }
41              
42             sub _port {
43 0     0     my ($self) = @_;
44 0 0         if($self->{port}) {
45 0           return $self->{port};
46             }
47 0           return $DEFAULT_PORT;
48             }
49              
50             sub _sock {
51 0     0     my ($self) = @_;
52              
53 0 0         if($self->{_sock}) {
54 0           return $self->{_sock};
55             }
56              
57 0 0         $self->{_sock} = IO::Socket::INET->new(
58             PeerAddr => $self->_host,
59             PeerPort => $self->_port,
60             Proto => 'tcp',
61             ) or croak $EVAL_ERROR;
62 0           return $self->{_sock};
63             }
64              
65             sub _request {
66 0     0     my ($self, $txt) = @_;
67              
68 0           my $sock = $self->_sock;
69 0 0         ($DEBUG == $DEBUG_INFO) and carp q[Socket connected];
70              
71 0 0         print {$sock} $txt or croak $EVAL_ERROR;
  0            
72 0 0         ($DEBUG == $DEBUG_INFO) and carp qq[Sent '$txt'];
73              
74 0           my $response = q[];
75              
76 0 0         ($DEBUG == $DEBUG_INFO) and carp q[Going to read response];
77 0           while(my $buf = <$sock>) {
78 0 0         ($DEBUG == $DEBUG_INFO) and carp qq[Read '$buf'];
79 0           $buf =~ s/[\r\n]+$//smx;
80 0 0         ($DEBUG == $DEBUG_INFO) and carp qq[Processed '$buf'];
81              
82 0 0         if($buf =~ /^STAT/smx) {
    0          
83             #########
84             # retain the rest of the line
85             #
86 0           $buf =~ s/^.*?\s//smx;
87 0 0         if(!ref $response) {
88 0           $response = [];
89             }
90 0           push @{$response}, $buf;
  0            
91              
92             } elsif($buf =~ /^VALUE/smx) {
93             #########
94             # retain the expected number of bytes from the next line onwwards
95             #
96 0           my ($size) = $buf =~ /(\d+)$/smx;
97 0           my $tmp = q[];
98              
99 0           while(my $buf2 = <$sock>) {
100 0 0         ($DEBUG == $DEBUG_INFO) and carp qq[Read '$buf2'];
101 0 0         if($buf2 =~ /^END/smx) {
102 0           last;
103             }
104              
105 0           $tmp .= $buf2;
106             }
107 0           $response = substr $tmp, 0, $size;
108 0           $buf = 'END';
109             }
110              
111 0 0 0       if($buf eq 'END' ||
112             $buf eq 'STORED') {
113 0           last;
114             }
115             }
116              
117 0 0         ($DEBUG == $DEBUG_INFO) and carp q[Finished request];
118              
119 0           return $response;
120             }
121              
122             sub queues {
123 0     0 1   my ($self) = @_;
124 0           my $response = $self->_request("stats queue\r\n");
125 0 0         if(!$response) {
126 0           $response = [];
127             }
128 0           return $response;
129             }
130              
131             sub delete_queue {
132 0     0 1   my ($self, $queuename) = @_;
133 0           my $response = $self->_request("delete $queuename\r\n");
134 0           return $response;
135             }
136              
137             sub push { ## no critic (Homonym)
138 0     0 1   my ($self, $queuename, $message) = @_;
139 0           my $len = length $message;
140 0           return $self->_request("set $queuename 0 0 $len\r\n$message\r\n");
141             }
142              
143             sub shift { ## no critic (Homonym)
144 0     0 1   my ($self, $queuename) = @_;
145 0           return $self->_request("get $queuename\r\n");
146             }
147              
148             sub DESTROY {
149 0     0     my ($self) = @_;
150 0 0         if($self->{_sock}) {
151 0           $self->{_sock}->close;
152 0           delete $self->{_sock};
153             }
154 0           return 1;
155             }
156              
157             1;
158             __END__
159              
160             =head1 NAME
161              
162             Net::MemcacheQ
163              
164             =head1 VERSION
165              
166             $LastChangedRevision$
167              
168             =head1 SYNOPSIS
169              
170             my $oNMQ = Net::MemcacheQ->new({
171             host => '192.168.0.1',
172             port => 22202,
173             });
174              
175             $oNMQ->push('myqueue', '{"some data":"abcdefg"}');
176              
177             my $message = $oNMQ->shift('myqueue');
178              
179             =head1 DESCRIPTION
180              
181             MemcacheQ implements a BerkeleyDB-backed FIFO message queue service
182             serviced using the Memcache protocol. Net::MemcacheQ provides a simple
183             interface against a single memcacheq instance.
184              
185             For more information about MemcacheQ, please see:
186             http://memcachedb.org/memcacheq/
187              
188             =head1 SUBROUTINES/METHODS
189              
190             =head2 new - constructor
191              
192             my $oNMQ = Net::MemcacheQ->new({...});
193              
194             Optional arguments:
195             host => 'localhost' # memcacheq server hostname
196             port => 22201 # memcacheq server port
197              
198             =head2 queues - arrayref of queue names
199              
200             my $arQueueNames = $oNMQ->queues();
201              
202             =head2 delete_queue - delete a queue, messages and all
203              
204             $oNMQ->delete_queue($sQueueName);
205              
206             =head2 push - push a message onto a given queue
207              
208             $oNMQ->push($sQueueName, $sQueueMessage);
209              
210             =head2 shift - pull a message from a given queue
211              
212             my $sMessage = $oNMQ->shift($sQueueName);
213              
214             =head2 DESTROY - disconnect socket on destruction
215              
216             =head1 DIAGNOSTICS
217              
218             =head1 CONFIGURATION AND ENVIRONMENT
219              
220             Debugging messages are available by setting:
221              
222             $Net::MemcacheQ::DEBUG = $Net::MemcacheQ::DEBUG_INFO;
223              
224             =head1 DEPENDENCIES
225              
226             =over
227              
228             =item strict
229              
230             =item warnings
231              
232             =item IO::Socket::INET
233              
234             =item Readonly
235              
236             =item Carp
237              
238             =item English -no_match_vars
239              
240             =back
241              
242             =head1 INCOMPATIBILITIES
243              
244             =head1 BUGS AND LIMITATIONS
245              
246             See those of memcacheq, in particular about message size.
247              
248             =head1 AUTHOR
249              
250             $Author: Roger Pettett$
251              
252             =head1 LICENSE AND COPYRIGHT
253              
254             This program is free software: you can redistribute it and/or modify
255             it under the terms of the GNU General Public License as published by
256             the Free Software Foundation, either version 3 of the License, or
257             (at your option) any later version.
258              
259             This program is distributed in the hope that it will be useful,
260             but WITHOUT ANY WARRANTY; without even the implied warranty of
261             MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
262             GNU General Public License for more details.
263              
264             You should have received a copy of the GNU General Public License
265             along with this program. If not, see <http://www.gnu.org/licenses/>.
266              
267             =cut