File Coverage

blib/lib/POE/Component/Metabase/Relay/Server/Queue.pm
Criterion Covered Total %
statement 46 48 95.8
branch n/a
condition n/a
subroutine 16 16 100.0
pod n/a
total 62 64 96.8


line stmt bran cond sub pod time code
1             package POE::Component::Metabase::Relay::Server::Queue;
2             $POE::Component::Metabase::Relay::Server::Queue::VERSION = '0.34';
3             # ABSTRACT: Submission queue for the metabase relay
4              
5 1     1   4 use strict;
  1         1  
  1         28  
6 1     1   4 use warnings;
  1         1  
  1         22  
7 1     1   4 use POE qw[Component::EasyDBI];
  1         0  
  1         4  
8 1     1   52656 use POE::Component::Client::HTTP;
  1         77824  
  1         25  
9 1     1   6 use POE::Component::Resolver;
  1         2  
  1         39  
10 1     1   494 use POE::Component::Metabase::Client::Submit;
  1         5496  
  1         26  
11 1     1   7 use CPAN::Testers::Report ();
  1         2  
  1         12  
12 1     1   418 use Metabase::User::Profile ();
  1         2023  
  1         19  
13 1     1   456 use Metabase::User::Secret ();
  1         192  
  1         20  
14 1     1   512 use Module::Load::Conditional qw[can_load];
  1         13642  
  1         58  
15 1     1   6 use JSON ();
  1         1  
  1         16  
16 1     1   4 use Params::Util qw[_HASH];
  1         1  
  1         36  
17 1     1   5 use Time::HiRes ();
  1         1  
  1         12  
18 1     1   3 use Data::UUID;
  1         1  
  1         53  
19              
20 1     1   4 use constant DELAY => 150;
  1         1  
  1         68  
21              
22              
23             my $sql = {
24             'create' => 'CREATE TABLE IF NOT EXISTS queue ( id varchar(150), submitted varchar(32), attempts INTEGER, data BLOB )',
25             'insert' => 'INSERT INTO queue values(?,?,?,?)',
26             'delete' => 'DELETE FROM queue where id = ?',
27             'queue' => 'SELECT * FROM queue ORDER BY attempts ASC, submitted ASC limit ', # the limit is appended via "submissions"
28             'update' => 'UPDATE queue SET attempts = ? WHERE id = ?',
29             'addidx' => [
30             'CREATE INDEX IF NOT EXISTS queue_id ON queue ( id )',
31             'CREATE INDEX IF NOT EXISTS queue_att_sub ON queue ( attempts, submitted )',
32             ],
33             };
34              
35 1     1   214 use MooseX::POE;
  0            
  0            
