File Coverage

blib/lib/POE/Component/Metabase/Relay/Server/Queue.pm
Criterion Covered Total %
statement 108 154 70.1
branch 13 50 26.0
condition 3 18 16.6
subroutine 31 40 77.5
pod 1 2 50.0
total 156 264 59.0


line stmt bran cond sub pod time code
1             package POE::Component::Metabase::Relay::Server::Queue;
2             $POE::Component::Metabase::Relay::Server::Queue::VERSION = '0.38';
3             # ABSTRACT: Submission queue for the metabase relay
4              
5 1     1   7 use strict;
  1         3  
  1         29  
6 1     1   7 use warnings;
  1         3  
  1         26  
7 1     1   6 use POE qw[Component::EasyDBI];
  1         2  
  1         6  
8 1     1   65705 use POE::Component::Client::HTTP;
  1         96263  
  1         52  
9 1     1   8 use POE::Component::Resolver;
  1         2  
  1         40  
10 1     1   546 use POE::Component::Metabase::Client::Submit;
  1         16224  
  1         27  
11 1     1   7 use CPAN::Testers::Report ();
  1         3  
  1         21  
12 1     1   431 use Metabase::User::Profile ();
  1         2441  
  1         24  
13 1     1   440 use Metabase::User::Secret ();
  1         257  
  1         27  
14 1     1   522 use Module::Load::Conditional qw[can_load];
  1         17667  
  1         66  
15 1     1   9 use JSON ();
  1         2  
  1         21  
16 1     1   5 use Params::Util qw[_HASH];
  1         3  
  1         57  
17 1     1   7 use Time::HiRes ();
  1         2  
  1         16  
18 1     1   5 use Data::UUID;
  1         3  
  1         59  
19              
20 1     1   6 use constant DELAY => 150;
  1         3  
  1         77  
21              
22              
23             my $sql = {
24             'create' => 'CREATE TABLE IF NOT EXISTS queue ( id varchar(150), submitted varchar(32), attempts INTEGER, data BLOB )',
25             'insert' => 'INSERT INTO queue values(?,?,?,?)',
26             'delete' => 'DELETE FROM queue where id = ?',
27             'queue' => 'SELECT * FROM queue ORDER BY attempts ASC, submitted ASC limit ', # the limit is appended via "submissions"
28             'update' => 'UPDATE queue SET attempts = ? WHERE id = ?',
29             'addidx' => [
30             'CREATE INDEX IF NOT EXISTS queue_id ON queue ( id )',
31             'CREATE INDEX IF NOT EXISTS queue_att_sub ON queue ( attempts, submitted )',
32             ],
33             };
34              
35 1     1   473 use MooseX::POE;
  1         465806  
  1         8  
36 1     1   235860 use MooseX::Types::URI qw[Uri];
  1         203473  
  1         11  
