File Coverage

blib/lib/Event/IO/Record.pm
Criterion Covered Total %
statement 15 109 13.7
branch 0 50 0.0
condition 0 24 0.0
subroutine 5 19 26.3
pod 14 14 100.0
total 34 216 15.7


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             Event::IO::Record - buffered asynchronous I/O, timeouts
4              
5             =head1 METHODS
6              
7             =cut
8             package Event::IO::Record;
9              
10 1     1   1178 use strict;
  1         3  
  1         61  
11             our $VERSION = '0.01';
12              
13 1     1   5 use Event;
  1         2  
  1         7  
14 1     1   91 use Fcntl;
  1         1  
  1         408  
15 1     1   6 use Errno qw(:POSIX);
  1         2  
  1         537  
16              
17 1     1   6 use constant READ_SIZE => 1024; # bytes per read
  1         2  
  1         1569  
18              
19              
20             =head2 new ( named parameters... )
21              
22             =over 4
23              
24             =item init
25              
26             If true (default), generate an init_event immediately (otherwise you must
27             call init_event later).
28              
29             =item timeout
30              
31             Default timeout; see Timeout method.
32              
33             =item irs, ors
34              
35             Input/output record separators; default irs => "\r?[\0\n]", ors => "\n".
36              
37             =item handle
38              
39             Handle for connection, should be an IO::Socket object (::INET or ::UNIX).
40              
41             =back
42              
43             =cut
44             sub new {
45 0     0 1   my ($class,%param) = @_;
46 0           my ($init,$timeout,$irs,$ors,$handle) =
47             delete @param{qw[init timeout irs ors handle]};
48 0 0         die 'unknown parameter(s): '.(join ', ',keys %param) if keys %param;
49              
50             # defaults
51 0 0         $init = 1 if not defined $init;
52 0   0       $timeout ||= 0;
53 0   0       $irs ||= "\r?[\0\n]";
54 0   0       $ors ||= "\n";
55              
56             # create object
57 0   0       my $self = bless { handle => $handle, in => '', out => '',
58             timeout => $timeout, irs => $irs, ors => $ors }, ref $class || $class;
59 0 0         $self->init_event() if $init;
60              
61 0           return $self
62             }
63              
64              
65             =head2 timeout ( time )
66              
67             Time is the time in seconds; 0 disables; undef reinitializes the current value.
68             We generates a timeout_event when the timer expires.
69              
70             =cut
71             sub timeout {
72 0     0 1   my ($self,$time) = @_;
73 0 0         $time = $self->{timeout} unless defined $time;
74              
75 0 0         if($self->{timer}) {
76 0           $self->{timer}->cancel();
77 0           delete $self->{timer};
78             }
79              
80 0           $self->{timeout} = $time;
81              
82 0 0 0       $self->{timer} =
83             Event->timer(after => $time, cb => [$self,'timeout_event'])
84             if $time and $self->{init};
85             }
86              
87              
88             =head2 init_event
89              
90             Initialization event, called before anything else happens.
91              
92             =cut
93             sub init_event {
94 0     0 1   my $self = shift;
95 0 0         warn "@{[ref $self]} initialized twice!" if $self->{init}++;
  0            
96              
97             # set non-blocking
98 0 0         if(my $flags = $self->{handle}->fcntl(F_GETFL,pack '') >= 0) {
99 0           $self->{handle}->fcntl(F_SETFL,$flags | O_NONBLOCK);
100             }
101              
102             # set up read/write event watchers and inactivity timeout
103 0           $self->{read} =
104             Event->io(fd => $self->{handle}, poll => 'r', cb => [$self,'read_event']);
105 0           $self->{write} =
106             Event->io(fd => $self->{handle}, poll => 'w', cb => [$self,'write_event'],
107             repeat => 0, parked => 1);
108 0           $self->timeout();
109             }
110              
111              
112             =head2 read_event
113              
114             Data is available for reading. We buffer it up and emit lines to derived
115             classes as Cs.
116              
117             =cut
118             sub read_event {
119 0     0 1   my $self = shift;
120 0           $self->timer(0);
121              
122             # buffer up input until we can't read any more
123 0           my ($data,$frag,$count) = ($self->{in},'',0);
124 0           my $close;
125 0           $self->{in} = '';
126              
127 0           do {{
128             # undef means we have an error so log it and close
129 0 0         unless(defined $self->{handle}->recv($frag,READ_SIZE)) {
  0            
130 0 0 0       last if EAGAIN == $! or EWOULDBLOCK == $!; # no data available
131 0 0         next if EINTR == $!; # interrupted by signal
132              
133             # queue up the read error until we've processed what we've read
134 0           warn "@{[ref $self]} socket read error: $!";
  0            
135 0           $close = "read error: $!";
136 0           last;
137             }
138              
139             # assume if we got 0 bytes and no error that it's time to bail
140             # if not, we get an infinite sequence of read_events....
141             # don't bail until we've sent the lines that we have, however
142 0 0         unless(length $frag) {
143 0           $close = 'remote closed socket';
144 0           last;
145             }
146              
147             # otherwise append to the existing block and read until we run out of data
148 0           $data .= $frag;
149 0           $count .= length $frag;
150             }} while length $frag == READ_SIZE;
151              
152             # send each line as an event
153 0           my $irs = $self->{irs};
154 0   0       while(length $data and $data =~ s/^(.*?)$irs//s) {
155 0           $self->line_event($1);
156 0           $irs = $self->{irs}; # refresh in case line_event changes it
157             }
158 0           $self->{in} = $data;
159              
160 0           $self->timer(1);
161              
162             # if the socket was closed, we can now send the close event
163 0 0         $self->close($close) if $close;
164             }
165              
166              
167             =head2 line_event ( line )
168              
169             Override in derived class to process incoming data.
170              
171             =cut
172 0     0 1   sub line_event {
173             }
174              
175              
176             =head2 write( data )
177              
178             Buffered write.
179              
180             =cut
181             sub write {
182 0     0 1   my ($self,$data) = @_;
183 0           $self->{out} .= $data.$self->{ors};
184 0           $self->write_event();
185             }
186              
187              
188             =head2 write_event
189              
190             Write event - handle buffered writes.
191              
192             =cut
193             sub write_event {
194 0     0 1   my $self = shift;
195 0           my $data = $self->{out};
196              
197             # send as much as we can from the buffer
198 0           while(length $data) {
199 0           my $count = $self->{handle}->send($data);
200 0 0         unless(defined $count) {
201 0 0 0       if(EAGAIN == $! or EWOULDBLOCK == $!) { # writing would block
202 0           $self->{write}->start();
203 0           last;
204             }
205 0 0         next if EINTR == $!; # interrupted by signal
206 0           warn "@{[ref $self]} socket write error: $!";
  0            
207 0           $self->{out} = $data;
208 0           return $self->close('write error');
209             }
210 0           $data = substr($data,$count);
211 0 0         $self->timer(1) if $count; # reinitialize the inactivity timer
212             }
213 0           $self->{out} = $data;
214              
215             # send an event if we've written everything in the buffer
216 0 0 0       $self->sent_event() if not length $data and $self->can('sent_event');
217             }
218              
219              
220             =head2 timer ( enable flag )
221              
222             Disable or restart inactivity timer.
223              
224             =cut
225             sub timer {
226 0     0 1   my ($self,$enable) = @_;
227 0 0         $enable ? $self->{timer}->again() : $self->{timer}->stop()
    0          
228             if $self->{timer};
229             }
230              
231              
232             =head2 timeout_event
233              
234             Inactivity timeout event.
235              
236             =cut
237             sub timeout_event {
238 0     0 1   my $self = shift;
239 0           $self->error('closing inactive connection after '.
240 0           "@{[$self->{timeout}]} s");
241 0           $self->close('timed out');
242             }
243              
244              
245             =head2 close
246              
247             Remove event handlers, this will close the connection (as long as no other
248             outstanding references exist).
249              
250             =cut
251             sub close {
252 0     0 1   my $self = shift;
253 0 0         if($self->{read}) {
254 0           for my $ev(qw[read write timer]) {
255 0 0         (delete $self->{$ev})->cancel() if $self->{$ev};
256             }
257             }
258 0 0         (delete $self->{handle})->close() if $self->{handle}; # close the socket
259             }
260              
261              
262             =head2 closed
263              
264             Return true iff socket is closed.
265              
266             =cut
267             sub closed {
268 0     0 1   my $self = shift;
269 0           return not $self->{read}
270             }
271              
272              
273             =head2 error( message )
274              
275             Log error, subclasses may do more.
276              
277             =cut
278             sub error {
279 0     0 1   my ($self,$err) = @_;
280 0           warn "@{[ref $self]} error: $err";
  0            
281             }
282              
283              
284             =head2 IRS( [ input record separator ] )
285              
286             Get/set input record separator.
287              
288             =cut
289             sub IRS {
290 0     0 1   my $self = shift;
291 0 0         $self->{irs} = shift if @_;
292 0           $self->{irs}
293             }
294              
295              
296             =head2 ORS( [ output record separator ] )
297              
298             Get/set output record separator.
299              
300             =cut
301             sub ORS {
302 0     0 1   my $self = shift;
303 0 0         $self->{ors} = shift if @_;
304 0           $self->{ors}
305             }
306              
307              
308             =head1 AUTHOR
309              
310             David B. Robins Edbrobins@davidrobins.netE
311              
312             =cut
313              
314              
315             1;