File Coverage

blib/lib/Cache/Memcached/Queue.pm
Criterion Covered Total %
statement 19 193 9.8
branch 0 90 0.0
condition 0 64 0.0
subroutine 7 19 36.8
pod 6 7 85.7
total 32 373 8.5


line stmt bran cond sub pod time code
1             #! /usr/bin/perl
2              
3             package Cache::Memcached::Queue;
4 1     1   14642 use Moose;
  1         308530  
  1         6  
5 1     1   4895 use Carp qw/confess cluck/;
  1         2  
  1         97  
6 1     1   7 use feature qw/say switch/;
  1         7  
  1         107  
7 1     1   793 use Cache::Memcached::Fast;
  1         4226  
  1         27  
8 1     1   459 use Data::Serializer;
  1         1980  
  1         24  
9 1     1   529 use Data::Dumper;
  1         4898  
  1         84  
10              
11             BEGIN {
12 1     1   1915 our $VERSION = '0.1.8';
13             }
14              
15             has config_file => ( is => 'rw' );
16              
17             has memcached => ( is => 'rw' );
18              
19             has 'last' => ( is => 'rw' );
20              
21             has first => ( is => 'rw' );
22              
23             has memcached_servers => (
24             is => 'rw',
25             isa => 'Cache::Memcached'
26             );
27              
28             has name => ( is => 'rw',
29             isa => 'Str',
30             default => 'CMQID' );
31              
32             has id => (
33             is => 'rw',
34             required => 'id'
35             );
36              
37              
38             has qid => (
39             is => 'rw',
40             isa => 'Str',
41             );
42              
43             has max_enq => (
44             is => 'rw',
45             default => 0,
46             );
47              
48             has servers => (
49             is => 'rw',
50             default => sub { return ['localhost:11211'] },
51             );
52              
53             has size => ( is => 'rw' );
54              
55             has serialize => (
56             is => 'rw',
57             isa => 'Int',
58             default => 0,
59             );
60              
61             has serializer => (
62             is => 'rw',
63             default => sub {
64             return Data::Serializer->new(
65             serializer => 'Storable',
66             compress => 1,
67             );
68             }
69             );
70              
71              
72              
73              
74              
75              
76              
77              
78              
79             sub BUILD {
80 0     0 0   my ( $self, ) = @_;
81 0 0         $self->memcached(
82             Cache::Memcached::Fast->new( { servers => $self->servers } ) )
83             or confess "Can't load from memcached!";
84 0           my $name = $self->name;
85 0 0         $name .= '_' if $name !~ /\_$/;
86 0           $self->qid($name . $self->id);
87 0           undef $name;
88 0           $self->load;
89 0           return $self;
90             }
91              
92              
93              
94              
95              
96              
97              
98              
99             sub load {
100 0     0 1   my ($self,$flag) = @_;
101 0 0         $flag = 0 if !defined($flag);
102 0           my ( $ok, $id ) = ( 0, $self->id );
103 0 0 0       if ( !defined($id) || !$id ) {
104 0           confess "You must define an id!";
105             }
106             else {
107 0 0         $id .= '_' if $id !~ /\_$/;
108 0           my $qid = $self->name . '_' . $self->id . '_';
109 0           $self->qid($qid);
110 0           my ( $first, $last, $size, $name ) =
111             ( $qid . 'first', $qid . 'last', $qid . 'size', $qid . 'name', );
112              
113             #This queue already exists?
114 0           my $real_first = $self->memcached->get($first);
115 0 0 0       confess "Fatal error! Can't load or create queue! Check memcached server!" if $flag and !defined($real_first);
116 0 0         if ( defined($real_first) ) {
117 0           $self->first( $self->memcached->get($first) );
118 0           $self->last( $self->memcached->get($last) );
119 0           $self->size( $self->memcached->get($size) );
120 0 0         $self->name( $self->memcached->get($name) ) if !defined $self->name;
121 0           $self->qid($qid);
122 0           $ok = 1;
123             }
124             else {
125 0           say q[Queue '] . $self->qid . q[' doesn't exists! Creating...];
126 0           $self->memcached->set($qid . 'LOCKED',$$,0);
127 0           $self->memcached->set($name,$self->name,0);
128 0           $self->memcached->set($first,$self->qid . '1',0,0);
129 0           $self->memcached->set($last,$self->qid . '1',0,0);
130 0           $self->memcached->set($size,0,0);
131 0           $self->memcached->set($qid . 'LOCKED',0,0);
132 0           say q[Queue '] . $self->qid . q[' was created!];
133 0           $self->load(1);
134             }
135             }
136 0           return $ok;
137             }
138              
139              
140              
141              
142              
143              
144              
145              
146             sub enq {
147 0     0 1   my ( $self, $parameters ) = @_;
148 0           my ( $ok, $expire, ) = ( 0, undef, undef );
149 0 0         if(!defined($parameters)){
150 0           say 'No value was defined to enqueue!';
151             }
152             else {
153 0           my $value = undef;
154 0 0         if(ref($parameters) eq ''){
    0          
155 0   0       $value = $parameters // '';
156             }
157             elsif(!defined($parameters->{value})){
158 0   0       $value = $parameters || '';
159             }
160             else {
161 0   0       $value = $parameters->{value} || '';
162             }
163              
164             #checar se é necessário a serialização
165 0 0         if(ref($value)){
166             #serializar
167 0           my $serialized = $self->serializer->serialize($value);
168 0           $value = $serialized;
169 0           undef $serialized;
170             }
171 0           $self->load;
172 0 0 0       if(!$self->_is_locked || $self->_unlock){
173 0           $self->_lock;
174 0   0       my $size = $self->size // 0;
175             #checando se a fila esta cheia
176 0 0 0       if($self->max_enq > 0 && $self->size >= $self->max_enq){
177 0           say "Queue is full!";
178             }
179             else {
180 0 0 0       my $last = $1 if $self->last =~ /_(\d+)$/ // 1;
181             #checando se last == first e se existe algum valor
182 0           my $first_value = $self->memcached->get($self->first);
183 0 0         if( $first_value) {
184 0           $last++;
185             }
186 0           $size++;
187 0           my $new_last = $self->qid . $last;
188 0           $self->last($new_last);
189            
190 0           $self->memcached->set($new_last,$value,0);
191             }
192 0           $self->size($size);
193 0           $self->_save(['last','size']);
194 0 0         $self->_unlock if($self->_is_locked);
195             }
196             }
197 0           return $ok;
198             }
199              
200              
201              
202              
203              
204              
205              
206             sub deq {
207 0     0 1   my ( $self, ) = @_;
208 0           my ( $last_item,$value ) = ( undef,undef );
209 0           $self->load;
210 0 0 0       if(!$self->_is_locked || $self->_unlock ){
211 0           $self->_lock;
212 0           my $size = $self->size;
213 0 0         if(!$size){
214 0           say 'Queue is empty!';
215             }
216             else {
217 0 0 0       my $first = $1 if $self->first =~ /_(\d+)$/ // 1;
218 0   0       $value = $self->memcached->get($self->first) // '';
219 0 0         if($value =~ /^\^.*?Storable/i){
220 0           my $unserialized = $self->serializer->deserialize($value);
221 0           $value = $unserialized;
222 0           undef $unserialized;
223             }
224 0           $self->memcached->delete($self->first);
225 0 0         if($self->last ne $self->first){
226 0           $first++;
227 0           $self->first($self->qid . $first);
228 0 0         $size-- if($size > 0);
229             }
230             else {
231 0           $size = 0;
232 0           $self->first($self->qid . '1',0);
233 0           $self->last($self->qid . '1',0);
234 0           $self->_save(['last']);
235             }
236             }
237 0           $self->size($size);
238 0           $self->_save(['first','size']);
239 0 0         $self->_unlock if($self->_is_locked);
240             }
241 0   0       return $value // '';
242             }
243              
244              
245              
246              
247              
248              
249             sub show {
250 0     0 1   my ( $self, ) = @_;
251 0           while(!$self->_lock){
252 0           $self->load;
253 0           sleep .3;
254             }
255 0 0 0       my $first = $1 if $self->first =~ /_(\d+)$/ // 1;
256 0 0 0       my $last = $1 if $self->last =~ /_(\d+)$/ // 1;
257 0           foreach my $i($first..$last){
258 0           my $value = $self->memcached->get($self->qid . $i);
259 0           say "$i - $value";
260             }
261 0           $self->_unlock;
262             }
263              
264              
265              
266              
267              
268             sub cleanup {
269 0     0 1   my ( $self, ) = @_;
270 0           $self->load;
271             $self->iterate(sub {
272 0     0     my $index = shift;
273 0           $self->memcached->delete($index);
274 0           });
275             }
276              
277              
278              
279              
280              
281             sub _save {
282 0     0     my ( $self, $parameters ) = @_;
283 0           my $last = $self->last;
284 0           my $ok = 0;
285              
286 0 0         if ( ref($parameters) !~ /ARRAY/ ) {
287 0           confess "The parameters to save data MUST BE AN ARRAYREF";
288             }
289 0           foreach my $k ( @{$parameters} ) {
  0            
290 0 0         if ( $k !~ /^name|first|last|size|max_enq|qid$/ ) {
291 0           confess "The parameter '$k' is invalid!";
292             }
293             else {
294 0           my $index = $self->qid . $k;
295 0 0         if ( !$self->memcached->set( $index, $self->{$k},0 ) ) {
296 0           confess "Memcached can't set a value!";
297             }
298             else {
299 0           $ok = 1;
300             }
301             }
302             }
303 0           return $ok;
304             }
305              
306              
307              
308              
309              
310             sub iterate {
311 0     0 1   my ( $self, $action, $action_params ) = @_;
312 0           $self->load;
313 0 0 0       if( (!defined($action) || !$action ) ||
    0 0        
    0 0        
      0        
314             (defined($action) && ref($action) !~ /CODE/)
315             ){
316 0           confess "'action' MUST be a CODE reference!";
317             }
318             elsif(defined($action_params) && ref($action_params) !~ /ARRAY/){
319 0           confess "'action_parameters' MUST be Array";
320             }
321             elsif($self->size == 0){
322 0           say STDERR "Queue '" . $self->qid . "' is empty!";
323             }
324             else {
325 0 0         my $first_index = $1 if $self->first =~ /(\d+)$/;
326 0 0         my $last_index = $1 if $self->last =~ /(\d+)$/;
327 0           say "The queue is " . $self->name;
328 0           foreach my $i($first_index .. $last_index){
329             #mounting index for memcached
330 0           my $mc_index = $self->qid;
331 0 0         $mc_index .= '_' if $mc_index !~ /_$/;
332 0           $mc_index .= $i;
333 0           my $value = $self->memcached->get($mc_index);
334 0 0         if(!defined($value)){
335 0           confess "An error occured trying make a 'get' operation. No value found for '$mc_index' index";
336             }
337 0           $action->($mc_index,$value,$action_params);
338             }
339             }
340             }
341              
342              
343              
344              
345              
346              
347              
348             sub _lock {
349 0     0     my ($self,$pid,$lock_pid) = (shift,$$,0);
350 0           $self->load;
351 0           my $qid = $self->qid;
352 0 0 0       confess "Panic! No 'qid'!" if (!defined($qid) || !$qid);
353 0           my $lock_idx = $qid . 'LOCKED';
354 0           $lock_pid = $self->_is_locked($lock_idx);
355 0 0         if(!$lock_pid){
356 0           my $rs = $self->memcached->set($lock_idx,$pid,0);
357 0 0         confess "Memcached server can't write!" if !defined($rs);
358 0           $lock_pid = $pid;
359             }
360             else {
361 0           say "is already locked!";
362 0           $lock_pid = 0;
363             }
364 0           $self->load;
365 0   0       return $lock_pid || 0;
366             }
367              
368              
369              
370              
371              
372              
373             sub _unlock {
374 0     0     my ($self,$pid,$ok) = (shift,$$,0);
375 0           $self->load;
376 0           my $qid = $self->qid;
377 0 0 0       confess "Panic! No 'qid'!" if (!defined($qid) || !$qid);
378 0           my $lock_idx = $qid . 'LOCKED';
379 0           my $lock_pid = $self->_is_locked($lock_idx);
380 0 0 0       if($lock_pid && $lock_pid == $pid){
    0 0        
381 0           my $rs = $self->memcached->set($lock_idx,0,0);
382 0 0         confess "Memcached can't write!" if !defined($rs);
383 0           $ok = 1;
384             }
385             elsif($lock_pid && $lock_pid != $pid){
386 0           say "Is locked by another process! $lock_pid";
387              
388             }
389 0           $self->load;
390 0           return $ok;
391             }
392              
393              
394              
395              
396              
397              
398             sub _is_locked {
399 0     0     my ($self,$lock_idx) = @_;
400 0 0         $lock_idx = 0 if !defined $lock_idx;
401 0           my $found = 0;
402              
403             # confess "Parameter 'lock_idx' is mandatory!" if (!defined($lock_idx) || !$lock_idx);
404 0 0 0       if(!defined($lock_idx) || !$lock_idx){
405 0           $lock_idx = $self->qid . 'LOCKED';
406             }
407 0           my $lock_pid = $self->memcached->get($lock_idx); #this pid locked the queue!
408             # $lock_pid = 0 if $$ == $lock_pid;
409             # foreach my $p(@{$t->table}){
410             # if($p->pid == $lock_pid){
411             # $found = $p->pid;
412             # last;
413             # }
414             # }
415             # $lock_pid = 0 if !$found;
416 0           return $lock_pid ;
417             }
418              
419              
420              
421              
422              
423             __PACKAGE__->meta->make_immutable;
424              
425             =head1 NAME
426              
427             Cache::Memcached::Queue - Simple and elegant way to persist queues on Memcached
428              
429             =head1 VERSION
430              
431             Version 0.1.8
432              
433             unstable version
434              
435             =cut
436              
437             =head1 DESCRIPTION
438              
439             The idea is take advantage from Cache::Memcached::Fast module using it as a back-end for
440             queue structures without sockets, extra protocols or extra databases to maintain queues-metadata.
441             All stuff is stored on Memcached! Including metadata.
442              
443              
444             This can be done adding some metadata on Memcached hash structure that controls data on
445             a queue structure(strict FIFO). This metadata defines identification for queues and
446             controls first element, last element, size(number of elements) and lock information
447             following patterns in their names. For stabilish this patterns, it's necessary to define
448             some elements:
449              
450             =over
451              
452             =item * prefix - WARNING! This attribute is deprecated!!! DON'T USE IT!
453              
454             =item * index - WARNING! This attribute is deprecated! DON'T USE IT!
455              
456             =item * name - This is a 'string' that defines a name for your queue;
457              
458             =item * id - It's a unique identifier for your queue and is defined on the 'id' attribute.
459             You can have queues with the same name since you have different ids;
460              
461              
462              
463             =back
464              
465              
466              
467             =head1 SYNOPSIS
468              
469             use common::sense;
470             use Cache::Memcached::Queue;
471             my $q = Cache::Memcached::Queue->new(
472             name => 'foo',
473             id => 1,
474             servers => ['localhost:11211'], #This is default. RTFM ON Cache::Memcached::Fast for more options
475             serialize => 1, #if true, every value on enq will be serialized (Data::Serializer with Storable)
476             #but if complex data is passed(hashes, arrays, objects, etc), this data will be
477             #serialized even serialize attribute is false.
478             );
479              
480            
481             #loading queue
482             $q->load();#load data from Memcached
483              
484             #common operations...
485             $q->enq('fus'); #enqueue 'fus'.
486              
487             $q->enq('goh'); #enqueue 'goh' and this never expires on memcached
488              
489             $q->show; #show all items from queue. In this case: 'goh'. Remember... FIFO(First In First Out).
490              
491             $q->deq; #deqeue 'fus'.
492              
493             $q->show; #show all items from queue. In this case: 'nuke'(first and last element from queue).
494              
495             $q->enq({'fus'=>['goh','dah']}); #enqueue serialize and compact data.
496              
497             $q->cleanup; #cleans everything. From object and from Memcached.
498              
499            
500              
501             =head2 load()
502              
503             Try to load the queue metadata from Memcached. If works, will return true. Otherwise
504             will return false.
505              
506              
507             =head2 enq( HashRef $parameters or SCALAR $value )
508              
509             Try to make a 'enqueue' operation. You can enqueue scalar or complex data(hashes, arrays, objects etc).
510              
511             There is two ways to enqueue:
512              
513             =over
514              
515             =item * common way(RECOMMENDED):
516              
517             my $Bar = 'Bar';
518             my @Array = ('Foo','Bar');
519             my %Hash = ('Foo' => 'Bar');
520             $q->enq('Foo');
521             $q->enq($Bar);
522             $q->enq(\@MyArray);
523             $q->enq(\%MyHash); #since %MyHash doesn't have 'value' and/or 'serialize' as an hash key. This is not treated by module!
524             $q->enq({ some => [{complex => 'data'}],},
525             );
526              
527             Hashes and Arrays must be passed as a reference! ALWAYS!
528              
529             =item * alternative way(NOT RECOMMENDED):
530            
531             $q->enq({value => 'Foo'});
532             $q->enq({value => $Bar});
533             $q->enq({value => \@MyArray});
534             $q->enq({value => \%MyHash});
535             $q->enq({value => { some => [{complex => 'data'}],} );
536              
537             =back
538              
539             If you try to enqueue complex data, it will be serialized. Doesn't matter if serialize attribute or
540             parameter is set to false.
541              
542             If you want to use alternative way, you must know the following valid parameters:
543              
544             =over
545              
546             =item value - A value that presupposes that you want to save
547              
548             =item serialize - If you need the value to be serialized, you must set serialized to true(1).
549              
550             =back
551              
552              
553             Example2: $enq({value => $some_object_or_structure,
554             serialize => 1, });
555              
556              
557             If this work, the method will return true. Otherwise, will return false.
558              
559             You can change serialize parameters setting 'serializer' method too.
560              
561              
562              
563             =head2 deq()
564              
565             Try to make a 'dequeue' operation on Queue. That means the first value
566             of queue will be removed from queue, and the first index pointer from queue will
567             be moved to the next index. If works, returns the 'dequeued'
568             value, otherwise returns undef.
569              
570             There is no parameters
571              
572             Example:
573              
574             my $first_element_of_queue = $q->deq;
575              
576              
577              
578             =head2 show()
579              
580             Try to show the content of queue(the data). This is made finding the 'first'
581             and 'last' pointers, extracting the sequential index, and interate the queue
582             with this indexes, making a 'get' operation from Memcached. If the value
583             exists, it will be showed. If not, a exception will be thrown .
584              
585             There is no parameters
586              
587             Example:
588              
589             say $q->show;
590              
591              
592             =head2 cleanup()
593              
594             Dequeue everything! No parameters! Returns true, if it's all right! Otherwise, returns false/throws an exception
595              
596              
597              
598             =head2 save( ArrayRef $parameters )
599              
600             Internal method to save the queue metadata.
601              
602              
603             =head2 iterate(CODE $action, Array $action_params)
604              
605             That method is a 'handler'. You can treat all values in another subroutine/static method, passing
606             two parameters:
607              
608             =over
609              
610             =item * action: this parameter MUST be a CODE reference. Example:
611              
612             #EX1: $q->iterate(
613             sub {
614             my ($index,$value,$params) = @_;
615             #do something with this!!!
616             }
617              
618             #EX2: $q->iterate( \&somesubroutine,$myparams) ;
619             sub somesubroutine {
620             my ($index,$value,$params) = @_;
621             #do something cool!
622             }
623              
624             =item * action_params: This can be a custom parameters. All yours!
625              
626              
627              
628             =back
629              
630              
631             So, by default, every index and values that are in your queue are passed together with your customized parameters.
632              
633             If you pass everything right, your 'action' will be performed! Otherwise, an exception will be throwed.
634              
635             =cut
636              
637             =head1 AUTHOR
638              
639             Andre Garcia Carneiro, C<< <bang at cpan.org> >>
640              
641             =head1 BUGS
642              
643             The queue lost reference to last element when there is more than one process accessing queue. I'm working on it.
644              
645             Please report any bugs or feature requests to C<bug-cache-memcached-queue at rt.cpan.org>, or through
646             the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Cache-memcached-Queue>. I will be notified, and then you'll
647             automatically be notified of progress on your bug as I make changes.
648              
649              
650             =head1 NOTES FOR THIS VERSION
651              
652             =over
653              
654             =item * 'beta' version was change to 'unstable', because multi-processing access is not working well yet.
655              
656             =item * The auto-installer was removed after CPAN request.
657            
658             =item * 'servers' attribute have ['localhost:11211'] as default value;
659              
660             =item * 'serialize' attribute is DEPRECATED. Doesn't work anymore;
661              
662             =item * The new method 'iterator' allows delegate to other subroutine/static method queue data;
663              
664             =item * 'lock' feature is a internal feature that allows have a same queue with multiple processes working on it. (EXPERIMENTAL)
665              
666             =item * 'init' method was removed!
667              
668             =back
669              
670              
671             =head1 TODO
672              
673             =over
674              
675             =item * performance optimization
676              
677             =item * 'priority' support, maybe
678              
679              
680             =back
681              
682             =head1 SUPPORT
683              
684             You can find documentation for this module with the perldoc command.
685              
686             perldoc Cache::Memcached::Queue
687              
688              
689             You can also look for information at:
690              
691             =over 4
692              
693             =item * RT: CPAN's request tracker
694              
695             L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=Cache-memcached-Queue>
696              
697             =item * AnnoCPAN: Annotated CPAN documentation
698              
699             L<http://annocpan.org/dist/Cache-memcached-Queue>
700              
701             =item * CPAN Ratings
702              
703             L<http://cpanratings.perl.org/d/Cache-memcached-Queue>
704              
705             =item * Search CPAN
706              
707             L<http://search.cpan.org/dist/Cache-memcached-Queue/>
708              
709             =back
710              
711              
712              
713             =head1 LICENSE AND COPYRIGHT
714              
715             Copyright 2013 2014 Andre Garcia Carneiro.
716              
717             This program is free software; you can redistribute it and/or modify it
718             under the terms of either: the GNU General Public License as published
719             by the Free Software Foundation; or the Artistic License.
720              
721             See http://dev.perl.org/licenses/ for more information.
722              
723              
724             =cut
725              
726             1; # End of Cache::Memcached::Queue
727              
728             ## Please see file perltidy.ERR