File Coverage

blib/lib/Tatsumaki/MessageQueue.pm
Criterion Covered Total %
statement 70 89 78.6
branch 12 20 60.0
condition 5 8 62.5
subroutine 16 20 80.0
pod 3 8 37.5
total 106 145 73.1


line stmt bran cond sub pod time code
1             package Tatsumaki::MessageQueue;
2 3     3   11358 use strict;
  3         6  
  3         141  
3              
4 3     3   3777 use AnyEvent;
  3         14059  
  3         85  
5 3     3   3282 use Any::Moose;
  3         130033  
  3         26  
6 3     3   7013 use Try::Tiny;
  3         5319  
  3         188  
7 3     3   22 use Scalar::Util;
  3         8  
  3         158  
8 3     3   3439 use Time::HiRes;
  3         22898  
  3         21  
9 3     3   590 use constant DEBUG => $ENV{TATSUMAKI_DEBUG};
  3         6  
  3         3326  
10              
11             has channel => (is => 'rw', isa => 'Str');
12             has backlog => (is => 'rw', isa => 'ArrayRef', default => sub { [] });
13             has clients => (is => 'rw', isa => 'HashRef', default => sub { +{} });
14              
15             our $BacklogLength = 30; # TODO configurable
16              
17             my %instances;
18              
19             sub channels {
20 0     0 0 0 values %instances;
21             }
22              
23             sub instance {
24 13     13 0 11810 my($class, $name) = @_;
25 13   66     134 $instances{$name} ||= $class->new(channel => $name);
26             }
27              
28             sub backlog_events {
29 7     7 0 11 my $self = shift;
30 7         11 reverse grep defined, @{$self->backlog};
  7         68  
31             }
32              
33             sub append_backlog {
34 4     4 0 11 my($self, @events) = @_;
35 4         8 my @new_backlog = (reverse(@events), @{$self->backlog});
  4         17  
36 4         31 $self->backlog([ splice @new_backlog, 0, $BacklogLength ]);
37             }
38              
39             sub publish {
40 4     4 1 26 my($self, @events) = @_;
41              
42 4         6 for my $client_id (keys %{$self->clients}) {
  4         81  
43 7         88 my $client = $self->clients->{$client_id};
44 7 100       25 if ($client->{cv}->cb) {
45             # currently listening: flush and send the events right away
46 5         36 $self->flush_events($client_id, @events);
47             } else {
48             # between long poll comet: buffer the events
49             # TODO: limit buffer length
50 2         97 warn "Buffering new events for $client_id" if DEBUG;
51 2         3 push @{$client->{buffer}}, @events;
  2         8  
52             }
53             }
54 4         30 $self->append_backlog(@events);
55             }
56              
57             sub flush_events {
58 9     9 0 24 my($self, $client_id, @events) = @_;
59              
60 9 50       65 my $client = $self->clients->{$client_id} or return;
61             try {
62 9     9   332 my $cb = $client->{cv}->cb;
63 9         94 $client->{cv}->send(@events);
64 9         399 $client->{cv} = AE::cv;
65 9         92 $client->{buffer} = [];
66              
67 9 50       30 if ($client->{persistent}) {
68 0         0 $client->{cv}->cb($cb);
69             } else {
70 9         15 undef $client->{longpoll_timer};
71             $client->{reconnect_timer} = AE::timer 30, 0, sub {
72 0         0 Scalar::Util::weaken $self;
73 0         0 warn "Sweep $client_id (no long-poll reconnect)" if DEBUG;
74 0         0 undef $client;
75 0         0 delete $self->clients->{$client_id};
76 9         142 };
77             }
78             } catch {
79 0 0   0   0 /Tatsumaki::Error::ClientDisconnect/ and do {
80 0         0 warn "Client $client_id disconnected" if DEBUG;
81 0         0 undef $client;
82 0         0 delete $self->clients->{$client_id};
83             };
84 9         99 };
85             }
86              
87             sub poll_once {
88 9     9 1 71 my($self, $client_id, $cb, $timeout) = @_;
89              
90 9         15 my $is_new;
91 9   66     73 my $client = $self->clients->{$client_id} ||= do {
92 7         12 $is_new = 1;
93 7         211 + { cv => AE::cv, persistent => 0, buffer => [] };
94             };
95              
96 9 50       5697 if ( $client->{longpoll_timer} ) {
97             # close last connection from the same client_id
98 0         0 $self->flush_events($client_id);
99 0         0 undef $client->{longpoll_timer};
100             }
101 9         18 undef $client->{reconnect_timer};
102              
103 9     9   84 $client->{cv}->cb(sub { $cb->($_[0]->recv) });
  9         101  
104              
105             # reset garbage collection timeout with the long-poll timeout
106             # $timeout = 0 is a valid timeout for interval-polling
107 9 100       343 $timeout = 55 unless defined $timeout;
108             $client->{longpoll_timer} = AE::timer $timeout || 55, 0, sub {
109 1     1   998163 Scalar::Util::weaken $self;
110 1         3 warn "Timing out $client_id long-poll" if DEBUG;
111 1         9 $self->flush_events($client_id);
112 9   50     87 };
113              
114 9 100       25 if ($is_new) {
  2 50       11  
115             # flush backlog for a new client
116 7         27 my @events = $self->backlog_events;
117 7 100       40 $self->flush_events($client_id, @events) if @events;
118             }elsif ( @{ $client->{buffer} } ) {
119             # flush buffer for a long-poll client
120 2         3 $self->flush_events($client_id, @{ $client->{buffer} });
  2         8  
121             }
122             }
123              
124             sub poll {
125 0     0 1   my($self, $client_id, $cb) = @_;
126              
127             # TODO register client info like names and remote host in $client
128 0           my $cv = AE::cv;
129 0     0     $cv->cb(sub { $cb->($_[0]->recv) });
  0            
130 0           my $s = $self->clients->{$client_id} = {
131             cv => $cv, persistent => 1, buffer => [],
132             };
133              
134 0           my @events = $self->backlog_events;
135 0 0         $self->flush_events($client_id, @events) if @events;
136             }
137              
138             1;
139              
140             __END__