line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
=head1 NAME |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
Event::IO::Record - buffered asynchronous I/O, timeouts |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
=head1 METHODS |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
=cut |
8
|
|
|
|
|
|
|
package Event::IO::Record; |
9
|
|
|
|
|
|
|
|
10
|
1
|
|
|
1
|
|
1178
|
use strict; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
61
|
|
11
|
|
|
|
|
|
|
our $VERSION = '0.01'; |
12
|
|
|
|
|
|
|
|
13
|
1
|
|
|
1
|
|
5
|
use Event; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
7
|
|
14
|
1
|
|
|
1
|
|
91
|
use Fcntl; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
408
|
|
15
|
1
|
|
|
1
|
|
6
|
use Errno qw(:POSIX); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
537
|
|
16
|
|
|
|
|
|
|
|
17
|
1
|
|
|
1
|
|
6
|
use constant READ_SIZE => 1024; # bytes per read |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
1569
|
|
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
=head2 new ( named parameters... ) |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
=over 4 |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
=item init |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
If true (default), generate an init_event immediately (otherwise you must |
27
|
|
|
|
|
|
|
call init_event later). |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
=item timeout |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
Default timeout; see Timeout method. |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
=item irs, ors |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
Input/output record separators; default irs => "\r?[\0\n]", ors => "\n". |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
=item handle |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
Handle for connection, should be an IO::Socket object (::INET or ::UNIX). |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
=back |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
=cut |
44
|
|
|
|
|
|
|
sub new { |
45
|
0
|
|
|
0
|
1
|
|
my ($class,%param) = @_; |
46
|
0
|
|
|
|
|
|
my ($init,$timeout,$irs,$ors,$handle) = |
47
|
|
|
|
|
|
|
delete @param{qw[init timeout irs ors handle]}; |
48
|
0
|
0
|
|
|
|
|
die 'unknown parameter(s): '.(join ', ',keys %param) if keys %param; |
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
# defaults |
51
|
0
|
0
|
|
|
|
|
$init = 1 if not defined $init; |
52
|
0
|
|
0
|
|
|
|
$timeout ||= 0; |
53
|
0
|
|
0
|
|
|
|
$irs ||= "\r?[\0\n]"; |
54
|
0
|
|
0
|
|
|
|
$ors ||= "\n"; |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
# create object |
57
|
0
|
|
0
|
|
|
|
my $self = bless { handle => $handle, in => '', out => '', |
58
|
|
|
|
|
|
|
timeout => $timeout, irs => $irs, ors => $ors }, ref $class || $class; |
59
|
0
|
0
|
|
|
|
|
$self->init_event() if $init; |
60
|
|
|
|
|
|
|
|
61
|
0
|
|
|
|
|
|
return $self |
62
|
|
|
|
|
|
|
} |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
=head2 timeout ( time ) |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
Time is the time in seconds; 0 disables; undef reinitializes the current value. |
68
|
|
|
|
|
|
|
We generates a timeout_event when the timer expires. |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
=cut |
71
|
|
|
|
|
|
|
sub timeout { |
72
|
0
|
|
|
0
|
1
|
|
my ($self,$time) = @_; |
73
|
0
|
0
|
|
|
|
|
$time = $self->{timeout} unless defined $time; |
74
|
|
|
|
|
|
|
|
75
|
0
|
0
|
|
|
|
|
if($self->{timer}) { |
76
|
0
|
|
|
|
|
|
$self->{timer}->cancel(); |
77
|
0
|
|
|
|
|
|
delete $self->{timer}; |
78
|
|
|
|
|
|
|
} |
79
|
|
|
|
|
|
|
|
80
|
0
|
|
|
|
|
|
$self->{timeout} = $time; |
81
|
|
|
|
|
|
|
|
82
|
0
|
0
|
0
|
|
|
|
$self->{timer} = |
83
|
|
|
|
|
|
|
Event->timer(after => $time, cb => [$self,'timeout_event']) |
84
|
|
|
|
|
|
|
if $time and $self->{init}; |
85
|
|
|
|
|
|
|
} |
86
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
=head2 init_event |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
Initialization event, called before anything else happens. |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
=cut |
93
|
|
|
|
|
|
|
sub init_event { |
94
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
95
|
0
|
0
|
|
|
|
|
warn "@{[ref $self]} initialized twice!" if $self->{init}++; |
|
0
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
|
97
|
|
|
|
|
|
|
# set non-blocking |
98
|
0
|
0
|
|
|
|
|
if(my $flags = $self->{handle}->fcntl(F_GETFL,pack '') >= 0) { |
99
|
0
|
|
|
|
|
|
$self->{handle}->fcntl(F_SETFL,$flags | O_NONBLOCK); |
100
|
|
|
|
|
|
|
} |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
# set up read/write event watchers and inactivity timeout |
103
|
0
|
|
|
|
|
|
$self->{read} = |
104
|
|
|
|
|
|
|
Event->io(fd => $self->{handle}, poll => 'r', cb => [$self,'read_event']); |
105
|
0
|
|
|
|
|
|
$self->{write} = |
106
|
|
|
|
|
|
|
Event->io(fd => $self->{handle}, poll => 'w', cb => [$self,'write_event'], |
107
|
|
|
|
|
|
|
repeat => 0, parked => 1); |
108
|
0
|
|
|
|
|
|
$self->timeout(); |
109
|
|
|
|
|
|
|
} |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
=head2 read_event |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
Data is available for reading. We buffer it up and emit lines to derived |
115
|
|
|
|
|
|
|
classes as Cs. |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
=cut |
118
|
|
|
|
|
|
|
sub read_event { |
119
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
120
|
0
|
|
|
|
|
|
$self->timer(0); |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
# buffer up input until we can't read any more |
123
|
0
|
|
|
|
|
|
my ($data,$frag,$count) = ($self->{in},'',0); |
124
|
0
|
|
|
|
|
|
my $close; |
125
|
0
|
|
|
|
|
|
$self->{in} = ''; |
126
|
|
|
|
|
|
|
|
127
|
0
|
|
|
|
|
|
do {{ |
128
|
|
|
|
|
|
|
# undef means we have an error so log it and close |
129
|
0
|
0
|
|
|
|
|
unless(defined $self->{handle}->recv($frag,READ_SIZE)) { |
|
0
|
|
|
|
|
|
|
130
|
0
|
0
|
0
|
|
|
|
last if EAGAIN == $! or EWOULDBLOCK == $!; # no data available |
131
|
0
|
0
|
|
|
|
|
next if EINTR == $!; # interrupted by signal |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
# queue up the read error until we've processed what we've read |
134
|
0
|
|
|
|
|
|
warn "@{[ref $self]} socket read error: $!"; |
|
0
|
|
|
|
|
|
|
135
|
0
|
|
|
|
|
|
$close = "read error: $!"; |
136
|
0
|
|
|
|
|
|
last; |
137
|
|
|
|
|
|
|
} |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
# assume if we got 0 bytes and no error that it's time to bail |
140
|
|
|
|
|
|
|
# if not, we get an infinite sequence of read_events.... |
141
|
|
|
|
|
|
|
# don't bail until we've sent the lines that we have, however |
142
|
0
|
0
|
|
|
|
|
unless(length $frag) { |
143
|
0
|
|
|
|
|
|
$close = 'remote closed socket'; |
144
|
0
|
|
|
|
|
|
last; |
145
|
|
|
|
|
|
|
} |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
# otherwise append to the existing block and read until we run out of data |
148
|
0
|
|
|
|
|
|
$data .= $frag; |
149
|
0
|
|
|
|
|
|
$count .= length $frag; |
150
|
|
|
|
|
|
|
}} while length $frag == READ_SIZE; |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
# send each line as an event |
153
|
0
|
|
|
|
|
|
my $irs = $self->{irs}; |
154
|
0
|
|
0
|
|
|
|
while(length $data and $data =~ s/^(.*?)$irs//s) { |
155
|
0
|
|
|
|
|
|
$self->line_event($1); |
156
|
0
|
|
|
|
|
|
$irs = $self->{irs}; # refresh in case line_event changes it |
157
|
|
|
|
|
|
|
} |
158
|
0
|
|
|
|
|
|
$self->{in} = $data; |
159
|
|
|
|
|
|
|
|
160
|
0
|
|
|
|
|
|
$self->timer(1); |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
# if the socket was closed, we can now send the close event |
163
|
0
|
0
|
|
|
|
|
$self->close($close) if $close; |
164
|
|
|
|
|
|
|
} |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
=head2 line_event ( line ) |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
Override in derived class to process incoming data. |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
=cut |
172
|
0
|
|
|
0
|
1
|
|
sub line_event { |
173
|
|
|
|
|
|
|
} |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
=head2 write( data ) |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
Buffered write. |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
=cut |
181
|
|
|
|
|
|
|
sub write { |
182
|
0
|
|
|
0
|
1
|
|
my ($self,$data) = @_; |
183
|
0
|
|
|
|
|
|
$self->{out} .= $data.$self->{ors}; |
184
|
0
|
|
|
|
|
|
$self->write_event(); |
185
|
|
|
|
|
|
|
} |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
=head2 write_event |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
Write event - handle buffered writes. |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
=cut |
193
|
|
|
|
|
|
|
sub write_event { |
194
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
195
|
0
|
|
|
|
|
|
my $data = $self->{out}; |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
# send as much as we can from the buffer |
198
|
0
|
|
|
|
|
|
while(length $data) { |
199
|
0
|
|
|
|
|
|
my $count = $self->{handle}->send($data); |
200
|
0
|
0
|
|
|
|
|
unless(defined $count) { |
201
|
0
|
0
|
0
|
|
|
|
if(EAGAIN == $! or EWOULDBLOCK == $!) { # writing would block |
202
|
0
|
|
|
|
|
|
$self->{write}->start(); |
203
|
0
|
|
|
|
|
|
last; |
204
|
|
|
|
|
|
|
} |
205
|
0
|
0
|
|
|
|
|
next if EINTR == $!; # interrupted by signal |
206
|
0
|
|
|
|
|
|
warn "@{[ref $self]} socket write error: $!"; |
|
0
|
|
|
|
|
|
|
207
|
0
|
|
|
|
|
|
$self->{out} = $data; |
208
|
0
|
|
|
|
|
|
return $self->close('write error'); |
209
|
|
|
|
|
|
|
} |
210
|
0
|
|
|
|
|
|
$data = substr($data,$count); |
211
|
0
|
0
|
|
|
|
|
$self->timer(1) if $count; # reinitialize the inactivity timer |
212
|
|
|
|
|
|
|
} |
213
|
0
|
|
|
|
|
|
$self->{out} = $data; |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
# send an event if we've written everything in the buffer |
216
|
0
|
0
|
0
|
|
|
|
$self->sent_event() if not length $data and $self->can('sent_event'); |
217
|
|
|
|
|
|
|
} |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
|
220
|
|
|
|
|
|
|
=head2 timer ( enable flag ) |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
Disable or restart inactivity timer. |
223
|
|
|
|
|
|
|
|
224
|
|
|
|
|
|
|
=cut |
225
|
|
|
|
|
|
|
sub timer { |
226
|
0
|
|
|
0
|
1
|
|
my ($self,$enable) = @_; |
227
|
0
|
0
|
|
|
|
|
$enable ? $self->{timer}->again() : $self->{timer}->stop() |
|
|
0
|
|
|
|
|
|
228
|
|
|
|
|
|
|
if $self->{timer}; |
229
|
|
|
|
|
|
|
} |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
|
232
|
|
|
|
|
|
|
=head2 timeout_event |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
Inactivity timeout event. |
235
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
=cut |
237
|
|
|
|
|
|
|
sub timeout_event { |
238
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
239
|
0
|
|
|
|
|
|
$self->error('closing inactive connection after '. |
240
|
0
|
|
|
|
|
|
"@{[$self->{timeout}]} s"); |
241
|
0
|
|
|
|
|
|
$self->close('timed out'); |
242
|
|
|
|
|
|
|
} |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
=head2 close |
246
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
Remove event handlers, this will close the connection (as long as no other |
248
|
|
|
|
|
|
|
outstanding references exist). |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
=cut |
251
|
|
|
|
|
|
|
sub close { |
252
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
253
|
0
|
0
|
|
|
|
|
if($self->{read}) { |
254
|
0
|
|
|
|
|
|
for my $ev(qw[read write timer]) { |
255
|
0
|
0
|
|
|
|
|
(delete $self->{$ev})->cancel() if $self->{$ev}; |
256
|
|
|
|
|
|
|
} |
257
|
|
|
|
|
|
|
} |
258
|
0
|
0
|
|
|
|
|
(delete $self->{handle})->close() if $self->{handle}; # close the socket |
259
|
|
|
|
|
|
|
} |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
=head2 closed |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
Return true iff socket is closed. |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
=cut |
267
|
|
|
|
|
|
|
sub closed { |
268
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
269
|
0
|
|
|
|
|
|
return not $self->{read} |
270
|
|
|
|
|
|
|
} |
271
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
=head2 error( message ) |
274
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
Log error, subclasses may do more. |
276
|
|
|
|
|
|
|
|
277
|
|
|
|
|
|
|
=cut |
278
|
|
|
|
|
|
|
sub error { |
279
|
0
|
|
|
0
|
1
|
|
my ($self,$err) = @_; |
280
|
0
|
|
|
|
|
|
warn "@{[ref $self]} error: $err"; |
|
0
|
|
|
|
|
|
|
281
|
|
|
|
|
|
|
} |
282
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
=head2 IRS( [ input record separator ] ) |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
Get/set input record separator. |
287
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
=cut |
289
|
|
|
|
|
|
|
sub IRS { |
290
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
291
|
0
|
0
|
|
|
|
|
$self->{irs} = shift if @_; |
292
|
0
|
|
|
|
|
|
$self->{irs} |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
|
295
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
=head2 ORS( [ output record separator ] ) |
297
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
Get/set output record separator. |
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
=cut |
301
|
|
|
|
|
|
|
sub ORS { |
302
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
303
|
0
|
0
|
|
|
|
|
$self->{ors} = shift if @_; |
304
|
0
|
|
|
|
|
|
$self->{ors} |
305
|
|
|
|
|
|
|
} |
306
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
|
308
|
|
|
|
|
|
|
=head1 AUTHOR |
309
|
|
|
|
|
|
|
|
310
|
|
|
|
|
|
|
David B. Robins Edbrobins@davidrobins.netE |
311
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
=cut |
313
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
1; |