File Coverage

lib/Redis/JobQueue.pm
Criterion Covered Total %
statement 101 473 21.3
branch 0 204 0.0
condition 0 149 0.0
subroutine 34 80 42.5
pod 16 16 100.0
total 151 922 16.3


line stmt bran cond sub pod time code
1             package Redis::JobQueue;
2              
3             =head1 NAME
4              
5             Redis::JobQueue - Job queue management implemented using Redis server.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.19
10              
11             =cut
12              
13             #-- Pragmas --------------------------------------------------------------------
14              
15 62     62   8736776 use 5.010;
  62         174  
16 62     62   212 use strict;
  62         70  
  62         1015  
17 62     62   178 use warnings;
  62         73  
  62         2213  
18              
19             # ENVIRONMENT ------------------------------------------------------------------
20              
21             our $VERSION = '1.19';
22              
23 62         2693 use Exporter qw(
24             import
25 62     62   241 );
  62         86  
26             our @EXPORT_OK = qw(
27             DEFAULT_SERVER
28             DEFAULT_PORT
29             DEFAULT_TIMEOUT
30             DEFAULT_CONNECTION_TIMEOUT
31             DEFAULT_OPERATION_TIMEOUT
32             NAMESPACE
33             E_NO_ERROR
34             E_MISMATCH_ARG
35             E_DATA_TOO_LARGE
36             E_NETWORK
37             E_MAX_MEMORY_LIMIT
38             E_JOB_DELETED
39             E_REDIS
40             );
41              
42             #-- load the modules -----------------------------------------------------------
43              
44 62     62   225 use Carp;
  62         70  
  62         2781  
45 62     62   24972 use Data::UUID;
  62         28982  
  62         3127  
46 62         3043 use Digest::SHA1 qw(
47             sha1_hex
48 62     62   25111 );
  62         27749  
49 62         4239 use List::Util qw(
50             min
51             shuffle
52 62     62   288 );
  62         59  
53 62         395 use List::MoreUtils qw(
54             firstidx
55 62     62   25341 );
  62         372501  
56 62     62   27417 use Mouse;
  62         39025  
  62         419  
57 62     62   20109 use Mouse::Util::TypeConstraints;
  62         83  
  62         351  
58 62         3799 use Params::Util qw(
59             _ARRAY0
60             _INSTANCE
61             _NONNEGINT
62             _STRING
63 62     62   30513 );
  62         110278  
64 62     62   35329 use Redis '1.976';
  62         1136674  
  62         1993  
65 62         3707 use Redis::JobQueue::Job qw(
66             STATUS_CREATED
67             STATUS_WORKING
68             STATUS_COMPLETED
69             STATUS_FAILED
70 62     62   22690 );
  62         5596  
71 62         3158 use Redis::JobQueue::Util qw(
72             format_message
73 62     62   18247 );
  62         108  
74 62         3663 use Storable qw(
75             nfreeze
76             thaw
77 62     62   35521 );
  62         150051  
78 62     62   333 use Time::HiRes ();
  62         78  
  62         940  
79 62     62   201 use Try::Tiny;
  62         70  
  62         4436  
80              
81             #-- declarations ---------------------------------------------------------------
82              
83             =head1 SYNOPSIS
84              
85             use 5.010;
86             use strict;
87             use warnings;
88              
89             #-- Common
90             use Redis::JobQueue qw(
91             DEFAULT_SERVER
92             DEFAULT_PORT
93             );
94              
95             my $connection_string = DEFAULT_SERVER.':'.DEFAULT_PORT;
96             my $jq = Redis::JobQueue->new( redis => $connection_string );
97              
98             #-- Producer
99             my $job = $jq->add_job(
100             {
101             queue => 'xxx',
102             workload => \'Some stuff',
103             expire => 12*60*60, # 12h,
104             }
105             );
106              
107             #-- Worker
108             sub xxx {
109             my $job = shift;
110              
111             my $workload = ${ $job->workload };
112             # do something with workload;
113              
114             $job->result( 'XXX JOB result comes here' );
115             }
116              
117             while ( $job = $jq->get_next_job(
118             queue => 'xxx',
119             blocking => 1,
120             ) ) {
121             $job->status( 'working' );
122             $jq->update_job( $job );
123              
124             # do my stuff
125             xxx( $job );
126              
127             $job->status( 'completed' );
128             $jq->update_job( $job );
129             }
130              
131             #-- Consumer
132             my $id = $ARGV[0];
133             my $status = $jq->get_job_data( $id, 'status' );
134              
135             if ( $status eq 'completed' ) {
136             # it is now safe to remove it from JobQueue, since it's completed
137             my $job = $jq->load_job( $id );
138              
139             $jq->delete_job( $id );
140             say 'Job result: ', ${ $job->result };
141             } else {
142             say "Job is not complete, has current '$status' status";
143             }
144              
145             To see a brief but working code example of the C
146             package usage look at the L section.
147              
148             Description of the used by C data
149             structures (on Redis server) is provided in L section.
150              
151             =head1 ABSTRACT
152              
153             The C package is a set of Perl modules which
154             allows creation of a simple job queue based on Redis server capabilities.
155              
156             =head1 DESCRIPTION
157              
158             The main features of the package are:
159              
160             =over 3
161              
162             =item *
163              
164             Supports the automatic creation of job queues, job status monitoring,
165             updating the job data set, obtaining a consistent job from the queue,
166             removing jobs, and the classification of possible errors.
167              
168             =item *
169              
170             Contains various reusable components that can be used separately or together.
171              
172             =item *
173              
174             Provides an object oriented API.
175              
176             =item *
177              
178             Support of storing arbitrary job-related data structures.
179              
180             =item *
181              
182             Simple methods for organizing producer, worker, and consumer clients.
183              
184             =back
185              
186             =head3 Atributes
187              
188             =over
189              
190             =item C
191              
192             An id that uniquely identifies the job, scalar.
193              
194             =item C
195              
196             Queue name in which job is placed, scalar.
197              
198             =item C
199              
200             For how long (seconds) job data structures will be kept in memory.
201              
202             =item C
203              
204             Job status, scalar. See L L section
205             for the list of pre-defined statuses.
206             Can be also set to any arbitrary value.
207              
208             =item C, C
209              
210             User-set data structures which will be serialized before stored in Redis server.
211             Suitable for passing large amounts of data.
212              
213             =item C<*>
214              
215             Any other custom-named field passed to L or L method
216             will be stored as metadata scalar in Redis server.
217             Suitable for storing scalar values with fast access
218             (will be serialized before stored in Redis server).
219              
220             =back
221              
222             =cut
223              
224             =head2 EXPORT
225              
226             None by default.
227              
228             The following additional constants, defining defaults for various parameters, are available for export:
229              
230             =over
231              
232             =item C
233              
234             Maximum size of the data stored in C, C: 512MB.
235              
236             =cut
237 62     62   254 use constant MAX_DATASIZE => 512*1024*1024; # A String value can be at max 512 Megabytes in length.
  62         82  
  62         3936  
238              
239             =item C
240              
241             Default address of the Redis server - C<'localhost'>.
242              
243             =cut
244 62     62   233 use constant DEFAULT_SERVER => 'localhost';
  62         86  
  62         2826  
245              
246             =item C
247              
248             Default port of the Redis server - 6379.
249              
250             =cut
251 62     62   222 use constant DEFAULT_PORT => 6379;
  62         70  
  62         2534  
252              
253             =item C
254              
255             Maximum time (in seconds) to wait for a new job from the queue,
256             0 - unlimited.
257              
258             =cut
259 62     62   226 use constant DEFAULT_TIMEOUT => 0; # 0 for an unlimited timeout
  62         74  
  62         2600  
260              
261             =item C
262              
263             Default socket timeout for connection, number of seconds: 0.1 .
264              
265             =cut
266 62     62   222 use constant DEFAULT_CONNECTION_TIMEOUT => 0.1;
  62         90  
  62         2503  
267              
268             =item C
269              
270             Default socket timeout for read and write operations, number of seconds: 1.
271              
272             =cut
273 62     62   238 use constant DEFAULT_OPERATION_TIMEOUT => 1;
  62         66  
  62         2901  
274              
275             =item C
276              
277             Namespace used for keys on the Redis server - C<'JobQueue'>.
278              
279             =cut
280 62     62   291 use constant NAMESPACE => 'JobQueue';
  62         110  
  62         2575  
281              
282             =item Error codes are identified
283              
284             More details about error codes are provided in L section.
285              
286             =back
287              
288             Possible error codes:
289              
290             =cut
291              
292             =over 3
293              
294             =item C
295              
296             0 - No error
297              
298             =cut
299 62     62   233 use constant E_NO_ERROR => 0;
  62         71  
  62         2590  
300              
301             =item C
302              
303             1 - Invalid argument of C or other L.
304              
305             =cut
306 62     62   271 use constant E_MISMATCH_ARG => 1;
  62         78  
  62         2582  
307              
308             =item C
309              
310             2 - Provided data is too large.
311              
312             =cut
313 62     62   230 use constant E_DATA_TOO_LARGE => 2;
  62         60  
  62         2556  
314              
315             =item C
316              
317             3 - Error connecting to Redis server.
318              
319             =cut
320 62     62   205 use constant E_NETWORK => 3;
  62         70  
  62         2570  
321              
322             =item C
323              
324             4 - Command failed because its execution requires more than allowed memory, set in C.
325              
326             =cut
327 62     62   219 use constant E_MAX_MEMORY_LIMIT => 4;
  62         61  
  62         2293  