37              
38             {
39 1     1   2030 use Moose::Util::TypeConstraints;
  1         17  
  1         8  
40              
41             my $ps = subtype as 'Str', where { $poe_kernel->alias_resolve( $_ ) };
42             coerce $ps, from 'Str', via { $poe_kernel->alias_resolve( $_ )->ID };
43              
44             has 'session' => (
45             is => 'ro',
46             isa => $ps,
47             coerce => 1,
48             );
49              
50 1     1   2381 no Moose::Util::TypeConstraints;
  1         2  
  1         6  
51             }
52              
53             has 'event' => (
54             is => 'ro',
55             isa => 'Str',
56             );
57              
58             has 'profile' => (
59             is => 'ro',
60             isa => 'Metabase::User::Profile',
61             required => 1,
62             );
63              
64             has 'secret' => (
65             is => 'ro',
66             isa => 'Metabase::User::Secret',
67             required => 1,
68             );
69              
70             has 'dsn' => (
71             is => 'ro',
72             isa => 'Str',
73             required => 1,
74             );
75              
76             has 'uri' => (
77             is => 'ro',
78             isa => Uri,
79             coerce => 1,
80             required => 1,
81             );
82              
83             has 'username' => (
84             is => 'ro',
85             isa => 'Str',
86             default => '',
87             );
88              
89             has 'password' => (
90             is => 'ro',
91             isa => 'Str',
92             default => '',
93             );
94              
95             has 'debug' => (
96             is => 'rw',
97             isa => 'Bool',
98             default => 0,
99             );
100              
101             has 'multiple' => (
102             is => 'ro',
103             isa => 'Bool',
104             default => 0,
105             writer => '_set_multiple',
106             );
107              
108             has 'db_opts' => (
109             is => 'ro',
110             isa => 'HashRef',
111             default => sub {{}},
112             );
113              
114             has 'no_relay' => (
115             is => 'rw',
116             isa => 'Bool',
117             default => 0,
118             trigger => sub {
119             my( $self, $new, $old ) = @_;
120             return if ! $self->_has_easydbi;
121             $self->yield( '_process_queue' ) if ! $new;
122             },
123             );
124              
125             has 'no_curl' => (
126             is => 'ro',
127             isa => 'Bool',
128             default => 0,
129             );
130              
131             has 'submissions' => (
132             is => 'rw',
133             isa => 'Int',
134             default => 10,
135             );
136              
137             has '_uuid' => (
138             is => 'ro',
139             isa => 'Data::UUID',
140             lazy_build => 1,
141             init_arg => undef,
142             );
143              
144             has '_easydbi' => (
145             is => 'ro',
146             isa => 'POE::Component::EasyDBI',
147             lazy_build => 1,
148             init_arg => undef,
149             );
150              
151             has _http_alias => (
152             is => 'ro',
153             isa => 'Str',
154             init_arg => undef,
155             writer => '_set_http_alias',
156             );
157              
158             has _resolver => (
159             is => 'ro',
160             isa => 'POE::Component::Resolver',
161             init_arg => undef,
162             writer => '_set_resolver',
163             );
164              
165             has '_processing' => (
166             is => 'ro',
167             isa => 'HashRef',
168             default => sub {{}},
169             );
170              
171             sub _build__easydbi {
172 1     1   3 my $self = shift;
173 1 50       51 POE::Component::EasyDBI->new(
174             alias => '',
175             dsn => $self->dsn,
176             username => $self->username,
177             password => $self->password,
178             ( _HASH( $self->db_opts ) ? ( options => $self->db_opts ) : () ),
179             );
180             }
181              
182             sub _build__uuid {
183 0     0   0 Data::UUID->new();
184             }
185              
186             sub spawn {
187 1     1 1 43 shift->new(@_);
188             }
189              
190             sub START {
191 1     1 0 428 my ($kernel,$self) = @_[KERNEL,OBJECT];
192 1         5 $self->_build_table;
193 1         171 $kernel->yield( 'do_vacuum', 'process' );
194 1 50 33     243 if ( !$self->no_curl && can_load( modules => { 'POE::Component::Curl::Multi' => '0.08' } ) ) {
    50          
195 0         0 $self->_set_multiple( 0 );
196 0         0 $self->_set_http_alias( join '-', __PACKAGE__, $self->get_session_id );
197 0         0 POE::Component::Curl::Multi->spawn(
198             Alias => $self->_http_alias,
199             FollowRedirects => 2,
200             );
201             }
202             elsif ( $self->multiple ) {
203 0         0 $self->_set_resolver( BINGOS::POE::Component::Resolver->new() );
204             }
205             else {
206 1         92 $self->_set_http_alias( join '-', __PACKAGE__, $self->get_session_id );
207 1         61 POE::Component::Client::HTTP->spawn(
208             Alias => $self->_http_alias,
209             FollowRedirects => 2,
210             );
211             }
212 1 50       3217 $kernel->yield( '_process_queue' ) if ! $self->no_relay;
213 1         113 return;
214             }
215              
216              
217             sub _build_table {
218 1     1   3 my $self = shift;
219              
220 1         46 $self->_easydbi;
221              
222 1 50       86 if ( $self->dsn =~ /^dbi\:SQLite/i ) {
223 1         97 $self->_easydbi->do(
224             sql => 'PRAGMA synchronous = OFF',
225             event => '_generic_db_result',
226             _ts => $self->_time,
227             );
228             }
229              
230             $self->_easydbi->do(
231             sql => $sql->{create},
232 1         349 event => '_generic_db_result',
233             _ts => $self->_time,
234             );
235              
236 1         172 foreach my $idx ( @{ $sql->{addidx} } ) {
  1         29  
237 2         232 $self->_easydbi->do(
238             sql => $idx,
239             event => '_generic_db_result',
240             _ts => $self->_time,
241             );
242             }
243             }
244              
245             event 'do_vacuum' => sub {
246 1     1   2443 my ($kernel,$self,$process) = @_[KERNEL,OBJECT,ARG0];
247 1 50       51 $self->_easydbi->do(
248             sql => 'VACUUM',
249             event => '_generic_db_result',
250             _ts => $self->_time,
251             ( $process ? ( _process => 1 ) : () ),
252             );
253              
254 1         190 $kernel->delay( 'do_vacuum' => DELAY * 60 );
255 1         229 return;
256             };
257              
258             event 'shutdown' => sub {
259 1     1   764 my ($kernel,$self) = @_[KERNEL,OBJECT];
260 1         7 $kernel->alarm_remove_all();
261 1         179 $kernel->post( $self->_easydbi->ID, 'shutdown' );
262 1         213 $kernel->post(
263             $self->_http_alias,
264             'shutdown',
265             );
266 1 50 33     172 $self->_resolver->_really_shutdown
267             if $self->multiple && $self->_resolver;
268 1         5 return;
269             };
270              
271             event '_generic_db_result' => sub {
272 5     5   8954465 my ($kernel,$self,$result) = @_[KERNEL,OBJECT,ARG0];
273 5         266 $result->{dsn} = $self->dsn;
274 5 50       28 if ( $result->{error} ) {
275 0 0       0 warn "DB error (" . ( $self->_time - $result->{_ts} ) . "s): " . JSON->new->pretty(1)->encode( $result ) . "\n" if $self->debug;
276             }
277 5 100       19 $kernel->yield( '_process_queue' ) if $result->{_process};
278 5         112 return;
279             };
280              
281             event 'submit' => sub {
282 0     0   0 my ($kernel,$self,$fact) = @_[KERNEL,OBJECT,ARG0];
283 0 0 0     0 return unless $fact and $fact->isa('Metabase::Fact');
284 0         0 my $timestamp = $self->_time;
285 0         0 $kernel->yield( '_dispatch_event', 'enqueued', $fact );
286             $self->_easydbi->do(
287             sql => $sql->{insert},
288 0 0       0 event => '_generic_db_result',
289             placeholders => [ $self->_uuid->create_b64(), $timestamp, 0, $self->_encode_fact( $fact ) ],
290             ( $self->no_relay ? () : ( _process => 1 ) ),
291             _ts => $timestamp,
292             );
293 0         0 return;
294             };
295              
296             event '_process_queue' => sub {
297 2     2   624 my ($kernel,$self) = @_[KERNEL,OBJECT];
298 2 50       116 return if $self->no_relay;
299             # Processing event
300 2         21 $kernel->delay( '_process_queue', DELAY );
301             $self->_easydbi->arrayhash(
302 2 50       504 sql => $sql->{queue} . ( $self->multiple ? $self->submissions : 1 ),
303             event => '_queue_db_result',
304             _ts => $self->_time,
305             );
306 2         333 return;
307             };
308              
309             event '_queue_db_result' => sub {
310 2     2   3049 my ($kernel,$self,$result) = @_[KERNEL,OBJECT,ARG0];
311 2 50 33     21 if ( $result->{error} and $self->debug ) {
312 0         0 warn $result->{error}, "\n";
313 0         0 return;
314             }
315 2         6 foreach my $row ( @{ $result->{result} } ) {
  2         9  
316             # Have we seen this report before?
317 0 0       0 if ( exists $self->_processing->{ $row->{id} } ) {
318 0         0 next;
319             } else {
320 0         0 $self->_processing->{ $row->{id} }++;
321             }
322              
323             # Submit event ?
324 0         0 my $report = $self->_decode_fact( $row->{data} );
325             POE::Component::Metabase::Client::Submit->submit(
326             event => '_submit_status',
327             profile => $self->profile,
328             secret => $self->secret,
329             fact => $report,
330             uri => $self->uri->as_string,
331 0 0       0 context => [ $row->{id}, $row->{attempts}, $self->_time ],
332             ( $self->multiple ? ( resolver => $self->_resolver ) : ( http_alias => $self->_http_alias ) ),
333             );
334              
335             }
336 2         16 return;
337             };
338              
339             event '_dispatch_event' => sub {
340 0     0   0 my ($kernel,$self,@args) = @_[KERNEL,OBJECT,ARG0..$#_];
341 0 0 0     0 return unless $self->session and $self->event;
342 0         0 $kernel->post( $self->session, $self->event, @args );
343 0         0 return;
344             };
345              
346             event '_clear_processing' => sub {
347 0     0   0 my($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
348             delete $self->_processing->{ $id } if exists
349 0 0       0 $self->_processing->{ $id };
350              
351 0         0 return;
352             };
353              
354             event '_submit_status' => sub {
355 0     0   0 my ($kernel,$self,$res) = @_[KERNEL,OBJECT,ARG0];
356 0         0 my ($id,$attempts,$starttime) = @{ $res->{context} };
  0         0  
357 0         0 my $timestamp = $self->_time;
358 0         0 $kernel->delay_set( '_clear_processing' => DELAY, $id );
359 0 0       0 if ( $res->{success} ) {
360             # Success event
361 0 0       0 warn "Submit '$id' (" . ( $timestamp - $starttime ) . "s) success\n" if $self->debug;
362             $self->_easydbi->do(
363             sql => $sql->{delete},
364 0 0       0 event => '_generic_db_result',
365             placeholders => [ $id ],
366             ( $self->no_relay ? () : ( _process => 1 ) ),
367             _ts => $timestamp,
368             );
369             }
370             else {
371 0 0       0 warn "Submit '$id' (" . ( $timestamp - $starttime ) . "s) error: $res->{error}\n" . ( defined $res->{content} ? "$res->{content}\n" : '' ) if $self->debug;
    0          
372 0 0 0     0 if ( defined $res->{content} and $res->{content} =~ /GUID conflicts with an existing object/i ) {
373             # Duplicate event
374             $self->_easydbi->do(
375             sql => $sql->{delete},
376 0         0 event => '_generic_db_result',
377             placeholders => [ $id ],
378             _ts => $timestamp,
379             );
380             }
381             else {
382             # Re-enqueue event
383 0         0 $attempts++;
384             $self->_easydbi->do(
385             sql => $sql->{update},
386 0         0 event => '_generic_db_result',
387             placeholders => [ $attempts, $id ],
388             _ts => $timestamp,
389             );
390             }
391             }
392 0         0 return;
393             };
394              
395             sub _time {
396 7     7   150 return Time::HiRes::time;
397             }
398              
399             sub _encode_fact {
400 0     0     my $self = shift;
401 0           return JSON->new->encode( shift->as_struct );
402             }
403              
404             sub _decode_fact {
405 0     0     my $self = shift;
406 0           return CPAN::Testers::Report->from_struct( JSON->new->decode( shift ) );
407             }
408              
409 1     1   2453 no MooseX::POE;
  1         2  
  1         9  
410              
411             __PACKAGE__->meta->make_immutable;
412              
413             package
414             BINGOS::POE::Component::Resolver;
415              
416 1     1   307 use base qw[POE::Component::Resolver];
  1         3  
  1         273  
417              
418             #noop
419             sub shutdown {
420 0     0     return;
421             }
422              
423             sub _really_shutdown {
424 0     0     shift->SUPER::shutdown;
425             }
426              
427             1;
428              
429             __END__
430              
431             =pod
432              
433             =encoding UTF-8
434              
435             =head1 NAME
436              
437             POE::Component::Metabase::Relay::Server::Queue - Submission queue for the metabase relay
438              
439             =head1 VERSION
440              
441             version 0.38
442              
443             =head1 DESCRIPTION
444              
445             POE::Component::Metabase::Relay::Server::Queue is the submission queue for L<POE::Component::Metabase::Relay::Server>.
446              
447             It is based on L<POE::Component::EasyDBI> database and uses L<POE::Component::Metabase::Client::Submit> to send
448             reports to a L<Metabase> server.
449              
450             =for Pod::Coverage DELAY
451              
452             =for Pod::Coverage START
453              
454             =head1 CONSTRUCTOR
455              
456             =over
457              
458             =item C<spawn>
459              
460             Spawns a new component session and creates a SQLite database if it doesn't already exist.
461              
462             Takes a number of mandatory parameters:
463              
464             'dsn', a DBI DSN to use to store the submission queue;
465             'profile', a Metabase::User::Profile object;
466             'secret', a Metabase::User::Secret object;
467             'uri', the uri of metabase server to submit to;
468              
469             and a number of optional parameters:
470              
471             'username', a DSN username if required;
472             'password', a DSN password if required;
473             'db_opts', a hashref of DBD options that is passed to POE::Component::EasyDBI;
474             'debug', enable debugging information;
475             'multiple', set to true to enable the Queue to use multiple PoCo-Client-HTTPs, default 0;
476             'no_relay', set to true to disable report submissions to the Metabase, default 0;
477             'no_curl', set to true to disable automatic usage of POE::Component::Curl::Multi, default 0;
478             'submissions', an int to control the number of parallel http clients ( used only if multiple == 1 ), default 10;
479              
480             =back
481              
482             =head1 INPUT EVENTS
483              
484             =over
485              
486             =item C<submit>
487              
488             Takes one parameter a L<Metabase::Fact> to submit.
489              
490             =item C<shutdown>
491              
492             Terminates the component.
493              
494             =back
495              
496             =head1 SEE ALSO
497              
498             L<Metabase>
499              
500             L<Metabase::User::Profile>
501              
502             L<Metabase::User::Secret>
503              
504             L<POE::Component::Metabase::Client::Submit>
505              
506             L<POE::Component::Metabase::Relay::Server>
507              
508             L<POE::Component::EasyDBI>
509              
510             =head1 AUTHOR
511              
512             Chris Williams <chris@bingosnet.co.uk>
513              
514             =head1 COPYRIGHT AND LICENSE
515              
516             This software is copyright (c) 2019 by Chris Williams.
517              
518             This is free software; you can redistribute it and/or modify it under
519             the same terms as the Perl 5 programming language system itself.
520              
521             =cut