File Coverage

blib/lib/Disbatch.pm
Criterion Covered Total %
statement 41 279 14.7
branch 0 82 0.0
condition 0 64 0.0
subroutine 14 59 23.7
pod 26 27 96.3
total 81 511 15.8


line stmt bran cond sub pod time code
1             package Disbatch;
2             $Disbatch::VERSION = '3.990';
3 2     2   1129585 use 5.12.0;
  2         6  
4 2     2   8 use warnings;
  2         3  
  2         57  
5              
6 2     2   394 use boolean 0.25;
  2         750  
  2         11  
7 2     2   110 use Cpanel::JSON::XS;
  2         3  
  2         87  
8 2     2   6 use Data::Dumper;
  2         2  
  2         60  
9 2     2   468 use Encode;
  2         6627  
  2         120  
10 2     2   467 use File::Slurp;
  2         8314  
  2         103  
11 2     2   1436 use Log::Log4perl;
  2         65266  
  2         12  
12 2     2   539 use MongoDB 1.0.4;
  2         1009789  
  2         54  
13 2     2   17 use POSIX 'setsid';
  2         3  
  2         18  
14 2     2   146 use Safe::Isa;
  2         3  
  2         210  
15 2     2   8 use Sys::Hostname;
  2         3  
  2         74  
16 2     2   878 use Time::Moment;
  2         2362  
  2         55  
17 2     2   413 use Try::Tiny::Retry;
  2         853  
  2         6935  
