File Coverage

blib/lib/Cache/Memcached/Queue.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #! /usr/bin/perl
2              
3             package Cache::Memcached::Queue;
4 1     1   31119 use Moose;
  0            
  0            
5             use Carp qw/confess cluck/;
6             use feature qw/say switch/;
7             use Cache::Memcached::Fast;
8             use Data::Serializer;
9             use Proc::ProcessTable;
10             use Data::Dumper;
11              
12             BEGIN {
13             our $VERSION = '0.1.7';
14             }
15              
16             has config_file => ( is => 'rw' );
17              
18             has memcached => ( is => 'rw' );
19              
20             has 'last' => ( is => 'rw' );
21              
22             has first => ( is => 'rw' );
23              
24             has memcached_servers => (
25             is => 'rw',
26             isa => 'Cache::Memcached'
27             );
28              
29             has name => ( is => 'rw',
30             isa => 'Str',
31             default => 'CMQID' );
32              
33             has id => (
34             is => 'rw',
35             required => 'id'
36             );
37              
38              
39             has qid => (
40             is => 'rw',
41             isa => 'Str',
42             );
43              
44             has max_enq => (
45             is => 'rw',
46             default => 0,
47             );
48              
49             has servers => (
50             is => 'rw',
51             default => sub { return ['localhost:11211'] },
52             );
53              
54             has size => ( is => 'rw' );
55              
56             has serialize => (
57             is => 'rw',
58             isa => 'Int',
59             default => 0,
60             );
61              
62             has serializer => (
63             is => 'rw',
64             default => sub {
65             return Data::Serializer->new(
66             serializer => 'Storable',
67             compress => 1,
68             );
69             }
70             );
71              
72              
73              
74              
75              
76              
77              
78              
79              
80             sub BUILD {
81             my ( $self, ) = @_;
82             $self->memcached(
83             Cache::Memcached::Fast->new( { servers => $self->servers } ) )
84             or confess "Can't load from memcached!";
85             my $name = $self->name;
86             $name .= '_' if $name !~ /\_$/;
87             $self->qid($name . $self->id);
88             undef $name;
89             $self->load;
90             return $self;
91             }
92              
93              
94              
95              
96              
97              
98              
99              
100             sub load {
101             my ($self,$flag) = @_;
102             $flag = 0 if !defined($flag);
103             my ( $ok, $id ) = ( 0, $self->id );
104             if ( !defined($id) || !$id ) {
105             confess "You must define an id!";
106             }
107             else {
108             $id .= '_' if $id !~ /\_$/;
109             my $qid = $self->name . '_' . $self->id . '_';
110             $self->qid($qid);
111             my ( $first, $last, $size, $name ) =
112             ( $qid . 'first', $qid . 'last', $qid . 'size', $qid . 'name', );
113              
114             #This queue already exists?
115             my $real_first = $self->memcached->get($first);
116             confess "Fatal error! Can't load or create queue! Check memcached server!" if $flag and !defined($real_first);
117             if ( defined($real_first) ) {
118             $self->first( $self->memcached->get($first) );
119             $self->last( $self->memcached->get($last) );
120             $self->size( $self->memcached->get($size) );
121             $self->name( $self->memcached->get($name) ) if !defined $self->name;
122             $self->qid($qid);
123             $ok = 1;
124             }
125             else {
126             say q[Queue '] . $self->qid . q[' doesn't exists! Creating...];
127             $self->memcached->set($qid . 'LOCKED',$$,0);
128             $self->memcached->set($name,$self->name,0);
129             $self->memcached->set($first,$self->qid . '1',0,0);
130             $self->memcached->set($last,$self->qid . '1',0,0);
131             $self->memcached->set($size,0,0);
132             $self->memcached->set($qid . 'LOCKED',0,0);
133             say q[Queue '] . $self->qid . q[' was created!];
134             $self->load(1);
135             }
136             }
137             return $ok;
138             }
139              
140              
141              
142              
143              
144              
145              
146              
147             sub enq {
148             my ( $self, $parameters ) = @_;
149             my ( $ok, $expire, ) = ( 0, undef, undef );
150             if(!defined($parameters)){
151             say 'No value was defined to enqueue!';
152             }
153             else {
154             my $value = undef;
155             if(ref($parameters) eq ''){
156             $value = $parameters // '';
157             }
158             elsif(!defined($parameters->{value})){
159             $value = $parameters || '';
160             }
161             else {
162             $value = $parameters->{value} || '';
163             }
164              
165             #checar se é necessário a serialização
166             if(ref($value)){
167             #serializar
168             my $serialized = $self->serializer->serialize($value);
169             $value = $serialized;
170             undef $serialized;
171             }
172             $self->load;
173             if(!$self->_is_locked || $self->_unlock){
174             $self->_lock;
175             my $size = $self->size // 0;
176             #checando se a fila esta cheia
177             if($self->max_enq > 0 && $self->size >= $self->max_enq){
178             say "Queue is full!";
179             }
180             else {
181             my $last = $1 if $self->last =~ /_(\d+)$/ // 1;
182             #checando se last == first e se existe algum valor
183             my $first_value = $self->memcached->get($self->first);
184             say "FIRST: " . $self->first;
185             say "FIRST_VALUE: " . Dumper $first_value;
186             if( $first_value) {
187             $last++;
188             }
189             $size++;
190             my $new_last = $self->qid . $last;
191             say "LAST: $last";
192             say "NEWLAST: $new_last";
193             $self->last($new_last);
194            
195             $self->memcached->set($new_last,$value,0);
196             }
197             $self->size($size);
198             $self->_save(['last','size']);
199             $self->_unlock if($self->_is_locked);
200             }
201             }
202             return $ok;
203             }
204              
205              
206              
207              
208              
209              
210              
211             sub deq {
212             my ( $self, ) = @_;
213             my ( $last_item,$value ) = ( undef,undef );
214             $self->load;
215             if(!$self->_is_locked || $self->_unlock ){
216             $self->_lock;
217             my $size = $self->size;
218             if(!$size){
219             say 'Queue is empty!';
220             }
221             else {
222             my $first = $1 if $self->first =~ /_(\d+)$/ // 1;
223             $value = $self->memcached->get($self->first) // '';
224             if($value =~ /^\^.*?Storable/i){
225             my $unserialized = $self->serializer->deserialize($value);
226             $value = $unserialized;
227             undef $unserialized;
228             }
229             $self->memcached->delete($self->first);
230             if($self->last ne $self->first){
231             $first++;
232             $self->first($self->qid . $first);
233             $size-- if($size > 0);
234             }
235             else {
236             $size = 0;
237             $self->first($self->qid . '1',0);
238             $self->last($self->qid . '1',0);
239             $self->_save(['last']);
240             }
241             }
242             $self->size($size);
243             $self->_save(['first','size']);
244             $self->_unlock if($self->_is_locked);
245             }
246             return $value // '';
247             }
248              
249              
250              
251              
252              
253              
254             sub show {
255             my ( $self, ) = @_;
256             while(!$self->_lock){
257             $self->load;
258             sleep .3;
259             }
260             my $first = $1 if $self->first =~ /_(\d+)$/ // 1;
261             my $last = $1 if $self->last =~ /_(\d+)$/ // 1;
262             foreach my $i($first..$last){
263             my $value = $self->memcached->get($self->qid . $i);
264             say "$i - $value";
265             }
266             $self->_unlock;
267             }
268              
269              
270              
271              
272              
273             sub cleanup {
274             my ( $self, ) = @_;
275             }
276              
277              
278              
279              
280              
281             sub _save {
282             my ( $self, $parameters ) = @_;
283             my $last = $self->last;
284             my $ok = 0;
285              
286             if ( ref($parameters) !~ /ARRAY/ ) {
287             confess "The parameters to save data MUST BE AN ARRAYREF";
288             }
289             foreach my $k ( @{$parameters} ) {
290             if ( $k !~ /^name|first|last|size|max_enq|qid$/ ) {
291             confess "The parameter '$k' is invalid!";
292             }
293             else {
294             my $index = $self->qid . $k;
295             if ( !$self->memcached->set( $index, $self->{$k},0 ) ) {
296             confess "Memcached can't set a value!";
297             }
298             else {
299             $ok = 1;
300             }
301             }
302             }
303             return $ok;
304             }
305              
306              
307              
308              
309              
310             sub iterate {
311             my ( $self, $action, $action_params ) = @_;
312             $self->load;
313             if( (!defined($action) || !$action ) ||
314             (defined($action) && ref($action) !~ /CODE/)
315             ){
316             confess "'action' MUST be a CODE reference!";
317             }
318             elsif(defined($action_params) && ref($action_params) !~ /ARRAY/){
319             confess "'action_parameters' MUST be Array";
320             }
321             elsif($self->size == 0){
322             say STDERR "Queue '" . $self->qid . "' is empty!";
323             }
324             else {
325             my $first_index = $1 if $self->first =~ /(\d+)$/;
326             my $last_index = $1 if $self->last =~ /(\d+)$/;
327             say "The queue is " . $self->name;
328             foreach my $i($first_index .. $last_index){
329             #mounting index for memcached
330             my $mc_index = $self->qid;
331             $mc_index .= '_' if $mc_index !~ /_$/;
332             $mc_index .= $i;
333             my $value = $self->memcached->get($mc_index);
334             if(!defined($value)){
335             confess "An error occured trying make a 'get' operation. No value found for '$mc_index' index";
336             }
337             $action->($mc_index,$value,$action_params);
338             }
339             }
340             }
341              
342              
343              
344              
345              
346              
347              
348             sub _lock {
349             my ($self,$pid,$lock_pid) = (shift,$$,0);
350             $self->load;
351             my $qid = $self->qid;
352             confess "Panic! No 'qid'!" if (!defined($qid) || !$qid);
353             my $lock_idx = $qid . 'LOCKED';
354             $lock_pid = $self->_is_locked($lock_idx);
355             if(!$lock_pid){
356             my $rs = $self->memcached->set($lock_idx,$pid,0);
357             confess "Memcached server can't write!" if !defined($rs);
358             $lock_pid = $pid;
359             }
360             else {
361             say "is already locked!";
362             $lock_pid = 0;
363             }
364             $self->load;
365             return $lock_pid || 0;
366             }
367              
368              
369              
370              
371              
372              
373             sub _unlock {
374             my ($self,$pid,$ok) = (shift,$$,0);
375             $self->load;
376             my $qid = $self->qid;
377             confess "Panic! No 'qid'!" if (!defined($qid) || !$qid);
378             my $lock_idx = $qid . 'LOCKED';
379             my $lock_pid = $self->_is_locked($lock_idx);
380             if($lock_pid && $lock_pid == $pid){
381             say 'DEBUG 1';
382             my $rs = $self->memcached->set($lock_idx,0,0);
383             confess "Memcached can't write!" if !defined($rs);
384             $ok = 1;
385             }
386             elsif($lock_pid && $lock_pid != $pid){
387             say 'DEBUG 2';
388             say "Is locked by another process! $lock_pid";
389             }
390             $self->load;
391             return $ok;
392             }
393              
394              
395              
396              
397              
398              
399             sub _is_locked {
400             my ($self,$lock_idx) = @_;
401             $lock_idx = 0 if !defined $lock_idx;
402             # confess "Parameter 'lock_idx' is mandatory!" if (!defined($lock_idx) || !$lock_idx);
403             if(!defined($lock_idx) || !$lock_idx){
404             $lock_idx = $self->qid . 'LOCKED';
405             }
406             my $lock_pid = $self->memcached->get($lock_idx); #this pid locked the queue!
407             # $lock_pid = 0 if $$ == $lock_pid;
408             return $lock_pid ;
409             }
410              
411              
412              
413              
414              
415             __PACKAGE__->meta->make_immutable;
416              
417             =head1 NAME
418              
419             Cache::Memcached::Queue - Simple and elegant way to persist queues on Memcached
420              
421             =head1 VERSION
422              
423             Version 0.1.7
424              
425             unstable version
426              
427             =cut
428              
429             =head1 DESCRIPTION
430              
431             The idea is take advantage from Cache::Memcached::Fast module using it as a back-end for
432             queue structures without sockets, extra protocols or extra databases to maintain queues-metadata.
433             All stuff is stored on Memcached! Including metadata.
434              
435              
436             This can be done adding some metadata on Memcached hash structure that controls data on
437             a queue structure(strict FIFO). This metadata defines identification for queues and
438             controls first element, last element, size(number of elements) and lock information
439             following patterns in their names. For stabilish this patterns, it's necessary to define
440             some elements:
441              
442             =over
443              
444             =item * prefix - This is defined by the 'id_prefix' attribute. Is the real 'name' of queue.
445             Default value is 'CMQID_'.
446              
447             =item * name - This is a 'string' that defines a name for your queue;
448              
449             =item * id - It's a unique identifier for your queue and is defined on the 'id' attribute.
450             You can have queues with the same name since you have different ids;
451              
452             =item * index - Single sequencial number that identifies a position of some element on queue.
453             For example: 'CMQID_1_1' This is the first element from queue 'CMQID' with id 1.
454             OBS: 'CMQID_1_1' is the same element pointed by 'CMQID_1_first'.
455              
456              
457             =back
458              
459              
460              
461             =head1 SYNOPSIS
462              
463             use common::sense;
464             use Cache::Memcached::Queue;
465             my $q = Cache::Memcached::Queue->new(
466             name => 'foo',
467             id => 1,
468             servers => ['localhost:11211'], #This is default. RTFM ON Cache::Memcached::Fast for more options
469             serialize => 1, #if true, every value on enq will be serialized (Data::Serializer with Storable)
470             #but if complex data is passed(hashes, arrays, objects, etc), this data will be
471             #serialized even serialize attribute is false.
472             );
473              
474            
475             #loading queue
476             $q->load();#load data from Memcached
477              
478             #common operations...
479             $q->enq('fus'); #enqueue 'fus'.
480              
481             $q->enq('goh'); #enqueue 'goh' and this never expires on memcached
482              
483             $q->show; #show all items from queue. In this case: 'goh'. Remember... FIFO(First In First Out).
484              
485             $q->deq; #deqeue 'fus'.
486              
487             $q->show; #show all items from queue. In this case: 'nuke'(first and last element from queue).
488              
489             $q->enq({'fus'=>['goh','dah']}); #enqueue serialize and compact data.
490              
491             $q->cleanup; #cleans everything. From object and from Memcached.
492              
493            
494              
495             =head2 load()
496              
497             Try to load the queue metadata from Memcached. If works, will return true. Otherwise
498             will return false.
499              
500              
501             =head2 enq( HashRef $parameters or SCALAR $value )
502              
503             Try to make a 'enqueue' operation. You can enqueue scalar or complex data(hashes, arrays, objects etc).
504              
505             There is two ways to enqueue:
506              
507             =over
508              
509             =item * common way(RECOMMENDED):
510              
511             my $Bar = 'Bar';
512             my @Array = ('Foo','Bar');
513             my %Hash = ('Foo' => 'Bar');
514             $q->enq('Foo');
515             $q->enq($Bar);
516             $q->enq(\@MyArray);
517             $q->enq(\%MyHash); #since %MyHash doesn't have 'value' and/or 'serialize' as an hash key. This is not treated by module!
518             $q->enq({ some => [{complex => 'data'}],},
519             );
520              
521             Hashes and Arrays must be passed as a reference! ALWAYS!
522              
523             =item * alternative way(NOT RECOMMENDED):
524            
525             $q->enq({value => 'Foo'});
526             $q->enq({value => $Bar});
527             $q->enq({value => \@MyArray});
528             $q->enq({value => \%MyHash});
529             $q->enq({value => { some => [{complex => 'data'}],} );
530              
531             =back
532              
533             If you try to enqueue complex data, it will be serialized. Doesn't matter if serialize attribute or
534             parameter is set to false.
535              
536             If you want to use alternative way, you must know the following valid parameters:
537              
538             =over
539              
540             =item value - A value that presupposes that you want to save
541              
542             =item serialize - If you need the value to be serialized, you must set serialized to true(1).
543              
544             =back
545              
546              
547             Example2: $enq({value => $some_object_or_structure,
548             serialize => 1, });
549              
550              
551             If this work, the method will return true. Otherwise, will return false.
552              
553             You can change serialize parameters setting 'serializer' method too.
554              
555              
556              
557             =head2 deq()
558              
559             Try to make a 'dequeue' operation on Queue. That means the first value
560             of queue will be removed from queue, and the first index pointer from queue will
561             be moved to the next index. If works, returns the 'dequeued'
562             value, otherwise returns undef.
563              
564             There is no parameters
565              
566             Example:
567              
568             my $first_element_of_queue = $q->deq;
569              
570              
571              
572             =head2 show()
573              
574             Try to show the content of queue(the data). This is made finding the 'first'
575             and 'last' pointers, extracting the sequential index, and interate the queue
576             with this indexes, making a 'get' operation from Memcached. If the value
577             exists, it will be showed. If not, a exception will be thrown .
578              
579             There is no parameters
580              
581             Example:
582              
583             say $q->show;
584              
585              
586             =head2 cleanup()
587              
588             Dequeue everything! No parameters! Returns true, if it's all right! Otherwise, returns false/throws an exception
589              
590              
591              
592             =head2 save( ArrayRef $parameters )
593              
594             Internal method to save the queue metadata.
595              
596              
597             =head2 iterate(CODE $action, Array $action_params)
598              
599             That method is a 'handler'. You can treat all values in another subroutine/static method, passing
600             two parameters:
601              
602             =over
603              
604             =item * action: this parameter MUST be a CODE reference. Example:
605              
606             #EX1: $q->iterate(
607             sub {
608             my ($index,$value,$params) = @_;
609             #do something with this!!!
610             }
611              
612             #EX2: $q->iterate( \&somesubroutine,$myparams) ;
613             sub somesubroutine {
614             my ($index,$value,$params) = @_;
615             #do something cool!
616             }
617              
618             =item * action_params: This can be a custom parameters. All yours!
619              
620              
621              
622             =back
623              
624              
625             So, by default, every index and values that are in your queue are passed together with your customized parameters.
626              
627             If you pass everything right, your 'action' will be performed! Otherwise, an exception will be throwed.
628              
629             =cut
630              
631             =head1 AUTHOR
632              
633             Andre Garcia Carneiro, C<< <bang at cpan.org> >>
634              
635             =head1 BUGS
636              
637             The queue lost reference to last element when there is more than one process accessing queue. I'm working on it.
638              
639             Please report any bugs or feature requests to C<bug-cache-memcached-queue at rt.cpan.org>, or through
640             the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Cache-memcached-Queue>. I will be notified, and then you'll
641             automatically be notified of progress on your bug as I make changes.
642              
643              
644             =head1 NOTES FOR THIS VERSION
645              
646             =over
647              
648             =item * 'beta' version was change to 'unstable', because multi-processing access is not working well yet.
649              
650             =item * The auto-installer was removed after CPAN request.
651            
652             =item * 'servers' attribute have ['localhost:11211'] as default value;
653              
654             =item * 'serialize' attribute is DEPRECATED. Doesn't work anymore;
655              
656             =item * The new method 'iterator' allows delegate to other subroutine/static method queue data;
657              
658             =item * 'lock' feature is a internal feature that allows have a same queue with multiple processes working on it. (EXPERIMENTAL)
659              
660             =item * 'init' method was removed!
661              
662             =back
663              
664              
665             =head1 TODO
666              
667             =over
668              
669             =item * performance optimization
670              
671             =item * 'priority' support, maybe
672              
673              
674             =back
675              
676             =head1 SUPPORT
677              
678             You can find documentation for this module with the perldoc command.
679              
680             perldoc Cache::Memcached::Queue
681              
682              
683             You can also look for information at:
684              
685             =over 4
686              
687             =item * RT: CPAN's request tracker
688              
689             L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=Cache-memcached-Queue>
690              
691             =item * AnnoCPAN: Annotated CPAN documentation
692              
693             L<http://annocpan.org/dist/Cache-memcached-Queue>
694              
695             =item * CPAN Ratings
696              
697             L<http://cpanratings.perl.org/d/Cache-memcached-Queue>
698              
699             =item * Search CPAN
700              
701             L<http://search.cpan.org/dist/Cache-memcached-Queue/>
702              
703             =back
704              
705              
706              
707             =head1 LICENSE AND COPYRIGHT
708              
709             Copyright 2013 2014 Andre Garcia Carneiro.
710              
711             This program is free software; you can redistribute it and/or modify it
712             under the terms of either: the GNU General Public License as published
713             by the Free Software Foundation; or the Artistic License.
714              
715             See http://dev.perl.org/licenses/ for more information.
716              
717              
718             =cut
719              
720             1; # End of Cache::Memcached::Queue
721              
722             ## Please see file perltidy.ERR