File Coverage

blib/lib/POE/Component/MessageQueue/Statistics.pm
Criterion Covered Total %
statement 7 67 10.4
branch 0 14 0.0
condition n/a
subroutine 3 15 20.0
pod n/a
total 10 96 10.4


line stmt bran cond sub pod time code
1             # $Id$
2             #
3             # Copyright (c) 2007 Daisuke Maki <daisuke@endeworks.jp>
4             # All rights reserved.
5             #
6             # This program is free software: you can redistribute it and/or modify
7             # it under the terms of the GNU General Public License as published by
8             # the Free Software Foundation, either version 2 of the License, or
9             # (at your option) any later version.
10             #
11             # This program is distributed in the hope that it will be useful,
12             # but WITHOUT ANY WARRANTY; without even the implied warranty of
13             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14             # GNU General Public License for more details.
15             #
16             # You should have received a copy of the GNU General Public License
17             # along with this program. If not, see <http://www.gnu.org/licenses/>.
18              
19             package POE::Component::MessageQueue::Statistics;
20 1     1   1484 use strict;
  1         3  
  1         39  
21 1     1   5 use warnings;
  1         2  
  1         1082  
22              
23             sub new
24             {
25 0     0     my $class = shift;
26 0           my $self = bless {
27             statistics => {
28             ID => sprintf(
29             "POE::Component::MessageQueue version %s (PID: $$)",
30             # hide from PAUSE
31             eval join('::', '$POE', 'Component', 'MessageQueue', 'VERSION')
32             ),
33             total_stored => 0,
34             total_sent => 0,
35             subscriptions => 0,
36             queues => {},
37             },
38             store_info => {},
39             publishers => [],
40             }, $class;
41              
42 0           $self;
43             }
44              
45             sub register
46             {
47 0     0     my ($self, $mq) = @_;
48             $mq->register_event( $_, $self )
49 0           for qw(store dispatch remove recv subscribe unsubscribe);
50             }
51              
52             sub add_publisher {
53 0     0     my ($self, $pub) = @_;
54 0           push(@{$self->{publishers}}, $pub);
  0            
55             }
56              
57             sub get_queue
58             {
59 0     0     my ($self, $name) = @_;
60              
61 0           my $queue = $self->{statistics}{queues}{$name};
62              
63 0 0         unless ($queue)
64             {
65 0           $queue = $self->{statistics}{queues}{$name} = {};
66 0           $queue->{$_} = 0 foreach qw(
67             sent stored
68             total_stored total_sent
69             total_recvd avg_secs_stored avg_size_recvd
70             );
71             }
72              
73 0           return $queue;
74             }
75              
76             sub get_topic
77             {
78 0     0     my ($self, $name) = @_;
79 0           my $topic = $self->{statistics}{topics}{$name};
80              
81 0 0         unless ($topic)
82             {
83 0           $topic = $self->{statistics}{topics}{$name} = {};
84 0           $topic->{$_} = 0 foreach qw(sent total_sent total_recvd avg_size_recvd);
85             }
86              
87 0           return $topic;
88             }
89              
90             sub shutdown
91             {
92 0     0     my $self = shift;
93 0           foreach my $pub (@{$self->{publishers}})
  0            
94             {
95 0           $pub->shutdown();
96             }
97             }
98              
99             sub notify
100             {
101 0     0     my ($self, $event, $data) = @_;
102 0 0         if(my $method = $self->can("notify_$event"))
103             {
104 0           $method->($self, $data);
105             }
106             else
107             {
108 0           die "Tried to notify $event, which has no handler.";
109             }
110             }
111              
112             sub notify_store
113             {
114 0     0     my ($self, $message) = @_;
115 0           my $qname = $message->destination;
116              
117 0 0         return unless $qname =~ m(/queue/(.*));
118 0           $qname = $1;
119              
120 0           $self->{store_info}->{$message->id} = {
121             qname => $qname,
122             timestamp => time(),
123             };
124              
125 0           my $global = $self->{statistics};
126 0           $global->{total_stored}++;
127              
128 0           my $stats = $self->get_queue($qname);
129 0           $stats->{stored}++;
130 0           $stats->{total_stored}++;
131 0           $stats->{last_stored} = scalar localtime();
132             }
133              
134             sub reaverage {
135 0     0     my ($total, $average, $size) = @_;
136 0 0         return 0 if ($total <= 0);
137 0           return ($average * ($total - 1) + $size) / $total;
138             }
139              
140             sub get_destination
141             {
142 0     0     my ($self, $data) = @_;
143 0           my $d = $data->{destination};
144 0 0         if ($d->name =~ m{/.*/(.*)})
145             {
146 0 0         return $d->isa('POE::Component::MessageQueue::Queue') ?
147             $self->get_queue($1) :
148             $self->get_topic($1) ;
149             }
150 0           return;
151             }
152              
153             sub notify_recv
154             {
155 0     0     my ($self, $data) = @_;
156 0           my $stats = $self->get_destination($data);
157 0           $stats->{total_recvd}++;
158              
159             # recalc the average
160 0           $stats->{avg_size_recvd} = reaverage(
161             $stats->{total_recvd},
162             $stats->{avg_size_recvd},
163             $data->{message}->size,
164             );
165             }
166              
167             sub notify_dispatch
168             {
169 0     0     my ($self, $data) = @_;
170              
171 0           my $global = $self->{statistics};
172 0           $global->{total_sent}++;
173              
174 0           my $stats = $self->get_destination($data);
175 0           $stats->{total_sent}++;
176 0           $stats->{sent}++;
177 0           $stats->{last_sent} = scalar localtime();
178             }
179              
180             sub notify_remove {
181             my ($self, $id) = @_;
182 1     1   507 use YAML;
  0            
  0            
183              
184             if (my $store_info = delete $self->{store_info}->{$id})
185             {
186             my $stats = $self->get_queue($store_info->{qname});
187             $stats->{stored}--;
188             $stats->{avg_secs_stored} = reaverage(
189             $stats->{total_stored},
190             $stats->{avg_secs_stored},
191             (time() - $store_info->{timestamp})
192             );
193             }
194             }
195              
196             sub notify_subscribe
197             {
198             my ($self, $data) = @_;
199              
200             # Global
201             my $h = $self->{statistics};
202             $h->{subscriptions}++;
203              
204             # Per-queue
205             my $stats = $self->get_destination($data);
206             $stats->{subscriptions}++;
207             }
208              
209             sub notify_unsubscribe
210             {
211             my ($self, $data) = @_;
212              
213             # Global
214             my $h = $self->{statistics};
215             $h->{subscriptions}--;
216              
217             # Per-queue
218             my $stats = $self->get_destination($data);
219             $stats->{subscriptions}--;
220             }
221              
222             sub notify_pump {}
223              
224             1;
225              
226             __END__
227              
228             =head1 NAME
229              
230             POE::Component::MessageQueue::Statistics - Gather MQ Usage Statistics
231              
232             =head1 SYNOPSIS
233              
234             my $statistics = POE::Component::MessageQueue::Statistics->new();
235             $mq->register( $statistics );
236              
237             =head1 DESCRIPTION
238              
239             POE::Component::MessageQueue::Statistics is a simple observer that receives
240             events from the main POE::Component::MessageQueue object to collect usage
241             statistics.
242              
243             By itself it will only *gather* statistics, and will not output anything.
244              
245             To enable outputs, you need to create a separate Publish object:
246              
247             POE::Component::MessageQueue::Statistics::Publish::YAML->new(
248             output => \*STDERR,
249             statistics => $statistics
250             );
251              
252             Please refer to L<POE::Component::MessageQueue::Statistics::Publish> for details
253             on how to enable output
254              
255             =head1 SEE ALSO
256              
257             L<POE::Component::MessageQueue::Statistics::Publish>,
258             L<POE::Component::MessageQueue::Statistics::Publish::YAML>
259              
260             =head1 AUTHOR
261              
262             Daisuke Maki E<lt>daisuke@endeworks.jpE<gt>
263              
264             =cut