File Coverage

blib/lib/NetSDS/App/QueueProcessor.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             #===============================================================================
2             #
3             # FILE: QueueProcessor.pm
4             #
5             # DESCRIPTION: NetSDS queue processing application
6             #
7             # NOTES: ---
8             # AUTHOR: Michael Bochkaryov (Rattler), <misha@rattler.kiev.ua>
9             # COMPANY: Net.Style
10             # CREATED: 10.08.2009 20:57:57 EEST
11             #===============================================================================
12              
13             =head1 NAME
14              
15             NetSDS::App::QueueProcessor - queue processing server framework
16              
17             =head1 SYNOPSIS
18              
19             -----------------------------
20             # Configuration file
21              
22             # Queue server IP and port
23             queue_server = "127.0.0.1:22201"
24              
25             # Pulling queue name
26             queue_name = "myq"
27              
28             # Processing bandwidth (messages per second)
29             bandwidth = 2
30              
31             # Timeout on idle loops
32             idle_timeout = 3
33             -----------------------------
34              
35             QProc->run(conf_file => './qproc.conf');
36              
37             1;
38              
39             package QProc;
40              
41             use Data::Dumper;
42             use base 'NetSDS::App::QueueProcessor';
43              
44             # Message processing logic
45             sub process {
46              
47             my ( $self, $msg ) = @_;
48              
49             # Just dump message structure
50             print Dumper($msg);
51              
52             }
53              
54             1;
55              
56             =head1 DESCRIPTION
57              
58             C<NetSDS::App::QueueProcessor> module implements framework for applications
59             processing messages arriving from MemcacheQ queue server.
60              
61             =cut
62              
63             package NetSDS::App::QueueProcessor;
64              
65 2     2   15584 use 5.8.0;
  2         9  
  2         138  
66 2     2   11 use strict;
  2         4  
  2         77  
67 2     2   10 use warnings;
  2         4  
  2         79  
68              
69 2     2   1974 use Time::HiRes qw(sleep time); # high resolution timer
  2         4958  
  2         11  
70 2     2   608 use NetSDS::Queue; # MemcacheQ API
  0            
  0            
71             use base 'NetSDS::App';
72              
73             use version; our $VERSION = '0.032';
74              
75             #===============================================================================
76             #
77              
78             =head1 CLASS API
79              
80             =over
81              
82             =item B<new([...])> - class constructor
83              
84             =cut
85              
86             #-----------------------------------------------------------------------
87             sub new {
88              
89             my ( $class, %params ) = @_;
90              
91             my $self = $class->SUPER::new(%params);
92              
93             return $self;
94              
95             }
96              
97              
98             #***********************************************************************
99              
100             =item B<initialize()> - application initialization
101              
102             Internal method implementing common startup actions.
103              
104             =cut
105              
106             #-----------------------------------------------------------------------
107              
108             sub initialize {
109              
110             my ( $self, @params ) = @_;
111              
112             $self->SUPER::initialize(@params);
113              
114             # Initialize queue server
115             $self->{server} = NetSDS::Queue->new(
116             server => $self->conf->{queue_server},
117             );
118             $self->{queue_name} = $self->conf->{queue_name};
119              
120             # Set time for each iteration
121             if ( $self->conf->{'bandwidth'} ) {
122             my $bandwidth = $self->conf->{'bandwidth'} + 0;
123             if ($bandwidth) {
124             # Bandwidth is given in message/sec
125             $self->{'sleep_period'} = ( 1 / $bandwidth );
126             } else {
127             $self->{'sleep_period'} = 1; # Default bandwidth = 1 message/sec
128             }
129             } else {
130             $self->{'sleep_period'} = 1; # Default bandwidth = 1 message/sec
131             }
132              
133             $self->{'idle_timeout'} = ( $self->conf->{'idle_timeout'} + 0 ) ? $self->conf->{'idle_timeout'} + 0 : 5;
134              
135             return $self;
136              
137             } ## end sub initialize
138              
139              
140             #***********************************************************************
141              
142             =item B<main_loop()> - main processing loop
143              
144             Internal method for application logic.
145              
146             =cut
147              
148             #-----------------------------------------------------------------------
149              
150             sub main_loop {
151              
152             my ($self) = @_;
153              
154             $self->start();
155              
156             # Main processing loop itself
157             while ( !$self->{to_finalize} ) {
158              
159             # Call production code
160             while ( my $res = $self->{server}->pull( $self->{queue_name} ) ) {
161              
162             # Set iteration start timestamp in microseconds
163             my $start_time = time;
164              
165             $self->process($res);
166             last if ( $self->{to_finalize} );
167              
168             # If iteration finished fast - sleep
169             if ( time < ( $start_time + $self->{'sleep_period'} ) ) {
170             sleep( $self->{'sleep_period'} + $start_time - time );
171             }
172              
173             }
174             # Sleep if no messages in queue
175             sleep $self->{'idle_timeout'};
176              
177             # Process infinite loop
178             unless ( $self->{infinite} ) {
179             $self->{to_finalize} = 1;
180             }
181              
182             } ## end while ( !$self->{to_finalize...
183              
184             $self->stop();
185              
186             } ## end sub main_loop
187              
188             #***********************************************************************
189              
190             =item B<process()> - main JSON-RPC iteration
191              
192             This is internal method that implements JSON-RPC call processing.
193              
194             =cut
195              
196             #-----------------------------------------------------------------------
197              
198             sub process {
199              
200             my ( $self, $msg ) = @_;
201              
202             }
203              
204             1;
205              
206             __END__
207              
208             =back
209              
210             =head1 EXAMPLES
211              
212             See C<samples/app_qproc.pl> appliction.
213              
214             =head1 SEE ALSO
215              
216             L<NetSDS::Queue>
217              
218             L<NetSDS::App>
219              
220             =head1 TODO
221              
222             None
223              
224             =head1 AUTHOR
225              
226             Michael Bochkaryov <misha@rattler.kiev.ua>
227              
228             =head1 LICENSE
229              
230             Copyright (C) 2008-2009 Michael Bochkaryov
231              
232             This program is free software; you can redistribute it and/or modify
233             it under the terms of the GNU General Public License as published by
234             the Free Software Foundation; either version 2 of the License, or
235             (at your option) any later version.
236              
237             This program is distributed in the hope that it will be useful,
238             but WITHOUT ANY WARRANTY; without even the implied warranty of
239             MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
240             GNU General Public License for more details.
241              
242             You should have received a copy of the GNU General Public License
243             along with this program; if not, write to the Free Software
244             Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
245              
246             =cut
247              
248