File Coverage

lib/Tie/Array/QueueExpire.pm
Criterion Covered Total %
statement 28 30 93.3
branch n/a
condition n/a
subroutine 10 10 100.0
pod n/a
total 38 40 95.0


line stmt bran cond sub pod time code
1             package Tie::Array::QueueExpire;
2             ###########################################################
3             # Tie::Array::QueueExpire package
4             # Gnu GPL2 license
5             #
6             #
7             # Fabrice Dulaunoy
8             ###########################################################
9             # ChangeLog:
10             #
11             ###########################################################
12              
13             =head1 NAME
14              
15             Tie::Array::QueueExpire - Introduction
16              
17              
18             Tie an ARRAY over a SQLite DB with expiration of elements
19             Revision: 1.03
20              
21             =head1 SYNOPSIS
22              
23             use Tie::Array::QueueExpire;
24             use Data::Dumper;
25             my $t = tie( my @myarray, "Tie::Array::QueueExpire", '/tmp/db_test.bdb' );
26             push @myarray, int rand 1000;
27            
28             # normal ARRAY function
29             my $data = shift @myarray;
30             my $data = pop @myarray;
31             print "this elem exists\n" if (exists( $myarray[6]));
32             print "size = ". scalar( @myarray )."\n";
33            
34             # using the PUSH with an extra parameter to put the new element in futur
35             # also return the key of the inserted value
36             for ( 1 .. 10 )
37             {
38             say "t=".time.' '. int (($t->PUSH( $_ . ' ' . time, 10 ))/1000);
39             sleep 1;
40             }
41             sleep 10;
42             # Get the expired elements ( 7 seconds before now )
43             my $ex = $t->EXPIRE( 7 );
44            
45             # Get the expired elements
46             my @EXP = @{$t->EXPIRE($exp)};
47             # Get and delete the expired elements ( 20 seconds before now )
48             $ex = $t->EXPIRE(20,1);
49             my @EXP = @{$t->EXPIRE($exp,1)};
50            
51             # fetch element
52             # in scalar context return the value
53             # in array context return in first element the key and in second, the value
54             my $a =$t->FETCH(6);
55             my @b = $t->FETCH(6);
56             # the normal array fetch is always in scalar mode
57             my @c=$myarray[6];
58             say Dumper( $a );
59             say Dumper( \@b );
60             say Dumper( \@c );
61             # a convenient way to get all the elements from the array directly by the object
62             my @all = $t->SLICE();
63            
64             =head1 DESCRIPTION
65              
66             Tie an ARRAY over a TokyCabinet Btree DB and allow to get or deleted expired data;
67            
68             This module require Time::HiRes, TokyoCabinet (database and perl module.)
69             The insertion is ms unique ( 0.001 seconds )
70            
71             The normal ARRAY function present are
72            
73             push PUSH ( the object call allow to PUSH data with a specific expiration offset )
74             pop POP ( the object call return when called in ARRAY context an array with [ key, value ] )
75             shift SHIFT ( the object call return when called in ARRAY context an array with [ key, value ] )
76             exists EXISTS
77             scalar FETCHSIZE
78             clear
79             unshift ( but put data 1 micro-second before the first entry)
80             DESTROY
81              
82             The following function is not completely iplemented.
83            
84             splice SPLICE (no replacement allowed and the object call return when called in ARRAY context an array with [ key, value ] )
85              
86            
87             The following function are not implemented.
88            
89             extend
90             store
91             STORESIZE
92              
93             The following function are specific of this module.
94            
95             EXPIRE
96             PUSH
97             FETCH
98             SLICE
99             SPLICE
100             CLEAR
101            
102             =cut
103              
104 1     1   15150 use 5.008008;
  1         4  
  1         44  
105 1     1   7 use strict;
  1         2  
  1         39  
106 1     1   5 use warnings;
  1         7  
  1         55  
107 1     1   927 use Tie::Array;
  1         1265  
  1         29  
