File Coverage

blib/lib/Disbatch/Web.pm
Criterion Covered Total %
statement 50 116 43.1
branch 0 18 0.0
condition 0 12 0.0
subroutine 17 36 47.2
pod 6 11 54.5
total 73 193 37.8


line stmt bran cond sub pod time code
1             package Disbatch::Web;
2             $Disbatch::Web::VERSION = '3.990';
3 2     2   993 use 5.12.0;
  2         4  
4 2     2   6 use strict;
  2         2  
  2         33  
5 2     2   7 use warnings;
  2         7  
  2         40  
6              
7 2     2   719 use Cpanel::JSON::XS;
  2         902  
  2         96  
8 2     2   552 use Data::Dumper;
  2         4555  
  2         86  
9 2     2   362 use Disbatch;
  2         3  
  2         44  
10 2     2   8 use File::Slurp;
  2         3  
  2         131  
11 2     2   820 use Limper::SendFile;
  2         10766  
  2         78  
12 2     2   731 use Limper::SendJSON;
  2         3550  
  2         68  
13 2     2   10 use Limper;
  2         2  
  2         131  
14 2     2   7 use Log::Log4perl;
  2         16  
  2         15  
15 2     2   74 use MongoDB::OID 1.0.4;
  2         33  
  2         32  
16 2     2   6 use Safe::Isa;
  2         2  
  2         175  
17 2     2   6 use Time::Moment;
  2         2  
  2         32  
18 2     2   5 use Try::Tiny::Retry;
  2         2  
  2         91  
19 2     2   729 use URL::Encode qw/url_params_mixed/;
  2         5630  
  2         786  
