File Coverage

blib/lib/NetSDS/Queue.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             #===============================================================================
2             #
3             # FILE: Queue.pm
4             #
5             # DESCRIPTION: NetSDS Queue API
6             #
7             # AUTHOR: Michael Bochkaryov (Rattler), <misha@rattler.kiev.ua>
8             # COMPANY: Net.Style
9             # CREATED: 12.07.2009 11:41:24 UTC
10             #===============================================================================
11              
12             =head1 NAME
13              
14             NetSDS::Queue - simple API to MemcacheQ powered queue
15              
16             =head1 SYNOPSIS
17              
18             use NetSDS::Queue;
19             use Data::Dumper;
20              
21             my $q = NetSDS::Queue->new( server => '10.0.0.5:18181' );
22              
23             # Push messages to queue
24             $q->push('myq', { id => 1, text => 'one'});
25             $q->push('myq', { id => 2, text => 'two'});
26             $q->push('myq', { id => 3, text => 'three'});
27              
28             # Fetch messages from queue
29             while ( my $data = $q->pull('myq') ) {
30             print Dumper($data);
31             }
32              
33             =head1 DESCRIPTION
34              
35             C<NetSDS::Queue> module provides simple API to NetSDS queue.
36              
37             Low level messaging is based on fast and reliable MemcacheQ server.
38             It use BerkeleyDB for persistance and Memchache protocol over TCP
39             or Unix sockets.
40              
41             Every message is converted to JSON and then stored as Base64 string.
42              
43             =cut
44              
45             package NetSDS::Queue;
46              
47 2     2   179523 use 5.8.0;
  2         7  
  2         96  
48 2     2   13 use strict;
  2         4  
  2         200  
49 2     2   18 use warnings;
  2         8  
  2         64  
50              
51 2     2   2380 use Cache::Memcached::Fast;
  2         17970  
  2         75  
52 2     2   1079 use NetSDS::Util::Convert;
  0            
  0            
53             use JSON;
54              
55             use base qw(NetSDS::Class::Abstract);
56              
57             use version; our $VERSION = "0.032";
58              
59             #===============================================================================
60             #
61              
62             =head1 CLASS API
63              
64             =over
65              
66             =item B<new(%params)> - class constructor
67              
68             The following parameters accepted:
69              
70             * server - address to MemcacheQ queue server (host:port)
71              
72             * max_size - maximum size of message allowed (default is 4096 bytes)
73              
74             my $queue = NetSDS::Queue->new(server => '192.168.0.1:12345');
75              
76             Default server address is 127.0.0.1:22201
77              
78             =cut
79              
80             #-----------------------------------------------------------------------
81             sub new {
82              
83             my ( $class, %params ) = @_;
84              
85             my $this = $class->SUPER::new();
86              
87             # Set server (default is 127.0.0.1:22201)
88             my $server = '127.0.0.1:22201';
89             if ( $params{'server'} ) {
90             $server = $params{'server'};
91             }
92              
93             # Set message size limitation
94             $this->{max_size} = 4096;
95             if ( $params{'max_size'} ) {
96             $this->{max_size} = $params{'max_size'};
97             }
98             $this->mk_accessors('max_size');
99              
100             # Initialize memcacheq handler
101             $this->{handler} = Cache::Memcached::Fast->new(
102             {
103             servers => [$server],
104             connect_timeout => 5,
105             }
106             );
107              
108             # Create accessors
109             $this->mk_accessors('handler');
110              
111             if ( $this->handler ) {
112             return $this;
113             } else {
114             return undef;
115             }
116              
117             } ## end sub new
118              
119             #***********************************************************************
120              
121             =item B<push($queue, $data)> - push message to queue
122              
123             $queue->push('msgq', $my_data);
124              
125             =cut
126              
127             #-----------------------------------------------------------------------
128              
129             sub push {
130              
131             my ( $this, $queue, $data ) = @_;
132              
133             my $push_data = _encode($data);
134              
135             # Check if data for push is not more than max_size
136             if ( bytes::length($push_data) > $this->max_size() ) {
137             return $this->error( "Cant insert message bigger than max_size (" . $this->max_size . ")" );
138             }
139             return $this->handler->set( $queue, $push_data );
140              
141             }
142              
143             #***********************************************************************
144              
145             =item B<pull($queue)> - fetch message from queue
146              
147             my $data = $queue->pull('msgq');
148              
149             =cut
150              
151             #-----------------------------------------------------------------------
152              
153             sub pull {
154              
155             my ( $this, $queue ) = @_;
156              
157             return _decode( $this->handler->get($queue) );
158              
159             }
160              
161             sub _encode {
162              
163             my ($struct) = @_;
164             return conv_str_base64( encode_json($struct) );
165             }
166              
167             sub _decode {
168              
169             my ($string) = @_;
170              
171             if ($string) {
172             return decode_json( conv_base64_str($string) );
173             } else {
174             return undef;
175             }
176             }
177              
178             1;
179              
180             __END__
181              
182             =back
183              
184             =head1 EXAMPLES
185              
186             See files in C<samples> catalog.
187              
188             =head1 BUGS
189              
190             Unknown yet
191              
192             =head1 SEE ALSO
193              
194             http://memcachedb.org/memcacheq/ - MemcacheQ server
195              
196             http://openhack.ru/Cache-Memcached-Fast - Perl XS API to Memcached servers
197              
198             =head1 TODO
199              
200             None
201              
202             =head1 AUTHOR
203              
204             Michael Bochkaryov <misha@rattler.kiev.ua>
205              
206             =head1 LICENSE
207              
208             Copyright (C) 2008-2009 Michael Bochkaryov
209              
210             This program is free software; you can redistribute it and/or modify
211             it under the terms of the GNU General Public License as published by
212             the Free Software Foundation; either version 2 of the License, or
213             (at your option) any later version.
214              
215             This program is distributed in the hope that it will be useful,
216             but WITHOUT ANY WARRANTY; without even the implied warranty of
217             MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
218             GNU General Public License for more details.
219              
220             You should have received a copy of the GNU General Public License
221             along with this program; if not, write to the Free Software
222             Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
223              
224             =cut
225              
226