328              
329             =item C
330              
331             5 - Job's data was removed.
332              
333             =cut
334 62     62   205 use constant E_JOB_DELETED => 5;
  62         82  
  62         2720  
335              
336             =item C
337              
338             6 - Other error on Redis server.
339              
340             =back
341              
342             =cut
343 62     62   253 use constant E_REDIS => 6;
  62         61  
  62         306657  
344              
345             our @ERROR = (
346             'No error',
347             'Invalid argument',
348             'Data is too large',
349             'Error in connection to Redis server',
350             "Command not allowed when used memory > 'maxmemory'",
351             'job was removed prior to use',
352             'Redis error',
353             );
354              
355             our $WAIT_USED_MEMORY = 0; # No attempt to protect against used_memory > maxmemory
356             #our $WAIT_USED_MEMORY = 1;
357              
358             my $_ID_IN_QUEUE_FIELD = '__id_in_queue__';
359              
360             my $NAMESPACE = NAMESPACE;
361              
362             my @job_fields = Redis::JobQueue::Job->job_attributes; # sorted list
363             splice @job_fields, ( firstidx { $_ eq 'meta_data' } @job_fields ), 1;
364             my %job_fnames = map { $_ => 1 } @job_fields;
365              
366             my $uuid = new Data::UUID;
367              
368             my ( $_running_script_name, $_running_script_body );
369             my %lua_script_body;
370             my $lua_id_rgxp = '([^:]+)$';
371              
372             my $lua_body_start = <<"END_BODY_START";
373             local job_id = ARGV[1]
374             local data_fields = { unpack( ARGV, 2 ) }
375              
376             local job_key = '${NAMESPACE}:'..job_id
377             local job_exists = redis.call( 'EXISTS', job_key )
378             local job_data = {}
379              
380             if job_exists == 1 then
381             END_BODY_START
382              
383             my $lua_body_end = <<"END_BODY_END";
384             end
385             return { job_exists, unpack( job_data ) }
386             END_BODY_END
387              
388             # Deletes the job data in Redis server
389             $lua_script_body{delete_job} = <<"END_DELETE_JOB";
390             local job_id = ARGV[1]
391              
392             local job_key = '${NAMESPACE}:'..job_id
393             if redis.call( 'EXISTS', job_key ) == 1 then
394             redis.call( 'LREM', '${NAMESPACE}:queue:'..redis.call( 'HGET', job_key, 'queue' ), 0, redis.call( 'HGET', job_key, '${_ID_IN_QUEUE_FIELD}' ) )
395             return redis.call( 'DEL', job_key )
396             else
397             return nil
398             end
399             END_DELETE_JOB
400              
401             # Adds a job to the queue on the Redis server
402             $lua_script_body{load_job} = <<"END_LOAD_JOB";
403             $lua_body_start
404              
405             for _, field in ipairs( redis.call( 'HKEYS', job_key ) ) do
406             if field ~= '${_ID_IN_QUEUE_FIELD}' then
407             -- return the field names and values for the data fields
408             table.insert( job_data, field )
409             table.insert( job_data, redis.call( 'HGET', job_key, field ) )
410             end
411             end
412              
413             $lua_body_end
414             END_LOAD_JOB
415              
416             # Data of the job is requested from the Redis server
417             $lua_script_body{get_job_data} = <<"END_GET_JOB_DATA";
418             $lua_body_start
419              
420             for _, field in ipairs( data_fields ) do
421             table.insert( job_data, redis.call( 'HGET', job_key, field ) )
422             end
423              
424             $lua_body_end
425             END_GET_JOB_DATA
426              
427             # Gets queue status from the Redis server
428             $lua_script_body{queue_status} = <<"END_QUEUE_STATUS";
429             local queue = ARGV[1]
430             local tm = tonumber( ARGV[2] ) -- a floating seconds since the epoch
431              
432             local queue_key = '${NAMESPACE}:queue:'..queue
433             local queue_status = {}
434              
435             -- if it is necessary to determine the status of a particular queue
436             if queue then
437             -- Queue length
438             queue_status[ 'length' ] = redis.call( 'LLEN', queue_key )
439             -- if the queue is set
440             if queue_status[ 'length' ] > 0 then
441             -- for each item in the queue
442             for _, in_queue_id in ipairs( redis.call( 'LRANGE', queue_key, 0, -1 ) ) do
443             -- select the job ID and determine the value of a field 'created'
444             local created = redis.call( 'HGET', '${NAMESPACE}:'..in_queue_id:match( '^(%S+)' ), 'created' )
445             if created then
446             created = tonumber( created ) -- values are stored as strings (a floating seconds since the epoch)
447             -- initialize the calculated values
448             if not queue_status[ 'max_job_age' ] then
449             queue_status[ 'max_job_age' ] = 0
450             queue_status[ 'min_job_age' ] = 0
451             end
452             -- time of birth
453             if queue_status[ 'max_job_age' ] == 0 or created < queue_status[ 'max_job_age' ] then
454             queue_status[ 'max_job_age' ] = created
455             end
456             if queue_status[ 'min_job_age' ] == 0 or created > queue_status[ 'min_job_age' ] then
457             queue_status[ 'min_job_age' ] = created
458             end
459             end
460             end
461              
462             -- time of birth -> age
463             if queue_status[ 'max_job_age' ] then -- queue_status[ 'min_job_age' ] also ~= 0
464             -- The age of the old job (the lifetime of the queue)
465             queue_status[ 'max_job_age' ] = tm - queue_status[ 'max_job_age' ]
466             -- The age of the younger job
467             queue_status[ 'min_job_age' ] = tm - queue_status[ 'min_job_age' ]
468             -- Duration of queue activity (the period during which new jobs were created)
469             queue_status[ 'lifetime' ] = queue_status[ 'max_job_age' ] - queue_status[ 'min_job_age' ]
470             end
471             end
472             end
473              
474             -- all jobs in the queue, including inactive ones
475             queue_status[ 'all_jobs' ] = 0
476             -- review all job keys on the server, including inactive ones
477             for _, key in ipairs( redis.call( 'KEYS', '${NAMESPACE}:*' ) ) do
478             -- counting on the basic structures of jobs
479             if key:find( '^${NAMESPACE}:${lua_id_rgxp}' ) then
480             -- consider only the structure related to a given queue
481             if redis.call( 'HGET', key, 'queue' ) == queue then
482             queue_status[ 'all_jobs' ] = queue_status[ 'all_jobs' ] + 1
483             end
484             end
485             end
486              
487             local result_status = {}
488             -- memorize the names and values of what was possible to calculate
489             for key, val in pairs( queue_status ) do
490             table.insert( result_status, key )
491             table.insert( result_status, tostring( val ) )
492             end
493              
494             return result_status
495             END_QUEUE_STATUS
496              
497             # Gets a list of job IDs on the Redis server
498             $lua_script_body{get_job_ids} = <<"END_GET_JOB_IDS";
499             local is_queued = tonumber( ARGV[1] )
500             local queues = { unpack( ARGV, 3, 2 + ARGV[2] ) }
501             local statuses = { unpack( ARGV, 3 + ARGV[2] ) }
502              
503             local tmp_ids = {}
504              
505             if is_queued == 1 then -- jobs in the queues
506             -- if not limited to specified queues
507             if #queues == 0 then
508             -- determine the queues still have not served the job
509             for _, queue_key in ipairs( redis.call( 'KEYS', '${NAMESPACE}:queue:*' ) ) do
510             table.insert( queues, queue_key:match( '${lua_id_rgxp}' ) )
511             end
512             end
513             -- view each specified queue
514             for _, queue in ipairs( queues ) do
515             -- for each identifier contained in the queue
516             for _, in_queue_id in ipairs( redis.call( 'LRANGE', '${NAMESPACE}:queue:'..queue, 0, -1 ) ) do
517             -- distinguish and remember the ID of the job
518             tmp_ids[ in_queue_id:match( '^(%S+)' ) ] = 1
519             end
520             end
521             else -- all jobs on the server
522             local all_job_ids = {}
523             for _, key in ipairs( redis.call( 'KEYS', '${NAMESPACE}:*' ) ) do
524             -- considering only the basic structure of jobs
525             if key:find( '^${NAMESPACE}:${lua_id_rgxp}' ) then
526             -- forming a "hash" of jobs IDs
527             all_job_ids[ key:match( '${lua_id_rgxp}' ) ] = 1
528             end
529             end
530             -- if a restriction is set on the queues
531             if #queues > 0 then
532             -- analyze each job on a server
533             for job_id, _ in pairs( all_job_ids ) do
534             -- get the name of the job queue
535             local tmp_queue = redis.call( 'HGET', '${NAMESPACE}:'..job_id, 'queue' )
536             -- associate job queue name with the names of the queues from the restriction
537             for _, queue in ipairs( queues ) do
538             if tmp_queue == queue then
539             -- memorize the appropriate job ID
540             tmp_ids[ job_id ] = 1
541             break
542             end
543             end
544             end
545             else
546             -- if there is no restriction on the queues, then remember all the jobs on the server
547             tmp_ids = all_job_ids
548             end
549             end
550              
551             -- if the restriction is set by the statuses
552             if #statuses > 0 then
553             -- analyze each job from a previously created "hash"
554             for job_id, _ in pairs( tmp_ids ) do
555             -- determine the current status of the job
556             local job_status = redis.call( 'HGET', '${NAMESPACE}:'..job_id, 'status' )
557             -- associate job status with the statuses from the restriction
558             for _, status in ipairs( statuses ) do
559             if job_status == status then
560             -- mark the job satisfying the restriction
561             tmp_ids[ job_id ] = 2 -- Filter by status sign
562             break
563             end
564             end
565             end
566             -- reanalyze each job from a previously created "hash"
567             for job_id, _ in pairs( tmp_ids ) do
568             if tmp_ids[ job_id ] ~= 2 then
569             -- remove unfiltered by status
570             tmp_ids[ job_id ] = nil
571             end
572             end
573             end
574              
575             local job_ids = {}
576             for id, _ in pairs( tmp_ids ) do
577             -- remember identifiers of selected jobs
578             table.insert( job_ids, id )
579             end
580              
581             return job_ids
582             END_GET_JOB_IDS
583              
584             #-- constructor ----------------------------------------------------------------
585              
586             =head2 CONSTRUCTOR
587              
588             =head3 C $server, timeout =E $timeout, check_maxmemory =E $mode, ... )>
589              
590             Creates a new C object to communicate with Redis server.
591             If invoked without any arguments, the constructor C creates and returns
592             a C object that is configured with the default settings and uses
593             local C server.
594              
595             In the parameters of the constructor 'redis' may be either a Redis object
596             or a server string or a hash reference of parameters to create a Redis object.
597              
598             Optional C boolean argument (default is true)
599             defines if attempt is made to find out maximum available memory from Redis.
600              
601             In some cases Redis implementation forbids such request,
602             but setting to false can be used as a workaround.
603              
604             This example illustrates a C call with all the valid arguments:
605              
606             my $coll = Redis::CappedCollection->create(
607             redis => { server => "$server:$port" }, # Redis object
608             # or hash reference to parameters to create a new Redis object.
609             timeout => $timeout, # wait time (in seconds)
610             # for blocking call of get_next_job.
611             # Set 0 for unlimited wait time
612             check_maxmemory => $mode, # boolean argument (default is true)
613             # defines if attempt is made to find out
614             # maximum available memory from Redis.
615             reconnect_on_error => 1, # Boolean argument - default is true and conservative_reconnect is true.
616             # Controls ability to force re-connection with Redis on error.
617             connection_timeout => DEFAULT_CONNECTION_TIMEOUT, # Socket timeout for connection,
618             # number of seconds (can be fractional).
619             # NOTE: Changes external socket configuration.
620             operation_timeout => DEFAULT_OPERATION_TIMEOUT, # Socket timeout for read and write operations,
621             # number of seconds (can be fractional).
622             # NOTE: Changes external socket configuration.
623             );
624              
625             =head3 Caveats related to connection with Redis server
626              
627             =over 3
628              
629             =item *
630              
631             According to L documentation:
632              
633             This module consider that any data sent to the Redis server is a raw octets string,
634             even if it has utf8 flag set.
635             And it doesn't do anything when getting data from the Redis server.
636              
637             =item *
638              
639             Non-serialize-able fields (like status or message) passed in UTF-8 can not be
640             correctly restored from Redis server. To avoid potential data corruction, passing
641             UTF-8 encoded value causes error.
642              
643             =item *
644              
645             L value is used when a L class object is
646             passed to the C constructor without additional C argument.
647              
648             =back
649              
650             This example illustrates C call with all possible arguments:
651              
652             my $jq = Redis::JobQueue->new(
653             redis => "$server:$port", # Connection info for Redis which hosts queue
654             timeout => $timeout, # wait time (in seconds)
655             # for blocking call of get_next_job.
656             # Set 0 for unlimited wait time
657             );
658             # or
659             my $jq = Redis::JobQueue->new(
660             redis => { # The hash reference of parameters
661             # to create a Redis object
662             server => "$server:$port",
663             },
664             );
665              
666             The following examples illustrate other uses of the C method:
667              
668             $jq = Redis::JobQueue->new();
669             my $next_jq = Redis::JobQueue->new( $jq );
670              
671             my $redis = Redis->new( server => "$server:$port" );
672             $next_jq = Redis::JobQueue->new(
673             $redis,
674             timeout => $timeout,
675             );
676             # or
677             $next_jq = Redis::JobQueue->new(
678             redis => $redis,
679             timeout => $timeout,
680             );
681              
682             An invalid argument causes die (C).
683              
684             =cut
685             around BUILDARGS => sub {
686             my $orig = shift;
687             my $class = shift;
688              
689             if ( _INSTANCE( $_[0], 'Redis' ) ) {
690             my $redis = shift;
691             return $class->$orig(
692             # have to look into the Redis object ...
693             redis => $redis->{server},
694             # it is impossible to know from Redis now ...
695             #timeout => $redis->???,
696             _redis => $redis,
697             @_
698             );
699             } elsif ( _INSTANCE( $_[0], 'Test::RedisServer' ) ) {
700             # to test only
701             my $redis = shift;
702             # have to look into the Test::RedisServer object ...
703             my $conf = $redis->conf;
704             $conf->{server} = '127.0.0.1:'.$conf->{port} unless exists $conf->{server};
705             return $class->$orig(
706             redis => $conf->{server},
707             _redis => Redis->new( %$conf ),
708             @_
709             );
710             } elsif ( _INSTANCE( $_[0], __PACKAGE__ ) ) {
711             my $jq = shift;
712             return $class->$orig(
713             redis => $jq->_server,
714             _redis => $jq->_redis,
715             timeout => $jq->timeout,
716             @_
717             );
718             } else {
719             my %args = @_;
720             my $redis = $args{redis};
721             if ( _INSTANCE( $redis, 'Redis' ) ) {
722             delete $args{redis};
723             return $class->$orig(
724             # have to look into the Redis object ...
725             redis => $redis->{server},
726             # it is impossible to know from Redis now ...
727             #timeout => $redis->???,
728             _redis => $redis,
729             %args,
730             );
731             }
732             elsif ( ref( $redis ) eq 'HASH' ) {
733             $args{_use_external_connection} = 0;
734             my $conf = $redis;
735             $conf->{server} = DEFAULT_SERVER.':'.DEFAULT_PORT unless exists $conf->{server};
736             $conf->{reconnect} = 1 unless exists $conf->{reconnect};
737             $conf->{every} = 1000 unless exists $conf->{every}; # 1 ms
738             $conf->{conservative_reconnect} = 1 unless exists $conf->{conservative_reconnect};
739             $conf->{cnx_timeout} = $args{connection_timeout} = $conf->{cnx_timeout} // $args{connection_timeout} // DEFAULT_CONNECTION_TIMEOUT;
740             $conf->{read_timeout} = $args{operation_timeout} = $conf->{read_timeout} // $args{operation_timeout} // DEFAULT_OPERATION_TIMEOUT;
741             $conf->{write_timeout} = $conf->{read_timeout};
742             delete $args{redis};
743             return $class->$orig(
744             redis => $conf->{server},
745             _redis => Redis->new( %$conf ),
746             %args,
747             );
748             } else {
749             return $class->$orig( %args );
750             }
751             }
752              
753             return;
754             };
755              
756             sub BUILD {
757 0     0 1   my ( $self ) = @_;
758              
759 0 0         $self->_redis( $self->_redis_constructor )
760             unless ( $self->_redis );
761             $self->_redis->connect if
762             exists( $self->_redis->{no_auto_connect_on_new} )
763             && $self->_redis->{no_auto_connect_on_new}
764             && !$self->_redis->{sock}
765 0 0 0       ;
      0        
766              
767 0 0         if ( $self->_check_maxmemory ) {
768 0           my ( undef, $max_datasize ) = $self->_call_redis( 'CONFIG', 'GET', 'maxmemory' );
769 0 0         defined( _NONNEGINT( $max_datasize ) )
770             or die $self->_error( E_NETWORK );
771 0 0         $self->max_datasize( min $max_datasize, $self->max_datasize )
772             if $max_datasize;
773             }
774              
775 0           $self->_connection_timeout_trigger( $self->connection_timeout );
776 0           $self->_operation_timeout_trigger( $self->operation_timeout );
777              
778 0           my ( $major, $minor ) = $self->_redis->info->{redis_version} =~ /^(\d+)\.(\d+)/;
779 0 0 0       if ( $major < 2 || $major == 2 && $minor < 8 ) {
      0        
780 0           $self->_error( E_REDIS );
781 0           confess 'Needs Redis server version 2.8 or higher';
782             }
783              
784 0           return;
785             }
786              
787             #-- public attributes ----------------------------------------------------------
788              
789             subtype __PACKAGE__.'::NonNegNum',
790             as 'Num',
791             where { $_ >= 0 },
792             message { format_message( '%s is not a non-negative number!', $_ ) }
793             ;
794              
795             =head2 METHODS
796              
797             The following methods are available for object of the C class:
798              
799             =cut
800              
801             =head3 C
802              
803             Accessor to the C attribute.
804              
805             Returns current value of the attribute if called without an argument.
806              
807             Non-negative integer value can be used to specify a new value of the maximum
808             waiting time for queue (of the L method). Use
809             C = 0 for an unlimited wait time.
810              
811             =cut
812             has 'timeout' => (
813             is => 'rw',
814             isa => 'Redis::JobQueue::Job::NonNegInt',
815             default => DEFAULT_TIMEOUT,
816             );
817              
818             =head3 reconnect_on_error
819              
820             Controls ability to force re-connection with Redis on error.
821              
822             =cut
823             has reconnect_on_error => (
824             is => 'rw',
825             isa => 'Bool',
826             default => 0,
827             );
828              
829             =head3 connection_timeout
830              
831             Controls socket timeout for Redis server connection, number of seconds (can be fractional).
832              
833             NOTE: Changes external socket configuration.
834              
835             =cut
836             has connection_timeout => (
837             is => 'rw',
838             isa => 'Maybe['.__PACKAGE__.'::NonNegNum]',
839             default => undef,
840             trigger => \&_connection_timeout_trigger,
841             );
842              
843             sub _connection_timeout_trigger {
844 0     0     my ( $self, $timeout, $old_timeout ) = @_;
845              
846 0 0 0       return if scalar( @_ ) == 2 && ( !defined( $timeout ) && !defined( $old_timeout ) );
      0        
847              
848 0 0         if ( my $redis = $self->_redis ) {
849 0 0         my $socket = _INSTANCE( $redis->{sock}, 'IO::Socket' ) or confess( 'Bad socket object' );
850             # IO::Socket provides a way to set a timeout on the socket,
851             # but the timeout will be used only for connection,
852             # not for reading / writing operations.
853 0           $socket->timeout( $redis->{cnx_timeout} = $timeout );
854             }
855              
856 0           return;
857             }
858              
859             =head3 operation_timeout
860              
861             Controls socket timeout for Redis server read and write operations, number of seconds (can be fractional).
862              
863             NOTE: Changes external socket configuration.
864              
865             =cut
866             has operation_timeout => (
867             is => 'rw',
868             isa => 'Maybe['.__PACKAGE__.'::NonNegNum]',
869             default => undef,
870             trigger => \&_operation_timeout_trigger,
871             );
872              
873             sub _operation_timeout_trigger {
874 0     0     my ( $self, $timeout, $old_timeout ) = @_;
875              
876 0 0 0       return if scalar( @_ ) == 2 && ( !defined( $timeout ) && !defined( $old_timeout ) );
      0        
877              
878 0 0         if ( my $redis = $self->_redis ) {
879 0 0         my $socket = _INSTANCE( $redis->{sock}, 'IO::Socket' ) or confess( 'Bad socket object' );
880             # IO::Socket::Timeout provides a way to set a timeout
881             # on read / write operations on an IO::Socket instance,
882             # or any IO::Socket::* modules, like IO::Socket::INET.
883 0 0         if ( defined $timeout ) {
884 0           $redis->{write_timeout} = $redis->{read_timeout} = $timeout;
885 0           $redis->_maybe_enable_timeouts( $socket );
886 0           $socket->enable_timeout;
887             } else {
888 0           $redis->{write_timeout} = $redis->{read_timeout} = 0;
889 0           $redis->_maybe_enable_timeouts( $socket );
890 0           $socket->disable_timeout;
891             }
892             }
893              
894 0           return;
895             }
896              
897             =head3 C
898              
899             Provides access to the C attribute.
900              
901             Returns current value of the attribute if called with no argument.
902              
903             Non-negative integer value can be used to specify a new value for the maximum
904             size of data in the attributes of a
905             L object.
906              
907             The check is done before sending data to the module L,
908             after possible processing by methods of module L
909             (attributes L, L
910             and L).
911             It is automatically serialized.
912              
913             The C attribute value is used in the L
914             and data entry job operations on the Redis server.
915              
916             The L uses the smaller of the values of 512MB and
917             C limit from a F file.
918              
919             =cut
920             has 'max_datasize' => (
921             is => 'rw',
922             isa => 'Redis::JobQueue::Job::NonNegInt',
923             default => MAX_DATASIZE,
924             );
925              
926             =head3 C
927              
928             Returns the code of the last identified error.
929              
930             See L section for description of error codes.
931              
932             =cut
933             has 'last_errorcode' => (
934             reader => 'last_errorcode',
935             writer => '_set_last_errorcode',
936             isa => 'Int',
937             default => E_NO_ERROR,
938             );
939              
940             #-- private attributes ---------------------------------------------------------
941              
942             has '_server' => (
943             is => 'rw',
944             init_arg => 'redis',
945             isa => 'Str',
946             default => DEFAULT_SERVER.':'.DEFAULT_PORT,
947             trigger => sub {
948             my $self = shift;
949             $self->_server( $self->_server.':'.DEFAULT_PORT )
950             unless $self->_server =~ /:/;
951             },
952             );
953              
954             has '_redis' => (
955             is => 'rw',
956             isa => 'Maybe[Redis]',
957             default => undef,
958             );
959              
960             has '_transaction' => (
961             is => 'rw',
962             isa => 'Bool',
963             default => undef,
964             );
965              
966             has '_lua_scripts' => (
967             is => 'rw',
968             isa => 'HashRef[Str]',
969             lazy => 1,
970             init_arg => undef,
971             builder => sub { return {}; },
972             );
973              
974             has '_check_maxmemory' => (
975             is => 'ro',
976             init_arg => 'check_maxmemory',
977             isa => 'Bool',
978             default => 1,
979             );
980              
981             has '_use_external_connection' => (
982             is => 'rw',
983             isa => 'Bool',
984             default => 1,
985             );
986              
987             #-- public methods -------------------------------------------------------------
988              
989             =head3 C
990              
991             Returns error message of the last identified error.
992              
993             See L section for more info on errors.
994              
995             =cut
996             sub last_error {
997 0     0 1   my ( $self ) = @_;
998              
999 0           return $ERROR[ $self->last_errorcode ];
1000             }
1001              
1002             =head3 C 1 )>
1003              
1004             Adds a job to the queue on the Redis server.
1005              
1006             The first argument should be either L
1007             object (which is modified by the method) or a reference to a hash representing
1008             L - in the latter case
1009             a new L object is created.
1010              
1011             Returns a L object with a new
1012             unique identifier.
1013              
1014             Job status is set to L.
1015              
1016             C optionally takes arguments in key-value pairs.
1017              
1018             The following example illustrates a C call with all possible arguments:
1019              
1020             my $job_data = {
1021             id => '4BE19672-C503-11E1-BF34-28791473A258',
1022             queue => 'lovely_queue', # required
1023             job => 'strong_job', # optional attribute
1024             expire => 12*60*60,
1025             status => 'created',
1026             workload => \'Some stuff',
1027             result => \'JOB result comes here',
1028             };
1029              
1030             my $job = Redis::JobQueue::Job->new( $job_data );
1031              
1032             my $resulting_job = $jq->add_job( $job );
1033             # or
1034             $resulting_job = $jq->add_job(
1035             $pre_job,
1036             LPUSH => 1,
1037             );
1038              
1039             If used with the optional argument C 1>, the job is placed at the beginnig of
1040             the queue and will be returned by the next call to get_next_job.
1041              
1042             TTL of job data on Redis server is set in accordance with the L
1043             attribute of the L object. Make
1044             sure it's higher than the time needed to process all the jobs in the queue.
1045              
1046             =cut
1047             sub add_job {
1048 0     0 1   my $self = shift;
1049              
1050 0           $self->_error( E_NO_ERROR );
1051 0 0 0       ref( $_[0] ) eq 'HASH' || _INSTANCE( $_[0], 'Redis::JobQueue::Job' )
1052             || confess $self->_error( E_MISMATCH_ARG );
1053 0 0         my $job = _INSTANCE( $_[0], 'Redis::JobQueue::Job' ) ? shift : Redis::JobQueue::Job->new( shift );
1054              
1055 0           my %args = ( @_ );
1056              
1057 0           my ( $id, $key );
1058 0           do {
1059 0           $self->_call_redis( 'UNWATCH' );
1060 0           $key = $self->_jkey( $id = $uuid->create_str );
1061 0           $self->_call_redis( 'WATCH', $key );
1062             } while ( $self->_call_redis( 'EXISTS', $self->_jkey( $id ) ) );
1063              
1064 0           $job->id( $id );
1065 0           my $expire = $job->expire;
1066              
1067             # transaction start
1068 0           $self->_call_redis( 'MULTI' );
1069              
1070             $self->_call_redis( 'HSET', $key, $_, $job->$_ // q{} )
1071 0   0       for @job_fields;
1072             $self->_call_redis( 'HSET', $key, $_, $job->meta_data( $_ ) // q{} )
1073 0   0       for keys %{ $job->meta_data };
  0            
1074              
1075 0           my $id_in_list = $id;
1076              
1077 0 0         if ( $expire ) {
1078 0           $id_in_list .= ' '.( time + $expire );
1079 0           $self->_call_redis( 'EXPIRE', $key, $expire );
1080             }
1081              
1082 0           $self->_call_redis( 'HSET', $key, $_ID_IN_QUEUE_FIELD, $id_in_list );
1083              
1084 0 0         $self->_call_redis( $args{LPUSH} ? 'LPUSH' : 'RPUSH', $self->_qkey( $job->queue ), $id_in_list );
1085              
1086             # transaction end
1087 0   0       $self->_call_redis( 'EXEC' ) // return;
1088              
1089 0           $job->clear_modified;
1090 0           return $job;
1091             }
1092              
1093             =head3 C
1094              
1095             Data of the job is requested from the Redis server. First argument can be either
1096             a job ID or L object.
1097              
1098             Returns C when the job was not found on Redis server.
1099              
1100             The method returns the jobs data from the Redis server.
1101             See L for the list of standard jobs data fields.
1102              
1103             The method returns a reference to a hash of the standard jobs data fields if only the first
1104             argument is specified.
1105              
1106             If given a key name C<$data_key>, it returns data corresponding to
1107             the key or C when the value is undefined or key is not in the data or metadata.
1108              
1109             The following examples illustrate uses of the C method:
1110              
1111             my $data_href = $jq->get_job_data( $id );
1112             # or
1113             $data_href = $jq->get_job_data( $job );
1114             # or
1115             my $data_key = 'foo';
1116             my $data = $jq->get_job_data( $job->id, $data_key );
1117              
1118             You can specify a list of names of key data or metadata.
1119             In this case it returns the corresponding list of data. For example:
1120              
1121             my ( $status, $foo ) = $jq->get_job_data( $job->id, 'status', 'foo' );
1122              
1123             See L for more informations about
1124             the jobs metadata.
1125              
1126             =cut
1127             sub get_job_data {
1128 0     0 1   my ( $self, $id_source, @data_keys ) = @_;
1129              
1130 0           my $job_id = $self->_get_job_id( $id_source );
1131 0           my $data_fields = scalar @data_keys;
1132 0 0         my @right_keys = map { _STRING( $_ ) || q{} } @data_keys;
  0            
1133 0           my %right_names = map { $_ => 1 } grep { $_ } @right_keys;
  0            
  0            
1134              
1135 0           my @additional = ();
1136 0 0         if ( $data_fields ) {
1137 0 0         if ( exists $right_names{elapsed} ) {
1138 0           foreach my $field ( qw( started completed failed ) ) {
1139 0 0         push @additional, $field if !exists $right_names{ $field };
1140             }
1141             }
1142             } else {
1143 0           @additional = @job_fields;
1144             }
1145 0           my @all_fields = ( @right_keys, @additional );
1146 0           my $total_fields = scalar( @all_fields );
1147              
1148 0           $self->_error( E_NO_ERROR );
1149              
1150 0           my $tm = time;
1151 0           my ( $job_exists, @data ) = $self->_call_redis(
1152             $self->_lua_script_cmd( 'get_job_data' ),
1153             0,
1154             $job_id,
1155             @all_fields,
1156             );
1157              
1158 0 0         return unless $job_exists;
1159              
1160 0           for ( my $i = 0; $i < $total_fields; ++$i ) {
1161 0           my $field = $all_fields[ $i ];
1162 0 0 0       if ( $field ne 'elapsed' && ( $field =~ /^(workload|result)$/ || !$job_fnames{ $field } ) ) {
      0        
1163 0 0         $data[ $i ] = ${ thaw( $data[ $i ] ) }
  0            
1164             if $data[ $i ];
1165             }
1166             }
1167              
1168 0 0         if ( !$data_fields ) {
1169 0           my %result_data;
1170 0           for ( my $i = 0; $i < $total_fields; ++$i ) {
1171 0           $result_data{ $all_fields[ $i ] } = $data[ $i ];
1172             }
1173              
1174 0 0         if ( my $started = $result_data{started} ) {
1175             $result_data{elapsed} = (
1176             $result_data{completed}
1177             || $result_data{failed}
1178             || time
1179 0   0       ) - $started;
1180             } else {
1181 0           $result_data{elapsed} = undef;
1182             }
1183              
1184 0           return \%result_data;
1185             } else {
1186 0           for ( my $i = 0; $i < $data_fields; ++$i ) {
1187 0 0         if ( $right_keys[ $i ] eq 'elapsed' ) {
1188 0 0   0     if ( my $started = $data[ firstidx { $_ eq 'started' } @all_fields ] ) {
  0            
1189             $data[ $i ] = (
1190 0     0     $data[ firstidx { $_ eq 'completed' } @all_fields ]
1191 0   0 0     || $data[ firstidx { $_ eq 'failed' } @all_fields ]
  0            
1192             || $tm
1193             ) - $started;
1194             } else {
1195 0           $data[ $i ] = undef;
1196             }
1197             }
1198              
1199 0 0         return $data[0] unless wantarray;
1200             }
1201              
1202 0           return @data;
1203             }
1204              
1205 0           return;
1206             }
1207              
1208             =head3 C
1209              
1210             The list of names of metadata fields of the job is requested from the Redis server.
1211             First argument can be either a job ID or
1212             L object.
1213              
1214             Returns empty list when the job was not found on Redis server or
1215             the job does not have metadata.
1216              
1217             The following examples illustrate uses of the C method:
1218              
1219             my @fields = $jq->get_job_meta_fields( $id );
1220             # or
1221             @fields = $jq->get_job_meta_fields( $job );
1222             # or
1223             @fields = $jq->get_job_meta_fields( $job->id );
1224              
1225             See L for more informations about
1226             the jobs metadata.
1227              
1228             =cut
1229             sub get_job_meta_fields {
1230 0     0 1   my ( $self, $id_source ) = @_;
1231              
1232 0 0         return grep { !$job_fnames{ $_ } && $_ ne $_ID_IN_QUEUE_FIELD } $self->_call_redis( 'HKEYS', $self->_jkey( $self->_get_job_id( $id_source ) ) );
  0            
1233             }
1234              
1235             =head3 C
1236              
1237             Loads job data from the Redis server. The argument is either job ID or
1238             a L object.
1239              
1240             Method returns the object corresponding to the loaded job.
1241             Returns C if the job is not found on the Redis server.
1242              
1243             The following examples illustrate uses of the C method:
1244              
1245             $job = $jq->load_job( $id );
1246             # or
1247             $job = $jq->load_job( $job );
1248              
1249             =cut
1250             sub load_job {
1251 0     0 1   my ( $self, $id_source ) = @_;
1252              
1253 0           my $job_id = $self->_get_job_id( $id_source );
1254              
1255 0           $self->_error( E_NO_ERROR );
1256              
1257 0           my ( $job_exists, @job_data ) = $self->_call_redis(
1258             $self->_lua_script_cmd( 'load_job' ),
1259             0,
1260             $job_id,
1261             @job_fields,
1262             );
1263              
1264 0 0         return unless $job_exists;
1265              
1266 0           my ( $pre_job, $key, $val );
1267 0           while ( @job_data ) {
1268 0           $key = shift @job_data;
1269 0           $val = shift @job_data;
1270 0 0         if ( $job_fnames{ $key } ) {
1271 0           $pre_job->{ $key } = $val;
1272             } else {
1273 0           $pre_job->{meta_data}->{ $key } = $val;
1274             }
1275             }
1276              
1277 0           foreach my $field ( qw( workload result ) ) {
1278 0           $pre_job->{ $field } = ${ thaw( $pre_job->{ $field } ) }
1279 0 0         if $pre_job->{ $field };
1280             }
1281 0 0         if ( $pre_job->{meta_data} ) {
1282 0           my $meta_data = $pre_job->{meta_data};
1283 0           foreach my $field ( keys %$meta_data ) {
1284 0           $meta_data->{ $field } = ${ thaw( $meta_data->{ $field } ) }
1285 0 0         if $meta_data->{ $field };
1286             }
1287             }
1288              
1289 0           my $new_job = Redis::JobQueue::Job->new( $pre_job );
1290 0           $new_job->clear_modified;
1291              
1292 0           return $new_job;
1293             }
1294              
1295             =head3 C $queue_name, blocking =E 1 )>
1296              
1297             Selects the job identifier which is at the beginning of the queue.
1298              
1299             C takes arguments in key-value pairs.
1300             You can specify a queue name or a reference to an array of queue names.
1301             Queues from the list are processed in random order.
1302              
1303             By default, each queue is processed in a separate request with the result
1304             returned immediately if a job is found (waiting) in that queue. If no waiting
1305             job found, returns undef.
1306             In case optional C argument is true, all queues are processed in
1307             a single request to Redis server and if no job is found waiting in queue(s),
1308             get_next_job execution will be paused for up to C seconds or until
1309             a job becomes available in any of the listed queues.
1310              
1311             Use C = 0 for an unlimited wait time.
1312             Default - C is false (0).
1313              
1314             Method returns the job object corresponding to the received job identifier.
1315             Returns the C if there is no job in the queue.
1316              
1317             These examples illustrate a C call with all the valid arguments:
1318              
1319             $job = $jq->get_next_job(
1320             queue => 'xxx',
1321             blocking => 1,
1322             );
1323             # or
1324             $job = $jq->get_next_job(
1325             queue => [ 'aaa', 'bbb' ],
1326             blocking => 1,
1327             );
1328              
1329             TTL job data for the job resets on the Redis server in accordance with
1330             the L attribute of the job object.
1331              
1332             =cut
1333             sub get_next_job {
1334 0     0 1   my $self = shift;
1335              
1336 0 0         !( scalar( @_ ) % 2 )
1337             || confess $self->_error( E_MISMATCH_ARG );
1338 0           my %args = ( @_ );
1339 0           my $queues = $args{queue};
1340 0           my $blocking = $args{blocking};
1341 0           my $only_id = $args{_only_id};
1342              
1343 0 0 0       $queues = [ $queues // () ]
1344             if !ref( $queues );
1345              
1346 0           foreach my $arg ( ( @{$queues} ) ) {
  0            
1347 0 0         defined _STRING( $arg )
1348             or confess $self->_error( E_MISMATCH_ARG );
1349             }
1350              
1351 0           my @keys = ();
1352 0           push @keys, map { $self->_qkey( $_ ) } @$queues;
  0            
1353              
1354 0           $self->_error( E_NO_ERROR );
1355              
1356 0 0         if ( @keys ) {
1357 0           @keys = shuffle( @keys );
1358              
1359 0           my $full_id;
1360 0 0         if ( $blocking ) {
1361             # 'BLPOP' waiting time of a given $self->timeout parameter
1362 0           my @cmd = ( 'BLPOP', ( @keys ), $self->timeout );
1363 0           while (1) {
1364 0           ( undef, $full_id ) = $self->_call_redis( @cmd );
1365             # if the job is no longer
1366 0 0         last unless $full_id;
1367              
1368 0           my $ret = $self->_get_next_job( $full_id, $only_id );
1369 0 0         return $ret if $ret;
1370             }
1371             } else {
1372             # 'LPOP' takes only one queue name at a time
1373 0           foreach my $key ( @keys ) {
1374 0 0         next unless $self->_call_redis( 'EXISTS', $key );
1375 0           my @cmd = ( 'LPOP', $key );
1376 0           while (1) {
1377 0           $full_id = $self->_call_redis( @cmd );
1378             # if the job is no longer
1379 0 0         last unless $full_id;
1380              
1381 0           my $ret = $self->_get_next_job( $full_id, $only_id );
1382 0 0         return $ret if $ret;
1383             }
1384             }
1385             }
1386             }
1387              
1388 0           return;
1389             }
1390              
1391             =head3 C $queue_name, blocking =E 1 )>
1392              
1393             Like L, but returns job identifier only.
1394              
1395             TTL job data for the job does not reset on the Redis server.
1396              
1397             =cut
1398             sub get_next_job_id {
1399 0     0 1   my $self = shift;
1400              
1401 0           return $self->get_next_job( @_, _only_id => 1 );
1402             }
1403              
1404             sub _get_next_job {
1405 0     0     my ( $self, $full_id, $only_id ) = @_;
1406              
1407 0           my ( $id, $expire_time ) = split ' ', $full_id;
1408 0           my $key = $self->_jkey( $id );
1409 0 0         if ( $self->_call_redis( 'EXISTS', $key ) ) {
1410 0 0         if ( $only_id ) {
1411 0           return $id;
1412             } else {
1413 0           my $job = $self->load_job( $id );
1414 0 0         if ( my $expire = $job->expire ) {
1415 0           $self->_call_redis( 'EXPIRE', $key, $expire );
1416             }
1417 0           return $job;
1418             }
1419             } else {
1420 0 0 0       if ( !$expire_time || time < $expire_time ) {
1421 0           confess format_message( '%s %s', $id, $self->_error( E_JOB_DELETED ) );
1422             }
1423             # If the queue contains a job identifier that has already been removed due
1424             # to expiration, the cycle will ensure the transition
1425             # to the next job ID selection
1426 0           return;
1427             }
1428              
1429 0           return;
1430             }
1431              
1432             =head3 C
1433              
1434             Saves job data changes to the Redis server. Job ID is obtained from
1435             the argument, which can be a L
1436             object.
1437              
1438             Returns the number of attributes that were updated if the job was found on the Redis
1439             server and C if it was not.
1440             When you change a single attribute, returns C<2> because L also changes.
1441              
1442             Changing the L attribute is ignored.
1443              
1444             The following examples illustrate uses of the C method:
1445              
1446             $jq->update_job( $job );
1447              
1448             TTL job data for the job resets on the Redis server in accordance with
1449             the L attribute of the job object.
1450              
1451             =cut
1452             sub update_job {
1453 0     0 1   my ( $self, $job ) = @_;
1454              
1455 0 0         _INSTANCE( $job, 'Redis::JobQueue::Job' )
1456             or confess $self->_error( E_MISMATCH_ARG );
1457              
1458 0           my @modified = $job->modified_attributes;
1459 0 0         return 0 unless @modified;
1460              
1461 0           $self->_error( E_NO_ERROR );
1462              
1463 0           my $id = $job->id;
1464 0           my $key = $self->_jkey( $id );
1465 0           $self->_call_redis( 'WATCH', $key );
1466 0 0         if ( !$self->_call_redis( 'EXISTS', $key ) ) {
1467 0           $self->_call_redis( 'UNWATCH' );
1468 0           return;
1469             }
1470              
1471             # transaction start
1472 0           $self->_call_redis( 'MULTI' );
1473              
1474 0           my $expire = $job->expire;
1475 0 0         if ( $expire ) {
1476 0           $self->_call_redis( 'EXPIRE', $key, $expire );
1477             } else {
1478 0           $self->_call_redis( 'PERSIST', $key );
1479             }
1480              
1481 0           my $updated = 0;
1482 0           foreach my $field ( @modified ) {
1483 0 0 0       if ( !$job_fnames{ $field } ) {
    0          
1484 0   0       $self->_call_redis( 'HSET', $key, $field, $job->meta_data( $field ) // q{} );
1485 0           ++$updated;
1486             } elsif ( $field ne 'expire' && $field ne 'id' ) {
1487 0   0       $self->_call_redis( 'HSET', $key, $field, $job->$field // q{} );
1488 0           ++$updated;
1489             } else {
1490             # Field 'expire' and 'id' shall remain unchanged
1491             }
1492             }
1493              
1494             # transaction end
1495 0   0       $self->_call_redis( 'EXEC' ) // return;
1496 0           $job->clear_modified;
1497              
1498 0           return $updated;
1499             }
1500              
1501             =head3 C
1502              
1503             Deletes job's data from Redis server.
1504             The Job ID is obtained from the argument, which can be either a string or
1505             a L object.
1506              
1507             Returns true if job and its metadata was successfully deleted from Redis server.
1508             False if jobs or its metadata wasn't found.
1509              
1510             The following examples illustrate uses of the C method:
1511              
1512             $jq->delete_job( $job );
1513             # or
1514             $jq->delete_job( $id );
1515              
1516             Use this method soon after receiving the results of the job to free memory on
1517             the Redis server.
1518              
1519             See description of the C data structure used on the Redis server
1520             at the L section.
1521              
1522             Note that job deletion time is proportional to number of jobs currently in the queue.
1523              
1524             =cut
1525             sub delete_job {
1526 0     0 1   my ( $self, $id_source ) = @_;
1527              
1528 0 0 0       defined( _STRING( $id_source ) ) || _INSTANCE( $id_source, 'Redis::JobQueue::Job' )
1529             || confess $self->_error( E_MISMATCH_ARG );
1530              
1531 0           $self->_error( E_NO_ERROR );
1532              
1533 0 0         return $self->_call_redis(
1534             $self->_lua_script_cmd( 'delete_job' ),
1535             0,
1536             ref( $id_source ) ? $id_source->id : $id_source,
1537             );
1538             }
1539              
1540             =head3 C
1541              
1542             Gets list of job IDs on the Redis server.
1543             These IDs are identifiers of job data structures, not only the identifiers which
1544             get derived from the queue by L.
1545              
1546             The following examples illustrate simple uses of the C method
1547             (IDs of all existing jobs):
1548              
1549             @jobs = $jq->get_job_ids;
1550              
1551             These are identifiers of jobs data structures related to the queue
1552             determined by the arguments.
1553              
1554             C takes arguments in key-value pairs.
1555             You can specify a queue name or job status
1556             (or a reference to an array of queue names or job statuses)
1557             to filter the list of identifiers.
1558              
1559             You can also specify an argument C (true or false).
1560              
1561             When filtering by the names of the queues and C is set to true,
1562             only the identifiers of the jobs which have not yet been derived from
1563             the queues using L are returned.
1564              
1565             Filtering by status returns the task IDs whose status is exactly the same
1566             as the specified status.
1567              
1568             The following examples illustrate uses of the C method:
1569              
1570             # filtering the identifiers of jobs data structures
1571             @ids = $jq->get_job_ids( queue => 'some_queue' );
1572             # or
1573             @ids = $jq->get_job_ids( queue => [ 'foo_queue', 'bar_queue' ] );
1574             # or
1575             @ids = $jq->get_job_ids( status => STATUS_COMPLETED );
1576             # or
1577             @ids = $jq->get_job_ids( status => [ STATUS_COMPLETED, STATUS_FAILED ] );
1578              
1579             # filter the IDs are in the queues
1580             @ids = $jq->get_job_ids( queued => 1, queue => 'some_queue' );
1581             # etc.
1582              
1583             =cut
1584             sub get_job_ids {
1585 0     0 1   my $self = shift;
1586              
1587 0 0         confess format_message( '%s (Odd number of elements in hash assignment)', $self->_error( E_MISMATCH_ARG ) )
1588             if ( scalar( @_ ) % 2 );
1589              
1590 0           my %args = @_;
1591              
1592             # Here are the arguments to references to arrays
1593 0           foreach my $field ( qw( queue status ) ) {
1594 0 0 0       $args{ $field } = [ $args{ $field } ] if exists( $args{ $field } ) && ref( $args{ $field } ) ne 'ARRAY';
1595             }
1596              
1597 0           my @queues = grep { _STRING( $_ ) } @{ $args{queue} };
  0            
  0            
1598 0           my @statuses = grep { _STRING( $_ ) } @{ $args{status} };
  0            
  0            
1599              
1600 0           $self->_error( E_NO_ERROR );
1601              
1602             my @ids = $self->_call_redis(
1603             $self->_lua_script_cmd( 'get_job_ids' ),
1604             0,
1605 0 0         $args{queued} ? 1 : 0,
    0          
    0          
1606             scalar( @queues ), # the number of queues to filter
1607             scalar( @queues ) ? @queues : (), # the queues to filter
1608             scalar( @statuses ) ? @statuses : (), # the statuses to filter
1609             );
1610              
1611 0           return @ids;
1612             }
1613              
1614             =head3 C
1615              
1616             Returns the address of the Redis server used by the queue
1617             (in form of 'host:port').
1618              
1619             The following example illustrates use of the C method:
1620              
1621             $redis_address = $jq->server;
1622              
1623             =cut
1624             sub server {
1625 0     0 1   my ( $self ) = @_;
1626              
1627 0           return $self->_server;
1628             }
1629              
1630             =head3 C
1631              
1632             This command is used to test connection to Redis server.
1633              
1634             Returns 1 if a connection is still alive or 0 otherwise.
1635              
1636             The following example illustrates use of the C method:
1637              
1638             $is_alive = $jq->ping;
1639              
1640             External connections to the server object (eg, C <$redis = Redis->new( ... );>),
1641             and the queue object can continue to work after calling ping only if the method returned 1.
1642              
1643             If there is no connection to the Redis server (methods return 0), the connection to the server closes.
1644             In this case, to continue working with the queue,
1645             you must re-create the C object with the L method.
1646             When using an external connection to the server,
1647             to check the connection to the server you can use the C<$redis-Eecho( ... )> call.
1648             This is useful to avoid closing the connection to the Redis server unintentionally.
1649              
1650             =cut
1651             sub ping {
1652 0     0 1   my ( $self ) = @_;
1653              
1654 0           $self->_error( E_NO_ERROR );
1655              
1656 0           my $ret = $self->_redis->ping;
1657              
1658 0 0 0       $ret = ( $ret // '' ) eq 'PONG' ? 1 : 0;
1659              
1660 0           return $ret;
1661             }
1662              
1663             =head3 C
1664              
1665             Closes connection to the Redis server.
1666              
1667             The following example illustrates use of the C method:
1668              
1669             $jq->quit;
1670              
1671             It does not close the connection to the Redis server if it is an external connection provided
1672             to queue constructor as existing L object.
1673             When using an external connection (eg, C<$redis = Redis-E new (...);>),
1674             to close the connection to the Redis server, call C<$redis-Equit> after calling this method.
1675              
1676             =cut
1677             sub quit {
1678 0     0 1   my ( $self ) = @_;
1679              
1680 0 0 0       return if $] >= 5.14 && ${^GLOBAL_PHASE} eq 'DESTRUCT';
1681              
1682 0           $self->_error( E_NO_ERROR );
1683 0 0         $self->_redis->quit unless $self->_use_external_connection;
1684              
1685 0           return;
1686             }
1687              
1688             =head3 C
1689              
1690             Gets queue status from the Redis server.
1691             Queue name is obtained from the argument. The argument can be either a
1692             string representing a queue name or a
1693             L object.
1694              
1695             Returns a reference to a hash describing state of the queue or a reference
1696             to an empty hash when the queue wasn't found.
1697              
1698             The following examples illustrate uses of the C method:
1699              
1700             $qstatus = $jq->queue_status( $queue_name );
1701             # or
1702             $qstatus = $jq->queue_status( $job );
1703              
1704             The returned hash contains the following information related to the queue:
1705              
1706             =over 3
1707              
1708             =item * C
1709              
1710             The number of jobs in the active queue that are waiting to be selected by L and then processed.
1711              
1712             =item * C
1713              
1714             The number of ALL jobs tagged with the queue, i.e. including those that were processed before and other jobs,
1715             not presently waiting in the active queue.
1716              
1717             =item * C
1718              
1719             The age of the oldest job (the lifetime of the queue) in the active queue.
1720              
1721             =item * C
1722              
1723             The age of the youngest job in the active queue.
1724              
1725             =item * C
1726              
1727             Time it currently takes for a job to pass through the queue.
1728              
1729             =back
1730              
1731             Statistics based on the jobs that have not yet been removed.
1732             Some fields may be missing if the status of the job prevents determining
1733             the desired information (eg, there are no jobs in the queue).
1734              
1735             =cut
1736             # Statistics based on the jobs that have not yet been removed
1737             sub queue_status {
1738 0     0 1   my ( $self, $maybe_queue ) = @_;
1739              
1740 0 0 0       defined( _STRING( $maybe_queue ) ) || _INSTANCE( $maybe_queue, 'Redis::JobQueue::Job' )
1741             || confess $self->_error( E_MISMATCH_ARG );
1742              
1743 0 0         $maybe_queue = $maybe_queue->queue
1744             if ref $maybe_queue;
1745              
1746 0           $self->_error( E_NO_ERROR );
1747              
1748 0           my %qstatus = $self->_call_redis(
1749             $self->_lua_script_cmd( 'queue_status' ),
1750             0,
1751             $maybe_queue,
1752             Time::HiRes::time(),
1753             );
1754              
1755 0           return \%qstatus;
1756             }
1757              
1758             =head3 C
1759              
1760             Gets queue length from the Redis server.
1761             Queue name is obtained from the argument. The can be a string representing a queue name
1762             or a L object.
1763              
1764             If queue does not exist, it is interpreted as an empty list and 0 is returned.
1765              
1766             The following examples illustrate uses of the C method:
1767              
1768             $queue_length = $jq->queue_length( $queue_name );
1769             # or
1770             $queue_length = $jq->queue_status( $job );
1771              
1772             =cut
1773             sub queue_length {
1774 0     0 1   my ( $self, $maybe_queue ) = @_;
1775              
1776 0 0 0       defined( _STRING( $maybe_queue ) ) || _INSTANCE( $maybe_queue, 'Redis::JobQueue::Job' )
1777             || confess $self->_error( E_MISMATCH_ARG );
1778              
1779 0 0         $maybe_queue = $maybe_queue->queue
1780             if ref $maybe_queue;
1781              
1782 0           $self->_error( E_NO_ERROR );
1783              
1784 0           my $queue_lenght = $self->_call_redis( 'LLEN', $self->_qkey( $maybe_queue ) );
1785              
1786 0           return $queue_lenght;
1787             }
1788              
1789             #-- private methods ------------------------------------------------------------
1790              
1791             sub _redis_exception {
1792 0     0     my ( $self, $error ) = @_;
1793              
1794 0           my $err_msg = '';
1795              
1796             # Use the error messages from Redis.pm
1797 0 0 0       if (
    0 0        
    0 0        
      0        
      0        
1798             $error =~ /Could not connect to Redis server at / # not in start if not reconnected
1799             || $error =~ /^Can't close socket: /
1800             || $error =~ /^Not connected to any server/
1801             # Maybe for pub/sub only
1802             || $error =~ /^Error while reading from Redis server: /
1803             || $error =~ /^Redis server closed connection/
1804             ) {
1805 0           $self->_error( E_NETWORK );
1806 0 0 0       $err_msg = $self->_do_reconnect( E_NETWORK, $err_msg ) if !$self->_transaction && $self->reconnect_on_error;
1807             } elsif (
1808             $error =~ /^\[[^]]+\]\s+NOSCRIPT No matching script. Please use EVAL./
1809             ) {
1810 0           $self->_clear_sha1;
1811 0           return 1;
1812             } elsif (
1813             $error =~ /[\S+] ERR command not allowed when used memory > 'maxmemory'/
1814             || $error =~ /[\S+] OOM command not allowed when used memory > 'maxmemory'/
1815             ) {
1816 0           $self->_error( E_MAX_MEMORY_LIMIT );
1817             } else {
1818 0           $self->_error( E_REDIS );
1819 0 0 0       $err_msg = $self->_do_reconnect( E_REDIS, $err_msg ) if !$self->_transaction && $self->reconnect_on_error;
1820             }
1821              
1822 0 0         if ( $self->_transaction ) {
1823             try {
1824 0     0     $self->_redis->discard;
1825 0           };
1826 0           $self->_transaction( 0 );
1827             }
1828              
1829 0           die( format_message( '%s %s', $error, $err_msg ) );
1830              
1831 0           return;
1832             }
1833              
1834             sub _redis_constructor {
1835 0     0     my ( $self ) = @_;
1836              
1837 0           $self->_error( E_NO_ERROR );
1838 0           my $redis;
1839              
1840             try {
1841 0     0     $redis = Redis->new(
1842             server => $self->_server,
1843             );
1844             } catch {
1845 0     0     $self->_redis_exception( $_ );
1846 0           };
1847 0           return $redis;
1848             }
1849              
1850             # Keep in mind the default 'redis.conf' values:
1851             # Close the connection after a client is idle for N seconds (0 to disable)
1852             # timeout 300
1853              
1854             # Send a request to Redis
1855             sub _call_redis {
1856 0     0     my $self = shift;
1857 0           my $method = shift;
1858              
1859 0           $self->_error( E_NO_ERROR );
1860              
1861 0 0 0       $self->_wait_used_memory if $self && $method =~ /^EVAL/i;
1862              
1863 0           my @result;
1864             my $error;
1865             # if you use "$method eq 'HSET'" then use $_[0..2] to reduce data copies
1866             # $_[0] - key
1867             # $_[1] - field
1868             # $_[2] - value
1869 0 0 0       if ( $method eq 'HSET' && $_[1] eq $_ID_IN_QUEUE_FIELD ) {
    0 0        
    0 0        
      0        
1870 0           my ( $key, $field, $value ) = @_;
1871             try {
1872 0     0     @result = $self->_redis->$method(
1873             $key,
1874             $field,
1875             $value,
1876             );
1877             } catch {
1878 0     0     $error = $_;
1879 0           };
1880             } elsif ( $method eq 'HSET' && ( $_[1] =~ /^(workload|result)$/ || !$job_fnames{ $_[1] } ) ) {
1881 0           my $data_ref = \nfreeze( \$_[2] );
1882              
1883 0 0         if ( length( $$data_ref ) > $self->max_datasize ) {
1884 0 0         if ( $self->_transaction ) {
1885             try {
1886 0     0     $self->_redis->discard;
1887 0           };
1888 0           $self->_transaction( 0 );
1889             }
1890             # 'die' as maybe too long to analyze the data output from the 'confess'
1891 0           die format_message( '%s: %s', $self->_error( E_DATA_TOO_LARGE ), $_[1] );
1892             }
1893              
1894 0           my ( $key, $field ) = @_;
1895             try {
1896 0     0     @result = $self->_redis->$method(
1897             $key,
1898             $field,
1899             $$data_ref,
1900             );
1901             } catch {
1902 0     0     $error = $_;
1903 0           };
1904             } elsif ( $method eq 'HSET' && utf8::is_utf8( $_[2] ) ) {
1905             # In our case, the user sends data to the job queue and then gets it back.
1906             # Workload and result are fine with Unicode - they're properly serialized by Storable and stored as bytes in Redis;
1907             # Storable takes care about Unicode etc.
1908             # The main string job data fields (id, queue, job, status, message),
1909             # however, is not serialized for performance and convenience reasons, and stored in Redis as-is.
1910             # If there is Unicode in these fields, we have the following options:
1911             # 1. Assume everything is Unicode, turn utf-8 encoding in Redis.pm settings and take a substantial performance hit;
1912             # as we store the biggest parts of job data - workload and result - serialized already,
1913             # encoding and decoding them again is not a good idea.
1914             # 2. Assume that all fields is Unicode, encode and decode it;
1915             # this may lead to subtle errors if user provides it which is binary, not Unicode.
1916             # 3. Detect Unicode data and store "utf-8" flag along with data on redis,
1917             # to decode only utf-8 data when it is requested by user.
1918             # This makes data management more complicated.
1919             # 4. Assume that data is for application internal use,
1920             # and that application must ensure that it does not contain Unicode;
1921             # if Unicode is really needed, it should be either stored in workload or result,
1922             # or the application must take care about encoding and decoding Unicode data before sending to the job queue.
1923             # The job queue will throw an exception if Unicode data is encountered.
1924             #
1925             # We choose (4) as it is consistent, does not degrade performance and does not cause subtle errors with damaged data.
1926              
1927             # For non-serialized fields: UTF8 can not be transferred to the Redis server
1928 0           confess format_message( '%s (utf8 in %s)', $self->_error( E_MISMATCH_ARG ), $_[1] );
1929             } else {
1930 0           my $try_again;
1931 0           my @args = @_;
1932             RUN_METHOD: {
1933 0           try {
1934 0     0     @result = $self->_redis->$method( @args );
1935             } catch {
1936 0     0     $error = $_;
1937 0           $try_again = $self->_redis_exception( $error );
1938 0           };
1939              
1940 0 0 0       if ( $try_again && $method eq 'EVALSHA' ) {
1941 0           $self->_lua_scripts->{ $_running_script_name } = $args[0]; # sha1
1942 0           $args[0] = $_running_script_body;
1943 0           $method = 'EVAL';
1944 0           redo RUN_METHOD;
1945             }
1946             }
1947             }
1948              
1949 0 0         $self->_redis_exception( $error )
1950             if $error;
1951              
1952 0 0         $self->_transaction( 1 )
1953             if $method eq 'MULTI';
1954 0 0         if ( $method eq 'EXEC' ) {
1955 0           $self->_transaction( 0 );
1956 0   0       $result[0] // return; # 'WATCH': the transaction is not entered at all
1957             }
1958              
1959 0 0 0       if ( $method eq 'HGET' and $_[1] =~ /^(workload|result)$/ ) {
1960 0 0         if ( $result[0] ) {
1961 0           $result[0] = ${ thaw( $result[0] ) };
  0            
1962 0 0         $result[0] = ${ $result[0] } if ref( $result[0] ) eq 'SCALAR';
  0            
1963             }
1964             }
1965              
1966 0 0         return wantarray ? @result : $result[0];
1967             }
1968              
1969             sub _wait_used_memory {
1970 0 0   0     return unless $WAIT_USED_MEMORY;
1971              
1972 0           my ( $self ) = @_;
1973              
1974 0           my ( undef, $maxmemory ) = $self->_call_redis( 'CONFIG', 'GET', 'maxmemory' );
1975 0           my $sleepped;
1976 0 0         if ( $maxmemory ) {
1977 0   0       my $max_timeout = $self->operation_timeout || DEFAULT_OPERATION_TIMEOUT;
1978 0           my $timeout = 0.01;
1979 0           $sleepped = 0;
1980             WAIT_USED_MEMORY: {
1981 0           my ( $used_memory ) = $self->_call_redis( 'INFO', 'memory' ) =~ /used_memory:(\d+)/;
  0            
1982 0 0 0       if ( $used_memory < $maxmemory || $sleepped > $max_timeout ) {
1983 0           last WAIT_USED_MEMORY;
1984             }
1985              
1986 0           Time::HiRes::sleep $timeout;
1987 0           $sleepped += $timeout;
1988             # $self->_redis->connect;
1989 0           redo WAIT_USED_MEMORY;
1990             }
1991             }
1992              
1993 0           return $sleepped;
1994             }
1995              
1996             sub _reconnect {
1997 0     0     my ( $self ) = @_;
1998              
1999 0 0 0       if ( !$self->_transaction && $self->reconnect_on_error && !$self->ping ) {
      0        
2000 0           my $err_msg = $self->_do_reconnect;
2001 0 0         $self->_redis_exception( $err_msg )
2002             if $err_msg;
2003             }
2004             }
2005              
2006             sub _do_reconnect {
2007 0     0     my $self = shift;
2008 0   0       my $err = shift // 0;
2009 0           my $msg = shift;
2010              
2011 0           my $err_msg = '';
2012 0 0 0       if (
      0        
      0        
      0        
2013             !$err || (
2014             $err != E_MISMATCH_ARG
2015             && $err != E_DATA_TOO_LARGE
2016             && $err != E_MAX_MEMORY_LIMIT
2017             && $err != E_JOB_DELETED
2018             )
2019             ) {
2020             try {
2021             # NOTE: socket recreated
2022 0     0     $self->_redis->quit;
2023 0           $self->_redis->connect;
2024             } catch {
2025 0     0     my $error = $_;
2026 0           $err_msg = "(Not reconnected: $error)";
2027 0           };
2028             }
2029              
2030 0 0         if ( $err_msg ) {
2031 0 0         $msg = defined( $msg )
    0          
2032             ? ( $msg ? "$msg " : '' )."($err_msg)"
2033             : $err_msg;
2034             }
2035              
2036 0           return $msg;
2037             }
2038              
2039             sub _jkey {
2040 0     0     my $self = shift;
2041              
2042 0           local $" = ':';
2043 0           return "$NAMESPACE:@_";
2044             }
2045              
2046             sub _qkey {
2047 0     0     my $self = shift;
2048              
2049 0           local $" = ':';
2050 0           return "$NAMESPACE:queue:@_";
2051             }
2052              
2053             sub _get_job_id {
2054 0     0     my ( $self, $id_source ) = @_;
2055              
2056 0 0 0       defined( _STRING( $id_source ) ) || _INSTANCE( $id_source, 'Redis::JobQueue::Job' )
2057             || confess $self->_error( E_MISMATCH_ARG );
2058              
2059 0 0         return ref( $id_source ) ? $id_source->id : $id_source;
2060             }
2061              
2062             sub _lua_script_cmd {
2063 0     0     my ( $self, $name ) = @_;
2064              
2065 0           $_running_script_name = $name;
2066 0           $_running_script_body = $lua_script_body{ $name };
2067              
2068 0           my $sha1 = $self->_lua_scripts->{ $_running_script_name };
2069 0 0         unless ( $sha1 ) {
2070 0           $sha1 = $self->_lua_scripts->{ $_running_script_name } = sha1_hex( $_running_script_body );
2071 0 0         unless ( ( $self->_call_redis( 'SCRIPT', 'EXISTS', $sha1 ) )[0] ) {
2072 0           return( 'EVAL', $_running_script_body );
2073             }
2074             }
2075 0           return( 'EVALSHA', $sha1 );
2076             }
2077              
2078             sub _clear_sha1 {
2079 0     0     my ( $self ) = @_;
2080              
2081 0           $self->_lua_scripts( {} );
2082              
2083 0           return;
2084             }
2085              
2086             sub _error {
2087 0     0     my ( $self, $error_code ) = @_;
2088              
2089 0           $self->_set_last_errorcode( $error_code );
2090 0           return $self->last_error;
2091             }
2092              
2093             #-- Closes and cleans up -------------------------------------------------------
2094              
2095 62     62   409 no Mouse::Util::TypeConstraints;
  62         97  
  62         582  
2096 62     62   8001 no Mouse; # keywords are removed from the package
  62         74  
  62         263  
2097             __PACKAGE__->meta->make_immutable();
2098              
2099             __END__