File Coverage

blib/lib/Mojo/IOLoop/Stream.pm
Criterion Covered Total %
statement 81 83 97.5
branch 50 62 80.6
condition 8 20 40.0
subroutine 26 26 100.0
pod 15 15 100.0
total 180 206 87.3


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::Stream;
2 63     63   482 use Mojo::Base 'Mojo::EventEmitter';
  63         156  
  63         419  
3              
4 63     63   442 use Errno qw(EAGAIN ECONNRESET EINTR EWOULDBLOCK);
  63         152  
  63         3265  
5 63     63   412 use Mojo::IOLoop;
  63         182  
  63         350  
6 63     63   402 use Mojo::Util;
  63         187  
  63         2593  
7 63     63   396 use Scalar::Util qw(weaken);
  63         162  
  63         105779  
8              
9             has high_water_mark => 1048576;
10             has reactor => sub { Mojo::IOLoop->singleton->reactor }, weak => 1;
11              
12 347 50   347   4472 sub DESTROY { shift->close unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }
13              
14 2 100   2 1 26 sub bytes_read { shift->{read} || 0 }
15              
16 17   100 17 1 109 sub bytes_waiting { length(shift->{buffer} // '') }
17              
18 2 100   2 1 11 sub bytes_written { shift->{written} || 0 }
19              
20 14 100   14 1 84 sub can_write { $_[0]{handle} && $_[0]->bytes_waiting < $_[0]->high_water_mark }
21              
22             sub close {
23 687     687 1 2258 my $self = shift;
24 687 100       1999 return unless my $reactor = $self->reactor;
25 679 100       1960 return unless my $handle = delete $self->timeout(0)->{handle};
26 339         1118 $reactor->remove($handle);
27 339         1380 $self->emit('close');
28             }
29              
30 283 100   283 1 781 sub close_gracefully { $_[0]->is_writing ? $_[0]{graceful}++ : $_[0]->close }
31              
32 1874     1874 1 4801 sub handle { shift->{handle} }
33              
34             sub is_readable {
35 760     760 1 1485 my $self = shift;
36 760         2431 $self->_again;
37 760   33     7819 return $self->{handle} && Mojo::Util::_readable(0, fileno $self->{handle});
38             }
39              
40             sub is_writing {
41 7878     7878 1 12443 my $self = shift;
42 7878 100       16963 return undef unless $self->{handle};
43 7848   100     29678 return !!length($self->{buffer}) || $self->has_subscribers('drain');
44             }
45              
46 388     388 1 7891 sub new { shift->SUPER::new(handle => shift, timeout => 15) }
47              
48             sub start {
49 388     388 1 702 my $self = shift;
50              
51             # Resume
52 388 50       1175 return unless $self->{handle};
53 388         984 my $reactor = $self->reactor;
54 388 100       1076 return $reactor->watch($self->{handle}, 1, $self->is_writing) if delete $self->{paused};
55              
56 387         1351 weaken $self;
57 387 100   12568   1399 my $cb = sub { pop() ? $self->_write : $self->_read };
  12568         37058  
58 387         1295 $reactor->io($self->timeout($self->{timeout})->{handle} => $cb);
59             }
60              
61             sub steal_handle {
62 1     1 1 9 my $self = shift;
63 1         3 $self->reactor->remove($self->{handle});
64 1         10 return delete $self->{handle};
65             }
66              
67 1 50 33 1 1 31 sub stop { $_[0]->reactor->watch($_[0]{handle}, 0, $_[0]->is_writing) if $_[0]{handle} && !$_[0]{paused}++ }
68              
69             sub timeout {
70 3975     3975 1 8018 my ($self, $timeout) = @_;
71              
72 3975 100       8698 return $self->{timeout} unless defined $timeout;
73 3966         7035 $self->{timeout} = $timeout;
74              
75 3966         9331 my $reactor = $self->reactor;
76 3966 100       9751 if ($self->{timer}) {
    100          
77 3230 100       6815 if (!$self->{timeout}) { $reactor->remove(delete $self->{timer}) }
  335         1246  
78 2895         8745 else { $reactor->again($self->{timer}, $self->{timeout}) }
79             }
80             elsif ($self->{timeout}) {
81 388         1158 weaken $self;
82             $self->{timer}
83 388 50 33 6   1785 = $reactor->timer($timeout => sub { $self and delete($self->{timer}) and $self->emit('timeout')->close });
  6         179  
84             }
85              
86 3966         56502 return $self;
87             }
88              
89             sub write {
90 7791     7791 1 23039 my ($self, $chunk, $cb) = @_;
91              
92             # IO::Socket::SSL will corrupt data with the wrong internal representation
93 7791         19941 utf8::downgrade $chunk;
94 7791         17367 $self->{buffer} .= $chunk;
95 7791 100       16051 if ($cb) { $self->once(drain => $cb) }
  7535 100       18674  
96 183         703 elsif (!length $self->{buffer}) { return $self }
97 7608 50       24956 $self->reactor->watch($self->{handle}, !$self->{paused}, 1) if $self->{handle};
98              
99 7608         23876 return $self;
100             }
101              
102 12204 100   12204   44352 sub _again { $_[0]->reactor->again($_[0]{timer}) if $_[0]{timer} }
103              
104             sub _read {
105 4976     4976   8116 my $self = shift;
106              
107 4976 50       17098 if (defined(my $read = $self->{handle}->sysread(my $buffer, 131072, 0))) {
108 4976         119705 $self->{read} += $read;
109 4976 100       19724 return $read == 0 ? $self->close : $self->emit(read => $buffer)->_again;
110             }
111              
112             # Retry
113 0 0 0     0 return undef if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
      0        
114              
115             # Closed (maybe real error)
116 0 0       0 $! == ECONNRESET ? $self->close : $self->emit(error => $!)->close;
117             }
118              
119             sub _write {
120 7592     7592   10997 my $self = shift;
121              
122             # Handle errors only when reading (to avoid timing problems)
123 7592         12575 my $handle = $self->{handle};
124 7592 100       16924 if (length $self->{buffer}) {
125 6559 50       20857 return undef unless defined(my $written = $handle->syswrite($self->{buffer}));
126 6559         211866 $self->{written} += $written;
127 6559         36571 $self->emit(write => substr($self->{buffer}, 0, $written, ''))->_again;
128             }
129              
130             # Clear the buffer to free the underlying SV* memory
131 7592 50       57577 undef $self->{buffer}, $self->emit('drain') unless length $self->{buffer};
132 7592 100       16683 return undef if $self->is_writing;
133 2303 100       6463 return $self->close if $self->{graceful};
134 2246 100       9061 $self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle};
135             }
136              
137             1;
138              
139             =encoding utf8
140              
141             =head1 NAME
142              
143             Mojo::IOLoop::Stream - Non-blocking I/O stream
144              
145             =head1 SYNOPSIS
146              
147             use Mojo::IOLoop::Stream;
148              
149             # Create stream
150             my $stream = Mojo::IOLoop::Stream->new($handle);
151             $stream->on(read => sub ($stream, $bytes) {...});
152             $stream->on(close => sub ($stream) {...});
153             $stream->on(error => sub ($stream, $err) {...});
154              
155             # Start and stop watching for new data
156             $stream->start;
157             $stream->stop;
158              
159             # Start reactor if necessary
160             $stream->reactor->start unless $stream->reactor->is_running;
161              
162             =head1 DESCRIPTION
163              
164             L is a container for I/O streams used by L.
165              
166             =head1 EVENTS
167              
168             L inherits all events from L and can emit the following new ones.
169              
170             =head2 close
171              
172             $stream->on(close => sub ($stream) {...});
173              
174             Emitted if the stream gets closed.
175              
176             =head2 drain
177              
178             $stream->on(drain => sub ($stream) {...});
179              
180             Emitted once all data has been written.
181              
182             =head2 error
183              
184             $stream->on(error => sub ($stream, $err) {...});
185              
186             Emitted if an error occurs on the stream, fatal if unhandled.
187              
188             =head2 read
189              
190             $stream->on(read => sub ($stream, $bytes) {...});
191              
192             Emitted if new data arrives on the stream.
193              
194             =head2 timeout
195              
196             $stream->on(timeout => sub ($stream) {...});
197              
198             Emitted if the stream has been inactive for too long and will get closed automatically.
199              
200             =head2 write
201              
202             $stream->on(write => sub ($stream, $bytes) {...});
203              
204             Emitted if new data has been written to the stream.
205              
206             =head1 ATTRIBUTES
207              
208             L implements the following attributes.
209              
210             =head2 high_water_mark
211              
212             my $size = $msg->high_water_mark;
213             $msg = $msg->high_water_mark(1024);
214              
215             Maximum size of L buffer in bytes before L returns false, defaults to C<1048576> (1MiB).
216              
217             =head2 reactor
218              
219             my $reactor = $stream->reactor;
220             $stream = $stream->reactor(Mojo::Reactor::Poll->new);
221              
222             Low-level event reactor, defaults to the C attribute value of the global L singleton. Note that
223             this attribute is weakened.
224              
225             =head1 METHODS
226              
227             L inherits all methods from L and implements the following new ones.
228              
229             =head2 bytes_read
230              
231             my $num = $stream->bytes_read;
232              
233             Number of bytes received.
234              
235             =head2 bytes_waiting
236              
237             my $num = $stream->bytes_waiting;
238              
239             Number of bytes that have been enqueued with L and are waiting to be written.
240              
241             =head2 bytes_written
242              
243             my $num = $stream->bytes_written;
244              
245             Number of bytes written.
246              
247             =head2 can_write
248              
249             my $bool = $stream->can_write;
250              
251             Returns true if calling L is safe.
252              
253             =head2 close
254              
255             $stream->close;
256              
257             Close stream immediately.
258              
259             =head2 close_gracefully
260              
261             $stream->close_gracefully;
262              
263             Close stream gracefully.
264              
265             =head2 handle
266              
267             my $handle = $stream->handle;
268              
269             Get handle for stream, usually an L or L object.
270              
271             =head2 is_readable
272              
273             my $bool = $stream->is_readable;
274              
275             Quick non-blocking check if stream is readable, useful for identifying tainted sockets.
276              
277             =head2 is_writing
278              
279             my $bool = $stream->is_writing;
280              
281             Check if stream is writing.
282              
283             =head2 new
284              
285             my $stream = Mojo::IOLoop::Stream->new($handle);
286              
287             Construct a new L object.
288              
289             =head2 start
290              
291             $stream->start;
292              
293             Start or resume watching for new data on the stream.
294              
295             =head2 steal_handle
296              
297             my $handle = $stream->steal_handle;
298              
299             Steal L and prevent it from getting closed automatically.
300              
301             =head2 stop
302              
303             $stream->stop;
304              
305             Stop watching for new data on the stream.
306              
307             =head2 timeout
308              
309             my $timeout = $stream->timeout;
310             $stream = $stream->timeout(45);
311              
312             Maximum amount of time in seconds stream can be inactive before getting closed automatically, defaults to C<15>.
313             Setting the value to C<0> will allow this stream to be inactive indefinitely.
314              
315             =head2 write
316              
317             $stream = $stream->write($bytes);
318             $stream = $stream->write($bytes => sub {...});
319              
320             Enqueue data to be written to the stream as soon as possible, the optional drain callback will be executed once all
321             data has been written.
322              
323             =head1 SEE ALSO
324              
325             L, L, L.
326              
327             =cut