18              
19             my $json = Cpanel::JSON::XS->new->utf8;
20              
21             my $default_log4perl = {
22             level => 'DEBUG', # 'TRACE'
23             appenders => {
24             filelog => {
25             type => 'Log::Log4perl::Appender::File',
26             layout => '[%p] %d %F{1} %L %C %c> %m %n',
27             args => { filename => '/var/log/disbatchd.log' }, # 'disbatchd.log'
28             },
29             screenlog => {
30             type => 'Log::Log4perl::Appender::ScreenColoredLevels',
31             layout => '[%p] %d %F{1} %L %C %c> %m %n',
32             args => { },
33             }
34             }
35             };
36              
37             sub new {
38 0     0 1   my $class = shift;
39 0           my $self = { @_ };
40 0           $self->{node} = hostname;
41 0   0       $self->{class} //= 'Disbatch';
42 0           $self->{class} = lc $self->{class};
43 0           bless $self, $class;
44             }
45              
46             sub logger {
47 0     0 1   my ($self, $type) = @_;
48 0 0         my $logger = defined $type ? "$self->{class}.$type" : $self->{class};
49 0   0       $self->{loggers}{$logger} //= Log::Log4perl->get_logger($logger);
50 0 0         if (!keys %{Log::Log4perl->appenders}){
  0            
51 0           $self->{loggers}{$logger}->level($self->{config}{log4perl}{level});
52 0           my $default_layout = "[%p] %d %F{1} %L %C %c> %m %n";
53 0           for my $name (keys %{$self->{config}{log4perl}{appenders}}) {
  0            
54 0           my $ap = Log::Log4perl::Appender->new($self->{config}{log4perl}{appenders}{$name}{type}, name => $name, %{$self->{config}{log4perl}{appenders}{$name}{args}});
  0            
55 0   0       $ap->layout(Log::Log4perl::Layout::PatternLayout->new($self->{config}{log4perl}{appenders}{$name}{layout} // $default_layout));
56 0           $self->{loggers}{$logger}->add_appender($ap);
57             }
58             }
59 0           $self->{loggers}{$logger};
60             }
61              
62             sub mongo {
63 0     0 1   my ($self) = @_;
64 0 0         return $self->{mongo} if defined $self->{mongo};
65 0           my %attributes = %{$self->{config}{attributes}};
  0            
66 0 0         if (keys %{$self->{config}{auth}}) {
  0            
67 0           my $username = 'plugin';
68 0 0         $username = 'disbatchd' if $self->{class} eq 'disbatch';
69 0 0         $username = 'disbatch_web' if $self->{class} eq 'disbatch::web';
70 0 0         $username = 'task_runner' if $self->{class} eq 'task_runner';
71 0           $attributes{username} = $username;
72 0           $attributes{password} = $self->{config}{auth}{$username};
73 0           $attributes{db_name} = $self->{config}{database};
74             }
75 0           warn "Connecting ", scalar localtime;
76 0           $self->{mongo} = MongoDB->connect($self->{config}{mongohost}, \%attributes)->get_database($self->{config}{database}) ;
77             }
78 0     0 1   sub nodes { $_[0]->mongo->coll('nodes') }
79 0     0 1   sub queues { $_[0]->mongo->coll('queues') }
80 0     0 1   sub tasks { $_[0]->mongo->coll('tasks') }
81              
82             # loads the config file at startup.
83             # anything in the config file at startup is static and cannot be changed without restarting disbatchd
84             sub load_config {
85 0     0 1   my ($self) = @_;
86 0 0         if (!defined $self->{config}) {
87             $self->{config} = try {
88 0     0     $json->relaxed->decode(scalar read_file($self->{config_file}));
89             } catch {
90 0     0     $self->{config}{log4perl} = $default_log4perl;
91 0           $self->logger->logdie("Could not parse $self->{config_file}: $_");
92 0           };
93             # Ensure defaults:
94 0   0       $self->{config}{attributes} //= {};
95 0   0       $self->{config}{auth} //= {};
96 0   0       $self->{config}{gfs} //= 'auto';
97 0   0       $self->{config}{quiet} //= false;
98 0   0       $self->{config}{task_runner} //= '/usr/bin/task_runner';
99 0   0       $self->{config}{testing} //= false;
100 0   0       $self->{config}{log4perl} //= $default_log4perl;
101 0   0       $self->{config}{activequeues} //= [];
102 0   0       $self->{config}{ignorequeues} //= [];
103 0   0       $self->{config}{plugins} //= [];
104             # FIXME: validate config values
105              
106 0 0 0       if (!defined $self->{config}{mongohost} or !defined $self->{config}{database}) {
107 0           my $error = "Both 'mongohost' and 'database' must be defined in file $self->{config_file}";
108 0           $self->logger->logdie($error);
109             }
110             }
111             }
112              
113             # from Synacor::Disbatch::Backend
114             sub ensure_indexes {
115 0     0 1   my ($self) = @_;
116 0           my @task_indexes = (
117             { keys => [node => 1, status => 1, queue => 1] },
118             { keys => [node => 1, status => 1, queue => 1, _id => 1] },
119             { keys => [node => 1, status => 1, queue => 1, _id => -1] },
120             { keys => [queue => 1, status => 1] },
121             );
122 0     0     try { $self->tasks->indexes->create_many(@task_indexes) } catch { $self->logger->logdie("Could not ensure_indexes: $_") };
  0            
  0            
123             try {
124 0     0     $self->queues->indexes->create_one([ name => 1 ], { unique => true });
125 0           $self->nodes->indexes->create_one([ node => 1 ], { unique => true });
126 0           $self->mongo->coll('tasks.chunks')->indexes->create_one([ files_id => 1, n => 1 ], { unique => true });
127 0           $self->mongo->coll('tasks.files')->indexes->create_one([ filename => 1, 'metadata.task_id' => 1 ]);
128             } catch {
129 0     0     $self->logger->logdie("Could not ensure_indexes: $_")
130 0           };
131             }
132              
133             # validates plugins for defined queues
134             sub validate_plugins {
135 0     0 1   my ($self) = @_;
136 0     0     my @queues = try { $self->queues->find->all } catch { $self->logger->error("Could not find queues: $_"); () };
  0            
  0            
  0            
137 0           for my $plugin (map { $_->{plugin} } @queues) {
  0            
138 0 0         next if exists $self->{plugins}{$plugin};
139 0 0         if ($plugin !~ /^[\w:]+$/) {
    0          
    0          
140 0           $self->logger->error("Illegal plugin value: $plugin");
141             } elsif (eval "require $plugin; $plugin->new->can('run');") {
142 0           $self->{plugins}{$plugin} = $plugin;
143 0 0         next if exists $self->{old_plugins}{$plugin};
144 0           $self->logger->info("$plugin is valid for queues");
145             } elsif (eval "require ${plugin}::Task; ${plugin}::Task->new->can('run');") {
146 0           $self->{plugins}{$plugin} = $plugin . '::Task';
147 0 0         next if exists $self->{old_plugins}{$plugin};
148 0           $self->logger->info("${plugin}::Task is valid for queues");
149 0           $self->logger->warn("Having a plugin format with a subpackage *::Task is deprecated");
150             } else {
151 0           $self->{plugins}{$plugin} = undef;
152 0           $self->logger->warn("Could not load $plugin, ignoring queues using it");
153             }
154             }
155             }
156              
157             # clears plugin validation and re-runs
158             sub revalidate_plugins {
159 0     0 1   my ($self) = @_;
160 0           $self->{old_plugins} = $self->{plugins};
161 0           $self->{plugins} = {};
162 0           $self->validate_plugins;
163             }
164              
165             sub scheduler_report {
166 0     0 1   my ($self) = @_;
167 0           my @result;
168 0           my @queues = $self->queues->find->all;
169 0           for my $queue (@queues) {
170             push @result, {
171             id => $queue->{_id}{value},
172             plugin => $queue->{plugin},
173             name => $queue->{name},
174             threads => $queue->{threads},
175             queued => $self->count_queued($queue->{_id}),
176             running => $self->count_running($queue->{_id}),
177 0           completed => $self->count_completed($queue->{_id}),
178             };
179             }
180 0           \@result;
181             }
182              
183             sub scheduler_report_old_api {
184 0     0 0   my ($self) = @_;
185 0           my @result;
186 0     0     my @queues = try { $self->queues->find->all } catch { $self->logger->error("Could not find queues: $_"); () };
  0            
  0            
  0            
187 0           for my $queue (@queues) {
188             push @result, {
189             id => $queue->{_id}{value},
190             plugin => $queue->{plugin},
191             name => $queue->{name},
192             threads => $queue->{threads},
193             queued => $self->count_queued($queue->{_id}),
194             running => $self->count_running($queue->{_id}),
195 0           completed => $self->count_completed($queue->{_id}),
196             };
197             }
198 0           \@result;
199             }
200              
201             # updates the nodes collection
202             sub update_node_status {
203 0     0 1   my ($self) = @_;
204              
205 0   0 0     my $status = try { $self->nodes->find_one({node => $self->{node}}) // {} } catch { $self->logger->error("Could not find node: $_"); undef };
  0            
  0            
  0            
206 0 0         return unless defined $status;
207 0           $status->{node} = $self->{node};
208 0           $status->{timestamp} = Time::Moment->now_utc;
209 0     0     try { $self->nodes->update_one({node => $self->{node}}, {'$set' => $status}, {upsert => 1}) } catch { $self->logger->error("Could not update node: $_") };
  0            
  0            
210             }
211              
212             ### Synacor::Disbatch::Queue like stuff ###
213              
214             # will claim and return a task for given queue, or return undef
215             sub claim_task {
216 0     0 1   my ($self, $queue) = @_;
217              
218 0   0       $self->{sort} //= 'default';
219              
220 0           my $query = { '$or' => [{node => undef}, {node => -1}], status => -2, queue => $queue->{_id} };
221 0           my $update = { '$set' => {node => $self->{node}, status => -1, mtime => Time::Moment->now_utc} };
222              
223 0           my $options;
224 0 0         if ($self->{sort} eq 'fifo') {
    0          
    0          
225 0           $options->{sort} = { _id => 1 };
226             } elsif ($self->{sort} eq 'lifo') {
227 0           $options->{sort} = { _id => -1 };
228             } elsif ($self->{sort} ne 'default') {
229 0           $self->logger->warn("$queue->{name}: unknown sort order '$self->{sort}' -- using default");
230             }
231 0     0     $self->{claimed_task} = try { $self->tasks->find_one_and_update($query, $update, $options) } catch { $self->logger->error("Could not claim task: $_"); undef };
  0            
  0            
  0            
232             }
233              
234             # will unclaim and return the task document for the given OID, or return undef
235             sub unclaim_task {
236 0     0 1   my ($self, $task_id) = @_;
237 0           my $query = { _id => $task_id, node => $self->{node}, status => -1 };
238 0           my $update = { '$set' => {node => undef, status => -2, mtime => Time::Moment->now_utc} };
239 0           $self->logger->warn("Unclaliming task $task_id");
240 0     0     retry { $self->tasks->find_one_and_update($query, $update) } catch { $self->logger->error("Could not unclaim task $task_id: $_"); undef };
  0            
  0            
  0            
241             }
242              
243             sub orphaned_tasks {
244 0     0 1   my ($self) = @_;
245             try { $self->tasks->update_many(
246             {
247             node => $self->{node},
248 0     0     status => -1,
249             '$or' => [
250             { '$and' => [{mtime => {'$type' => 9}}, {mtime => {'$lt' => Time::Moment->now_utc->minus_minutes(5)}}] },
251             { '$and' => [{mtime => {'$not' => {'$type' => 9}}}, {mtime => {'$lt' => time - 300}}] },
252             ],
253             },
254             {'$set' => {status => -6, mtime => Time::Moment->now_utc}}
255             ) }
256 0     0     catch { $self->logger->error("Could not find orphaned_tasks: $_") };
  0            
257             }
258              
259             # will fork & exec to start a given task
260             # NOTE: $self->{config_file} is required
261             sub start_task {
262 0     0 1   my ($self, $queue, $task) = @_;
263 0           my $command = $self->{config}{task_runner};
264             my @args = (
265             '--config' => $self->{config_file},
266             '--task' => $task->{_id},
267 0           );
268 0 0         push @args, '--gfs', $self->{config}{gfs} if $self->{config}{gfs};
269 0 0         push @args, '--quiet' if $self->{config}{quiet};
270 0 0         push @args, '--testing' if $self->{config}{testing};
271 0           $self->logger->info(join ' ', $command, @args);
272 0 0         unless (fork) {
273 0 0         setsid != -1 or die "Can't start a new session: $!";
274 0 0         unless (exec $command, @args) {
275 0           $self->mongo->reconnect;
276 0           $self->logger->error("Could not exec '$command', unclaiming task $task->{_id} and setting threads to 0 for $queue->{name}");
277 0     0     retry { $self->queues->update_one({_id => $queue->{_id}}, {'$set' => {threads => 0}}) } catch { "Could not set queues to 0 for $queue->{name}: $_" };
  0            
  0            
278 0           $self->unclaim_task($task->{_id});
279 0           exit;
280             }
281             }
282 0           $self->{claimed_task} = undef;
283             }
284              
285             sub count_tasks {
286 0     0 1   my ($self, $queue_id, $status, $node) = @_;
287              
288 0           my $query = {};
289 0 0         $query->{queue} = $queue_id if defined $queue_id;
290 0 0         $query->{status} = $status if defined $status;
291 0 0         $query->{node} = $node if defined $node;
292              
293 0     0     try { $self->tasks->count($query) } catch { $self->logger->error("Could not count tasks: $_"); undef };
  0            
  0            
  0            
294             }
295              
296             sub count_queued {
297 0     0 1   my ($self, $queue_id) = @_;
298 0           $self->count_tasks($queue_id, {'$lte' => -2});
299             }
300              
301             sub count_running {
302 0     0 1   my ($self, $queue_id) = @_;
303 0           $self->count_tasks($queue_id, {'$in' => [-1,0]});
304             }
305              
306             sub count_node_running {
307 0     0 1   my ($self, $queue_id) = @_;
308 0           $self->count_tasks($queue_id, {'$in' => [-1,0]}, $self->{node});
309             }
310              
311             sub count_completed {
312 0     0 1   my ($self, $queue_id) = @_;
313 0           $self->count_tasks($queue_id, {'$gte' => 1});
314             }
315              
316             sub count_total {
317 0     0 1   my ($self, $queue_id) = @_;
318 0           $self->count_tasks($queue_id);
319             }
320              
321             # checks if this node will process (activequeues) or ignore (ignorequeues) a queue
322             sub is_active_queue {
323 0     0 1   my ($self, $queue_id) = @_;
324 0 0         if (@{$self->{config}{activequeues}}) {
  0 0          
325 0 0         grep($queue_id->{value} eq $_, @{$self->{config}{activequeues}}) ? 1 : 0;
  0            
326 0           } elsif (@{$self->{config}{ignorequeues}}) {
327 0 0         grep($queue_id->{value} eq $_, @{$self->{config}{ignorequeues}}) ? 0 : 1;
  0            
328             } else {
329 0           1;
330             }
331             }
332              
333             # will run as many tasks for each queue as allowed
334             sub process_queues {
335 0     0 1   my ($self) = @_;
336 0           my $revalidate_plugins = 0;
337 0     0     my $node = try { $self->nodes->find_one({node => $self->{node}}, {maxthreads => 1}) } catch { $self->logger->error("Could not find node: $_"); { maxthreads => 0 } };
  0            
  0            
  0            
338 0   0       my $node_running = $self->count_node_running({'$exists' => 1}) // 0;
339 0 0 0       return if defined $node and defined $node->{maxthreads} and $node_running >= $node->{maxthreads};
      0        
340 0     0     my @queues = try { $self->queues->find->all } catch { $self->logger->error("Could not find queues: $_"); () };
  0            
  0            
  0            
341 0           for my $queue (@queues) {
342 0 0 0       if ($self->{plugins}{$queue->{plugin}} and $self->is_active_queue($queue->{_id})) {
343 0           my $queue_running = $self->count_running($queue->{_id});
344 0   0       while (defined $queue_running and ($queue->{threads} // 0) > $queue_running and (!defined $node->{maxthreads} or $node->{maxthreads} > $node_running)) {
      0        
      0        
      0        
345 0           my $task = $self->claim_task($queue);
346 0 0         last unless defined $task;
347 0           $self->start_task($queue, $task);
348 0           $queue_running = $self->count_running($queue->{_id});
349 0   0       $node_running = $self->count_node_running({'$exists' => 1}) // 0;
350             }
351             } else {
352 0           $revalidate_plugins = 1;
353             }
354             }
355 0 0         $self->revalidate_plugins if $revalidate_plugins;
356             }
357              
358             ### END Synacor::Disbatch::Queue like stuff ###
359              
360             # returns the file document
361             # throws any error
362             sub put_gfs {
363 0     0 1   my ($self, $content, $filename, $metadata) = @_;
364 0   0       $filename ||= 'unknown';
365 0           my $chunk_size = 255 * 1024;
366 0           my $file_doc = {
367             uploadDate => DateTime->now,
368             filename => $filename,
369             chunkSize => $chunk_size,
370             length => length encode_utf8($content),
371             md5 => Digest::MD5->new->add(encode_utf8($content))->hexdigest,
372             };
373 0 0         if (defined $metadata) {
374 0 0         die 'metadata must be a HASH' unless ref $metadata eq 'HASH';
375 0           $file_doc->{metadata} = $metadata;
376             }
377 0           my $files_id = $self->mongo->coll('tasks.files')->insert($file_doc);
378 0           my $n = 0;
379 0           for (my $n = 0; length $content; $n++) {
380 0           my $data = substr $content, 0, $chunk_size, '';
381 0           $self->mongo->coll('tasks.chunks')->insert({ n => $n, data => bless(\$data, 'MongoDB::BSON::String'), files_id => $files_id });
382             }
383 0           $files_id;
384             }
385              
386             # returns the content as a string
387             # throws an error if the file document does not exist, and any other error
388             sub get_gfs {
389 0     0 1   my ($self, $filename_or_id, $metadata) = @_;
390 0           my $file_id;
391 0 0         if ($filename_or_id->$_isa('MongoDB::OID')) {
392 0           $file_id = $filename_or_id;
393             } else {
394 0           my $query = {};
395 0 0         $query->{filename} = $filename_or_id if defined $filename_or_id;
396 0 0         $query->{metadata} = $metadata if defined $metadata;
397 0           $file_id = $self->mongo->coll('tasks.files')->find($query)->next->{_id};
398             }
399             # this does no error-checking:
400 0           my $result = $self->mongo->coll('tasks.chunks')->find({files_id => $file_id})->sort({n => 1})->result;
401 0           my $data;
402 0           while (my $chunk = $result->next) {
403 0           $data .= $chunk->{data};
404             }
405 0           $data;
406             }
407              
408             DESTROY {
409 0     0     my ($self) = @_;
410             # this happens after the END block in the calling script, or if the object ever goes out of scope
411             }
412              
413             1;
414              
415             __END__
416              
417             =encoding utf8
418              
419             =head1 NAME
420              
421             Disbatch - a scalable distributed batch processing framework using MongoDB.
422              
423             =head1 VERSION
424              
425             version 3.990
426              
427             =head1 SUBROUTINES
428              
429             =over 2
430              
431             =item new(class => $class, ...)
432              
433             "class" defaults to "Disbatch", and the value is then lowercased.
434              
435             "node" is the hostname.
436              
437             Anything else is put into $self.
438              
439             =item logger($type)
440              
441             Parameters: type (string, optional)
442              
443             Returns a L<Log::Log4perl> object.
444              
445             =item mongo
446              
447             Parameters: none
448              
449             Returns a L<MongoDB::Database> object.
450              
451             =item nodes
452              
453             Parameters: none
454              
455             Returns a L<MongoDB::Collection> object for collection "nodes".
456              
457             =item queues
458              
459             Parameters: none
460              
461             Returns a L<MongoDB::Collection> object for collection "queues".
462              
463             =item tasks
464              
465             Parameters: none
466              
467             Returns a L<MongoDB::Collection> object for collection "tasks".
468              
469             =item load_config
470              
471             Parameters: none
472              
473             Loads C<< $self->{config_file} >> only if C<< $self->{config} >> is undefined.
474              
475             Anything in the config file at startup is static and cannot be changed without restarting disbatchd.
476              
477             Returns nothing.
478              
479             =item ensure_indexes
480              
481             Parameters: none
482              
483             Ensures the proper MongoDB indexes are created for C<tasks>, C<tasks.files>, and C<tasks.chunks> collections.
484              
485             Returns nothing.
486              
487             =item validate_plugins
488              
489             Parameters: none
490              
491             Validates plugins for defined queues.
492              
493             Returns nothing.
494              
495             =item revalidate_plugins
496              
497             Parameters: none
498              
499             Clears plugin validation and re-runs C<validate_plugins()>.
500              
501             Returns nothing.
502              
503             =item scheduler_report
504              
505             Parameters: none
506              
507             Used by the Disbatch Command Interface to get queue information.
508              
509             Returns an C<ARRAY> containing C<HASH>es of queue information.
510              
511             Throws errors.
512              
513             =item update_node_status
514              
515             Parameters: none
516              
517             Updates the node document with the current timestamp and queues as returned by C<scheduler_report()>.
518              
519             Returns nothing.
520              
521             =item claim_task($queue)
522              
523             Parameters: queue document
524              
525             Claims a task (sets status to -1 and sets node to hostname) for the given queue.
526              
527             Returns a task document, or undef if no queued task found.
528              
529             =item unclaim_task($task_id)
530              
531             Parameters: L<MongoDB::OID> object for a task
532              
533             Sets the task's node to null, status to -2, and update mtime if it has status -1 and this node's hostname.
534              
535             Returns a task document, or undef if a matching task is not found.
536              
537             =item orphaned_tasks
538              
539             Parameters: none
540              
541             Sets status to -6 for all tasks for this node with status -1 and an mtime of more than 300 seconds ago.
542              
543             Returns nothing.
544              
545             =item start_task($queue, $task)
546              
547             Parameters: queue document, task document
548              
549             Will fork and exec C<< $self->{config}{task_runner} >> to start the given task.
550             If the exec fails, it will set threads to 0 for the given queue and call C<unclaim_task()>.
551              
552             Returns nothing.
553              
554             =item count_tasks($queue_id, $status, $node)
555              
556             Parameters: L<MongoDB::OID> object for a queue or a query operator value or C<undef>, a status or a query operator value or C<undef>, a node or C<undef>.
557              
558             Counts all tasks for the given C<$queue_id> with given C<$status> and C<$node>.
559              
560             Used by the below C<count_*> subroutines. If any of the parameters are C<undef>, they will not be added to the query.
561              
562             Returns: a non-negative integer, or undef if an error.
563              
564             =item count_queued($queue_id)
565              
566             =item count_running($queue_id)
567              
568             =item count_node_running($queue_id)
569              
570             =item count_completed($queue_id)
571              
572             =item count_total($queue_id)
573              
574             Parameters: L<MongoDB::OID> object for a queue or a query operator value or C<undef>
575              
576             Counts queued (status <= -2), running (status of 0 or -1), running on this node, completed (status >= 1), or all tasks for the given queue (status <= -2).
577              
578             Returns: a non-negative integer, or undef if an error.
579              
580             =item is_active_queue($queue_id)
581              
582             Parameters: L<MongoDB::OID> object for a queue
583              
584             Checks C<config.activequeues> if it has entries, and returns 1 if given queue is defined in it or 0 if not.
585             If it does not have entries, checks C<config.ignorequeues> if it has entries, and returns 0 if given queue is defined in it or 1 if not.
586              
587             Returns 1 or 0.
588              
589             =item process_queues
590              
591             Parameters: none
592              
593             Will claim and start as many tasks for each queue as allowed by the current node's C<maxthreads> and each queue's C<threads>.
594              
595             Returns nothing.
596              
597             =item put_gfs($content, $filename, $metadata)
598              
599             Parameters: UTF-8 content to store, optional filename to store it as, optional metadata C<HASH>
600              
601             Stores UTF-8 content in a custom GridFS format that stores data as strings instead of as BinData.
602              
603             Returns a C<MongoDB::OID> object for the ID inserted in the C<tasks.files> collection.
604              
605             =item get_gfs($filename_or_id, $metadata)
606              
607             Parameters: filename or C<MongoDB::OID> object, optional metadata C<HASH>
608              
609             Gets UTF-8 content from the custom GridFS format. Metadata is only used if given a filename instead of a C<MongoDB::OID> object.
610              
611             Returns: content string.
612              
613             =back
614              
615             =head1 SEE ALSO
616              
617             L<Disbatch::Web>
618              
619             L<Disbatch::Roles>
620              
621             L<Disbatch::Plugin::Demo>
622              
623             L<disbatchd>
624              
625             L<disbatch.pl>
626              
627             L<task_runner>
628              
629             L<disbatch-create-users>
630              
631             =head1 AUTHORS
632              
633             Ashley Willis <awillis@synacor.com>
634              
635             Matt Busigin
636              
637             =head1 COPYRIGHT AND LICENSE
638              
639             This software is Copyright (c) 2016 by Ashley Willis.
640              
641             This is free software, licensed under:
642              
643             The Apache License, Version 2.0, January 2004