36             use MooseX::Types::URI qw[Uri];
37              
38             {
39             use Moose::Util::TypeConstraints;
40              
41             my $ps = subtype as 'Str', where { $poe_kernel->alias_resolve( $_ ) };
42             coerce $ps, from 'Str', via { $poe_kernel->alias_resolve( $_ )->ID };
43              
44             has 'session' => (
45             is => 'ro',
46             isa => $ps,
47             coerce => 1,
48             );
49              
50             no Moose::Util::TypeConstraints;
51             }
52              
53             has 'event' => (
54             is => 'ro',
55             isa => 'Str',
56             );
57              
58             has 'profile' => (
59             is => 'ro',
60             isa => 'Metabase::User::Profile',
61             required => 1,
62             );
63              
64             has 'secret' => (
65             is => 'ro',
66             isa => 'Metabase::User::Secret',
67             required => 1,
68             );
69              
70             has 'dsn' => (
71             is => 'ro',
72             isa => 'Str',
73             required => 1,
74             );
75              
76             has 'uri' => (
77             is => 'ro',
78             isa => Uri,
79             coerce => 1,
80             required => 1,
81             );
82              
83             has 'username' => (
84             is => 'ro',
85             isa => 'Str',
86             default => '',
87             );
88              
89             has 'password' => (
90             is => 'ro',
91             isa => 'Str',
92             default => '',
93             );
94              
95             has 'debug' => (
96             is => 'rw',
97             isa => 'Bool',
98             default => 0,
99             );
100              
101             has 'multiple' => (
102             is => 'ro',
103             isa => 'Bool',
104             default => 0,
105             writer => '_set_multiple',
106             );
107              
108             has 'db_opts' => (
109             is => 'ro',
110             isa => 'HashRef',
111             default => sub {{}},
112             );
113              
114             has 'no_relay' => (
115             is => 'rw',
116             isa => 'Bool',
117             default => 0,
118             trigger => sub {
119             my( $self, $new, $old ) = @_;
120             return if ! $self->_has_easydbi;
121             $self->yield( '_process_queue' ) if ! $new;
122             },
123             );
124              
125             has 'no_curl' => (
126             is => 'ro',
127             isa => 'Bool',
128             default => 0,
129             );
130              
131             has 'submissions' => (
132             is => 'rw',
133             isa => 'Int',
134             default => 10,
135             );
136              
137             has '_uuid' => (
138             is => 'ro',
139             isa => 'Data::UUID',
140             lazy_build => 1,
141             init_arg => undef,
142             );
143              
144             has '_easydbi' => (
145             is => 'ro',
146             isa => 'POE::Component::EasyDBI',
147             lazy_build => 1,
148             init_arg => undef,
149             );
150              
151             has _http_alias => (
152             is => 'ro',
153             isa => 'Str',
154             init_arg => undef,
155             writer => '_set_http_alias',
156             );
157              
158             has _resolver => (
159             is => 'ro',
160             isa => 'POE::Component::Resolver',
161             init_arg => undef,
162             writer => '_set_resolver',
163             );
164              
165             has '_processing' => (
166             is => 'ro',
167             isa => 'HashRef',
168             default => sub {{}},
169             );
170              
171             sub _build__easydbi {
172             my $self = shift;
173             POE::Component::EasyDBI->new(
174             alias => '',
175             dsn => $self->dsn,
176             username => $self->username,
177             password => $self->password,
178             ( _HASH( $self->db_opts ) ? ( options => $self->db_opts ) : () ),
179             );
180             }
181              
182             sub _build__uuid {
183             Data::UUID->new();
184             }
185              
186             sub spawn {
187             shift->new(@_);
188             }
189              
190             sub START {
191             my ($kernel,$self) = @_[KERNEL,OBJECT];
192             $self->_build_table;
193             $kernel->yield( 'do_vacuum', 'process' );
194             if ( !$self->no_curl && can_load( modules => { 'POE::Component::Curl::Multi' => '0.08' } ) ) {
195             $self->_set_multiple( 0 );
196             $self->_set_http_alias( join '-', __PACKAGE__, $self->get_session_id );
197             POE::Component::Curl::Multi->spawn(
198             Alias => $self->_http_alias,
199             FollowRedirects => 2,
200             );
201             }
202             elsif ( $self->multiple ) {
203             $self->_set_resolver( BINGOS::POE::Component::Resolver->new() );
204             }
205             else {
206             $self->_set_http_alias( join '-', __PACKAGE__, $self->get_session_id );
207             POE::Component::Client::HTTP->spawn(
208             Alias => $self->_http_alias,
209             FollowRedirects => 2,
210             );
211             }
212             $kernel->yield( '_process_queue' ) if ! $self->no_relay;
213             return;
214             }
215              
216              
217             sub _build_table {
218             my $self = shift;
219              
220             $self->_easydbi;
221              
222             if ( $self->dsn =~ /^dbi\:SQLite/i ) {
223             $self->_easydbi->do(
224             sql => 'PRAGMA synchronous = OFF',
225             event => '_generic_db_result',
226             _ts => $self->_time,
227             );
228             }
229              
230             $self->_easydbi->do(
231             sql => $sql->{create},
232             event => '_generic_db_result',
233             _ts => $self->_time,
234             );
235              
236             foreach my $idx ( @{ $sql->{addidx} } ) {
237             $self->_easydbi->do(
238             sql => $idx,
239             event => '_generic_db_result',
240             _ts => $self->_time,
241             );
242             }
243             }
244              
245             event 'do_vacuum' => sub {
246             my ($kernel,$self,$process) = @_[KERNEL,OBJECT,ARG0];
247             $self->_easydbi->do(
248             sql => 'VACUUM',
249             event => '_generic_db_result',
250             _ts => $self->_time,
251             ( $process ? ( _process => 1 ) : () ),
252             );
253              
254             $kernel->delay( 'do_vacuum' => DELAY * 60 );
255             return;
256             };
257              
258             event 'shutdown' => sub {
259             my ($kernel,$self) = @_[KERNEL,OBJECT];
260             $kernel->alarm_remove_all();
261             $kernel->post( $self->_easydbi->ID, 'shutdown' );
262             $kernel->post(
263             $self->_http_alias,
264             'shutdown',
265             );
266             $self->_resolver->_really_shutdown
267             if $self->multiple && $self->_resolver;
268             return;
269             };
270              
271             event '_generic_db_result' => sub {
272             my ($kernel,$self,$result) = @_[KERNEL,OBJECT,ARG0];
273             if ( $result->{error} ) {
274             warn "DB error (" . ( $self->_time - $result->{_ts} ) . "s): " . JSON->new->pretty(1)->encode( $result ) . "\n" if $self->debug;
275             }
276             $kernel->yield( '_process_queue' ) if $result->{_process};
277             return;
278             };
279              
280             event 'submit' => sub {
281             my ($kernel,$self,$fact) = @_[KERNEL,OBJECT,ARG0];
282             return unless $fact and $fact->isa('Metabase::Fact');
283             my $timestamp = $self->_time;
284             $kernel->yield( '_dispatch_event', 'enqueued', $fact );
285             $self->_easydbi->do(
286             sql => $sql->{insert},
287             event => '_generic_db_result',
288             placeholders => [ $self->_uuid->create_b64(), $timestamp, 0, $self->_encode_fact( $fact ) ],
289             ( $self->no_relay ? () : ( _process => 1 ) ),
290             _ts => $timestamp,
291             );
292             return;
293             };
294              
295             event '_process_queue' => sub {
296             my ($kernel,$self) = @_[KERNEL,OBJECT];
297             return if $self->no_relay;
298             # Processing event
299             $kernel->delay( '_process_queue', DELAY );
300             $self->_easydbi->arrayhash(
301             sql => $sql->{queue} . ( $self->multiple ? $self->submissions : 1 ),
302             event => '_queue_db_result',
303             _ts => $self->_time,
304             );
305             return;
306             };
307              
308             event '_queue_db_result' => sub {
309             my ($kernel,$self,$result) = @_[KERNEL,OBJECT,ARG0];
310             if ( $result->{error} and $self->debug ) {
311             warn $result->{error}, "\n";
312             return;
313             }
314             foreach my $row ( @{ $result->{result} } ) {
315             # Have we seen this report before?
316             if ( exists $self->_processing->{ $row->{id} } ) {
317             next;
318             } else {
319             $self->_processing->{ $row->{id} }++;
320             }
321              
322             # Submit event ?
323             my $report = $self->_decode_fact( $row->{data} );
324             POE::Component::Metabase::Client::Submit->submit(
325             event => '_submit_status',
326             profile => $self->profile,
327             secret => $self->secret,
328             fact => $report,
329             uri => $self->uri->as_string,
330             context => [ $row->{id}, $row->{attempts}, $self->_time ],
331             ( $self->multiple ? ( resolver => $self->_resolver ) : ( http_alias => $self->_http_alias ) ),
332             );
333              
334             }
335             return;
336             };
337              
338             event '_dispatch_event' => sub {
339             my ($kernel,$self,@args) = @_[KERNEL,OBJECT,ARG0..$#_];
340             return unless $self->session and $self->event;
341             $kernel->post( $self->session, $self->event, @args );
342             return;
343             };
344              
345             event '_clear_processing' => sub {
346             my($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
347             delete $self->_processing->{ $id } if exists
348             $self->_processing->{ $id };
349              
350             return;
351             };
352              
353             event '_submit_status' => sub {
354             my ($kernel,$self,$res) = @_[KERNEL,OBJECT,ARG0];
355             my ($id,$attempts,$starttime) = @{ $res->{context} };
356             my $timestamp = $self->_time;
357             $kernel->delay_set( '_clear_processing' => DELAY, $id );
358             if ( $res->{success} ) {
359             # Success event
360             warn "Submit '$id' (" . ( $timestamp - $starttime ) . "s) success\n" if $self->debug;
361             $self->_easydbi->do(
362             sql => $sql->{delete},
363             event => '_generic_db_result',
364             placeholders => [ $id ],
365             ( $self->no_relay ? () : ( _process => 1 ) ),
366             _ts => $timestamp,
367             );
368             }
369             else {
370             warn "Submit '$id' (" . ( $timestamp - $starttime ) . "s) error: $res->{error}\n" . ( defined $res->{content} ? "$res->{content}\n" : '' ) if $self->debug;
371             if ( defined $res->{content} and $res->{content} =~ /GUID conflicts with an existing object/i ) {
372             # Duplicate event
373             $self->_easydbi->do(
374             sql => $sql->{delete},
375             event => '_generic_db_result',
376             placeholders => [ $id ],
377             _ts => $timestamp,
378             );
379             }
380             else {
381             # Re-enqueue event
382             $attempts++;
383             $self->_easydbi->do(
384             sql => $sql->{update},
385             event => '_generic_db_result',
386             placeholders => [ $attempts, $id ],
387             _ts => $timestamp,
388             );
389             }
390             }
391             return;
392             };
393              
394             sub _time {
395             return Time::HiRes::time;
396             }
397              
398             sub _encode_fact {
399             my $self = shift;
400             return JSON->new->encode( shift->as_struct );
401             }
402              
403             sub _decode_fact {
404             my $self = shift;
405             return CPAN::Testers::Report->from_struct( JSON->new->decode( shift ) );
406             }
407              
408             no MooseX::POE;
409              
410             __PACKAGE__->meta->make_immutable;
411              
412             package
413             BINGOS::POE::Component::Resolver;
414              
415             use base qw[POE::Component::Resolver];
416              
417             #noop
418             sub shutdown {
419             return;
420             }
421              
422             sub _really_shutdown {
423             shift->SUPER::shutdown;
424             }
425              
426             1;
427              
428             __END__
429              
430             =pod
431              
432             =encoding UTF-8
433              
434             =head1 NAME
435              
436             POE::Component::Metabase::Relay::Server::Queue - Submission queue for the metabase relay
437              
438             =head1 VERSION
439              
440             version 0.34
441              
442             =head1 DESCRIPTION
443              
444             POE::Component::Metabase::Relay::Server::Queue is the submission queue for L<POE::Component::Metabase::Relay::Server>.
445              
446             It is based on L<POE::Component::EasyDBI> database and uses L<POE::Component::Metabase::Client::Submit> to send
447             reports to a L<Metabase> server.
448              
449             =for Pod::Coverage DELAY
450              
451             =for Pod::Coverage START
452              
453             =head1 CONSTRUCTOR
454              
455             =over
456              
457             =item C<spawn>
458              
459             Spawns a new component session and creates a SQLite database if it doesn't already exist.
460              
461             Takes a number of mandatory parameters:
462              
463             'dsn', a DBI DSN to use to store the submission queue;
464             'profile', a Metabase::User::Profile object;
465             'secret', a Metabase::User::Secret object;
466             'uri', the uri of metabase server to submit to;
467              
468             and a number of optional parameters:
469              
470             'username', a DSN username if required;
471             'password', a DSN password if required;
472             'db_opts', a hashref of DBD options that is passed to POE::Component::EasyDBI;
473             'debug', enable debugging information;
474             'multiple', set to true to enable the Queue to use multiple PoCo-Client-HTTPs, default 0;
475             'no_relay', set to true to disable report submissions to the Metabase, default 0;
476             'no_curl', set to true to disable automatic usage of POE::Component::Curl::Multi, default 0;
477             'submissions', an int to control the number of parallel http clients ( used only if multiple == 1 ), default 10;
478              
479             =back
480              
481             =head1 INPUT EVENTS
482              
483             =over
484              
485             =item C<submit>
486              
487             Takes one parameter a L<Metabase::Fact> to submit.
488              
489             =item C<shutdown>
490              
491             Terminates the component.
492              
493             =back
494              
495             =head1 SEE ALSO
496              
497             L<Metabase>
498              
499             L<Metabase::User::Profile>
500              
501             L<Metabase::User::Secret>
502              
503             L<POE::Component::Metabase::Client::Submit>
504              
505             L<POE::Component::Metabase::Relay::Server>
506              
507             L<POE::Component::EasyDBI>
508              
509             =head1 AUTHOR
510              
511             Chris Williams <chris@bingosnet.co.uk>
512              
513             =head1 COPYRIGHT AND LICENSE
514              
515             This software is copyright (c) 2014 by Chris Williams.
516              
517             This is free software; you can redistribute it and/or modify it under
518             the same terms as the Perl 5 programming language system itself.
519              
520             =cut