File Coverage

blib/lib/Mojo/IOLoop/Stream.pm
Criterion Covered Total %
statement 81 83 97.5
branch 50 62 80.6
condition 8 20 40.0
subroutine 26 26 100.0
pod 15 15 100.0
total 180 206 87.3


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::Stream;
2 64     64   529 use Mojo::Base 'Mojo::EventEmitter';
  64         188  
  64         497  
3              
4 64     64   664 use Errno qw(EAGAIN ECONNRESET EINTR EWOULDBLOCK);
  64         206  
  64         3624  
5 64     64   456 use Mojo::IOLoop;
  64         162  
  64         379  
6 64     64   425 use Mojo::Util;
  64         207  
  64         2894  
7 64     64   454 use Scalar::Util qw(weaken);
  64         181  
  64         118014  
8              
9             has high_water_mark => 1048576;
10             has reactor => sub { Mojo::IOLoop->singleton->reactor }, weak => 1;
11              
12 347 50   347   6354 sub DESTROY { shift->close unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }
13              
14 2 100   2 1 38 sub bytes_read { shift->{read} || 0 }
15              
16 17   100 17 1 106 sub bytes_waiting { length(shift->{buffer} // '') }
17              
18 2 100   2 1 11 sub bytes_written { shift->{written} || 0 }
19              
20 14 100   14 1 90 sub can_write { $_[0]{handle} && $_[0]->bytes_waiting < $_[0]->high_water_mark }
21              
22             sub close {
23 688     688 1 1922 my $self = shift;
24 688 100       2130 return unless my $reactor = $self->reactor;
25 685 100       2144 return unless my $handle = delete $self->timeout(0)->{handle};
26 344         1231 $reactor->remove($handle);
27 344         1645 $self->emit('close');
28             }
29              
30 286 100   286 1 788 sub close_gracefully { $_[0]->is_writing ? $_[0]{graceful}++ : $_[0]->close }
31              
32 1898     1898 1 4546 sub handle { shift->{handle} }
33              
34             sub is_readable {
35 771     771 1 1489 my $self = shift;
36 771         2325 $self->_again;
37 771   33     8349 return $self->{handle} && Mojo::Util::_readable(0, fileno $self->{handle});
38             }
39              
40             sub is_writing {
41 7804     7804 1 12066 my $self = shift;
42 7804 100       17440 return undef unless $self->{handle};
43 7774   100     30788 return !!length($self->{buffer}) || $self->has_subscribers('drain');
44             }
45              
46 388     388 1 10135 sub new { shift->SUPER::new(handle => shift, timeout => 15) }
47              
48             sub start {
49 388     388 1 741 my $self = shift;
50              
51             # Resume
52 388 50       1203 return unless $self->{handle};
53 388         996 my $reactor = $self->reactor;
54 388 100       1176 return $reactor->watch($self->{handle}, 1, $self->is_writing) if delete $self->{paused};
55              
56 387         1309 weaken $self;
57 387 100   12428   1347 my $cb = sub { pop() ? $self->_write : $self->_read };
  12428         38809  
58 387         1244 $reactor->io($self->timeout($self->{timeout})->{handle} => $cb);
59             }
60              
61             sub steal_handle {
62 1     1 1 9 my $self = shift;
63 1         8 $self->reactor->remove($self->{handle});
64 1         5 return delete $self->{handle};
65             }
66              
67 1 50 33 1 1 22 sub stop { $_[0]->reactor->watch($_[0]{handle}, 0, $_[0]->is_writing) if $_[0]{handle} && !$_[0]{paused}++ }
68              
69             sub timeout {
70 4018     4018 1 8279 my ($self, $timeout) = @_;
71              
72 4018 100       8827 return $self->{timeout} unless defined $timeout;
73 4009         6962 $self->{timeout} = $timeout;
74              
75 4009         9178 my $reactor = $self->reactor;
76 4009 100       10087 if ($self->{timer}) {
    100          
77 3272 100       7148 if (!$self->{timeout}) { $reactor->remove(delete $self->{timer}) }
  340         1395  
78 2932         9268 else { $reactor->again($self->{timer}, $self->{timeout}) }
79             }
80             elsif ($self->{timeout}) {
81 388         1185 weaken $self;
82             $self->{timer}
83 388 50 33 6   1937 = $reactor->timer($timeout => sub { $self and delete($self->{timer}) and $self->emit('timeout')->close });
  6         182  
84             }
85              
86 4009         62049 return $self;
87             }
88              
89             sub write {
90 7714     7714 1 24493 my ($self, $chunk, $cb) = @_;
91              
92             # IO::Socket::SSL will corrupt data with the wrong internal representation
93 7714         19993 utf8::downgrade $chunk;
94 7714         18654 $self->{buffer} .= $chunk;
95 7714 100       15193 if ($cb) { $self->once(drain => $cb) }
  7457 100       18619  
96 184         751 elsif (!length $self->{buffer}) { return $self }
97 7530 50       25691 $self->reactor->watch($self->{handle}, !$self->{paused}, 1) if $self->{handle};
98              
99 7530         24726 return $self;
100             }
101              
102 12063 100   12063   44847 sub _again { $_[0]->reactor->again($_[0]{timer}) if $_[0]{timer} }
103              
104             sub _read {
105 4913     4913   7927 my $self = shift;
106              
107 4913 50       17998 if (defined(my $read = $self->{handle}->sysread(my $buffer, 131072, 0))) {
108 4913         122808 $self->{read} += $read;
109 4913 100       21763 return $read == 0 ? $self->close : $self->emit(read => $buffer)->_again;
110             }
111              
112             # Retry
113 0 0 0     0 return undef if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
      0        
114              
115             # Closed (maybe real error)
116 0 0       0 $! == ECONNRESET ? $self->close : $self->emit(error => $!)->close;
117             }
118              
119             sub _write {
120 7515     7515   11986 my $self = shift;
121              
122             # Handle errors only when reading (to avoid timing problems)
123 7515         12457 my $handle = $self->{handle};
124 7515 100       17511 if (length $self->{buffer}) {
125 6469 50       22964 return undef unless defined(my $written = $handle->syswrite($self->{buffer}));
126 6469         214663 $self->{written} += $written;
127 6469         40703 $self->emit(write => substr($self->{buffer}, 0, $written, ''))->_again;
128             }
129              
130             # Clear the buffer to free the underlying SV* memory
131 7515 50       58911 undef $self->{buffer}, $self->emit('drain') unless length $self->{buffer};
132 7515 100       17217 return undef if $self->is_writing;
133 2328 100       6507 return $self->close if $self->{graceful};
134 2270 100       9327 $self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle};
135             }
136              
137             1;
138              
139             =encoding utf8
140              
141             =head1 NAME
142              
143             Mojo::IOLoop::Stream - Non-blocking I/O stream
144              
145             =head1 SYNOPSIS
146              
147             use Mojo::IOLoop::Stream;
148              
149             # Create stream
150             my $stream = Mojo::IOLoop::Stream->new($handle);
151             $stream->on(read => sub ($stream, $bytes) {...});
152             $stream->on(close => sub ($stream) {...});
153             $stream->on(error => sub ($stream, $err) {...});
154              
155             # Start and stop watching for new data
156             $stream->start;
157             $stream->stop;
158              
159             # Start reactor if necessary
160             $stream->reactor->start unless $stream->reactor->is_running;
161              
162             =head1 DESCRIPTION
163              
164             L is a container for I/O streams used by L.
165              
166             =head1 EVENTS
167              
168             L inherits all events from L and can emit the following new ones.
169              
170             =head2 close
171              
172             $stream->on(close => sub ($stream) {...});
173              
174             Emitted if the stream gets closed.
175              
176             =head2 drain
177              
178             $stream->on(drain => sub ($stream) {...});
179              
180             Emitted once all data has been written.
181              
182             =head2 error
183              
184             $stream->on(error => sub ($stream, $err) {...});
185              
186             Emitted if an error occurs on the stream, fatal if unhandled.
187              
188             =head2 read
189              
190             $stream->on(read => sub ($stream, $bytes) {...});
191              
192             Emitted if new data arrives on the stream.
193              
194             =head2 timeout
195              
196             $stream->on(timeout => sub ($stream) {...});
197              
198             Emitted if the stream has been inactive for too long and will get closed automatically.
199              
200             =head2 write
201              
202             $stream->on(write => sub ($stream, $bytes) {...});
203              
204             Emitted if new data has been written to the stream.
205              
206             =head1 ATTRIBUTES
207              
208             L implements the following attributes.
209              
210             =head2 high_water_mark
211              
212             my $size = $msg->high_water_mark;
213             $msg = $msg->high_water_mark(1024);
214              
215             Maximum size of L buffer in bytes before L returns false, defaults to C<1048576> (1MiB).
216              
217             =head2 reactor
218              
219             my $reactor = $stream->reactor;
220             $stream = $stream->reactor(Mojo::Reactor::Poll->new);
221              
222             Low-level event reactor, defaults to the C attribute value of the global L singleton. Note that
223             this attribute is weakened.
224              
225             =head1 METHODS
226              
227             L inherits all methods from L and implements the following new ones.
228              
229             =head2 bytes_read
230              
231             my $num = $stream->bytes_read;
232              
233             Number of bytes received.
234              
235             =head2 bytes_waiting
236              
237             my $num = $stream->bytes_waiting;
238              
239             Number of bytes that have been enqueued with L and are waiting to be written.
240              
241             =head2 bytes_written
242              
243             my $num = $stream->bytes_written;
244              
245             Number of bytes written.
246              
247             =head2 can_write
248              
249             my $bool = $stream->can_write;
250              
251             Returns true if calling L is safe.
252              
253             =head2 close
254              
255             $stream->close;
256              
257             Close stream immediately.
258              
259             =head2 close_gracefully
260              
261             $stream->close_gracefully;
262              
263             Close stream gracefully.
264              
265             =head2 handle
266              
267             my $handle = $stream->handle;
268              
269             Get handle for stream, usually an L or L object.
270              
271             =head2 is_readable
272              
273             my $bool = $stream->is_readable;
274              
275             Quick non-blocking check if stream is readable, useful for identifying tainted sockets.
276              
277             =head2 is_writing
278              
279             my $bool = $stream->is_writing;
280              
281             Check if stream is writing.
282              
283             =head2 new
284              
285             my $stream = Mojo::IOLoop::Stream->new($handle);
286              
287             Construct a new L object.
288              
289             =head2 start
290              
291             $stream->start;
292              
293             Start or resume watching for new data on the stream.
294              
295             =head2 steal_handle
296              
297             my $handle = $stream->steal_handle;
298              
299             Steal L and prevent it from getting closed automatically.
300              
301             =head2 stop
302              
303             $stream->stop;
304              
305             Stop watching for new data on the stream.
306              
307             =head2 timeout
308              
309             my $timeout = $stream->timeout;
310             $stream = $stream->timeout(45);
311              
312             Maximum amount of time in seconds stream can be inactive before getting closed automatically, defaults to C<15>.
313             Setting the value to C<0> will allow this stream to be inactive indefinitely.
314              
315             =head2 write
316              
317             $stream = $stream->write($bytes);
318             $stream = $stream->write($bytes => sub {...});
319              
320             Enqueue data to be written to the stream as soon as possible, the optional drain callback will be executed once all
321             data has been written.
322              
323             =head1 SEE ALSO
324              
325             L, L, L.
326              
327             =cut