20              
21             my $json = Cpanel::JSON::XS->new->utf8;
22             my $disbatch;
23              
24             sub init {
25 0     0 1   my $args = { @_ };
26 0   0       $disbatch = Disbatch->new(class => 'Disbatch::Web', config_file => ($args->{config_file} // '/etc/disbatch/config.json'));
27 0           $disbatch->load_config;
28 0   0       public ($disbatch->{config}{web_root} // '/etc/disbatch/htdocs/');
29             }
30              
31             sub parse_params {
32 0 0 0 0 1   if ((request->{headers}{'content-type'} // '') eq 'application/x-www-form-urlencoded') {
    0 0        
    0          
33 0           url_params_mixed(request->{body}, 1);
34             } elsif ((request->{headers}{'content-type'} // '') eq 'application/json') {
35 0     0     try { $json->decode(request->{body}) } catch { $_ };
  0            
  0            
36             } elsif (request->{query}) {
37 0           url_params_mixed(request->{query}, 1);
38             }
39             }
40              
41             ################
42             #### NEW API ###
43             ################
44              
45             sub datetime_to_millisecond_epoch {
46 0     0 0   int($_[0]->hires_epoch * 1000);
47             }
48              
49             # will throw errors
50             sub get_nodes {
51 0     0 1   my ($filter) = @_;
52 0   0       $filter //= {};
53 0           my @nodes = $disbatch->nodes->find($filter)->sort({node => 1})->all;
54 0           for my $node (@nodes) {
55 0           $node->{id} = "$node->{_id}";
56 0 0         $node->{timestamp} = datetime_to_millisecond_epoch($node->{timestamp}) if ref $node->{timestamp} eq 'DateTime';
57             }
58 0           \@nodes;
59             }
60              
61             =item GET /nodes
62              
63             Parameters: none.
64              
65             Returns an Array of node Objects defined (with C<id> the stringified C<_id>) on success, C<< { "error": "Could not get current nodes: $_" } >> on error.
66              
67             Sets HTTP status to C<400> on error.
68              
69             Note: new in Disbatch 4
70              
71             =cut
72              
73             get '/nodes' => sub {
74             undef $disbatch->{mongo};
75             my $nodes = try { get_nodes } catch { status 400; "Could not get current nodes: $_" };
76             if ((status() // 200) == 400) {
77             Limper::warning $nodes;
78             return send_json { error => $nodes };
79             }
80             send_json $nodes, convert_blessed => 1;
81             };
82              
83             =item GET /nodes/:node
84              
85             URL: C<:node> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<node> name if it does not.
86              
87             Parameters: none.
88              
89             Returns node Object (with C<id> the stringified C<_id>) on success, C<< { "error": "Could not get node $node: $_" } >> on error.
90              
91             Sets HTTP status to C<400> on error.
92              
93             Note: new in Disbatch 4
94              
95             =cut
96              
97             get qr'^/nodes/(?<node>.+)' => sub {
98             undef $disbatch->{mongo};
99 2     2   949 my $filter = try { {_id => MongoDB::OID->new(value => $+{node})} } catch { {node => $+{node}} };
  2         689  
  2         9455  
100             my $node = try { get_nodes($filter) } catch { status 400; "Could not get node $+{node}: $_" };
101             if ((status() // 200) == 400) {
102             Limper::warning $node;
103             return send_json { error => $node };
104             }
105             send_json $node->[0], convert_blessed => 1;
106             };
107              
108             =item POST /nodes/:node
109              
110             URL: C<:node> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<node> name if it does not.
111              
112             Parameters: C<< { "maxthreads": maxthreads } >>
113              
114             "maxthreads" is a non-negative integer or null
115              
116             Returns C<< { ref $res: Object } >> or C<< { ref $res: Object, "error": error_string_or_reponse_object } >>
117              
118             Sets HTTP status to C<400> on error.
119              
120             Note: new in Disbatch 4
121              
122             =cut
123              
124             # postJSON('/nodes/' + row.rowId , { maxthreads: newValue}, loadQueues);
125             post qr'^/nodes/(?<node>.+)' => sub {
126             undef $disbatch->{mongo};
127             my $params = parse_params;
128              
129             unless (keys %$params) {
130             status 400;
131             return send_json {error => 'No params'};
132             }
133             my @valid_params = qw/maxthreads/;
134             for my $param (keys %$params) {
135             unless (grep $_ eq $param, @valid_params) {
136             status 400;
137             return send_json { error => 'Invalid param', param => $param};
138             }
139             }
140             my $node = $+{node}; # regex on next line clears $+
141             if (exists $params->{maxthreads} and defined $params->{maxthreads} and $params->{maxthreads} !~ /^\d+$/) {
142             status 400;
143             return send_json {error => 'maxthreads must be a non-negative integer or null'};
144             }
145             my $filter = try { {_id => MongoDB::OID->new(value => $node)} } catch { {node => $node} };
146             my $res = try {
147             $disbatch->nodes->update_one($filter, {'$set' => $params});
148             } catch {
149             Limper::warning "Could not update node $node: $_";
150             $_;
151             };
152             my $reponse = {
153             ref $res => {%$res},
154             };
155             unless ($res->{matched_count} == 1) {
156             status 400;
157             if ($res->$_isa('MongoDB::UpdateResult')) {
158             $reponse->{error} = $reponse->{'MongoDB::UpdateResult'};
159             } else {
160             $reponse->{error} = "$res";
161             }
162             }
163             send_json $reponse;
164             };
165              
166             =item GET /plugins
167              
168             Parameters: none.
169              
170             Returns an Array of allowed plugin names.
171              
172             Should never fail.
173              
174             Note: replaces /queue-prototypes-json
175              
176             =cut
177              
178             # This is needed at least to create queues in the web interface.
179             get '/plugins' => sub {
180             send_json $disbatch->{config}{plugins};
181             };
182              
183             =item GET /queues
184              
185             Parameters: none.
186              
187             Returns an Array of queue Objects on success, C<< { "error": "Could not get current queues: $_" } >> on error.
188              
189             Each item has the following keys: id, plugin, name, threads, queued, running, completed
190              
191             Sets HTTP status to C<400> on error.
192              
193             Note: replaces /scheduler-json
194              
195             =cut
196              
197             get '/queues' => sub {
198             undef $disbatch->{mongo};
199             my $queues = try { $disbatch->scheduler_report } catch { status 400; "Could not get current queues: $_" };
200             if ((status() // 200) == 400) {
201             Limper::warning $queues;
202             return send_json { error => $queues };
203             }
204             send_json $queues;
205             };
206              
207             sub map_plugins {
208 0     0 0   my %plugins = map { $_ => 1 } @{$disbatch->{config}{plugins}};
  0            
  0            
209 0           \%plugins;
210             }
211              
212             =item POST /queues
213              
214             Create a new queue.
215              
216             Parameters: C<< { "name": name, "plugin": plugin } >>
217              
218             C<name> is the desired name for the queue (must be unique), C<plugin> is the plugin name for the queue.
219              
220             Returns: C<< { ref $res: Object, "id": $inserted_id } >> on success; C<< { "error": "name and plugin required" } >>,
221             C<< { "error": "Invalid param", "param": $param } >>, or C<< { "error": "Unknown plugin", "plugin": $plugin } >> on input error; or
222             C<< { ref $res: Object, "id": null, "error": "$res" } >> on MongoDB error.
223              
224             Sets HTTP status to C<400> on error.
225              
226             Note: replaces /start-queue-json
227              
228             =cut
229              
230             post '/queues' => sub {
231             undef $disbatch->{mongo};
232             my $params = parse_params;
233             unless (($params->{name} // '') and ($params->{plugin} // '')) {
234             status 400;
235             return send_json { error => 'name and plugin required' };
236             }
237             my @valid_params = qw/name plugin/;
238             for my $param (keys %$params) {
239             unless (grep $_ eq $param, @valid_params) {
240             status 400;
241             return send_json { error => 'Invalid param', param => $param};
242             }
243             }
244             unless (map_plugins->{$params->{plugin}}) {
245             status 400;
246             return send_json { error => 'Unknown plugin', plugin => $params->{plugin} };
247             }
248              
249             my $res = try { $disbatch->queues->insert_one($params) } catch { Limper::warning "Could not create queue $params->{name}: $_"; $_ };
250             my $reponse = {
251             ref $res => {%$res},
252             id => $res->{inserted_id},
253             };
254             unless (defined $res->{inserted_id}) {
255             status 400;
256             $reponse->{error} = "$res";
257             $reponse->{ref $res}{result} = { ref $reponse->{ref $res}{result} => {%{$reponse->{ref $res}{result}}} } if ref $reponse->{ref $res}{result};
258             }
259             send_json $reponse, convert_blessed => 1;
260             };
261              
262             =item POST /queues/:queue
263              
264             URL: C<:queue> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<name> if it does not.
265              
266             Parameters: C<< { "name": name, "plugin": plugin, "threads": threads } >>
267              
268             C<name> is the new name for the queue (must be unique), C<plugin> is the new plugin name for the queue (must be defined in the config file),
269             C<threads> must be a non-negative integer. Only one of C<name>, C<plugin>, and C<threads> is required, but any combination is allowed.
270              
271             Returns C<< { ref $res: Object } >> or C<< { "error": error } >>
272              
273             Sets HTTP status to C<400> on error.
274              
275             Note: replaces /set-queue-attr-json
276              
277             =cut
278              
279             post qr'^/queues/(?<queue>.+)$' => sub {
280             my $queue = $+{queue};
281             undef $disbatch->{mongo};
282             my $params = parse_params;
283             my @valid_params = qw/threads name plugin/;
284              
285             unless (keys %$params) {
286             status 400;
287             return send_json {error => 'no params'};
288             }
289             for my $param (keys %$params) {
290             unless (grep $_ eq $param, @valid_params) {
291             status 400;
292             return send_json { error => 'unknown param', param => $param};
293             }
294             }
295             if (exists $params->{plugin} and !map_plugins()->{$params->{plugin}}) {
296             status 400;
297             return send_json { error => 'unknown plugin', plugin => $params->{plugin} };
298             }
299             if (exists $params->{threads} and $params->{threads} !~ /^\d+$/) {
300             status 400;
301             return send_json {error => 'threads must be a non-negative integer'};
302             }
303             if (exists $params->{name} and (ref $params->{name} or !($params->{name} // ''))){
304             status 400;
305             return send_json {error => 'name must be a string'};
306             }
307              
308             my $filter = try { {_id => MongoDB::OID->new(value => $queue)} } catch { {name => $queue} };
309             my $res = try {
310             $disbatch->queues->update_one($filter, {'$set' => $params});
311             } catch {
312             Limper::warning "Could not update queue $queue: $_";
313             $_;
314             };
315             my $reponse = {
316             ref $res => {%$res},
317             };
318             unless ($res->{matched_count} == 1) {
319             status 400;
320             $reponse->{error} = "$res";
321             }
322             send_json $reponse;
323             };
324              
325             =item DELETE /queues/:queue
326              
327             Deletes the specified queue.
328              
329             URL: C<:queue> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<name> if it does not.
330              
331             Parameters: none
332              
333             Returns: C<< { ref $res: Object } >> on success, or C<< { ref $res: Object, "error": "$res" } >> on error.
334              
335             Sets HTTP status to C<400> on error.
336              
337             Note: replaces /delete-queue-json
338              
339             =cut
340              
341             del qr'^/queues/(?<queue>.+)$' => sub {
342             undef $disbatch->{mongo};
343              
344             my $filter = try { {_id => MongoDB::OID->new(value => $+{queue})} } catch { {name => $+{queue}} };
345             my $res = try { $disbatch->queues->delete_one($filter) } catch { Limper::warning "Could not delete queue '$+{queue}': $_"; $_ };
346             my $reponse = {
347             ref $res => {%$res},
348             };
349             unless ($res->{deleted_count}) {
350             status 400;
351             $reponse->{error} = "$res";
352             }
353             send_json $reponse;
354             };
355              
356             # returns an MongoDB::OID object of either a simple string representation of the OID or a queue name, or undef if queue not found/valid
357             sub get_queue_oid {
358 0     0 1   my ($queue) = @_;
359             my $queue_id = try {
360 0     0     $disbatch->queues->find_one({_id => MongoDB::OID->new(value => $queue)});
361             } catch {
362 0     0     try { $disbatch->queues->find_one({name => $queue}) } catch { Limper::warning "Could not find queue $queue: $_"; undef };
  0            
  0            
  0            
363 0           };
364 0 0         defined $queue_id ? $queue_id->{_id} : undef;
365             }
366              
367             # creates a task for given queue _id and params, returning task _id
368             sub create_tasks {
369 0     0 1   my ($queue_id, $tasks) = @_;
370              
371 0           my @tasks = map {
372             queue => $queue_id,
373             status => -2,
374             stdout => undef,
375             stderr => undef,
376             node => undef,
377             params => $_,
378             ctime => Time::Moment->now_utc,
379             mtime => Time::Moment->now_utc,
380             }, @$tasks;
381              
382 0     0     my $res = try { $disbatch->tasks->insert_many(\@tasks) } catch { Limper::warning "Could not create tasks: $_"; $_ };
  0            
  0            
  0            
383 0           $res;
384             }
385              
386             =item POST /tasks/search
387              
388             Parameters: C<< { "filter": filter, "options": options, "count": count, "terse": terse } >>
389              
390             All parameters are optional.
391              
392             C<filter> is a filter expression (query) object.
393              
394             C<options> is an object of desired options to L<MongoDB::Collection#find>.
395              
396             If not set, C<options.limit> will be C<100>. This will fail if you try to set it above C<100>.
397              
398             C<count> is a boolean. Instead of an array of task documents, the count of task documents matching the query will be returned.
399              
400             C<terse> is a boolean. If C<true>, the the GridFS id or C<"[terse mode]"> will be returned for C<stdout> and C<stderr> of each document.
401             If C<false>, the full content of C<stdout> and C<stderr> will be returned. Default is C<true>.
402              
403             Returns: Array of task Objects or C<< { "count": $count } >> on success; C<< { "error": "filter and options must be name/value objects" } >>,
404             C<< { "error": "limit cannot exceed 100" } >>, or C<< { "error": "Bad OID passed: $error" } >> on input error;
405             or C<< { "error": "$error" } >> on count or search error.
406              
407             Sets HTTP status to C<400> on error.
408              
409             Note: replaces /search-tasks-json
410              
411             =cut
412              
413             # FIXME: I don't like this URL.
414             # see https://metacpan.org/pod/MongoDB::Collection#find
415             post '/tasks/search' => sub {
416             undef $disbatch->{mongo};
417             my $params = parse_params;
418              
419             my $LIMIT = 100;
420              
421             $params->{filter} //= {};
422             $params->{options} //= {};
423             $params->{count} //= 0;
424             $params->{terse} //= 1;
425             $params->{pretty} //= 0;
426             unless (ref $params->{filter} eq 'HASH' and ref $params->{options} eq 'HASH') {
427             status 400;
428             return send_json { error => 'filter and options must be name/value objects' };
429             }
430             $params->{options}{limit} //= $LIMIT;
431             if ($params->{options}{limit} > $LIMIT) {
432             status 400;
433             return send_json { error => "limit cannot exceed $LIMIT" };
434             }
435              
436             $params->{filter}{queue} = { '$oid' => $params->{filter}{queue} } if defined $params->{filter}{queue} and !ref $params->{filter}{queue};
437              
438             my $oid_error = try { $params->{filter} = deserialize_oid($params->{filter}); undef } catch { "Bad OID passed: $_" };
439             if (defined $oid_error) {
440             Limper::warning $oid_error;
441             status 400;
442             return send_json { error => $oid_error };
443             }
444              
445             # Turn value into a Time::Moment object if it looks like it includes milliseconds. Will break in the year 2286.
446             for my $type (qw/ctime mtime/) {
447             $params->{filter}{$type} = Time::Moment->from_epoch($params->{filter}{$type} / 1000) if ($params->{filter}{$type} // 0) > 9999999999;
448             }
449              
450             if ($params->{count}) {
451             my $count = try { $disbatch->tasks->count($params->{filter}) } catch { Limper::warning $_; $_; };
452             if (ref $count) {
453             status 400;
454             return send_json { error => "$count" };
455             }
456             return send_json { count => $count };
457             }
458             my ($error, @tasks) = try { undef, $disbatch->tasks->find($params->{filter}, $params->{options})->all } catch { Limper::warning "Could not find tasks: $_"; $_ };
459             if (defined $error) {
460             Limper::warning $error;
461             status 400;
462             return send_json { error => $error };
463             }
464              
465             for my $task (@tasks) {
466             for my $type (qw/stdout stderr/) {
467             if ($params->{terse}) {
468             $task->{$type} = '[terse mode]' if defined $task->{$type} and !$task->{$type}->$_isa('MongoDB::OID') and $task->{$type};
469             } elsif ($task->{$type}->$_isa('MongoDB::OID')) {
470             $task->{$type} = try { $disbatch->get_gfs($task->{$type}) } catch { Limper::warning "Could not get task $task->{_id} $type: $_"; $task->{$type} };
471             }
472             }
473             for my $type (qw/ctime mtime/) {
474             $task->{$type} = $task->{$type}->hires_epoch if ref $task->{$type} eq 'DateTime';
475             }
476             }
477              
478             send_json \@tasks, convert_blessed => 1, pretty => $params->{pretty};
479             };
480              
481             =item POST /tasks/:queue
482              
483             URL: C<:queue> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<name> if it does not.
484              
485             Parameters: an array of task params objects
486              
487             Returns: C<< { ref $res: Object } >> on success; C<< { "error": "params must be a JSON array of task params" } >>
488             or C<< { "error": "queue not found" } >> on input error; or C<< { ref $res: Object, "error": "Unknown error" } >> on MongoDB error.
489              
490             Sets HTTP status to C<400> on error.
491              
492             Note: replaces /queue-create-tasks-json
493              
494             =cut
495              
496             post qr'^/tasks/(?<queue>[^/]+)$' => sub {
497             undef $disbatch->{mongo};
498             my $params = parse_params;
499             unless (defined $params and ref $params eq 'ARRAY' and @$params and ! grep { ref $_ ne 'HASH' } @$params) {
500             status 400;
501             return send_json { error => 'params must be a JSON array of task params objects' };
502             }
503             if (grep { keys $_ == 0 } @$params) {
504             status 400;
505             return send_json { error => 'params must be a JSON array of task params objects with key/value pairs' };
506             }
507              
508             my $queue_id = get_queue_oid($+{queue});
509             unless (defined $queue_id) {
510             status 400;
511             return send_json { error => 'queue not found' };
512             }
513              
514             my $res = create_tasks($queue_id, $params);
515              
516             my $reponse = {
517             ref $res => {%$res},
518             };
519             unless (@{$res->{inserted}}) {
520             status 400;
521             $reponse->{error} = 'Unknown error';
522             }
523             send_json $reponse, convert_blessed => 1;
524             };
525              
526             =item POST /tasks/:queue/:collection
527              
528             URL: C<:queue> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<name> if it does not. C<:collection> is a MongoDB collection name.
529              
530             Parameters: C<< { "filter": filter, "params": params } >>
531              
532             C<filter> is a filter expression (query) object for the C<:collection> collection.
533              
534             C<params> is an object of task params. To insert a document value from a query into the params, prefix the desired key name with C<document.> as a value.
535              
536             Returns: C<< { ref $res: Object } >> on success; C<< { "error": "filter and params required and must be name/value objects" } >>
537             or C<< { "error": "queue not found" } >> on input error; C<< { "error": "Could not iterate on collection $collection: $error" } >> on query error,
538             or C<< { ref $res: Object, "error": "Unknown error" } >> on MongoDB error.
539              
540             Sets HTTP status to C<400> on error.
541              
542             Note: replaces /queue-create-tasks-from-query-json
543              
544             =cut
545              
546             post qr'^/tasks/(?<queue>.+?)/(?<collection>.+)$' => sub {
547             undef $disbatch->{mongo};
548             my $params = parse_params;
549             # {"migration":"foo"}
550             # {"migration":"document.migration","user1":"document.username"}
551             unless (defined $params->{filter} and ref $params->{filter} eq 'HASH' and defined $params->{params} and ref $params->{params} eq 'HASH') {
552             status 400;
553             return send_json { error => 'filter and params required and must be name/value objects' };
554             }
555              
556             my $collection = $+{collection};
557             my $queue_id = get_queue_oid($+{queue});
558             unless (defined $queue_id) {
559             status 400;
560             return send_json { error => 'queue not found' };
561             }
562              
563             my @fields = grep /^document\./, values %{$params->{params}};
564             my %fields = map { s/^document\.//; $_ => 1 } @fields;
565              
566             my $cursor = $disbatch->mongo->coll($collection)->find($params->{filter})->fields(\%fields);
567             my @tasks;
568             my $error;
569             try {
570             while (my $doc = $cursor->next) {
571             my $task = { %{$params->{params}} }; # copy it
572             for my $key (keys %$task) {
573             if ($task->{$key} =~ /^document\./) {
574             for my $field (@fields) {
575             my $f = quotemeta $field;
576             if ($task->{$key} =~ /^document\.$f$/) {
577             $task->{$key} = $doc->{$field};
578             }
579             }
580             }
581             }
582             push @tasks, $task;
583             }
584             } catch {
585             Limper::warning "Could not iterate on collection $collection: $_";
586             $error = "$_";
587             };
588              
589             if (defined $error) {
590             status 400;
591             return send_json { error => $error };
592             }
593              
594             my $res = create_tasks($queue_id, \@tasks); # doing 100k at once only take 12 seconds on my 13" rMBP
595              
596             my $reponse = {
597             ref $res => {%$res},
598             };
599             unless (@{$res->{inserted}}) {
600             status 400;
601             $reponse->{error} = 'Unknown error';
602             }
603             send_json $reponse, convert_blessed => 1;
604             };
605              
606             sub deserialize_oid {
607 0     0 0   my ($object) = @_;
608 0 0         if (ref $object eq 'HASH') {
    0          
609 0 0         return MongoDB::OID->new(value => $object->{'$oid'}) if exists $object->{'$oid'};
610 0           $object->{$_} = deserialize_oid($object->{$_}) for keys %$object;
611             } elsif (ref $object eq 'ARRAY') {
612 0           $_ = deserialize_oid($_) for @$object;
613             }
614 0           $object;
615             }
616              
617             ################
618             #### OLD API ###
619             ################
620              
621             get '/scheduler-json' => sub {
622             undef $disbatch->{mongo};
623             send_json $disbatch->scheduler_report_old_api;
624             };
625              
626             post '/set-queue-attr-json' => sub {
627             undef $disbatch->{mongo};
628             my $params = parse_params;
629             my @valid_attributes = qw/threads/;
630             unless (grep $_ eq $params->{attr}, @valid_attributes) {
631             status 400;
632             return send_json { success => 0, error => 'Invalid attr'};
633             }
634             unless (defined $params->{value}) {
635             status 400;
636             return send_json {success => 0, error => 'You must supply a value'};
637             }
638             unless (defined $params->{queueid}) {
639             status 400;
640             return send_json {success => 0, error => 'You must supply a queueid'};
641             }
642             my $res = try {
643             $disbatch->queues->update_one({_id => MongoDB::OID->new(value => $params->{queueid})}, {'$set' => { $params->{attr} => $params->{value} }});
644             } catch {
645             Limper::warning "Could not update queue $params->{queueid}: $_";
646             $_;
647             };
648             my $reponse = {
649             success => $res->{matched_count} == 1 ? 1 : 0,
650             ref $res => {%$res},
651             };
652             unless ($reponse->{success}) {
653             status 400;
654             $reponse->{error} = "$res";
655             }
656             send_json $reponse;
657             };
658              
659             sub get_plugins {
660 0     0 1   my @plugins = try { $disbatch->queues->distinct('plugin')->all } catch { Limper::warning "Could not get current plugins: $_"; () };
  0     0      
  0            
  0            
661 0   0       my $plugins = $disbatch->{config}{plugins} // [];
662 0           my %plugins = map { $_ => $_ } @plugins, @$plugins;
  0            
663 0           \%plugins;
664             }
665              
666             post '/start-queue-json' => sub {
667             undef $disbatch->{mongo};
668             my $params = parse_params;
669             unless (defined $params->{type} and defined $params->{name}) {
670             status 400;
671             return send_json [ 0, 'type and name required'];
672             }
673              
674             unless (get_plugins->{$params->{type}}) {
675             status 400;
676             return send_json [ 0, 'unknown type'];
677             }
678              
679             my $queue = { plugin => $params->{type}, name => $params->{name} };
680             my $res = try { $disbatch->queues->insert_one($queue) } catch { Limper::warning "Could not create queue $params->{name}: $_"; $_ };
681             my $reponse = {
682             success => defined $res->{inserted_id} ? 1 : 0,
683             ref $res => {%$res},
684             };
685             unless ($reponse->{success}) {
686             status 400;
687             $reponse->{error} = "$res";
688             }
689             send_json [ $reponse->{success}, $reponse->{ref $res}{inserted_id}, $reponse ], convert_blessed => 1;
690             };
691              
692             post '/delete-queue-json' => sub {
693             undef $disbatch->{mongo};
694             my $params = parse_params;
695             unless (defined $params->{id}) {
696             status 400;
697             return send_json [ 0, 'id required'];
698             }
699              
700             my $res = try { $disbatch->queues->delete_one({_id => MongoDB::OID->new(value => $params->{id})}) } catch { Limper::warning "Could not delete queue $params->{id}: $_"; $_ };
701             my $reponse = {
702             success => $res->{deleted_count} ? 1 : 0,
703             ref $res => {%$res},
704             };
705             unless ($reponse->{success}) {
706             status 400;
707             $reponse->{error} = "$res";
708             }
709             send_json [ $reponse->{success}, $reponse ];
710             };
711              
712             # This is needed at least to create queues in the web interface (just the keys).
713             get '/queue-prototypes-json' => sub {
714             undef $disbatch->{mongo};
715             send_json get_plugins;
716             };
717              
718             sub get_queue_oid_old {
719 0     0 0   my ($queue) = @_;
720             my $queue_id = try {
721 0     0     MongoDB::OID->new(value => $queue);
722             } catch {
723 0     0     my $q = try { $disbatch->queues->find_one({name => $queue}) } catch { Limper::warning "Could not find queue $queue: $_"; undef };
  0            
  0            
  0            
724 0 0         defined $q ? $q->{_id} : undef;
725 0           };
726             }
727              
728             # creates a task for given queue _id and params, returning task _id
729             sub create_tasks_old {
730 0     0 0   my ($queue_id, $tasks) = @_;
731              
732 0           my @tasks = map {
733             queue => $queue_id,
734             status => -2,
735             stdout => undef,
736             stderr => undef,
737             node => undef,
738             params => $_,
739             ctime => Time::Moment->now_utc,
740             mtime => Time::Moment->now_utc,
741             }, @$tasks;
742              
743 0     0     my $res = try { $disbatch->tasks->insert_many(\@tasks) } catch { Limper::warning "Could not create tasks: $_"; $_ };
  0            
  0            
  0            
744 0           $res;
745             }
746              
747             post '/queue-create-tasks-json' => sub {
748             undef $disbatch->{mongo};
749             my $params = parse_params;
750             unless (defined $params->{queueid} and defined $params->{object}) {
751             status 400;
752             return send_json [ 0, 'queueid and object required'];
753             }
754              
755             my $tasks = try { ref $params->{object} ? $params->{object} : $json->decode($params->{object}) } catch { $_ };
756             return send_json [ 0, $tasks ] unless ref $tasks;
757             return send_json [ 0, 'object param must be a JSON array' ] unless ref $tasks eq 'ARRAY';
758              
759             my $queue_id = get_queue_oid_old($params->{queueid});
760             return send_json [ 0, 'Queue not found' ] unless defined $queue_id;
761              
762             my $res = create_tasks_old($queue_id, $tasks);
763              
764             my $reponse = {
765             success => @{$res->{inserted}} ? 1 : 0,
766             ref $res => {%$res},
767             };
768             unless ($reponse->{success}) {
769             status 400;
770             $reponse->{error} = 'Unknown error';
771             }
772             send_json [ $reponse->{success}, scalar @{$res->{inserted}}, @{$res->{inserted}}, $reponse ], convert_blessed => 1;
773             };
774              
775             post '/queue-create-tasks-from-query-json' => sub {
776             undef $disbatch->{mongo};
777             my $params = parse_params;
778             unless (defined $params->{queueid} and defined $params->{collection} and defined $params->{jsonfilter} and defined $params->{params}) {
779             status 400;
780             return send_json [ 0, 'queueid, collection, jsonfilter, and params required'];
781             }
782              
783             my $filter = try { ref $params->{jsonfilter} ? $params->{jsonfilter} : $json->decode($params->{jsonfilter}) } catch { $_ }; # {"migration":"foo"}
784             return send_json [ 0, $filter ] unless ref $filter;
785              
786             my $task_params = try { ref $params->{params} ? $params->{params} : $json->decode($params->{params}) } catch { $_ }; # {"migration":"document.migration","user1":"document.username"}
787             return send_json [ 0, $task_params ] unless ref $task_params;
788              
789             my $queue_id = get_queue_oid_old($params->{queueid});
790             return send_json [ 0, 'Queue not found' ] unless defined $queue_id;
791              
792             my @fields = grep /^document\./, values %$task_params;
793             my %fields = map { s/^document\.//; $_ => 1 } @fields;
794              
795             my $cursor = $disbatch->mongo->coll($params->{collection})->find($filter)->fields(\%fields);
796             my @tasks;
797             my $error;
798             try {
799             while (my $object = $cursor->next) {
800             my $task = { %$task_params };
801             for my $key (keys %$task) {
802             if ($task->{$key} =~ /^document\./) {
803             for my $field (@fields) {
804             my $f = quotemeta $field;
805             if ($task->{$key} =~ /^document\.$f$/) {
806             $task->{$key} = $object->{$field};
807             }
808             }
809             }
810             }
811             push @tasks, $task;
812             }
813             } catch {
814             Limper::warning "Could not iterate on collection $params->{collection}: $_";
815             $error = "$_";
816             };
817              
818             return send_json [ 0, $error ] if defined $error;
819              
820             my $res = create_tasks_old($queue_id, \@tasks); # doing 100k at once only take 12 seconds on my 13" rMBP
821              
822             my $reponse = {
823             success => @{$res->{inserted}} ? 1 : 0,
824             ref $res => {%$res},
825             };
826             unless ($reponse->{success}) {
827             status 400;
828             $reponse->{error} = 'Unknown error';
829             }
830             send_json [ $reponse->{success}, scalar @{$res->{inserted}} ];
831             # send_json [ $reponse->{success}, scalar @{$res->{inserted}}, @{$res->{inserted}}, $reponse ], convert_blessed => 1;
832             };
833              
834             post '/search-tasks-json' => sub {
835             undef $disbatch->{mongo};
836             my $params = parse_params;
837             #unless (defined $params->{queue} and defined $params->{filter}) {
838             # status 400;
839             # return send_json [ 0, 'queue and filter required'];
840             #}
841              
842             $params->{filter} //= {};
843             my $filter = try { ref $params->{filter} ? $params->{filter} : $json->decode($params->{filter}) } catch { $_ };
844             return send_json [ 0, $params->{json} ? $filter : 'JSON object required for filter' ] unless ref $filter eq 'HASH';
845              
846             my $attrs = {};
847             $attrs->{limit} = $params->{limit} if $params->{limit};
848             $attrs->{skip} = $params->{skip} if $params->{skip};
849              
850             my $error;
851             try {
852             $filter->{queue} = MongoDB::OID->new(value => $params->{queue}) if $params->{queue};
853             $filter->{_id} = MongoDB::OID->new(value => delete $filter->{id}) if $filter->{id};
854             } catch {
855             $error = "$_";
856             Limper::warning "Bad OID passed: $error";
857             };
858             return send_json [ 0, $error ] if defined $error;
859             $filter->{status} = int $filter->{status} if defined $filter->{status};
860              
861             if ($params->{count}) {
862             my $count = try { $disbatch->tasks->count($filter) } catch { Limper::warning $_; $_; };
863             return send_json [ 0, "$count" ] if ref $count;
864             return send_json [ 1, $count ];
865             }
866             my @tasks = try { $disbatch->tasks->find($filter, $attrs)->all } catch { Limper::warning "Could not find tasks: $_"; () };
867              
868             for my $task (@tasks) {
869             if ($params->{terse}) {
870             $task->{stdout} = '[terse mode]' unless $task->{stdout}->$_isa('MongoDB::OID');
871             $task->{stderr} = '[terse mode]' unless $task->{stderr}->$_isa('MongoDB::OID');
872             } else {
873             $task->{stdout} = try { $disbatch->get_gfs($task->{stdout}) } catch { Limper::warning "Could not get task $task->{_id} stdout: $_"; $task->{stdout} } if $task->{stdout}->$_isa('MongoDB::OID');
874             $task->{stderr} = try { $disbatch->get_gfs($task->{stderr}) } catch { Limper::warning "Could not get task $task->{_id} stderr: $_"; $task->{stderr} } if $task->{stderr}->$_isa('MongoDB::OID');
875             }
876              
877             for my $type (qw/ctime mtime/) {
878             if ($task->{$type}) {
879             if (ref $task->{$type}) {
880             if (ref $task->{$type} eq 'Time::Moment' or ref $task->{$type} eq 'DateTime') {
881             $task->{"${type}_str"} = "$task->{$type}";
882             $task->{$type} = $task->{$type}->epoch;
883             } else {
884             # Unknown ref, force to string
885             $task->{"${type}_str"} = "$task->{$type}";
886             $task->{$type} = undef;
887             }
888             } else {
889             try {
890             my $dt = DateTime->from_epoch(epoch => $task->{$type});
891             $task->{"${type}_str"} = "$dt";
892             } catch {
893             $task->{"${type}_str"} = "$task->{$type}";
894             $task->{$type} = undef;
895             };
896             }
897             }
898             }
899             }
900              
901             send_json \@tasks, convert_blessed => 1;
902             };
903              
904             # MUST BE AT END
905              
906             get '/' => sub {
907             send_file '/index.html';
908             };
909              
910             get qr{^/} => sub {
911             send_file request->{path}; # sends request->{uri} by default
912             };
913              
914             1;
915              
916             __END__
917              
918             =encoding utf8
919              
920             =head1 NAME
921              
922             Disbatch::Web - Disbatch Command Interface (JSON REST API and web browser interface to Disbatch).
923              
924             =head1 VERSION
925              
926             version 3.990
927              
928             =head1 SUBROUTINES
929              
930             =over 2
931              
932             =item init(config_file => $config_file)
933              
934             Parameters: path to the Disbatch config file. Default is C</etc/disbatch/config.json>.
935              
936             Initializes the settings for the web server.
937              
938             Returns nothing.
939              
940             =item parse_params
941              
942             Parameters: none
943              
944             Parses request parameters in the following order:
945              
946             * from the request body if the Content-Type is C<application/x-www-form-urlencoded>
947              
948             * from the request body if the Content-Type is C<application/json>
949              
950             * from the request query otherwise
951              
952             Returns a C<HASH> of the parsed request parameters.
953              
954             =item get_nodes
955              
956             Parameters: none
957              
958             Returns an array of node objects defined, with C<timestamp> stringified and C<id> the stringified C<_id>.
959              
960             =item get_plugins
961              
962             Parameters: none
963              
964             Returns a C<HASH> of defined queues plugins and any defined C<config.plugins>, where values match the keys.
965              
966             =item get_queue_oid($queue)
967              
968             Parameters: Queue ID as a string, or queue name.
969              
970             Returns a C<MongoDB::OID> object representing this queue's _id.
971              
972             =item create_tasks($queue_id, $tasks)
973              
974             Parameters: C<MongoDB::OID> object of the queue _id, C<ARRAY> of task params.
975              
976             Creates one queued task document for the given queue _id per C<$tasks> entry. Each C<$task> entry becomes the value of the C<params> field of the document.
977              
978             Returns: the repsonse object from a C<MongoDB::Collection#insert_many> request.
979              
980             =back
981              
982             =head1 JSON ROUTES
983              
984             =over 2
985              
986             =back
987              
988             =head1 BROWSER ROUTES
989              
990             =over 2
991              
992             =item GET /
993              
994             Returns the contents of "/index.html" – the queue browser page.
995              
996             =item GET qr{^/}
997              
998             Returns the contents of the request path.
999              
1000             =back
1001              
1002             =head1 SEE ALSO
1003              
1004             L<Disbatch>
1005              
1006             L<Disbatch::Roles>
1007              
1008             L<Disbatch::Plugin::Demo>
1009              
1010             L<disbatchd>
1011              
1012             L<disbatch.pl>
1013              
1014             L<task_runner>
1015              
1016             L<disbatch-create-users>
1017              
1018             =head1 AUTHORS
1019              
1020             Ashley Willis <awillis@synacor.com>
1021              
1022             Matt Busigin
1023              
1024             =head1 COPYRIGHT AND LICENSE
1025              
1026             This software is Copyright (c) 2016 by Ashley Willis.
1027              
1028             This is free software, licensed under:
1029              
1030             The Apache License, Version 2.0, January 2004