108 1     1   5893 use Time::HiRes qw{ gettimeofday };
  1         3750  
  1         7  
109             require Exporter;
110              
111 1     1   271 use Carp;
  1         3  
  1         97  
112 1     1   12234 use DBI;
  1         22724  
  1         104  
113 1     1   1625 use DBD::SQLite;
  1         12552  
  1         48  
114              
115 1     1   11 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK);
  1         2  
  1         183  
116              
117             $VERSION = '1.04';
118             # use Data::Dumper;
119              
120             our @ISA = qw( Exporter Tie::StdArray );
121              
122             =head1 Basic ARRAY functions
123            
124             I< >
125            
126             =head2 tie
127            
128            
129             Tie an array over a TokyoCabinet DB
130             my $t = tie( my @myarray, "Tie::Array::QueueExpire", '/tmp/db_test.bdb' );
131             The fist parameter if the TokyoCabinet file used (or created)
132             Four optional parameter are allowed
133             In place two, a flag to serialize the data in the DB
134             In place three, an octal MASK allow to set the permission of the DB created
135             The default permission is 0600 (-rw-------)
136             In place four a parameter to delete the DB file, if present, at start
137             The default value is 0 (don't delete the file)
138            
139             =cut
140              
141             sub TIEARRAY
142             {
143             my $class = $_[0];
144             my %data;
145             $data{ _file } = $_[1];
146             $data{ _serialize } = $_[2] || 0;
147             $data{ _mode } = $_[3] || 0600;
148             $data{ _delete_on_start } = $_[4] || 0;
149              
150             my $serialiser;
151             if ( $data{ _serialize } )
152             {
153 1     1   2439 use Data::Serializer;
  0            
  0            
154             $serialiser = Data::Serializer->new( compress => 0 );
155             $data{ _serialize } = $serialiser;
156             }
157             my $dbfile = $data{ _file };
158             unlink $dbfile if ( -f $dbfile && $data{ _delete_on_start } );
159             my $dbh = DBI->connect( "dbi:SQLite:dbname=$dbfile", "", "" );
160             my $sql_list_table = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;";
161             my $ary_ref = $dbh->selectall_hashref( $sql_list_table, 'name' );
162             unless ( exists( $ary_ref->{ 'queue' } ) )
163             {
164             my $sql = "CREATE TABLE queue( key text , val text );";
165             my $res = $dbh->do( $sql );
166             }
167             my $mode = $data{ _mode };
168             chmod $mode, $data{ _file };
169             $data{ _bdb } = $dbh;
170             bless \%data, $class;
171             return \%data;
172             }
173              
174             =head2 FETCH
175            
176             Retrieve a specific key from the array
177             my $data = $myarray[6];
178             or
179             my $data = $t->FETCH(6);
180             or
181             my @data = $t->FETCH(6);
182             where
183             $data[0] = insertion key
184             and
185             $data[1] = value
186            
187             =cut
188              
189             sub FETCH
190             {
191             my $self = shift;
192             my $key = shift || 0;
193             if ( $key < 0 )
194             {
195             $key += $self->FETCHSIZE();
196             }
197             $key++;
198             my $dbh = $self->{ _bdb };
199             my $sql_view = "CREATE TEMP VIEW head_fetch AS SELECT * FROM queue ORDER BY key LIMIT $key;";
200             my $res = $dbh->do( $sql_view );
201             my $count = $dbh->selectall_arrayref( "SELECT COUNT(*) FROM head_fetch;" );
202             if ( ( $count->[0][0] ) < $key )
203             {
204             $res = $dbh->do( "DROP VIEW head_fetch " );
205             if ( wantarray )
206             {
207             return undef, undef;
208             }
209             else
210             {
211             return undef;
212             }
213             }
214             my $sql = "SELECT * FROM head_fetch ORDER BY key DESC LIMIT 1;";
215             my $row = $dbh->selectall_arrayref( $sql );
216             my $ticks = $row->[0][0];
217             my $val = $row->[0][1];
218             $val = $self->__deserialize__( $val ) if ( $self->{ _serialize } );
219             $res = $dbh->do( "DROP VIEW head_fetch " );
220             if ( wantarray )
221             {
222             return $ticks, $val;
223             }
224             else
225             {
226             return $val;
227             }
228             }
229              
230             =head2 FETCHSIZE
231            
232             Get the size of the array
233             my $data = scalar(@myarray);
234            
235             =cut
236              
237             sub FETCHSIZE
238             {
239             my $self = shift;
240             my $dbh = $self->{ _bdb };
241             my $row = $dbh->selectrow_hashref( "select count(*) from queue;" );
242             return $row->{ 'count(*)' };
243             }
244              
245             =head2 PUSH
246            
247             Add an element at the end of the array
248             push @myarray , 45646;
249             or
250             $t->PUSH( 'some text' );
251             it is also possible to add an elemnt with a offset expiration
252             $t->PUSH( 'some text in futur' , 10 );
253             add element in the array to be expired in 10 seconds
254             if the offset is negative, add the expiration in past
255            
256             =cut
257              
258             sub PUSH
259             {
260             my $self = shift;
261             my $value = shift;
262             my $time = shift || 0;
263             my $dbh = $self->{ _bdb };
264             $value = $self->__serialize__( $value ) if ( $self->{ _serialize } );
265             my ( $sec, $usec ) = gettimeofday;
266             $sec += $time if ( $time != 0 );
267             my $key = sprintf( "%010d%06d", $sec, $usec );
268             my $sql = "INSERT INTO queue ( key , val ) VALUES ( '$key','$value' );";
269             my $res = $dbh->do( $sql );
270             return $key;
271             }
272              
273             =head2 EXISTS
274            
275             Test if en element in the array exist
276             print "element exists\n" if (exits $myarray[5]);
277             return the insertion key
278            
279             =cut
280              
281             sub EXISTS
282             {
283             my $self = shift;
284             my $key = shift;
285             my $dbh = $self->{ _bdb };
286             return ( $self->FETCH( $key ) );
287             }
288              
289             =head2 POP
290            
291             Extract the latest element from the array (the youngest)
292             my $data = pop @myarray;
293             or
294             my $data = $t->POP();
295             or
296             my @data = $t->POP();
297             where
298             $data[0] = insertion key
299             and
300             $data[1] = value
301             =cut
302              
303             sub POP
304             {
305             my $self = shift;
306             my $dbh = $self->{ _bdb };
307             my $sql = "SELECT * FROM queue ORDER BY key DESC LIMIT 1;";
308             my $row = $dbh->selectall_arrayref( $sql );
309             my $ticks = $row->[0][0];
310             my $val = $row->[0][1];
311             $val = $self->__deserialize__( $val ) if ( $self->{ _serialize } );
312             my $sql_del = "DELETE FROM queue WHERE key = $ticks;";
313             my $res = $dbh->do( $sql_del );
314             if ( wantarray )
315             {
316             return $ticks, $val;
317             }
318             else
319             {
320             return $val;
321             }
322             }
323              
324             =head2 SHIFT
325            
326             Extract the first element from the array (the oldest)
327             my $data = shift @myarray;
328             or
329             my $data = $t->SHIFT();
330             or
331             my @data = $t->SHIFT();
332             where
333             $data[0] = insertion key
334             and
335             $data[1] = value
336             =cut
337              
338             sub SHIFT
339             {
340             my $self = shift;
341             my $dbh = $self->{ _bdb };
342             my $sql = "SELECT * FROM queue ORDER BY key LIMIT 1;";
343             my $row = $dbh->selectall_arrayref( $sql );
344             my $ticks = $row->[0][0];
345             my $val = $row->[0][1];
346             $val = $self->__deserialize__( $val ) if ( $self->{ _serialize } );
347             my $sql_del = "DELETE FROM queue WHERE key = $ticks;";
348             my $res = $dbh->do( $sql_del );
349             if ( wantarray )
350             {
351             return $ticks, $val;
352             }
353             else
354             {
355             return $val;
356             }
357             }
358              
359             =head2 UNSHIFT
360            
361             Add an element in the front of the array
362             unshift @myarray , 45646;
363             UNSHIFT data 1 mili-second before the first item
364            
365             =cut
366              
367             sub UNSHIFT
368             {
369             my $self = shift;
370             my $value = shift;
371             my $dbh = $self->{ _bdb };
372             my ( $k, $val ) = $self->FETCH( 0 );
373             my $sec = substr $k, 0, 10;
374             my $usec = substr $k, 10;
375             $usec--;
376             my $key = sprintf( "%010d%06d", $sec, $usec );
377             $value = $self->__serialize__( $value ) if ( $self->{ _serialize } );
378             my $sql = "INSERT INTO queue ( key , val ) VALUES ( '$key','$value' );";
379             my $res = $dbh->do( $sql );
380             return $key;
381             }
382              
383             =head2 CLEAR
384            
385             Delete all element in the array
386             $t->CLEAR;
387            
388             =cut
389              
390             sub CLEAR
391             {
392             my $self = shift;
393             my $dbh = $self->{ _bdb };
394             my $sql_del = "DELETE FROM queue;";
395             my $res = $dbh->do( $sql_del );
396             return $res;
397             }
398              
399             =head2 DESTROY
400            
401             Normal destructor call when untied the array
402             Normaly never called by user
403            
404             =cut
405              
406             sub DESTROY
407             {
408             my $self = shift;
409             my $dbh = $self->{ _bdb };
410             $dbh->disconnect;
411             }
412              
413             =head1 Specific functions from this module
414              
415             I< >
416              
417             =head2 SPLICE
418            
419             SPLICE don't allow a list replacement
420             because the insert order is made by time.
421             in scalar context return the latest element
422             in array context return all the elements selected
423             my @tmp = splice @myarray, 5 ,3;
424             or
425             my @res = $t->SPLICE( 1 , 7 );
426             =cut
427              
428             sub SPLICE
429             {
430             my $self = shift;
431             my $offset = shift || 0;
432             my $length = shift || 0;
433             my $dbh = $self->{ _bdb };
434             if ( $length == 0 )
435             {
436             $length = $self->FETCHSIZE();
437             }
438             if ( $offset < 0 )
439             {
440             $offset += $self->FETCHSIZE();
441             }
442             my $key = $offset + $length;
443             my $sql_view = "CREATE TEMP VIEW head_splice AS SELECT * FROM queue ORDER BY key LIMIT $key;";
444             my $res = $dbh->do( $sql_view );
445             # my $count = $dbh->selectall_arrayref( "SELECT COUNT(*) FROM head_splice;" );
446              
447             my $sql = "SELECT * FROM head_splice ORDER BY key DESC LIMIT $length;";
448             my ( $start, undef ) = $self->FETCH( $offset );
449             my ( $end, undef ) = $self->FETCH( $key );
450             if ( wantarray )
451             {
452             my $row = $dbh->selectall_arrayref( $sql );
453             $res = $dbh->do( "DROP VIEW head_splice " );
454             my $sql_del = "DELETE FROM queue WHERE key >= $start AND key < $end;";
455             my $res = $dbh->do( $sql_del );
456             return sort { $a->[0] <=> $b->[0] } @{ $row };
457             }
458             my $row = $dbh->selectcol_arrayref( $sql, { Columns => [2] } );
459             $res = $dbh->do( "DROP VIEW head_splice " );
460             my $sql_del = "DELETE FROM queue WHERE key >= $start AND key < $end;";
461             $res = $dbh->do( $sql_del );
462             my @REVERSED = reverse @$row;
463             return \@REVERSED;
464             }
465              
466             =head2 SLICE
467            
468             SLICE like SPLICE but don't delete elements
469             in scalar context return the latest element
470             in array context return all the elements selected
471            
472             my @res = $t->SPLICE( 1 , 7 );
473            
474             =cut
475              
476             sub SLICE
477             {
478             my $self = shift;
479             my $offset = shift || 0;
480             my $length = shift || 0;
481             my $dbh = $self->{ _bdb };
482             if ( $length == 0 )
483             {
484             $length = $self->FETCHSIZE();
485             }
486             if ( $offset < 0 )
487             {
488             $offset += $self->FETCHSIZE();
489             }
490             my $key = $offset + $length;
491             my $sql_view = "CREATE TEMP VIEW head_slice AS SELECT * FROM queue ORDER BY key LIMIT $key;";
492             my $res = $dbh->do( $sql_view );
493             # my $count = $dbh->selectall_arrayref( "SELECT COUNT(*) FROM head_slice;" );
494             my $sql = "SELECT * FROM head_slice ORDER BY key DESC LIMIT $length;";
495             if ( wantarray )
496             {
497             my $row = $dbh->selectall_arrayref( $sql );
498             $res = $dbh->do( "DROP VIEW head_slice " );
499             return sort { $a->[0] <=> $b->[0] } @{ $row };
500             }
501             my $row = $dbh->selectcol_arrayref( $sql, { Columns => [2] } );
502             $res = $dbh->do( "DROP VIEW head_slice " );
503             my @REVERSED = reverse @$row;
504             return \@REVERSED;
505             }
506              
507             =head2 EXPIRE
508            
509             Get the elements expired in the array.
510             my @ALL = $t->EXPIRE( 1207840028) ;
511             return a refernce to an array with all the expired value.
512            
513             If a second parameter is provided and not null, the data are also deleted from the array.
514             my @ALL = $t->EXPIRE( 1207840028 , 1 ) ;
515             return a refernce to an array with all the expired value.
516            
517             =cut
518              
519             sub EXPIRE
520             {
521             my $self = shift;
522             my $time = shift;
523             my $to_del = shift || 0;
524             my $dbh = $self->{ _bdb };
525            
526             my ( $sec, $usec ) = gettimeofday;
527             $sec += $time if ( $time != 0 );
528             my $key = sprintf( "%010d%06d", $sec, $usec );
529             my $sql = "SELECT * FROM queue WHERE key <= $key ORDER BY key ;";
530             if ( wantarray )
531             {
532             my $row = $dbh->selectall_arrayref( $sql );
533             my $res = $dbh->do( "DELETE FROM queue WHERE key <= $key;") if ($to_del) ;
534             return sort { $a->[0] <=> $b->[0] } @{ $row };
535             }
536             my $row = $dbh->selectcol_arrayref( $sql, { Columns => [2] } );
537             my $res = $dbh->do( "DELETE FROM queue WHERE key <= $key;") if ($to_del) ;
538             my @REVERSED = @$row;
539             return \@REVERSED;
540            
541             }
542              
543             =head1 Functions not Implemented
544              
545             I< >
546              
547              
548             =head2 EXTEND
549            
550             Not implemented because not signifiant for a expiration queue
551            
552             =cut
553              
554             sub EXTEND { carp "no EXTEND function"; }
555              
556             =head2 STORE
557            
558             Not implemented because not signifiant for a expiration queue
559            
560             =cut
561              
562             sub STORE { carp "no STORE function"; }
563              
564             =head2 STORESIZE
565            
566             Not implemented because not signifiant for a expiration queue
567            
568             =cut
569              
570             sub STORESIZE { carp "no STORESIZE function"; }
571              
572             sub __serialize__
573             {
574             my $self = shift;
575             my $val = shift;
576             my $serializer = $self->{ _serialize };
577             return $serializer->serialize( $val ) if $val;
578             return $val;
579             }
580              
581             sub __deserialize__
582             {
583             my $self = shift;
584             my $val = shift;
585             my $serializer = $self->{ _serialize };
586             return $serializer->deserialize( $val ) if $val;
587             return $val;
588             }
589             1;
590             __END__