File Coverage

blib/lib/Mojo/IOLoop/Stream.pm
Criterion Covered Total %
statement 81 83 97.5
branch 50 62 80.6
condition 8 20 40.0
subroutine 26 26 100.0
pod 15 15 100.0
total 180 206 87.3


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::Stream;
2 63     63   505 use Mojo::Base 'Mojo::EventEmitter';
  63         178  
  63         453  
3              
4 63     63   457 use Errno qw(EAGAIN ECONNRESET EINTR EWOULDBLOCK);
  63         206  
  63         3251  
5 63     63   387 use Mojo::IOLoop;
  63         212  
  63         384  
6 63     63   378 use Mojo::Util;
  63         167  
  63         2723  
7 63     63   477 use Scalar::Util qw(weaken);
  63         216  
  63         110380  
8              
9             has high_water_mark => 1048576;
10             has reactor => sub { Mojo::IOLoop->singleton->reactor }, weak => 1;
11              
12 347 50   347   5653 sub DESTROY { shift->close unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }
13              
14 2 100   2 1 31 sub bytes_read { shift->{read} || 0 }
15              
16 17   100 17 1 137 sub bytes_waiting { length(shift->{buffer} // '') }
17              
18 2 100   2 1 13 sub bytes_written { shift->{written} || 0 }
19              
20 14 100   14 1 100 sub can_write { $_[0]{handle} && $_[0]->bytes_waiting < $_[0]->high_water_mark }
21              
22             sub close {
23 687     687 1 2297 my $self = shift;
24 687 100       2076 return unless my $reactor = $self->reactor;
25 681 100       2174 return unless my $handle = delete $self->timeout(0)->{handle};
26 341         1231 $reactor->remove($handle);
27 341         1659 $self->emit('close');
28             }
29              
30 284 100   284 1 790 sub close_gracefully { $_[0]->is_writing ? $_[0]{graceful}++ : $_[0]->close }
31              
32 1874     1874 1 4583 sub handle { shift->{handle} }
33              
34             sub is_readable {
35 760     760 1 1436 my $self = shift;
36 760         2711 $self->_again;
37 760   33     8503 return $self->{handle} && Mojo::Util::_readable(0, fileno $self->{handle});
38             }
39              
40             sub is_writing {
41 7758     7758 1 12619 my $self = shift;
42 7758 100       17950 return undef unless $self->{handle};
43 7728   100     30665 return !!length($self->{buffer}) || $self->has_subscribers('drain');
44             }
45              
46 388     388 1 10089 sub new { shift->SUPER::new(handle => shift, timeout => 15) }
47              
48             sub start {
49 388     388 1 798 my $self = shift;
50              
51             # Resume
52 388 50       1243 return unless $self->{handle};
53 388         1073 my $reactor = $self->reactor;
54 388 100       1196 return $reactor->watch($self->{handle}, 1, $self->is_writing) if delete $self->{paused};
55              
56 387         1356 weaken $self;
57 387 100   12337   1518 my $cb = sub { pop() ? $self->_write : $self->_read };
  12337         39854  
58 387         1293 $reactor->io($self->timeout($self->{timeout})->{handle} => $cb);
59             }
60              
61             sub steal_handle {
62 1     1 1 19 my $self = shift;
63 1         4 $self->reactor->remove($self->{handle});
64 1         5 return delete $self->{handle};
65             }
66              
67 1 50 33 1 1 43 sub stop { $_[0]->reactor->watch($_[0]{handle}, 0, $_[0]->is_writing) if $_[0]{handle} && !$_[0]{paused}++ }
68              
69             sub timeout {
70 3977     3977 1 8262 my ($self, $timeout) = @_;
71              
72 3977 100       9098 return $self->{timeout} unless defined $timeout;
73 3968         6927 $self->{timeout} = $timeout;
74              
75 3968         8878 my $reactor = $self->reactor;
76 3968 100       10241 if ($self->{timer}) {
    100          
77 3232 100       7188 if (!$self->{timeout}) { $reactor->remove(delete $self->{timer}) }
  337         1454  
78 2895         9245 else { $reactor->again($self->{timer}, $self->{timeout}) }
79             }
80             elsif ($self->{timeout}) {
81 388         1208 weaken $self;
82             $self->{timer}
83 388 50 33 6   1924 = $reactor->timer($timeout => sub { $self and delete($self->{timer}) and $self->emit('timeout')->close });
  6         251  
84             }
85              
86 3968         62206 return $self;
87             }
88              
89             sub write {
90 7672     7672 1 24460 my ($self, $chunk, $cb) = @_;
91              
92             # IO::Socket::SSL will corrupt data with the wrong internal representation
93 7672         19891 utf8::downgrade $chunk;
94 7672         17784 $self->{buffer} .= $chunk;
95 7672 100       16351 if ($cb) { $self->once(drain => $cb) }
  7415 100       19003  
96 183         798 elsif (!length $self->{buffer}) { return $self }
97 7489 50       25016 $self->reactor->watch($self->{handle}, !$self->{paused}, 1) if $self->{handle};
98              
99 7489         23909 return $self;
100             }
101              
102 11973 100   11973   45384 sub _again { $_[0]->reactor->again($_[0]{timer}) if $_[0]{timer} }
103              
104             sub _read {
105 4866     4866   7788 my $self = shift;
106              
107 4866 50       18654 if (defined(my $read = $self->{handle}->sysread(my $buffer, 131072, 0))) {
108 4866         120195 $self->{read} += $read;
109 4866 100       20162 return $read == 0 ? $self->close : $self->emit(read => $buffer)->_again;
110             }
111              
112             # Retry
113 0 0 0     0 return undef if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
      0        
114              
115             # Closed (maybe real error)
116 0 0       0 $! == ECONNRESET ? $self->close : $self->emit(error => $!)->close;
117             }
118              
119             sub _write {
120 7471     7471   11069 my $self = shift;
121              
122             # Handle errors only when reading (to avoid timing problems)
123 7471         13109 my $handle = $self->{handle};
124 7471 100       16671 if (length $self->{buffer}) {
125 6438 50       22811 return undef unless defined(my $written = $handle->syswrite($self->{buffer}));
126 6438         216095 $self->{written} += $written;
127 6438         37943 $self->emit(write => substr($self->{buffer}, 0, $written, ''))->_again;
128             }
129              
130             # Clear the buffer to free the underlying SV* memory
131 7471 50       58216 undef $self->{buffer}, $self->emit('drain') unless length $self->{buffer};
132 7471 100       17187 return undef if $self->is_writing;
133 2302 100       6645 return $self->close if $self->{graceful};
134 2245 100       9207 $self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle};
135             }
136              
137             1;
138              
139             =encoding utf8
140              
141             =head1 NAME
142              
143             Mojo::IOLoop::Stream - Non-blocking I/O stream
144              
145             =head1 SYNOPSIS
146              
147             use Mojo::IOLoop::Stream;
148              
149             # Create stream
150             my $stream = Mojo::IOLoop::Stream->new($handle);
151             $stream->on(read => sub ($stream, $bytes) {...});
152             $stream->on(close => sub ($stream) {...});
153             $stream->on(error => sub ($stream, $err) {...});
154              
155             # Start and stop watching for new data
156             $stream->start;
157             $stream->stop;
158              
159             # Start reactor if necessary
160             $stream->reactor->start unless $stream->reactor->is_running;
161              
162             =head1 DESCRIPTION
163              
164             L is a container for I/O streams used by L.
165              
166             =head1 EVENTS
167              
168             L inherits all events from L and can emit the following new ones.
169              
170             =head2 close
171              
172             $stream->on(close => sub ($stream) {...});
173              
174             Emitted if the stream gets closed.
175              
176             =head2 drain
177              
178             $stream->on(drain => sub ($stream) {...});
179              
180             Emitted once all data has been written.
181              
182             =head2 error
183              
184             $stream->on(error => sub ($stream, $err) {...});
185              
186             Emitted if an error occurs on the stream, fatal if unhandled.
187              
188             =head2 read
189              
190             $stream->on(read => sub ($stream, $bytes) {...});
191              
192             Emitted if new data arrives on the stream.
193              
194             =head2 timeout
195              
196             $stream->on(timeout => sub ($stream) {...});
197              
198             Emitted if the stream has been inactive for too long and will get closed automatically.
199              
200             =head2 write
201              
202             $stream->on(write => sub ($stream, $bytes) {...});
203              
204             Emitted if new data has been written to the stream.
205              
206             =head1 ATTRIBUTES
207              
208             L implements the following attributes.
209              
210             =head2 high_water_mark
211              
212             my $size = $msg->high_water_mark;
213             $msg = $msg->high_water_mark(1024);
214              
215             Maximum size of L buffer in bytes before L returns false, defaults to C<1048576> (1MiB).
216              
217             =head2 reactor
218              
219             my $reactor = $stream->reactor;
220             $stream = $stream->reactor(Mojo::Reactor::Poll->new);
221              
222             Low-level event reactor, defaults to the C attribute value of the global L singleton. Note that
223             this attribute is weakened.
224              
225             =head1 METHODS
226              
227             L inherits all methods from L and implements the following new ones.
228              
229             =head2 bytes_read
230              
231             my $num = $stream->bytes_read;
232              
233             Number of bytes received.
234              
235             =head2 bytes_waiting
236              
237             my $num = $stream->bytes_waiting;
238              
239             Number of bytes that have been enqueued with L and are waiting to be written.
240              
241             =head2 bytes_written
242              
243             my $num = $stream->bytes_written;
244              
245             Number of bytes written.
246              
247             =head2 can_write
248              
249             my $bool = $stream->can_write;
250              
251             Returns true if calling L is safe.
252              
253             =head2 close
254              
255             $stream->close;
256              
257             Close stream immediately.
258              
259             =head2 close_gracefully
260              
261             $stream->close_gracefully;
262              
263             Close stream gracefully.
264              
265             =head2 handle
266              
267             my $handle = $stream->handle;
268              
269             Get handle for stream, usually an L or L object.
270              
271             =head2 is_readable
272              
273             my $bool = $stream->is_readable;
274              
275             Quick non-blocking check if stream is readable, useful for identifying tainted sockets.
276              
277             =head2 is_writing
278              
279             my $bool = $stream->is_writing;
280              
281             Check if stream is writing.
282              
283             =head2 new
284              
285             my $stream = Mojo::IOLoop::Stream->new($handle);
286              
287             Construct a new L object.
288              
289             =head2 start
290              
291             $stream->start;
292              
293             Start or resume watching for new data on the stream.
294              
295             =head2 steal_handle
296              
297             my $handle = $stream->steal_handle;
298              
299             Steal L and prevent it from getting closed automatically.
300              
301             =head2 stop
302              
303             $stream->stop;
304              
305             Stop watching for new data on the stream.
306              
307             =head2 timeout
308              
309             my $timeout = $stream->timeout;
310             $stream = $stream->timeout(45);
311              
312             Maximum amount of time in seconds stream can be inactive before getting closed automatically, defaults to C<15>.
313             Setting the value to C<0> will allow this stream to be inactive indefinitely.
314              
315             =head2 write
316              
317             $stream = $stream->write($bytes);
318             $stream = $stream->write($bytes => sub {...});
319              
320             Enqueue data to be written to the stream as soon as possible, the optional drain callback will be executed once all
321             data has been written.
322              
323             =head1 SEE ALSO
324              
325             L, L, L.
326              
327             =cut