line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
########################################################################### |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
=head1 NAME |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
Danga::Socket - Event loop and event-driven async socket base class |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
=head1 SYNOPSIS |
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
package My::Socket |
10
|
|
|
|
|
|
|
use Danga::Socket; |
11
|
|
|
|
|
|
|
use base ('Danga::Socket'); |
12
|
|
|
|
|
|
|
use fields ('my_attribute'); |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
sub new { |
15
|
|
|
|
|
|
|
my My::Socket $self = shift; |
16
|
|
|
|
|
|
|
$self = fields::new($self) unless ref $self; |
17
|
|
|
|
|
|
|
$self->SUPER::new( @_ ); |
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
$self->{my_attribute} = 1234; |
20
|
|
|
|
|
|
|
return $self; |
21
|
|
|
|
|
|
|
} |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
sub event_err { ... } |
24
|
|
|
|
|
|
|
sub event_hup { ... } |
25
|
|
|
|
|
|
|
sub event_write { ... } |
26
|
|
|
|
|
|
|
sub event_read { ... } |
27
|
|
|
|
|
|
|
sub close { ... } |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
$my_sock->tcp_cork($bool); |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
# write returns 1 if all writes have gone through, or 0 if there |
32
|
|
|
|
|
|
|
# are writes in queue |
33
|
|
|
|
|
|
|
$my_sock->write($scalar); |
34
|
|
|
|
|
|
|
$my_sock->write($scalarref); |
35
|
|
|
|
|
|
|
$my_sock->write(sub { ... }); # run when previous data written |
36
|
|
|
|
|
|
|
$my_sock->write(undef); # kick-starts |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
# read max $bytecount bytes, or undef on connection closed |
39
|
|
|
|
|
|
|
$scalar_ref = $my_sock->read($bytecount); |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
# watch for writability. not needed with ->write(). write() |
42
|
|
|
|
|
|
|
# will automatically turn on watch_write when you wrote too much |
43
|
|
|
|
|
|
|
# and turn it off when done |
44
|
|
|
|
|
|
|
$my_sock->watch_write($bool); |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
# watch for readability |
47
|
|
|
|
|
|
|
$my_sock->watch_read($bool); |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
# if you read too much and want to push some back on |
50
|
|
|
|
|
|
|
# readable queue. (not incredibly well-tested) |
51
|
|
|
|
|
|
|
$my_sock->push_back_read($buf); # scalar or scalar ref |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
Danga::Socket->AddOtherFds(..); |
54
|
|
|
|
|
|
|
Danga::Socket->SetLoopTimeout($millisecs); |
55
|
|
|
|
|
|
|
Danga::Socket->DescriptorMap(); |
56
|
|
|
|
|
|
|
Danga::Socket->WatchedSockets(); # count of DescriptorMap keys |
57
|
|
|
|
|
|
|
Danga::Socket->SetPostLoopCallback($code); |
58
|
|
|
|
|
|
|
Danga::Socket->EventLoop(); |
59
|
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
=head1 DESCRIPTION |
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
This is an abstract base class for objects backed by a socket which |
63
|
|
|
|
|
|
|
provides the basic framework for event-driven asynchronous IO, |
64
|
|
|
|
|
|
|
designed to be fast. Danga::Socket is both a base class for objects, |
65
|
|
|
|
|
|
|
and an event loop. |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
Callers subclass Danga::Socket. Danga::Socket's constructor registers |
68
|
|
|
|
|
|
|
itself with the Danga::Socket event loop, and invokes callbacks on the |
69
|
|
|
|
|
|
|
object for readability, writability, errors, and other conditions. |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
Because Danga::Socket uses the "fields" module, your subclasses must |
72
|
|
|
|
|
|
|
too. |
73
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
=head1 MORE INFO |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
For now, see servers using Danga::Socket for guidance. For example: |
77
|
|
|
|
|
|
|
perlbal, mogilefsd, or ddlockd. |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
=head1 API |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
Note where "C" is used below, normally you would call these methods as: |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
Danga::Socket->method(...); |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
However using a subclass works too. |
86
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
The CLASS methods are all methods for the event loop part of Danga::Socket, |
88
|
|
|
|
|
|
|
whereas the object methods are all used on your subclasses. |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
=cut |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
########################################################################### |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
package Danga::Socket; |
95
|
4
|
|
|
4
|
|
146862
|
use strict; |
|
4
|
|
|
|
|
30
|
|
|
4
|
|
|
|
|
85
|
|
96
|
4
|
|
|
4
|
|
1794
|
use bytes; |
|
4
|
|
|
|
|
44
|
|
|
4
|
|
|
|
|
16
|
|
97
|
4
|
|
|
4
|
|
1408
|
use POSIX (); |
|
4
|
|
|
|
|
18582
|
|
|
4
|
|
|
|
|
75
|
|
98
|
4
|
|
|
4
|
|
1541
|
use Time::HiRes (); |
|
4
|
|
|
|
|
3953
|
|
|
4
|
|
|
|
|
110
|
|
99
|
|
|
|
|
|
|
|
100
|
4
|
|
|
4
|
|
528
|
my $opt_bsd_resource = eval "use BSD::Resource; 1;"; |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
101
|
|
|
|
|
|
|
|
102
|
4
|
|
|
4
|
|
19
|
use vars qw{$VERSION}; |
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
177
|
|
103
|
|
|
|
|
|
|
$VERSION = "1.62-TRIAL"; |
104
|
|
|
|
|
|
|
|
105
|
4
|
|
|
4
|
|
18
|
use warnings; |
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
80
|
|
106
|
4
|
|
|
4
|
|
14
|
no warnings qw(deprecated); |
|
4
|
|
|
|
|
4
|
|
|
4
|
|
|
|
|
117
|
|
107
|
|
|
|
|
|
|
|
108
|
4
|
|
|
4
|
|
1385
|
use Sys::Syscall qw(:epoll); |
|
4
|
|
|
|
|
10013
|
|
|
4
|
|
|
|
|
575
|
|
109
|
|
|
|
|
|
|
|
110
|
4
|
|
|
|
|
13
|
use fields ('sock', # underlying socket |
111
|
|
|
|
|
|
|
'fd', # numeric file descriptor |
112
|
|
|
|
|
|
|
'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write |
113
|
|
|
|
|
|
|
'write_buf_offset', # offset into first array of write_buf to start writing at |
114
|
|
|
|
|
|
|
'write_buf_size', # total length of data in all write_buf items |
115
|
|
|
|
|
|
|
'write_set_watch', # bool: true if we internally set watch_write rather than by a subclass |
116
|
|
|
|
|
|
|
'read_push_back', # arrayref of "pushed-back" read data the application didn't want |
117
|
|
|
|
|
|
|
'closed', # bool: socket is closed |
118
|
|
|
|
|
|
|
'corked', # bool: socket is corked |
119
|
|
|
|
|
|
|
'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) |
120
|
|
|
|
|
|
|
'peer_v6', # bool: cached; if peer is an IPv6 address |
121
|
|
|
|
|
|
|
'peer_ip', # cached stringified IP address of $sock |
122
|
|
|
|
|
|
|
'peer_port', # cached port number of $sock |
123
|
|
|
|
|
|
|
'local_ip', # cached stringified IP address of local end of $sock |
124
|
|
|
|
|
|
|
'local_port', # cached port number of local end of $sock |
125
|
|
|
|
|
|
|
'writer_func', # subref which does writing. must return bytes written (or undef) and set $! on errors |
126
|
4
|
|
|
4
|
|
1377
|
); |
|
4
|
|
|
|
|
4448
|
|
127
|
|
|
|
|
|
|
|
128
|
4
|
|
|
|
|
355
|
use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK |
129
|
4
|
|
|
4
|
|
1453
|
EPIPE EAGAIN EBADF ECONNRESET ENOPROTOOPT); |
|
4
|
|
|
|
|
2870
|
|
130
|
4
|
|
|
4
|
|
1634
|
use Socket qw(IPPROTO_TCP); |
|
4
|
|
|
|
|
10404
|
|
|
4
|
|
|
|
|
448
|
|
131
|
4
|
|
|
4
|
|
22
|
use Carp qw(croak confess); |
|
4
|
|
|
|
|
6
|
|
|
4
|
|
|
|
|
162
|
|
132
|
|
|
|
|
|
|
|
133
|
4
|
50
|
|
4
|
|
17
|
use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # FIXME: not hard-coded (Linux-specific too) |
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
178
|
|
134
|
4
|
|
|
4
|
|
15
|
use constant DebugLevel => 0; |
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
120
|
|
135
|
|
|
|
|
|
|
|
136
|
4
|
|
|
4
|
|
15
|
use constant POLLIN => 1; |
|
4
|
|
|
|
|
7
|
|
|
4
|
|
|
|
|
109
|
|
137
|
4
|
|
|
4
|
|
14
|
use constant POLLOUT => 4; |
|
4
|
|
|
|
|
13
|
|
|
4
|
|
|
|
|
128
|
|
138
|
4
|
|
|
4
|
|
16
|
use constant POLLERR => 8; |
|
4
|
|
|
|
|
4
|
|
|
4
|
|
|
|
|
111
|
|
139
|
4
|
|
|
4
|
|
14
|
use constant POLLHUP => 16; |
|
4
|
|
|
|
|
7
|
|
|
4
|
|
|
|
|
132
|
|
140
|
4
|
|
|
4
|
|
15
|
use constant POLLNVAL => 32; |
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
11929
|
|
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
our $HAVE_KQUEUE = eval { require IO::KQueue; 1 }; |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
our ( |
145
|
|
|
|
|
|
|
$HaveEpoll, # Flag -- is epoll available? initially undefined. |
146
|
|
|
|
|
|
|
$HaveKQueue, |
147
|
|
|
|
|
|
|
%DescriptorMap, # fd (num) -> Danga::Socket object |
148
|
|
|
|
|
|
|
%PushBackSet, # fd (num) -> Danga::Socket (fds with pushed back read data) |
149
|
|
|
|
|
|
|
$Epoll, # Global epoll fd (for epoll mode only) |
150
|
|
|
|
|
|
|
$KQueue, # Global kqueue fd (for kqueue mode only) |
151
|
|
|
|
|
|
|
@ToClose, # sockets to close when event loop is done |
152
|
|
|
|
|
|
|
%OtherFds, # A hash of "other" (non-Danga::Socket) file |
153
|
|
|
|
|
|
|
# descriptors for the event loop to track. |
154
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
$PostLoopCallback, # subref to call at the end of each loop, if defined (global) |
156
|
|
|
|
|
|
|
%PLCMap, # fd (num) -> PostLoopCallback (per-object) |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
$LoopTimeout, # timeout of event loop in milliseconds |
159
|
|
|
|
|
|
|
$DoProfile, # if on, enable profiling |
160
|
|
|
|
|
|
|
%Profiling, # what => [ utime, stime, calls ] |
161
|
|
|
|
|
|
|
$DoneInit, # if we've done the one-time module init yet |
162
|
|
|
|
|
|
|
@Timers, # timers |
163
|
|
|
|
|
|
|
); |
164
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
Reset(); |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
##################################################################### |
168
|
|
|
|
|
|
|
### C L A S S M E T H O D S |
169
|
|
|
|
|
|
|
##################################################################### |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
=head2 C<< CLASS->Reset() >> |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
Reset all state |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
=cut |
176
|
|
|
|
|
|
|
sub Reset { |
177
|
6
|
|
|
6
|
1
|
1468
|
%DescriptorMap = (); |
178
|
6
|
|
|
|
|
49
|
%PushBackSet = (); |
179
|
6
|
|
|
|
|
10
|
@ToClose = (); |
180
|
6
|
|
|
|
|
8
|
%OtherFds = (); |
181
|
6
|
|
|
|
|
11
|
$LoopTimeout = -1; # no timeout by default |
182
|
6
|
|
|
|
|
7
|
$DoProfile = 0; |
183
|
6
|
|
|
|
|
9
|
%Profiling = (); |
184
|
6
|
|
|
|
|
8
|
@Timers = (); |
185
|
|
|
|
|
|
|
|
186
|
6
|
|
|
|
|
17
|
$PostLoopCallback = undef; |
187
|
6
|
|
|
|
|
7
|
%PLCMap = (); |
188
|
6
|
|
|
|
|
9
|
$DoneInit = 0; |
189
|
|
|
|
|
|
|
|
190
|
6
|
100
|
66
|
|
|
52
|
POSIX::close($Epoll) if defined $Epoll && $Epoll >= 0; |
191
|
6
|
50
|
33
|
|
|
19
|
POSIX::close($KQueue) if defined $KQueue && $KQueue >= 0; |
192
|
|
|
|
|
|
|
|
193
|
6
|
|
|
|
|
25
|
*EventLoop = *FirstTimeEventLoop; |
194
|
|
|
|
|
|
|
} |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
=head2 C<< CLASS->HaveEpoll() >> |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
Returns a true value if this class will use IO::Epoll for async IO. |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
=cut |
201
|
|
|
|
|
|
|
sub HaveEpoll { |
202
|
1
|
|
|
1
|
1
|
79
|
_InitPoller(); |
203
|
1
|
|
|
|
|
4
|
return $HaveEpoll; |
204
|
|
|
|
|
|
|
} |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
=head2 C<< CLASS->WatchedSockets() >> |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
Returns the number of file descriptors which are registered with the global |
209
|
|
|
|
|
|
|
poll object. |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
=cut |
212
|
|
|
|
|
|
|
sub WatchedSockets { |
213
|
4
|
|
|
4
|
1
|
1590
|
return scalar keys %DescriptorMap; |
214
|
|
|
|
|
|
|
} |
215
|
|
|
|
|
|
|
*watched_sockets = *WatchedSockets; |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
=head2 C<< CLASS->EnableProfiling() >> |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
Turns profiling on, clearing current profiling data. |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
=cut |
222
|
|
|
|
|
|
|
sub EnableProfiling { |
223
|
0
|
0
|
|
0
|
1
|
0
|
if ($opt_bsd_resource) { |
224
|
0
|
|
|
|
|
0
|
%Profiling = (); |
225
|
0
|
|
|
|
|
0
|
$DoProfile = 1; |
226
|
0
|
|
|
|
|
0
|
return 1; |
227
|
|
|
|
|
|
|
} |
228
|
0
|
|
|
|
|
0
|
return 0; |
229
|
|
|
|
|
|
|
} |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
=head2 C<< CLASS->DisableProfiling() >> |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
Turns off profiling, but retains data up to this point |
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
=cut |
236
|
|
|
|
|
|
|
sub DisableProfiling { |
237
|
0
|
|
|
0
|
1
|
0
|
$DoProfile = 0; |
238
|
|
|
|
|
|
|
} |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
=head2 C<< CLASS->ProfilingData() >> |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
Returns reference to a hash of data in format: |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
ITEM => [ utime, stime, #calls ] |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
=cut |
247
|
|
|
|
|
|
|
sub ProfilingData { |
248
|
0
|
|
|
0
|
1
|
0
|
return \%Profiling; |
249
|
|
|
|
|
|
|
} |
250
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
=head2 C<< CLASS->ToClose() >> |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
Return the list of sockets that are awaiting close() at the end of the |
254
|
|
|
|
|
|
|
current event loop. |
255
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
=cut |
257
|
0
|
|
|
0
|
1
|
0
|
sub ToClose { return @ToClose; } |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
=head2 C<< CLASS->OtherFds( [%fdmap] ) >> |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
Get/set the hash of file descriptors that need processing in parallel with |
262
|
|
|
|
|
|
|
the registered Danga::Socket objects. |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
=cut |
265
|
|
|
|
|
|
|
sub OtherFds { |
266
|
0
|
|
|
0
|
1
|
0
|
my $class = shift; |
267
|
0
|
0
|
|
|
|
0
|
if ( @_ ) { %OtherFds = @_ } |
|
0
|
|
|
|
|
0
|
|
268
|
0
|
0
|
|
|
|
0
|
return wantarray ? %OtherFds : \%OtherFds; |
269
|
|
|
|
|
|
|
} |
270
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
=head2 C<< CLASS->AddOtherFds( [%fdmap] ) >> |
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
Add fds to the OtherFds hash for processing. |
274
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
=cut |
276
|
|
|
|
|
|
|
sub AddOtherFds { |
277
|
0
|
|
|
0
|
1
|
0
|
my $class = shift; |
278
|
0
|
|
|
|
|
0
|
%OtherFds = ( %OtherFds, @_ ); # FIXME investigate what happens on dupe fds |
279
|
0
|
0
|
|
|
|
0
|
return wantarray ? %OtherFds : \%OtherFds; |
280
|
|
|
|
|
|
|
} |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
=head2 C<< CLASS->SetLoopTimeout( $timeout ) >> |
283
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
Set the loop timeout for the event loop to some value in milliseconds. |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
A timeout of 0 (zero) means poll forever. A timeout of -1 means poll and return |
287
|
|
|
|
|
|
|
immediately. |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
=cut |
290
|
|
|
|
|
|
|
sub SetLoopTimeout { |
291
|
3
|
|
|
3
|
1
|
72
|
return $LoopTimeout = $_[1] + 0; |
292
|
|
|
|
|
|
|
} |
293
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
=head2 C<< CLASS->DebugMsg( $format, @args ) >> |
295
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
Print the debugging message specified by the C-style I and |
297
|
|
|
|
|
|
|
I |
298
|
|
|
|
|
|
|
|
299
|
|
|
|
|
|
|
=cut |
300
|
|
|
|
|
|
|
sub DebugMsg { |
301
|
0
|
|
|
0
|
1
|
0
|
my ( $class, $fmt, @args ) = @_; |
302
|
0
|
|
|
|
|
0
|
chomp $fmt; |
303
|
0
|
|
|
|
|
0
|
printf STDERR ">>> $fmt\n", @args; |
304
|
|
|
|
|
|
|
} |
305
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
=head2 C<< CLASS->AddTimer( $seconds, $coderef ) >> |
307
|
|
|
|
|
|
|
|
308
|
|
|
|
|
|
|
Add a timer to occur $seconds from now. $seconds may be fractional, but timers |
309
|
|
|
|
|
|
|
are not guaranteed to fire at the exact time you ask for. |
310
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
Returns a timer object which you can call C<< $timer->cancel >> on if you need to. |
312
|
|
|
|
|
|
|
|
313
|
|
|
|
|
|
|
=cut |
314
|
|
|
|
|
|
|
sub AddTimer { |
315
|
6
|
|
|
6
|
1
|
1705
|
my $class = shift; |
316
|
6
|
|
|
|
|
14
|
my ($secs, $coderef) = @_; |
317
|
|
|
|
|
|
|
|
318
|
6
|
|
|
|
|
26
|
my $fire_time = Time::HiRes::time() + $secs; |
319
|
|
|
|
|
|
|
|
320
|
6
|
|
|
|
|
24
|
my $timer = bless [$fire_time, $coderef], "Danga::Socket::Timer"; |
321
|
|
|
|
|
|
|
|
322
|
6
|
100
|
100
|
|
|
44
|
if (!@Timers || $fire_time >= $Timers[-1][0]) { |
323
|
3
|
|
|
|
|
9
|
push @Timers, $timer; |
324
|
3
|
|
|
|
|
14
|
return $timer; |
325
|
|
|
|
|
|
|
} |
326
|
|
|
|
|
|
|
|
327
|
|
|
|
|
|
|
# Now, where do we insert? (NOTE: this appears slow, algorithm-wise, |
328
|
|
|
|
|
|
|
# but it was compared against calendar queues, heaps, naive push/sort, |
329
|
|
|
|
|
|
|
# and a bunch of other versions, and found to be fastest with a large |
330
|
|
|
|
|
|
|
# variety of datasets.) |
331
|
3
|
|
|
|
|
14
|
for (my $i = 0; $i < @Timers; $i++) { |
332
|
7
|
100
|
|
|
|
25
|
if ($Timers[$i][0] > $fire_time) { |
333
|
3
|
|
|
|
|
11
|
splice(@Timers, $i, 0, $timer); |
334
|
3
|
|
|
|
|
10
|
return $timer; |
335
|
|
|
|
|
|
|
} |
336
|
|
|
|
|
|
|
} |
337
|
|
|
|
|
|
|
|
338
|
0
|
|
|
|
|
0
|
die "Shouldn't get here."; |
339
|
|
|
|
|
|
|
} |
340
|
|
|
|
|
|
|
|
341
|
|
|
|
|
|
|
=head2 C<< CLASS->DescriptorMap() >> |
342
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
Get the hash of Danga::Socket objects keyed by the file descriptor (fileno) they |
344
|
|
|
|
|
|
|
are wrapping. |
345
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
Returns a hash in list context or a hashref in scalar context. |
347
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
=cut |
349
|
|
|
|
|
|
|
sub DescriptorMap { |
350
|
2
|
50
|
|
2
|
1
|
422
|
return wantarray ? %DescriptorMap : \%DescriptorMap; |
351
|
|
|
|
|
|
|
} |
352
|
|
|
|
|
|
|
*descriptor_map = *DescriptorMap; |
353
|
|
|
|
|
|
|
*get_sock_ref = *DescriptorMap; |
354
|
|
|
|
|
|
|
|
355
|
|
|
|
|
|
|
sub _InitPoller |
356
|
|
|
|
|
|
|
{ |
357
|
12
|
100
|
|
12
|
|
38
|
return if $DoneInit; |
358
|
4
|
|
|
|
|
7
|
$DoneInit = 1; |
359
|
|
|
|
|
|
|
|
360
|
4
|
50
|
|
|
|
34
|
if ($HAVE_KQUEUE) { |
|
|
50
|
|
|
|
|
|
361
|
0
|
|
|
|
|
0
|
$KQueue = IO::KQueue->new(); |
362
|
0
|
|
|
|
|
0
|
$HaveKQueue = $KQueue >= 0; |
363
|
0
|
0
|
|
|
|
0
|
if ($HaveKQueue) { |
364
|
0
|
|
|
|
|
0
|
*EventLoop = *KQueueEventLoop; |
365
|
|
|
|
|
|
|
} |
366
|
|
|
|
|
|
|
} |
367
|
|
|
|
|
|
|
elsif (Sys::Syscall::epoll_defined()) { |
368
|
4
|
|
|
|
|
23
|
$Epoll = eval { epoll_create(1024); }; |
|
4
|
|
|
|
|
15
|
|
369
|
4
|
|
33
|
|
|
124
|
$HaveEpoll = defined $Epoll && $Epoll >= 0; |
370
|
4
|
50
|
|
|
|
13
|
if ($HaveEpoll) { |
371
|
4
|
|
|
|
|
30
|
*EventLoop = *EpollEventLoop; |
372
|
|
|
|
|
|
|
} |
373
|
|
|
|
|
|
|
} |
374
|
|
|
|
|
|
|
|
375
|
4
|
50
|
33
|
|
|
19
|
if (!$HaveEpoll && !$HaveKQueue) { |
376
|
0
|
|
|
|
|
0
|
require IO::Poll; |
377
|
0
|
|
|
|
|
0
|
*EventLoop = *PollEventLoop; |
378
|
|
|
|
|
|
|
} |
379
|
|
|
|
|
|
|
} |
380
|
|
|
|
|
|
|
|
381
|
|
|
|
|
|
|
=head2 C<< CLASS->EventLoop() >> |
382
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
Start processing IO events. In most daemon programs this never exits. See |
384
|
|
|
|
|
|
|
C below for how to exit the loop. |
385
|
|
|
|
|
|
|
|
386
|
|
|
|
|
|
|
=cut |
387
|
|
|
|
|
|
|
sub FirstTimeEventLoop { |
388
|
1
|
|
|
1
|
0
|
6
|
my $class = shift; |
389
|
|
|
|
|
|
|
|
390
|
1
|
|
|
|
|
3
|
_InitPoller(); |
391
|
|
|
|
|
|
|
|
392
|
1
|
50
|
|
|
|
3
|
if ($HaveEpoll) { |
|
|
0
|
|
|
|
|
|
393
|
1
|
|
|
|
|
3
|
EpollEventLoop($class); |
394
|
|
|
|
|
|
|
} elsif ($HaveKQueue) { |
395
|
0
|
|
|
|
|
0
|
KQueueEventLoop($class); |
396
|
|
|
|
|
|
|
} else { |
397
|
0
|
|
|
|
|
0
|
PollEventLoop($class); |
398
|
|
|
|
|
|
|
} |
399
|
|
|
|
|
|
|
} |
400
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
## profiling-related data/functions |
402
|
|
|
|
|
|
|
our ($Prof_utime0, $Prof_stime0); |
403
|
|
|
|
|
|
|
sub _pre_profile { |
404
|
0
|
|
|
0
|
|
0
|
($Prof_utime0, $Prof_stime0) = getrusage(); |
405
|
|
|
|
|
|
|
} |
406
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
sub _post_profile { |
408
|
|
|
|
|
|
|
# get post information |
409
|
0
|
|
|
0
|
|
0
|
my ($autime, $astime) = getrusage(); |
410
|
|
|
|
|
|
|
|
411
|
|
|
|
|
|
|
# calculate differences |
412
|
0
|
|
|
|
|
0
|
my $utime = $autime - $Prof_utime0; |
413
|
0
|
|
|
|
|
0
|
my $stime = $astime - $Prof_stime0; |
414
|
|
|
|
|
|
|
|
415
|
0
|
|
|
|
|
0
|
foreach my $k (@_) { |
416
|
0
|
|
0
|
|
|
0
|
$Profiling{$k} ||= [ 0.0, 0.0, 0 ]; |
417
|
0
|
|
|
|
|
0
|
$Profiling{$k}->[0] += $utime; |
418
|
0
|
|
|
|
|
0
|
$Profiling{$k}->[1] += $stime; |
419
|
0
|
|
|
|
|
0
|
$Profiling{$k}->[2]++; |
420
|
|
|
|
|
|
|
} |
421
|
|
|
|
|
|
|
} |
422
|
|
|
|
|
|
|
|
423
|
|
|
|
|
|
|
# runs timers and returns milliseconds for next one, or next event loop |
424
|
|
|
|
|
|
|
sub RunTimers { |
425
|
49
|
100
|
|
49
|
0
|
152
|
return $LoopTimeout unless @Timers; |
426
|
|
|
|
|
|
|
|
427
|
21
|
|
|
|
|
126
|
my $now = Time::HiRes::time(); |
428
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
# Run expired timers |
430
|
21
|
|
100
|
|
|
211
|
while (@Timers && $Timers[0][0] <= $now) { |
431
|
6
|
|
|
|
|
17
|
my $to_run = shift(@Timers); |
432
|
6
|
50
|
|
|
|
36
|
$to_run->[1]->($now) if $to_run->[1]; |
433
|
|
|
|
|
|
|
} |
434
|
|
|
|
|
|
|
|
435
|
21
|
100
|
|
|
|
4464
|
return $LoopTimeout unless @Timers; |
436
|
|
|
|
|
|
|
|
437
|
|
|
|
|
|
|
# convert time to an even number of milliseconds, adding 1 |
438
|
|
|
|
|
|
|
# extra, otherwise floating point fun can occur and we'll |
439
|
|
|
|
|
|
|
# call RunTimers like 20-30 times, each returning a timeout |
440
|
|
|
|
|
|
|
# of 0.0000212 seconds |
441
|
20
|
|
|
|
|
93
|
my $timeout = int(($Timers[0][0] - $now) * 1000) + 1; |
442
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
# -1 is an infinite timeout, so prefer a real timeout |
444
|
20
|
50
|
|
|
|
64
|
return $timeout if $LoopTimeout == -1; |
445
|
|
|
|
|
|
|
|
446
|
|
|
|
|
|
|
# otherwise pick the lower of our regular timeout and time until |
447
|
|
|
|
|
|
|
# the next timer |
448
|
20
|
100
|
|
|
|
87
|
return $LoopTimeout if $LoopTimeout < $timeout; |
449
|
5
|
|
|
|
|
14
|
return $timeout; |
450
|
|
|
|
|
|
|
} |
451
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads |
453
|
|
|
|
|
|
|
### okay. |
454
|
|
|
|
|
|
|
sub EpollEventLoop { |
455
|
5
|
|
|
5
|
0
|
34
|
my $class = shift; |
456
|
|
|
|
|
|
|
|
457
|
5
|
|
|
|
|
18
|
foreach my $fd ( keys %OtherFds ) { |
458
|
0
|
0
|
|
|
|
0
|
if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, EPOLLIN) == -1) { |
459
|
0
|
|
|
|
|
0
|
warn "epoll_ctl(): failure adding fd=$fd; $! (", $!+0, ")\n"; |
460
|
|
|
|
|
|
|
} |
461
|
|
|
|
|
|
|
} |
462
|
|
|
|
|
|
|
|
463
|
5
|
|
|
|
|
8
|
while (1) { |
464
|
43
|
|
|
|
|
99
|
my @events; |
465
|
|
|
|
|
|
|
my $i; |
466
|
43
|
|
|
|
|
131
|
my $timeout = RunTimers(); |
467
|
|
|
|
|
|
|
|
468
|
|
|
|
|
|
|
# get up to 1000 events |
469
|
43
|
|
|
|
|
180
|
my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events); |
470
|
|
|
|
|
|
|
EVENT: |
471
|
43
|
|
|
|
|
6947736
|
for ($i=0; $i<$evcount; $i++) { |
472
|
16
|
|
|
|
|
48
|
my $ev = $events[$i]; |
473
|
|
|
|
|
|
|
|
474
|
|
|
|
|
|
|
# it's possible epoll_wait returned many events, including some at the end |
475
|
|
|
|
|
|
|
# that ones in the front triggered unregister-interest actions. if we |
476
|
|
|
|
|
|
|
# can't find the %sock entry, it's because we're no longer interested |
477
|
|
|
|
|
|
|
# in that event. |
478
|
16
|
|
|
|
|
50
|
my Danga::Socket $pob = $DescriptorMap{$ev->[0]}; |
479
|
16
|
|
|
|
|
19
|
my $code; |
480
|
16
|
|
|
|
|
22
|
my $state = $ev->[1]; |
481
|
|
|
|
|
|
|
|
482
|
|
|
|
|
|
|
# if we didn't find a Perlbal::Socket subclass for that fd, try other |
483
|
|
|
|
|
|
|
# pseudo-registered (above) fds. |
484
|
16
|
50
|
|
|
|
32
|
if (! $pob) { |
485
|
0
|
0
|
|
|
|
0
|
if (my $code = $OtherFds{$ev->[0]}) { |
486
|
0
|
|
|
|
|
0
|
$code->($state); |
487
|
|
|
|
|
|
|
} else { |
488
|
0
|
|
|
|
|
0
|
my $fd = $ev->[0]; |
489
|
0
|
|
|
|
|
0
|
warn "epoll() returned fd $fd w/ state $state for which we have no mapping. removing.\n"; |
490
|
0
|
|
|
|
|
0
|
POSIX::close($fd); |
491
|
0
|
|
|
|
|
0
|
epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0); |
492
|
|
|
|
|
|
|
} |
493
|
0
|
|
|
|
|
0
|
next; |
494
|
|
|
|
|
|
|
} |
495
|
|
|
|
|
|
|
|
496
|
16
|
|
|
|
|
18
|
DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n", |
497
|
|
|
|
|
|
|
$ev->[0], ref($pob), $ev->[1], time); |
498
|
|
|
|
|
|
|
|
499
|
16
|
50
|
|
|
|
22
|
if ($DoProfile) { |
500
|
0
|
|
|
|
|
0
|
my $class = ref $pob; |
501
|
|
|
|
|
|
|
|
502
|
|
|
|
|
|
|
# call profiling action on things that need to be done |
503
|
0
|
0
|
0
|
|
|
0
|
if ($state & EPOLLIN && ! $pob->{closed}) { |
504
|
0
|
|
|
|
|
0
|
_pre_profile(); |
505
|
0
|
|
|
|
|
0
|
$pob->event_read; |
506
|
0
|
|
|
|
|
0
|
_post_profile("$class-read"); |
507
|
|
|
|
|
|
|
} |
508
|
|
|
|
|
|
|
|
509
|
0
|
0
|
0
|
|
|
0
|
if ($state & EPOLLOUT && ! $pob->{closed}) { |
510
|
0
|
|
|
|
|
0
|
_pre_profile(); |
511
|
0
|
|
|
|
|
0
|
$pob->event_write; |
512
|
0
|
|
|
|
|
0
|
_post_profile("$class-write"); |
513
|
|
|
|
|
|
|
} |
514
|
|
|
|
|
|
|
|
515
|
0
|
0
|
|
|
|
0
|
if ($state & (EPOLLERR|EPOLLHUP)) { |
516
|
0
|
0
|
0
|
|
|
0
|
if ($state & EPOLLERR && ! $pob->{closed}) { |
517
|
0
|
|
|
|
|
0
|
_pre_profile(); |
518
|
0
|
|
|
|
|
0
|
$pob->event_err; |
519
|
0
|
|
|
|
|
0
|
_post_profile("$class-err"); |
520
|
|
|
|
|
|
|
} |
521
|
0
|
0
|
0
|
|
|
0
|
if ($state & EPOLLHUP && ! $pob->{closed}) { |
522
|
0
|
|
|
|
|
0
|
_pre_profile(); |
523
|
0
|
|
|
|
|
0
|
$pob->event_hup; |
524
|
0
|
|
|
|
|
0
|
_post_profile("$class-hup"); |
525
|
|
|
|
|
|
|
} |
526
|
|
|
|
|
|
|
} |
527
|
|
|
|
|
|
|
|
528
|
0
|
|
|
|
|
0
|
next; |
529
|
|
|
|
|
|
|
} |
530
|
|
|
|
|
|
|
|
531
|
|
|
|
|
|
|
# standard non-profiling codepat |
532
|
16
|
100
|
66
|
|
|
44
|
$pob->event_read if $state & EPOLLIN && ! $pob->{closed}; |
533
|
16
|
100
|
100
|
|
|
727
|
$pob->event_write if $state & EPOLLOUT && ! $pob->{closed}; |
534
|
15
|
50
|
|
|
|
38
|
if ($state & (EPOLLERR|EPOLLHUP)) { |
535
|
0
|
0
|
0
|
|
|
0
|
$pob->event_err if $state & EPOLLERR && ! $pob->{closed}; |
536
|
0
|
0
|
0
|
|
|
0
|
$pob->event_hup if $state & EPOLLHUP && ! $pob->{closed}; |
537
|
|
|
|
|
|
|
} |
538
|
|
|
|
|
|
|
} |
539
|
42
|
100
|
|
|
|
189
|
return unless PostEventLoop(); |
540
|
|
|
|
|
|
|
} |
541
|
0
|
|
|
|
|
0
|
exit 0; |
542
|
|
|
|
|
|
|
} |
543
|
|
|
|
|
|
|
|
544
|
|
|
|
|
|
|
### The fallback IO::Poll-based event loop. Gets installed as EventLoop if |
545
|
|
|
|
|
|
|
### IO::Epoll fails to load. |
546
|
|
|
|
|
|
|
sub PollEventLoop { |
547
|
1
|
|
|
1
|
0
|
697
|
my $class = shift; |
548
|
|
|
|
|
|
|
|
549
|
1
|
|
|
|
|
2
|
my Danga::Socket $pob; |
550
|
|
|
|
|
|
|
|
551
|
1
|
|
|
|
|
1
|
while (1) { |
552
|
6
|
|
|
|
|
16
|
my $timeout = RunTimers(); |
553
|
|
|
|
|
|
|
|
554
|
|
|
|
|
|
|
# the following sets up @poll as a series of ($poll,$event_mask) |
555
|
|
|
|
|
|
|
# items, then uses IO::Poll::_poll, implemented in XS, which |
556
|
|
|
|
|
|
|
# modifies the array in place with the even elements being |
557
|
|
|
|
|
|
|
# replaced with the event masks that occured. |
558
|
6
|
|
|
|
|
9
|
my @poll; |
559
|
6
|
|
|
|
|
18
|
foreach my $fd ( keys %OtherFds ) { |
560
|
0
|
|
|
|
|
0
|
push @poll, $fd, POLLIN; |
561
|
|
|
|
|
|
|
} |
562
|
6
|
|
|
|
|
27
|
while ( my ($fd, $sock) = each %DescriptorMap ) { |
563
|
10
|
|
|
|
|
33
|
push @poll, $fd, $sock->{event_watch}; |
564
|
|
|
|
|
|
|
} |
565
|
|
|
|
|
|
|
|
566
|
|
|
|
|
|
|
# if nothing to poll, either end immediately (if no timeout) |
567
|
|
|
|
|
|
|
# or just keep calling the callback |
568
|
6
|
100
|
|
|
|
16
|
unless (@poll) { |
569
|
1
|
|
|
|
|
150232
|
select undef, undef, undef, ($timeout / 1000); |
570
|
1
|
50
|
|
|
|
14
|
return unless PostEventLoop(); |
571
|
1
|
|
|
|
|
5
|
next; |
572
|
|
|
|
|
|
|
} |
573
|
|
|
|
|
|
|
|
574
|
5
|
|
|
|
|
300498
|
my $count = IO::Poll::_poll($timeout, @poll); |
575
|
5
|
100
|
|
|
|
32
|
unless ($count) { |
576
|
2
|
50
|
|
|
|
11
|
return unless PostEventLoop(); |
577
|
2
|
|
|
|
|
8
|
next; |
578
|
|
|
|
|
|
|
} |
579
|
|
|
|
|
|
|
|
580
|
|
|
|
|
|
|
# Fetch handles with read events |
581
|
3
|
|
|
|
|
8
|
while (@poll) { |
582
|
8
|
|
|
|
|
14
|
my ($fd, $state) = splice(@poll, 0, 2); |
583
|
8
|
100
|
|
|
|
17
|
next unless $state; |
584
|
|
|
|
|
|
|
|
585
|
4
|
|
|
|
|
7
|
$pob = $DescriptorMap{$fd}; |
586
|
|
|
|
|
|
|
|
587
|
4
|
50
|
|
|
|
7
|
if (!$pob) { |
588
|
0
|
0
|
|
|
|
0
|
if (my $code = $OtherFds{$fd}) { |
589
|
0
|
|
|
|
|
0
|
$code->($state); |
590
|
|
|
|
|
|
|
} |
591
|
0
|
|
|
|
|
0
|
next; |
592
|
|
|
|
|
|
|
} |
593
|
|
|
|
|
|
|
|
594
|
4
|
100
|
66
|
|
|
22
|
$pob->event_read if $state & POLLIN && ! $pob->{closed}; |
595
|
4
|
100
|
66
|
|
|
543
|
$pob->event_write if $state & POLLOUT && ! $pob->{closed}; |
596
|
4
|
50
|
33
|
|
|
13
|
$pob->event_err if $state & POLLERR && ! $pob->{closed}; |
597
|
4
|
50
|
33
|
|
|
11
|
$pob->event_hup if $state & POLLHUP && ! $pob->{closed}; |
598
|
|
|
|
|
|
|
} |
599
|
|
|
|
|
|
|
|
600
|
3
|
100
|
|
|
|
6
|
return unless PostEventLoop(); |
601
|
|
|
|
|
|
|
} |
602
|
|
|
|
|
|
|
|
603
|
0
|
|
|
|
|
0
|
exit 0; |
604
|
|
|
|
|
|
|
} |
605
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works |
607
|
|
|
|
|
|
|
### okay. |
608
|
|
|
|
|
|
|
sub KQueueEventLoop { |
609
|
0
|
|
|
0
|
0
|
0
|
my $class = shift; |
610
|
|
|
|
|
|
|
|
611
|
0
|
|
|
|
|
0
|
foreach my $fd (keys %OtherFds) { |
612
|
0
|
|
|
|
|
0
|
$KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), IO::KQueue::EV_ADD()); |
613
|
|
|
|
|
|
|
} |
614
|
|
|
|
|
|
|
|
615
|
0
|
|
|
|
|
0
|
while (1) { |
616
|
0
|
|
|
|
|
0
|
my $timeout = RunTimers(); |
617
|
0
|
|
|
|
|
0
|
my @ret = $KQueue->kevent($timeout); |
618
|
|
|
|
|
|
|
|
619
|
0
|
|
|
|
|
0
|
foreach my $kev (@ret) { |
620
|
0
|
|
|
|
|
0
|
my ($fd, $filter, $flags, $fflags) = @$kev; |
621
|
0
|
|
|
|
|
0
|
my Danga::Socket $pob = $DescriptorMap{$fd}; |
622
|
0
|
0
|
|
|
|
0
|
if (!$pob) { |
623
|
0
|
0
|
|
|
|
0
|
if (my $code = $OtherFds{$fd}) { |
624
|
0
|
|
|
|
|
0
|
$code->($filter); |
625
|
|
|
|
|
|
|
} else { |
626
|
0
|
|
|
|
|
0
|
warn "kevent() returned fd $fd for which we have no mapping. removing.\n"; |
627
|
0
|
|
|
|
|
0
|
POSIX::close($fd); # close deletes the kevent entry |
628
|
|
|
|
|
|
|
} |
629
|
0
|
|
|
|
|
0
|
next; |
630
|
|
|
|
|
|
|
} |
631
|
|
|
|
|
|
|
|
632
|
0
|
|
|
|
|
0
|
DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n", |
633
|
|
|
|
|
|
|
$fd, ref($pob), $flags, time); |
634
|
|
|
|
|
|
|
|
635
|
0
|
0
|
0
|
|
|
0
|
$pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed}; |
636
|
0
|
0
|
0
|
|
|
0
|
$pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed}; |
637
|
0
|
0
|
0
|
|
|
0
|
if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) { |
638
|
0
|
0
|
|
|
|
0
|
if ($fflags) { |
639
|
0
|
|
|
|
|
0
|
$pob->event_err; |
640
|
|
|
|
|
|
|
} else { |
641
|
0
|
|
|
|
|
0
|
$pob->event_hup; |
642
|
|
|
|
|
|
|
} |
643
|
|
|
|
|
|
|
} |
644
|
|
|
|
|
|
|
} |
645
|
0
|
0
|
|
|
|
0
|
return unless PostEventLoop(); |
646
|
|
|
|
|
|
|
} |
647
|
|
|
|
|
|
|
|
648
|
0
|
|
|
|
|
0
|
exit(0); |
649
|
|
|
|
|
|
|
} |
650
|
|
|
|
|
|
|
|
651
|
|
|
|
|
|
|
=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> |
652
|
|
|
|
|
|
|
|
653
|
|
|
|
|
|
|
Sets post loop callback function. Pass a subref and it will be |
654
|
|
|
|
|
|
|
called every time the event loop finishes. |
655
|
|
|
|
|
|
|
|
656
|
|
|
|
|
|
|
Return 1 (or any true value) from the sub to make the loop continue, 0 or false |
657
|
|
|
|
|
|
|
and it will exit. |
658
|
|
|
|
|
|
|
|
659
|
|
|
|
|
|
|
The callback function will be passed two parameters: \%DescriptorMap, \%OtherFds. |
660
|
|
|
|
|
|
|
|
661
|
|
|
|
|
|
|
=cut |
662
|
|
|
|
|
|
|
sub SetPostLoopCallback { |
663
|
6
|
|
|
6
|
1
|
79
|
my ($class, $ref) = @_; |
664
|
|
|
|
|
|
|
|
665
|
6
|
100
|
|
|
|
17
|
if (ref $class) { |
666
|
|
|
|
|
|
|
# per-object callback |
667
|
2
|
|
|
|
|
3
|
my Danga::Socket $self = $class; |
668
|
2
|
50
|
33
|
|
|
9
|
if (defined $ref && ref $ref eq 'CODE') { |
669
|
2
|
|
|
|
|
7
|
$PLCMap{$self->{fd}} = $ref; |
670
|
|
|
|
|
|
|
} else { |
671
|
0
|
|
|
|
|
0
|
delete $PLCMap{$self->{fd}}; |
672
|
|
|
|
|
|
|
} |
673
|
|
|
|
|
|
|
} else { |
674
|
|
|
|
|
|
|
# global callback |
675
|
4
|
50
|
33
|
|
|
33
|
$PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; |
676
|
|
|
|
|
|
|
} |
677
|
|
|
|
|
|
|
} |
678
|
|
|
|
|
|
|
|
679
|
|
|
|
|
|
|
# Internal function: run the post-event callback, send read events |
680
|
|
|
|
|
|
|
# for pushed-back data, and close pending connections. returns 1 |
681
|
|
|
|
|
|
|
# if event loop should continue, or 0 to shut it all down. |
682
|
|
|
|
|
|
|
sub PostEventLoop { |
683
|
|
|
|
|
|
|
# fire read events for objects with pushed-back read data |
684
|
48
|
|
|
48
|
0
|
100
|
my $loop = 1; |
685
|
48
|
|
|
|
|
133
|
while ($loop) { |
686
|
54
|
|
|
|
|
634
|
$loop = 0; |
687
|
54
|
|
|
|
|
195
|
foreach my $fd (keys %PushBackSet) { |
688
|
6
|
|
|
|
|
8
|
my Danga::Socket $pob = $PushBackSet{$fd}; |
689
|
|
|
|
|
|
|
|
690
|
|
|
|
|
|
|
# a previous event_read invocation could've closed a |
691
|
|
|
|
|
|
|
# connection that we already evaluated in "keys |
692
|
|
|
|
|
|
|
# %PushBackSet", so skip ones that seem to have |
693
|
|
|
|
|
|
|
# disappeared. this is expected. |
694
|
6
|
50
|
|
|
|
14
|
next unless $pob; |
695
|
|
|
|
|
|
|
|
696
|
6
|
50
|
|
|
|
6
|
die "ASSERT: the $pob socket has no read_push_back" unless @{$pob->{read_push_back}}; |
|
6
|
|
|
|
|
12
|
|
697
|
|
|
|
|
|
|
next unless (! $pob->{closed} && |
698
|
6
|
50
|
33
|
|
|
22
|
$pob->{event_watch} & POLLIN); |
699
|
6
|
|
|
|
|
7
|
$loop = 1; |
700
|
6
|
|
|
|
|
15
|
$pob->event_read; |
701
|
|
|
|
|
|
|
} |
702
|
|
|
|
|
|
|
} |
703
|
|
|
|
|
|
|
|
704
|
|
|
|
|
|
|
# now we can close sockets that wanted to close during our event processing. |
705
|
|
|
|
|
|
|
# (we didn't want to close them during the loop, as we didn't want fd numbers |
706
|
|
|
|
|
|
|
# being reused and confused during the event loop) |
707
|
48
|
|
|
|
|
185
|
while (my $sock = shift @ToClose) { |
708
|
1
|
|
|
|
|
2
|
my $fd = fileno($sock); |
709
|
|
|
|
|
|
|
|
710
|
|
|
|
|
|
|
# close the socket. (not a Danga::Socket close) |
711
|
1
|
|
|
|
|
8
|
$sock->close; |
712
|
|
|
|
|
|
|
|
713
|
|
|
|
|
|
|
# and now we can finally remove the fd from the map. see |
714
|
|
|
|
|
|
|
# comment above in _cleanup. |
715
|
1
|
|
|
|
|
63
|
delete $DescriptorMap{$fd}; |
716
|
|
|
|
|
|
|
} |
717
|
|
|
|
|
|
|
|
718
|
|
|
|
|
|
|
|
719
|
|
|
|
|
|
|
# by default we keep running, unless a postloop callback (either per-object |
720
|
|
|
|
|
|
|
# or global) cancels it |
721
|
48
|
|
|
|
|
76
|
my $keep_running = 1; |
722
|
|
|
|
|
|
|
|
723
|
|
|
|
|
|
|
# per-object post-loop-callbacks |
724
|
48
|
|
|
|
|
110
|
for my $plc (values %PLCMap) { |
725
|
2
|
|
66
|
|
|
523
|
$keep_running &&= $plc->(\%DescriptorMap, \%OtherFds); |
726
|
|
|
|
|
|
|
} |
727
|
|
|
|
|
|
|
|
728
|
|
|
|
|
|
|
# now we're at the very end, call callback if defined |
729
|
48
|
100
|
|
|
|
336
|
if (defined $PostLoopCallback) { |
730
|
38
|
|
100
|
|
|
354
|
$keep_running &&= $PostLoopCallback->(\%DescriptorMap, \%OtherFds); |
731
|
|
|
|
|
|
|
} |
732
|
|
|
|
|
|
|
|
733
|
48
|
|
|
|
|
2458
|
return $keep_running; |
734
|
|
|
|
|
|
|
} |
735
|
|
|
|
|
|
|
|
736
|
|
|
|
|
|
|
##################################################################### |
737
|
|
|
|
|
|
|
### Danga::Socket-the-object code |
738
|
|
|
|
|
|
|
##################################################################### |
739
|
|
|
|
|
|
|
|
740
|
|
|
|
|
|
|
=head2 OBJECT METHODS |
741
|
|
|
|
|
|
|
|
742
|
|
|
|
|
|
|
=head2 C<< CLASS->new( $socket ) >> |
743
|
|
|
|
|
|
|
|
744
|
|
|
|
|
|
|
Create a new Danga::Socket subclass object for the given I which will |
745
|
|
|
|
|
|
|
react to events on it during the C. |
746
|
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
This is normally (always?) called from your subclass via: |
748
|
|
|
|
|
|
|
|
749
|
|
|
|
|
|
|
$class->SUPER::new($socket); |
750
|
|
|
|
|
|
|
|
751
|
|
|
|
|
|
|
=cut |
752
|
|
|
|
|
|
|
sub new { |
753
|
10
|
|
|
10
|
1
|
8707
|
my Danga::Socket $self = shift; |
754
|
10
|
100
|
|
|
|
50
|
$self = fields::new($self) unless ref $self; |
755
|
|
|
|
|
|
|
|
756
|
10
|
|
|
|
|
6755
|
my $sock = shift; |
757
|
|
|
|
|
|
|
|
758
|
10
|
|
|
|
|
23
|
$self->{sock} = $sock; |
759
|
10
|
|
|
|
|
26
|
my $fd = fileno($sock); |
760
|
|
|
|
|
|
|
|
761
|
10
|
50
|
0
|
|
|
52
|
Carp::cluck("undef sock and/or fd in Danga::Socket->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) |
|
|
|
0
|
|
|
|
|
|
|
|
33
|
|
|
|
|
762
|
|
|
|
|
|
|
unless $sock && $fd; |
763
|
|
|
|
|
|
|
|
764
|
10
|
|
|
|
|
23
|
$self->{fd} = $fd; |
765
|
10
|
|
|
|
|
25
|
$self->{write_buf} = []; |
766
|
10
|
|
|
|
|
17
|
$self->{write_buf_offset} = 0; |
767
|
10
|
|
|
|
|
31
|
$self->{write_buf_size} = 0; |
768
|
10
|
|
|
|
|
16
|
$self->{closed} = 0; |
769
|
10
|
|
|
|
|
17
|
$self->{corked} = 0; |
770
|
10
|
|
|
|
|
19
|
$self->{read_push_back} = []; |
771
|
|
|
|
|
|
|
|
772
|
10
|
|
|
|
|
18
|
$self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; |
773
|
|
|
|
|
|
|
|
774
|
10
|
|
|
|
|
36
|
_InitPoller(); |
775
|
|
|
|
|
|
|
|
776
|
10
|
50
|
|
|
|
25
|
if ($HaveEpoll) { |
|
|
0
|
|
|
|
|
|
777
|
|
|
|
|
|
|
epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $self->{event_watch}) |
778
|
10
|
50
|
|
|
|
41
|
and die "couldn't add epoll watch for $fd\n"; |
779
|
|
|
|
|
|
|
} |
780
|
|
|
|
|
|
|
elsif ($HaveKQueue) { |
781
|
|
|
|
|
|
|
# Add them to the queue but disabled for now |
782
|
0
|
|
|
|
|
0
|
$KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), |
783
|
|
|
|
|
|
|
IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); |
784
|
0
|
|
|
|
|
0
|
$KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), |
785
|
|
|
|
|
|
|
IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); |
786
|
|
|
|
|
|
|
} |
787
|
|
|
|
|
|
|
|
788
|
|
|
|
|
|
|
Carp::cluck("Danga::Socket::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") |
789
|
10
|
50
|
|
|
|
258
|
if $DescriptorMap{$fd}; |
790
|
|
|
|
|
|
|
|
791
|
10
|
|
|
|
|
27
|
$DescriptorMap{$fd} = $self; |
792
|
10
|
|
|
|
|
24
|
return $self; |
793
|
|
|
|
|
|
|
} |
794
|
|
|
|
|
|
|
|
795
|
|
|
|
|
|
|
|
796
|
|
|
|
|
|
|
##################################################################### |
797
|
|
|
|
|
|
|
### I N S T A N C E M E T H O D S |
798
|
|
|
|
|
|
|
##################################################################### |
799
|
|
|
|
|
|
|
|
800
|
|
|
|
|
|
|
=head2 C<< $obj->tcp_cork( $boolean ) >> |
801
|
|
|
|
|
|
|
|
802
|
|
|
|
|
|
|
Turn TCP_CORK on or off depending on the value of I. |
803
|
|
|
|
|
|
|
|
804
|
|
|
|
|
|
|
=cut |
805
|
|
|
|
|
|
|
sub tcp_cork { |
806
|
1
|
|
|
1
|
1
|
1
|
my Danga::Socket $self = $_[0]; |
807
|
1
|
|
|
|
|
2
|
my $val = $_[1]; |
808
|
|
|
|
|
|
|
|
809
|
|
|
|
|
|
|
# make sure we have a socket |
810
|
1
|
50
|
|
|
|
3
|
return unless $self->{sock}; |
811
|
1
|
50
|
|
|
|
8
|
return if $val == $self->{corked}; |
812
|
|
|
|
|
|
|
|
813
|
0
|
|
|
|
|
0
|
my $rv; |
814
|
0
|
|
|
|
|
0
|
if (TCP_CORK) { |
815
|
0
|
0
|
|
|
|
0
|
$rv = setsockopt($self->{sock}, IPPROTO_TCP, TCP_CORK, |
816
|
|
|
|
|
|
|
pack("l", $val ? 1 : 0)); |
817
|
|
|
|
|
|
|
} else { |
818
|
|
|
|
|
|
|
# FIXME: implement freebsd *PUSH sockopts |
819
|
|
|
|
|
|
|
$rv = 1; |
820
|
|
|
|
|
|
|
} |
821
|
|
|
|
|
|
|
|
822
|
|
|
|
|
|
|
# if we failed, close (if we're not already) and warn about the error |
823
|
0
|
0
|
|
|
|
0
|
if ($rv) { |
824
|
0
|
|
|
|
|
0
|
$self->{corked} = $val; |
825
|
|
|
|
|
|
|
} else { |
826
|
0
|
0
|
0
|
|
|
0
|
if ($! == EBADF || $! == ENOTSOCK) { |
|
|
0
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
827
|
|
|
|
|
|
|
# internal state is probably corrupted; warn and then close if |
828
|
|
|
|
|
|
|
# we're not closed already |
829
|
0
|
|
|
|
|
0
|
warn "setsockopt: $!"; |
830
|
0
|
|
|
|
|
0
|
$self->close('tcp_cork_failed'); |
831
|
|
|
|
|
|
|
} elsif ($! == ENOPROTOOPT || $!{ENOTSOCK} || $!{EOPNOTSUPP}) { |
832
|
|
|
|
|
|
|
# TCP implementation doesn't support corking, so just ignore it |
833
|
|
|
|
|
|
|
# or we're trying to tcp-cork a non-socket (like a socketpair pipe |
834
|
|
|
|
|
|
|
# which is acting like a socket, which Perlbal does for child |
835
|
|
|
|
|
|
|
# processes acting like inetd-like web servers) |
836
|
|
|
|
|
|
|
} else { |
837
|
|
|
|
|
|
|
# some other error; we should never hit here, but if we do, die |
838
|
0
|
|
|
|
|
0
|
die "setsockopt: $!"; |
839
|
|
|
|
|
|
|
} |
840
|
|
|
|
|
|
|
} |
841
|
|
|
|
|
|
|
} |
842
|
|
|
|
|
|
|
|
843
|
|
|
|
|
|
|
=head2 C<< $obj->steal_socket() >> |
844
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
Basically returns our socket and makes it so that we don't try to close it, |
846
|
|
|
|
|
|
|
but we do remove it from epoll handlers. THIS CLOSES $self. It is the same |
847
|
|
|
|
|
|
|
thing as calling close, except it gives you the socket to use. |
848
|
|
|
|
|
|
|
|
849
|
|
|
|
|
|
|
=cut |
850
|
|
|
|
|
|
|
sub steal_socket { |
851
|
0
|
|
|
0
|
1
|
0
|
my Danga::Socket $self = $_[0]; |
852
|
0
|
0
|
|
|
|
0
|
return if $self->{closed}; |
853
|
|
|
|
|
|
|
|
854
|
|
|
|
|
|
|
# cleanup does most of the work of closing this socket |
855
|
0
|
|
|
|
|
0
|
$self->_cleanup(); |
856
|
|
|
|
|
|
|
|
857
|
|
|
|
|
|
|
# now undef our internal sock and fd structures so we don't use them |
858
|
0
|
|
|
|
|
0
|
my $sock = $self->{sock}; |
859
|
0
|
|
|
|
|
0
|
$self->{sock} = undef; |
860
|
0
|
|
|
|
|
0
|
return $sock; |
861
|
|
|
|
|
|
|
} |
862
|
|
|
|
|
|
|
|
863
|
|
|
|
|
|
|
=head2 C<< $obj->close( [$reason] ) >> |
864
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
Close the socket. The I argument will be used in debugging messages. |
866
|
|
|
|
|
|
|
|
867
|
|
|
|
|
|
|
=cut |
868
|
|
|
|
|
|
|
sub close { |
869
|
10
|
|
|
10
|
1
|
101
|
my Danga::Socket $self = $_[0]; |
870
|
10
|
100
|
|
|
|
17
|
return if $self->{closed}; |
871
|
|
|
|
|
|
|
|
872
|
|
|
|
|
|
|
# print out debugging info for this close |
873
|
1
|
|
|
|
|
1
|
if (DebugLevel) { |
874
|
|
|
|
|
|
|
my ($pkg, $filename, $line) = caller; |
875
|
|
|
|
|
|
|
my $reason = $_[1] || ""; |
876
|
|
|
|
|
|
|
warn "Closing \#$self->{fd} due to $pkg/$filename/$line ($reason)\n"; |
877
|
|
|
|
|
|
|
} |
878
|
|
|
|
|
|
|
|
879
|
|
|
|
|
|
|
# this does most of the work of closing us |
880
|
1
|
|
|
|
|
5
|
$self->_cleanup(); |
881
|
|
|
|
|
|
|
|
882
|
|
|
|
|
|
|
# defer closing the actual socket until the event loop is done |
883
|
|
|
|
|
|
|
# processing this round of events. (otherwise we might reuse fds) |
884
|
1
|
50
|
|
|
|
9
|
if ($self->{sock}) { |
885
|
1
|
|
|
|
|
4
|
push @ToClose, $self->{sock}; |
886
|
1
|
|
|
|
|
2
|
$self->{sock} = undef; |
887
|
|
|
|
|
|
|
} |
888
|
|
|
|
|
|
|
|
889
|
1
|
|
|
|
|
2
|
return 0; |
890
|
|
|
|
|
|
|
} |
891
|
|
|
|
|
|
|
|
892
|
|
|
|
|
|
|
### METHOD: _cleanup() |
893
|
|
|
|
|
|
|
### Called by our closers so we can clean internal data structures. |
894
|
|
|
|
|
|
|
sub _cleanup { |
895
|
1
|
|
|
1
|
|
2
|
my Danga::Socket $self = $_[0]; |
896
|
|
|
|
|
|
|
|
897
|
|
|
|
|
|
|
# we're effectively closed; we have no fd and sock when we leave here |
898
|
1
|
|
|
|
|
2
|
$self->{closed} = 1; |
899
|
|
|
|
|
|
|
|
900
|
|
|
|
|
|
|
# we need to flush our write buffer, as there may |
901
|
|
|
|
|
|
|
# be self-referential closures (sub { $client->close }) |
902
|
|
|
|
|
|
|
# preventing the object from being destroyed |
903
|
1
|
|
|
|
|
2
|
$self->{write_buf} = []; |
904
|
|
|
|
|
|
|
|
905
|
|
|
|
|
|
|
# uncork so any final data gets sent. only matters if the person closing |
906
|
|
|
|
|
|
|
# us forgot to do it, but we do it to be safe. |
907
|
1
|
|
|
|
|
5
|
$self->tcp_cork(0); |
908
|
|
|
|
|
|
|
|
909
|
|
|
|
|
|
|
# if we're using epoll, we have to remove this from our epoll fd so we stop getting |
910
|
|
|
|
|
|
|
# notifications about it |
911
|
1
|
50
|
33
|
|
|
6
|
if ($HaveEpoll && $self->{fd}) { |
912
|
1
|
50
|
|
|
|
4
|
if (epoll_ctl($Epoll, EPOLL_CTL_DEL, $self->{fd}, $self->{event_watch}) != 0) { |
913
|
|
|
|
|
|
|
# dump_error prints a backtrace so we can try to figure out why this happened |
914
|
0
|
|
|
|
|
0
|
$self->dump_error("epoll_ctl(): failure deleting fd=$self->{fd} during _cleanup(); $! (" . ($!+0) . ")"); |
915
|
|
|
|
|
|
|
} |
916
|
|
|
|
|
|
|
} |
917
|
|
|
|
|
|
|
|
918
|
|
|
|
|
|
|
# now delete from mappings. this fd no longer belongs to us, so we don't want |
919
|
|
|
|
|
|
|
# to get alerts for it if it becomes writable/readable/etc. |
920
|
1
|
|
|
|
|
42
|
delete $PushBackSet{$self->{fd}}; |
921
|
1
|
|
|
|
|
3
|
delete $PLCMap{$self->{fd}}; |
922
|
|
|
|
|
|
|
|
923
|
|
|
|
|
|
|
# we explicitly don't delete from DescriptorMap here until we |
924
|
|
|
|
|
|
|
# actually close the socket, as we might be in the middle of |
925
|
|
|
|
|
|
|
# processing an epoll_wait/etc that returned hundreds of fds, one |
926
|
|
|
|
|
|
|
# of which is not yet processed and is what we're closing. if we |
927
|
|
|
|
|
|
|
# keep it in DescriptorMap, then the event harnesses can just |
928
|
|
|
|
|
|
|
# looked at $pob->{closed} and ignore it. but if it's an |
929
|
|
|
|
|
|
|
# un-accounted for fd, then it (understandably) freak out a bit |
930
|
|
|
|
|
|
|
# and emit warnings, thinking their state got off. |
931
|
|
|
|
|
|
|
|
932
|
|
|
|
|
|
|
# and finally get rid of our fd so we can't use it anywhere else |
933
|
1
|
|
|
|
|
3
|
$self->{fd} = undef; |
934
|
|
|
|
|
|
|
} |
935
|
|
|
|
|
|
|
|
936
|
|
|
|
|
|
|
=head2 C<< $obj->sock() >> |
937
|
|
|
|
|
|
|
|
938
|
|
|
|
|
|
|
Returns the underlying IO::Handle for the object. |
939
|
|
|
|
|
|
|
|
940
|
|
|
|
|
|
|
=cut |
941
|
|
|
|
|
|
|
sub sock { |
942
|
0
|
|
|
0
|
1
|
0
|
my Danga::Socket $self = shift; |
943
|
0
|
|
|
|
|
0
|
return $self->{sock}; |
944
|
|
|
|
|
|
|
} |
945
|
|
|
|
|
|
|
|
946
|
|
|
|
|
|
|
=head2 C<< $obj->set_writer_func( CODEREF ) >> |
947
|
|
|
|
|
|
|
|
948
|
|
|
|
|
|
|
Sets a function to use instead of C when writing data to the socket. |
949
|
|
|
|
|
|
|
|
950
|
|
|
|
|
|
|
=cut |
951
|
|
|
|
|
|
|
sub set_writer_func { |
952
|
0
|
|
|
0
|
1
|
0
|
my Danga::Socket $self = shift; |
953
|
0
|
|
|
|
|
0
|
my $wtr = shift; |
954
|
0
|
0
|
0
|
|
|
0
|
Carp::croak("Not a subref") unless !defined $wtr || UNIVERSAL::isa($wtr, "CODE"); |
955
|
0
|
|
|
|
|
0
|
$self->{writer_func} = $wtr; |
956
|
|
|
|
|
|
|
} |
957
|
|
|
|
|
|
|
|
958
|
|
|
|
|
|
|
=head2 C<< $obj->write( $data ) >> |
959
|
|
|
|
|
|
|
|
960
|
|
|
|
|
|
|
Write the specified data to the underlying handle. I may be scalar, |
961
|
|
|
|
|
|
|
scalar ref, code ref (to run when there), or undef just to kick-start. |
962
|
|
|
|
|
|
|
Returns 1 if writes all went through, or 0 if there are writes in queue. If |
963
|
|
|
|
|
|
|
it returns 1, caller should stop waiting for 'writable' events) |
964
|
|
|
|
|
|
|
|
965
|
|
|
|
|
|
|
=cut |
966
|
|
|
|
|
|
|
sub write { |
967
|
2
|
|
|
2
|
1
|
602
|
my Danga::Socket $self; |
968
|
|
|
|
|
|
|
my $data; |
969
|
2
|
|
|
|
|
6
|
($self, $data) = @_; |
970
|
|
|
|
|
|
|
|
971
|
|
|
|
|
|
|
# nobody should be writing to closed sockets, but caller code can |
972
|
|
|
|
|
|
|
# do two writes within an event, have the first fail and |
973
|
|
|
|
|
|
|
# disconnect the other side (whose destructor then closes the |
974
|
|
|
|
|
|
|
# calling object, but it's still in a method), and then the |
975
|
|
|
|
|
|
|
# now-dead object does its second write. that is this case. we |
976
|
|
|
|
|
|
|
# just lie and say it worked. it'll be dead soon and won't be |
977
|
|
|
|
|
|
|
# hurt by this lie. |
978
|
2
|
50
|
|
|
|
10
|
return 1 if $self->{closed}; |
979
|
|
|
|
|
|
|
|
980
|
2
|
|
|
|
|
5
|
my $bref; |
981
|
|
|
|
|
|
|
|
982
|
|
|
|
|
|
|
# just queue data if there's already a wait |
983
|
|
|
|
|
|
|
my $need_queue; |
984
|
|
|
|
|
|
|
|
985
|
2
|
50
|
|
|
|
4
|
if (defined $data) { |
986
|
2
|
50
|
|
|
|
8
|
$bref = ref $data ? $data : \$data; |
987
|
2
|
50
|
|
|
|
5
|
if ($self->{write_buf_size}) { |
988
|
0
|
|
|
|
|
0
|
push @{$self->{write_buf}}, $bref; |
|
0
|
|
|
|
|
0
|
|
989
|
0
|
0
|
|
|
|
0
|
$self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1; |
990
|
0
|
|
|
|
|
0
|
return 0; |
991
|
|
|
|
|
|
|
} |
992
|
|
|
|
|
|
|
|
993
|
|
|
|
|
|
|
# this flag says we're bypassing the queue system, knowing we're the |
994
|
|
|
|
|
|
|
# only outstanding write, and hoping we don't ever need to use it. |
995
|
|
|
|
|
|
|
# if so later, though, we'll need to queue |
996
|
2
|
|
|
|
|
4
|
$need_queue = 1; |
997
|
|
|
|
|
|
|
} |
998
|
|
|
|
|
|
|
|
999
|
|
|
|
|
|
|
WRITE: |
1000
|
2
|
|
|
|
|
8
|
while (1) { |
1001
|
2
|
50
|
33
|
|
|
10
|
return 1 unless $bref ||= $self->{write_buf}[0]; |
1002
|
|
|
|
|
|
|
|
1003
|
2
|
|
|
|
|
3
|
my $len; |
1004
|
2
|
|
|
|
|
4
|
eval { |
1005
|
2
|
|
|
|
|
4
|
$len = length($$bref); # this will die if $bref is a code ref, caught below |
1006
|
|
|
|
|
|
|
}; |
1007
|
2
|
50
|
|
|
|
6
|
if ($@) { |
1008
|
0
|
0
|
|
|
|
0
|
if (UNIVERSAL::isa($bref, "CODE")) { |
1009
|
0
|
0
|
|
|
|
0
|
unless ($need_queue) { |
1010
|
0
|
|
|
|
|
0
|
$self->{write_buf_size}--; # code refs are worth 1 |
1011
|
0
|
|
|
|
|
0
|
shift @{$self->{write_buf}}; |
|
0
|
|
|
|
|
0
|
|
1012
|
|
|
|
|
|
|
} |
1013
|
0
|
|
|
|
|
0
|
$bref->(); |
1014
|
|
|
|
|
|
|
|
1015
|
|
|
|
|
|
|
# code refs are just run and never get reenqueued |
1016
|
|
|
|
|
|
|
# (they're one-shot), so turn off the flag indicating the |
1017
|
|
|
|
|
|
|
# outstanding data needs queueing. |
1018
|
0
|
|
|
|
|
0
|
$need_queue = 0; |
1019
|
|
|
|
|
|
|
|
1020
|
0
|
|
|
|
|
0
|
undef $bref; |
1021
|
0
|
|
|
|
|
0
|
next WRITE; |
1022
|
|
|
|
|
|
|
} |
1023
|
0
|
|
|
|
|
0
|
die "Write error: $@ <$bref>"; |
1024
|
|
|
|
|
|
|
} |
1025
|
|
|
|
|
|
|
|
1026
|
2
|
|
|
|
|
5
|
my $to_write = $len - $self->{write_buf_offset}; |
1027
|
2
|
|
|
|
|
3
|
my $written; |
1028
|
2
|
50
|
|
|
|
5
|
if (my $wtr = $self->{writer_func}) { |
1029
|
0
|
|
|
|
|
0
|
$written = $wtr->($bref, $to_write, $self->{write_buf_offset}); |
1030
|
|
|
|
|
|
|
} else { |
1031
|
2
|
|
|
|
|
84
|
$written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset}); |
1032
|
|
|
|
|
|
|
} |
1033
|
|
|
|
|
|
|
|
1034
|
2
|
50
|
|
|
|
15
|
if (! defined $written) { |
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
1035
|
0
|
0
|
|
|
|
0
|
if ($! == EPIPE) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
1036
|
0
|
|
|
|
|
0
|
return $self->close("EPIPE"); |
1037
|
|
|
|
|
|
|
} elsif ($! == EAGAIN) { |
1038
|
|
|
|
|
|
|
# since connection has stuff to write, it should now be |
1039
|
|
|
|
|
|
|
# interested in pending writes: |
1040
|
0
|
0
|
|
|
|
0
|
if ($need_queue) { |
1041
|
0
|
|
|
|
|
0
|
push @{$self->{write_buf}}, $bref; |
|
0
|
|
|
|
|
0
|
|
1042
|
0
|
|
|
|
|
0
|
$self->{write_buf_size} += $len; |
1043
|
|
|
|
|
|
|
} |
1044
|
0
|
0
|
|
|
|
0
|
$self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT; |
1045
|
0
|
|
|
|
|
0
|
$self->watch_write(1); |
1046
|
0
|
|
|
|
|
0
|
return 0; |
1047
|
|
|
|
|
|
|
} elsif ($! == ECONNRESET) { |
1048
|
0
|
|
|
|
|
0
|
return $self->close("ECONNRESET"); |
1049
|
|
|
|
|
|
|
} |
1050
|
|
|
|
|
|
|
|
1051
|
0
|
|
|
|
|
0
|
DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n"); |
1052
|
|
|
|
|
|
|
|
1053
|
0
|
|
|
|
|
0
|
return $self->close("write_error"); |
1054
|
|
|
|
|
|
|
} elsif ($written != $to_write) { |
1055
|
|
|
|
|
|
|
DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d", |
1056
|
0
|
|
|
|
|
0
|
$written, $self->{fd}); |
1057
|
0
|
0
|
|
|
|
0
|
if ($need_queue) { |
1058
|
0
|
|
|
|
|
0
|
push @{$self->{write_buf}}, $bref; |
|
0
|
|
|
|
|
0
|
|
1059
|
0
|
|
|
|
|
0
|
$self->{write_buf_size} += $len; |
1060
|
|
|
|
|
|
|
} |
1061
|
|
|
|
|
|
|
# since connection has stuff to write, it should now be |
1062
|
|
|
|
|
|
|
# interested in pending writes: |
1063
|
0
|
|
|
|
|
0
|
$self->{write_buf_offset} += $written; |
1064
|
0
|
|
|
|
|
0
|
$self->{write_buf_size} -= $written; |
1065
|
0
|
|
|
|
|
0
|
$self->on_incomplete_write; |
1066
|
0
|
|
|
|
|
0
|
return 0; |
1067
|
|
|
|
|
|
|
} elsif ($written == $to_write) { |
1068
|
|
|
|
|
|
|
DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)", |
1069
|
2
|
|
|
|
|
4
|
$written, $self->{fd}, $need_queue); |
1070
|
2
|
|
|
|
|
5
|
$self->{write_buf_offset} = 0; |
1071
|
|
|
|
|
|
|
|
1072
|
2
|
50
|
|
|
|
5
|
if ($self->{write_set_watch}) { |
1073
|
0
|
|
|
|
|
0
|
$self->watch_write(0); |
1074
|
0
|
|
|
|
|
0
|
$self->{write_set_watch} = 0; |
1075
|
|
|
|
|
|
|
} |
1076
|
|
|
|
|
|
|
|
1077
|
|
|
|
|
|
|
# this was our only write, so we can return immediately |
1078
|
|
|
|
|
|
|
# since we avoided incrementing the buffer size or |
1079
|
|
|
|
|
|
|
# putting it in the buffer. we also know there |
1080
|
|
|
|
|
|
|
# can't be anything else to write. |
1081
|
2
|
50
|
|
|
|
9
|
return 1 if $need_queue; |
1082
|
|
|
|
|
|
|
|
1083
|
0
|
|
|
|
|
0
|
$self->{write_buf_size} -= $written; |
1084
|
0
|
|
|
|
|
0
|
shift @{$self->{write_buf}}; |
|
0
|
|
|
|
|
0
|
|
1085
|
0
|
|
|
|
|
0
|
undef $bref; |
1086
|
0
|
|
|
|
|
0
|
next WRITE; |
1087
|
|
|
|
|
|
|
} |
1088
|
|
|
|
|
|
|
} |
1089
|
|
|
|
|
|
|
} |
1090
|
|
|
|
|
|
|
|
1091
|
|
|
|
|
|
|
sub on_incomplete_write { |
1092
|
0
|
|
|
0
|
0
|
0
|
my Danga::Socket $self = shift; |
1093
|
0
|
0
|
|
|
|
0
|
$self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT; |
1094
|
0
|
|
|
|
|
0
|
$self->watch_write(1); |
1095
|
|
|
|
|
|
|
} |
1096
|
|
|
|
|
|
|
|
1097
|
|
|
|
|
|
|
=head2 C<< $obj->push_back_read( $buf ) >> |
1098
|
|
|
|
|
|
|
|
1099
|
|
|
|
|
|
|
Push back I (a scalar or scalarref) into the read stream. Useful if you read |
1100
|
|
|
|
|
|
|
more than you need to and want to return this data on the next "read". |
1101
|
|
|
|
|
|
|
|
1102
|
|
|
|
|
|
|
=cut |
1103
|
|
|
|
|
|
|
sub push_back_read { |
1104
|
4
|
|
|
4
|
1
|
868
|
my Danga::Socket $self = shift; |
1105
|
4
|
|
|
|
|
6
|
my $buf = shift; |
1106
|
4
|
50
|
|
|
|
6
|
push @{$self->{read_push_back}}, ref $buf ? $buf : \$buf; |
|
4
|
|
|
|
|
11
|
|
1107
|
4
|
|
|
|
|
13
|
$PushBackSet{$self->{fd}} = $self; |
1108
|
|
|
|
|
|
|
} |
1109
|
|
|
|
|
|
|
|
1110
|
|
|
|
|
|
|
=head2 C<< $obj->read( $bytecount ) >> |
1111
|
|
|
|
|
|
|
|
1112
|
|
|
|
|
|
|
Read at most I bytes from the underlying handle; returns scalar ref |
1113
|
|
|
|
|
|
|
on read, or undef on connection closed. If you call read more than once and no |
1114
|
|
|
|
|
|
|
more data available after the first call, a scalar ref to an empty string is |
1115
|
|
|
|
|
|
|
returned. |
1116
|
|
|
|
|
|
|
|
1117
|
|
|
|
|
|
|
=cut |
1118
|
|
|
|
|
|
|
sub read { |
1119
|
10
|
|
|
10
|
1
|
100
|
my Danga::Socket $self = shift; |
1120
|
10
|
50
|
|
|
|
18
|
return if $self->{closed}; |
1121
|
10
|
|
|
|
|
12
|
my $bytes = shift; |
1122
|
10
|
|
|
|
|
8
|
my $buf; |
1123
|
10
|
|
|
|
|
14
|
my $sock = $self->{sock}; |
1124
|
|
|
|
|
|
|
|
1125
|
10
|
100
|
|
|
|
9
|
if (@{$self->{read_push_back}}) { |
|
10
|
|
|
|
|
22
|
|
1126
|
6
|
|
|
|
|
6
|
$buf = shift @{$self->{read_push_back}}; |
|
6
|
|
|
|
|
8
|
|
1127
|
6
|
|
|
|
|
8
|
my $len = length($$buf); |
1128
|
|
|
|
|
|
|
|
1129
|
6
|
100
|
|
|
|
13
|
if ($len <= $bytes) { |
1130
|
4
|
50
|
|
|
|
5
|
delete $PushBackSet{$self->{fd}} unless @{$self->{read_push_back}}; |
|
4
|
|
|
|
|
12
|
|
1131
|
4
|
|
|
|
|
7
|
return $buf; |
1132
|
|
|
|
|
|
|
} else { |
1133
|
|
|
|
|
|
|
# if the pushed back read is too big, we have to split it |
1134
|
2
|
|
|
|
|
4
|
my $overflow = substr($$buf, $bytes); |
1135
|
2
|
|
|
|
|
4
|
$buf = substr($$buf, 0, $bytes); |
1136
|
2
|
|
|
|
|
4
|
unshift @{$self->{read_push_back}}, \$overflow; |
|
2
|
|
|
|
|
5
|
|
1137
|
2
|
|
|
|
|
5
|
return \$buf; |
1138
|
|
|
|
|
|
|
} |
1139
|
|
|
|
|
|
|
} |
1140
|
|
|
|
|
|
|
|
1141
|
|
|
|
|
|
|
# if this is too high, perl quits(!!). reports on mailing lists |
1142
|
|
|
|
|
|
|
# don't seem to point to a universal answer. 5MB worked for some, |
1143
|
|
|
|
|
|
|
# crashed for others. 1MB works for more people. let's go with 1MB |
1144
|
|
|
|
|
|
|
# for now. :/ |
1145
|
4
|
50
|
|
|
|
8
|
my $req_bytes = $bytes > 1048576 ? 1048576 : $bytes; |
1146
|
|
|
|
|
|
|
|
1147
|
4
|
|
|
|
|
42
|
my $res = sysread($sock, $buf, $req_bytes, 0); |
1148
|
4
|
|
|
|
|
7
|
DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!); |
1149
|
|
|
|
|
|
|
|
1150
|
4
|
50
|
33
|
|
|
9
|
if (! $res && $! != EWOULDBLOCK) { |
1151
|
|
|
|
|
|
|
# catches 0=conn closed or undef=error |
1152
|
0
|
|
|
|
|
0
|
DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd}); |
1153
|
0
|
|
|
|
|
0
|
return undef; |
1154
|
|
|
|
|
|
|
} |
1155
|
|
|
|
|
|
|
|
1156
|
4
|
|
|
|
|
11
|
return \$buf; |
1157
|
|
|
|
|
|
|
} |
1158
|
|
|
|
|
|
|
|
1159
|
|
|
|
|
|
|
=head2 (VIRTUAL) C<< $obj->event_read() >> |
1160
|
|
|
|
|
|
|
|
1161
|
|
|
|
|
|
|
Readable event handler. Concrete deriviatives of Danga::Socket should |
1162
|
|
|
|
|
|
|
provide an implementation of this. The default implementation will die if |
1163
|
|
|
|
|
|
|
called. |
1164
|
|
|
|
|
|
|
|
1165
|
|
|
|
|
|
|
=cut |
1166
|
0
|
|
|
0
|
1
|
0
|
sub event_read { die "Base class event_read called for $_[0]\n"; } |
1167
|
|
|
|
|
|
|
|
1168
|
|
|
|
|
|
|
=head2 (VIRTUAL) C<< $obj->event_err() >> |
1169
|
|
|
|
|
|
|
|
1170
|
|
|
|
|
|
|
Error event handler. Concrete deriviatives of Danga::Socket should |
1171
|
|
|
|
|
|
|
provide an implementation of this. The default implementation will die if |
1172
|
|
|
|
|
|
|
called. |
1173
|
|
|
|
|
|
|
|
1174
|
|
|
|
|
|
|
=cut |
1175
|
0
|
|
|
0
|
1
|
0
|
sub event_err { die "Base class event_err called for $_[0]\n"; } |
1176
|
|
|
|
|
|
|
|
1177
|
|
|
|
|
|
|
=head2 (VIRTUAL) C<< $obj->event_hup() >> |
1178
|
|
|
|
|
|
|
|
1179
|
|
|
|
|
|
|
'Hangup' event handler. Concrete deriviatives of Danga::Socket should |
1180
|
|
|
|
|
|
|
provide an implementation of this. The default implementation will die if |
1181
|
|
|
|
|
|
|
called. |
1182
|
|
|
|
|
|
|
|
1183
|
|
|
|
|
|
|
=cut |
1184
|
0
|
|
|
0
|
1
|
0
|
sub event_hup { die "Base class event_hup called for $_[0]\n"; } |
1185
|
|
|
|
|
|
|
|
1186
|
|
|
|
|
|
|
=head2 C<< $obj->event_write() >> |
1187
|
|
|
|
|
|
|
|
1188
|
|
|
|
|
|
|
Writable event handler. Concrete deriviatives of Danga::Socket may wish to |
1189
|
|
|
|
|
|
|
provide an implementation of this. The default implementation calls |
1190
|
|
|
|
|
|
|
C with an C. |
1191
|
|
|
|
|
|
|
|
1192
|
|
|
|
|
|
|
=cut |
1193
|
|
|
|
|
|
|
sub event_write { |
1194
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
1195
|
0
|
|
|
|
|
0
|
$self->write(undef); |
1196
|
|
|
|
|
|
|
} |
1197
|
|
|
|
|
|
|
|
1198
|
|
|
|
|
|
|
=head2 C<< $obj->watch_read( $boolean ) >> |
1199
|
|
|
|
|
|
|
|
1200
|
|
|
|
|
|
|
Turn 'readable' event notification on or off. |
1201
|
|
|
|
|
|
|
|
1202
|
|
|
|
|
|
|
=cut |
1203
|
|
|
|
|
|
|
sub watch_read { |
1204
|
8
|
|
|
8
|
1
|
464
|
my Danga::Socket $self = shift; |
1205
|
8
|
50
|
33
|
|
|
45
|
return if $self->{closed} || !$self->{sock}; |
1206
|
|
|
|
|
|
|
|
1207
|
8
|
|
|
|
|
13
|
my $val = shift; |
1208
|
8
|
|
|
|
|
16
|
my $event = $self->{event_watch}; |
1209
|
|
|
|
|
|
|
|
1210
|
8
|
100
|
|
|
|
18
|
$event &= ~POLLIN if ! $val; |
1211
|
8
|
100
|
|
|
|
21
|
$event |= POLLIN if $val; |
1212
|
|
|
|
|
|
|
|
1213
|
|
|
|
|
|
|
# If it changed, set it |
1214
|
8
|
50
|
|
|
|
21
|
if ($event != $self->{event_watch}) { |
1215
|
8
|
50
|
|
|
|
24
|
if ($HaveKQueue) { |
|
|
50
|
|
|
|
|
|
1216
|
0
|
0
|
|
|
|
0
|
$KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_READ(), |
1217
|
|
|
|
|
|
|
$val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); |
1218
|
|
|
|
|
|
|
} |
1219
|
|
|
|
|
|
|
elsif ($HaveEpoll) { |
1220
|
8
|
50
|
|
|
|
21
|
epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) |
1221
|
|
|
|
|
|
|
and $self->dump_error("couldn't modify epoll settings for $self->{fd} " . |
1222
|
|
|
|
|
|
|
"from $self->{event_watch} -> $event: $! (" . ($!+0) . ")"); |
1223
|
|
|
|
|
|
|
} |
1224
|
8
|
|
|
|
|
109
|
$self->{event_watch} = $event; |
1225
|
|
|
|
|
|
|
} |
1226
|
|
|
|
|
|
|
} |
1227
|
|
|
|
|
|
|
|
1228
|
|
|
|
|
|
|
=head2 C<< $obj->watch_write( $boolean ) >> |
1229
|
|
|
|
|
|
|
|
1230
|
|
|
|
|
|
|
Turn 'writable' event notification on or off. |
1231
|
|
|
|
|
|
|
|
1232
|
|
|
|
|
|
|
=cut |
1233
|
|
|
|
|
|
|
sub watch_write { |
1234
|
6
|
|
|
6
|
1
|
41
|
my Danga::Socket $self = shift; |
1235
|
6
|
50
|
33
|
|
|
32
|
return if $self->{closed} || !$self->{sock}; |
1236
|
|
|
|
|
|
|
|
1237
|
6
|
|
|
|
|
8
|
my $val = shift; |
1238
|
6
|
|
|
|
|
11
|
my $event = $self->{event_watch}; |
1239
|
|
|
|
|
|
|
|
1240
|
6
|
100
|
|
|
|
14
|
$event &= ~POLLOUT if ! $val; |
1241
|
6
|
100
|
|
|
|
19
|
$event |= POLLOUT if $val; |
1242
|
|
|
|
|
|
|
|
1243
|
6
|
100
|
66
|
|
|
33
|
if ($val && caller ne __PACKAGE__) { |
1244
|
|
|
|
|
|
|
# A subclass registered interest, it's now responsible for this. |
1245
|
4
|
|
|
|
|
10
|
$self->{write_set_watch} = 0; |
1246
|
|
|
|
|
|
|
} |
1247
|
|
|
|
|
|
|
|
1248
|
|
|
|
|
|
|
# If it changed, set it |
1249
|
6
|
50
|
|
|
|
26
|
if ($event != $self->{event_watch}) { |
1250
|
6
|
50
|
|
|
|
21
|
if ($HaveKQueue) { |
|
|
50
|
|
|
|
|
|
1251
|
0
|
0
|
|
|
|
0
|
$KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_WRITE(), |
1252
|
|
|
|
|
|
|
$val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); |
1253
|
|
|
|
|
|
|
} |
1254
|
|
|
|
|
|
|
elsif ($HaveEpoll) { |
1255
|
6
|
50
|
|
|
|
19
|
epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) |
1256
|
|
|
|
|
|
|
and $self->dump_error("couldn't modify epoll settings for $self->{fd} " . |
1257
|
|
|
|
|
|
|
"from $self->{event_watch} -> $event: $! (" . ($!+0) . ")"); |
1258
|
|
|
|
|
|
|
} |
1259
|
6
|
|
|
|
|
90
|
$self->{event_watch} = $event; |
1260
|
|
|
|
|
|
|
} |
1261
|
|
|
|
|
|
|
} |
1262
|
|
|
|
|
|
|
|
1263
|
|
|
|
|
|
|
=head2 C<< $obj->dump_error( $message ) >> |
1264
|
|
|
|
|
|
|
|
1265
|
|
|
|
|
|
|
Prints to STDERR a backtrace with information about this socket and what lead |
1266
|
|
|
|
|
|
|
up to the dump_error call. |
1267
|
|
|
|
|
|
|
|
1268
|
|
|
|
|
|
|
=cut |
1269
|
|
|
|
|
|
|
sub dump_error { |
1270
|
0
|
|
|
0
|
1
|
0
|
my $i = 0; |
1271
|
0
|
|
|
|
|
0
|
my @list; |
1272
|
0
|
|
|
|
|
0
|
while (my ($file, $line, $sub) = (caller($i++))[1..3]) { |
1273
|
0
|
|
|
|
|
0
|
push @list, "\t$file:$line called $sub\n"; |
1274
|
|
|
|
|
|
|
} |
1275
|
|
|
|
|
|
|
|
1276
|
0
|
|
|
|
|
0
|
warn "ERROR: $_[1]\n" . |
1277
|
|
|
|
|
|
|
"\t$_[0] = " . $_[0]->as_string . "\n" . |
1278
|
|
|
|
|
|
|
join('', @list); |
1279
|
|
|
|
|
|
|
} |
1280
|
|
|
|
|
|
|
|
1281
|
|
|
|
|
|
|
=head2 C<< $obj->debugmsg( $format, @args ) >> |
1282
|
|
|
|
|
|
|
|
1283
|
|
|
|
|
|
|
Print the debugging message specified by the C-style I and |
1284
|
|
|
|
|
|
|
I. |
1285
|
|
|
|
|
|
|
|
1286
|
|
|
|
|
|
|
=cut |
1287
|
|
|
|
|
|
|
sub debugmsg { |
1288
|
0
|
|
|
0
|
1
|
0
|
my ( $self, $fmt, @args ) = @_; |
1289
|
0
|
0
|
|
|
|
0
|
confess "Not an object" unless ref $self; |
1290
|
|
|
|
|
|
|
|
1291
|
0
|
|
|
|
|
0
|
chomp $fmt; |
1292
|
0
|
|
|
|
|
0
|
printf STDERR ">>> $fmt\n", @args; |
1293
|
|
|
|
|
|
|
} |
1294
|
|
|
|
|
|
|
|
1295
|
|
|
|
|
|
|
|
1296
|
|
|
|
|
|
|
=head2 C<< $obj->peer_ip_string() >> |
1297
|
|
|
|
|
|
|
|
1298
|
|
|
|
|
|
|
Returns the string describing the peer's IP |
1299
|
|
|
|
|
|
|
|
1300
|
|
|
|
|
|
|
=cut |
1301
|
|
|
|
|
|
|
sub peer_ip_string { |
1302
|
2
|
|
|
2
|
1
|
4
|
my Danga::Socket $self = shift; |
1303
|
2
|
50
|
|
|
|
6
|
return _undef("peer_ip_string undef: no sock") unless $self->{sock}; |
1304
|
2
|
50
|
|
|
|
6
|
return $self->{peer_ip} if defined $self->{peer_ip}; |
1305
|
|
|
|
|
|
|
|
1306
|
2
|
|
|
|
|
20
|
my $pn = getpeername($self->{sock}); |
1307
|
2
|
50
|
|
|
|
7
|
return _undef("peer_ip_string undef: getpeername") unless $pn; |
1308
|
|
|
|
|
|
|
|
1309
|
2
|
|
|
|
|
4
|
my ($port, $iaddr) = eval { |
1310
|
2
|
50
|
|
|
|
4
|
if (length($pn) >= 28) { |
1311
|
0
|
|
|
|
|
0
|
return Socket6::unpack_sockaddr_in6($pn); |
1312
|
|
|
|
|
|
|
} else { |
1313
|
2
|
|
|
|
|
5
|
return Socket::sockaddr_in($pn); |
1314
|
|
|
|
|
|
|
} |
1315
|
|
|
|
|
|
|
}; |
1316
|
|
|
|
|
|
|
|
1317
|
2
|
50
|
|
|
|
20
|
if ($@) { |
1318
|
0
|
|
|
|
|
0
|
$self->{peer_port} = "[Unknown peerport '$@']"; |
1319
|
0
|
|
|
|
|
0
|
return "[Unknown peername '$@']"; |
1320
|
|
|
|
|
|
|
} |
1321
|
|
|
|
|
|
|
|
1322
|
2
|
|
|
|
|
11
|
$self->{peer_port} = $port; |
1323
|
|
|
|
|
|
|
|
1324
|
2
|
50
|
|
|
|
8
|
if (length($iaddr) == 4) { |
1325
|
2
|
|
|
|
|
22
|
return $self->{peer_ip} = Socket::inet_ntoa($iaddr); |
1326
|
|
|
|
|
|
|
} else { |
1327
|
0
|
|
|
|
|
0
|
$self->{peer_v6} = 1; |
1328
|
0
|
|
|
|
|
0
|
return $self->{peer_ip} = Socket6::inet_ntop(Socket6::AF_INET6(), |
1329
|
|
|
|
|
|
|
$iaddr); |
1330
|
|
|
|
|
|
|
} |
1331
|
|
|
|
|
|
|
} |
1332
|
|
|
|
|
|
|
|
1333
|
|
|
|
|
|
|
=head2 C<< $obj->peer_addr_string() >> |
1334
|
|
|
|
|
|
|
|
1335
|
|
|
|
|
|
|
Returns the string describing the peer for the socket which underlies this |
1336
|
|
|
|
|
|
|
object in form "ip:port" |
1337
|
|
|
|
|
|
|
|
1338
|
|
|
|
|
|
|
=cut |
1339
|
|
|
|
|
|
|
sub peer_addr_string { |
1340
|
2
|
|
|
2
|
1
|
15
|
my Danga::Socket $self = shift; |
1341
|
2
|
50
|
|
|
|
9
|
my $ip = $self->peer_ip_string |
1342
|
|
|
|
|
|
|
or return undef; |
1343
|
|
|
|
|
|
|
return $self->{peer_v6} ? |
1344
|
2
|
50
|
|
|
|
13
|
"[$ip]:$self->{peer_port}" : |
1345
|
|
|
|
|
|
|
"$ip:$self->{peer_port}"; |
1346
|
|
|
|
|
|
|
} |
1347
|
|
|
|
|
|
|
|
1348
|
|
|
|
|
|
|
=head2 C<< $obj->local_ip_string() >> |
1349
|
|
|
|
|
|
|
|
1350
|
|
|
|
|
|
|
Returns the string describing the local IP |
1351
|
|
|
|
|
|
|
|
1352
|
|
|
|
|
|
|
=cut |
1353
|
|
|
|
|
|
|
sub local_ip_string { |
1354
|
2
|
|
|
2
|
1
|
4
|
my Danga::Socket $self = shift; |
1355
|
2
|
50
|
|
|
|
5
|
return _undef("local_ip_string undef: no sock") unless $self->{sock}; |
1356
|
2
|
50
|
|
|
|
5
|
return $self->{local_ip} if defined $self->{local_ip}; |
1357
|
|
|
|
|
|
|
|
1358
|
2
|
|
|
|
|
17
|
my $pn = getsockname($self->{sock}); |
1359
|
2
|
50
|
|
|
|
6
|
return _undef("local_ip_string undef: getsockname") unless $pn; |
1360
|
|
|
|
|
|
|
|
1361
|
2
|
|
|
|
|
4
|
my ($port, $iaddr) = Socket::sockaddr_in($pn); |
1362
|
2
|
|
|
|
|
15
|
$self->{local_port} = $port; |
1363
|
|
|
|
|
|
|
|
1364
|
2
|
|
|
|
|
8
|
return $self->{local_ip} = Socket::inet_ntoa($iaddr); |
1365
|
|
|
|
|
|
|
} |
1366
|
|
|
|
|
|
|
|
1367
|
|
|
|
|
|
|
=head2 C<< $obj->local_addr_string() >> |
1368
|
|
|
|
|
|
|
|
1369
|
|
|
|
|
|
|
Returns the string describing the local end of the socket which underlies this |
1370
|
|
|
|
|
|
|
object in form "ip:port" |
1371
|
|
|
|
|
|
|
|
1372
|
|
|
|
|
|
|
=cut |
1373
|
|
|
|
|
|
|
sub local_addr_string { |
1374
|
2
|
|
|
2
|
1
|
13
|
my Danga::Socket $self = shift; |
1375
|
2
|
|
|
|
|
8
|
my $ip = $self->local_ip_string; |
1376
|
2
|
50
|
|
|
|
10
|
return $ip ? "$ip:$self->{local_port}" : undef; |
1377
|
|
|
|
|
|
|
} |
1378
|
|
|
|
|
|
|
|
1379
|
|
|
|
|
|
|
|
1380
|
|
|
|
|
|
|
=head2 C<< $obj->as_string() >> |
1381
|
|
|
|
|
|
|
|
1382
|
|
|
|
|
|
|
Returns a string describing this socket. |
1383
|
|
|
|
|
|
|
|
1384
|
|
|
|
|
|
|
=cut |
1385
|
|
|
|
|
|
|
sub as_string { |
1386
|
0
|
|
|
0
|
1
|
|
my Danga::Socket $self = shift; |
1387
|
|
|
|
|
|
|
my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') . |
1388
|
0
|
0
|
|
|
|
|
($self->{event_watch} & POLLOUT ? 'W' : '') . ")"; |
|
|
0
|
|
|
|
|
|
1389
|
0
|
0
|
|
|
|
|
my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open"); |
1390
|
0
|
|
|
|
|
|
my $peer = $self->peer_addr_string; |
1391
|
0
|
0
|
|
|
|
|
if ($peer) { |
1392
|
0
|
|
|
|
|
|
$ret .= " to " . $self->peer_addr_string; |
1393
|
|
|
|
|
|
|
} |
1394
|
0
|
|
|
|
|
|
return $ret; |
1395
|
|
|
|
|
|
|
} |
1396
|
|
|
|
|
|
|
|
1397
|
|
|
|
|
|
|
sub _undef { |
1398
|
0
|
0
|
|
0
|
|
|
return undef unless $ENV{DS_DEBUG}; |
1399
|
0
|
|
0
|
|
|
|
my $msg = shift || ""; |
1400
|
0
|
|
|
|
|
|
warn "Danga::Socket: $msg\n"; |
1401
|
0
|
|
|
|
|
|
return undef; |
1402
|
|
|
|
|
|
|
} |
1403
|
|
|
|
|
|
|
|
1404
|
|
|
|
|
|
|
package Danga::Socket::Timer; |
1405
|
|
|
|
|
|
|
# [$abs_float_firetime, $coderef]; |
1406
|
|
|
|
|
|
|
sub cancel { |
1407
|
0
|
|
|
0
|
|
|
$_[0][1] = undef; |
1408
|
|
|
|
|
|
|
} |
1409
|
|
|
|
|
|
|
|
1410
|
|
|
|
|
|
|
=head1 AUTHORS |
1411
|
|
|
|
|
|
|
|
1412
|
|
|
|
|
|
|
Brad Fitzpatrick - author |
1413
|
|
|
|
|
|
|
|
1414
|
|
|
|
|
|
|
Michael Granger - docs, testing |
1415
|
|
|
|
|
|
|
|
1416
|
|
|
|
|
|
|
Mark Smith - contributor, heavy user, testing |
1417
|
|
|
|
|
|
|
|
1418
|
|
|
|
|
|
|
Matt Sergeant - kqueue support, docs, timers, other bits |
1419
|
|
|
|
|
|
|
|
1420
|
|
|
|
|
|
|
=head1 BUGS |
1421
|
|
|
|
|
|
|
|
1422
|
|
|
|
|
|
|
Not documented enough (but isn't that true of every project?). |
1423
|
|
|
|
|
|
|
|
1424
|
|
|
|
|
|
|
tcp_cork only works on Linux for now. No BSD push/nopush support. |
1425
|
|
|
|
|
|
|
|
1426
|
|
|
|
|
|
|
=head1 LICENSE |
1427
|
|
|
|
|
|
|
|
1428
|
|
|
|
|
|
|
License is granted to use and distribute this module under the same |
1429
|
|
|
|
|
|
|
terms as Perl itself. |
1430
|
|
|
|
|
|
|
|
1431
|
|
|
|
|
|
|
=cut |
1432
|
|
|
|
|
|
|
|
1433
|
|
|
|
|
|
|
1; |
1434
|
|
|
|
|
|
|
|
1435
|
|
|
|
|
|
|
# Local Variables: |
1436
|
|
|
|
|
|
|
# mode: perl |
1437
|
|
|
|
|
|
|
# c-basic-indent: 4 |
1438
|
|
|
|
|
|
|
# indent-tabs-mode: nil |
1439
|
|
|
|
|
|
|
# End: |