File Coverage

blib/lib/ZMQ/Raw/Loop.pm
Criterion Covered Total %
statement 209 224 93.3
branch 53 78 67.9
condition 7 9 77.7
subroutine 32 33 96.9
pod 6 6 100.0
total 307 350 87.7


line stmt bran cond sub pod time code
1             package ZMQ::Raw::Loop;
2             $ZMQ::Raw::Loop::VERSION = '0.39';
3 14     14   92 use strict;
  14         28  
  14         411  
4 14     14   66 use warnings;
  14         153  
  14         347  
5 14     14   70 use Carp;
  14         22  
  14         1306  
6              
7 0     0   0 sub CLONE_SKIP { 1 }
8              
9             my @attributes;
10              
11             BEGIN
12             {
13 14     14   101 @attributes = qw/
14             context
15             poller
16             timers
17             handles
18             promises
19             events
20             terminated
21              
22             tevent
23             /;
24              
25 14     14   98 no strict 'refs';
  14         34  
  14         1364  
26 14         33 foreach my $accessor (@attributes)
27             {
28 112         854 *{$accessor} = sub
29             {
30 1378 100   1378   8595176 @_ > 1 ? $_[0]->{$accessor} = $_[1] : $_[0]->{$accessor}
31 112         348 };
32             }
33             }
34              
35 14     14   99 use ZMQ::Raw;
  14         25  
  14         420  
36 14     14   6328 use ZMQ::Raw::Loop::Event;
  14         34  
  14         415  
37 14     14   6108 use ZMQ::Raw::Loop::Handle;
  14         36  
  14         424  
38 14     14   6240 use ZMQ::Raw::Loop::Promise;
  14         36  
  14         417  
39 14     14   6062 use ZMQ::Raw::Loop::Timer;
  14         34  
  14         28530  
40              
41             =head1 NAME
42              
43             ZMQ::Raw::Loop - Loop class
44              
45             =head1 VERSION
46              
47             version 0.39
48              
49             =head1 DESCRIPTION
50              
51             A L represents an event loop.
52              
53             B: The API of this module is unstable and may change without warning
54             (any change will be appropriately documented in the changelog).
55              
56             =head1 METHODS
57              
58             =head2 new( $context )
59              
60             Create a new event loop
61              
62             =head2 run( )
63              
64             Run the event loop
65              
66             =head2 run_one( )
67              
68             Run until a single event occurs
69              
70             =head2 add( $item )
71              
72             Add C<$item> to the event loop. C<$item> should be a L>,
73             L>, L> or
74             L>.
75              
76             =head2 remove( $item )
77              
78             Remove C<$item> from the event loop.
79              
80             =head2 terminate( )
81              
82             Terminate the event loop
83              
84             =cut
85              
86             sub new
87             {
88 2     2 1 493 my ($this, $context) = @_;
89              
90 2   33     17 my $class = ref ($this) || $this;
91             my $self =
92             {
93             context => $context,
94             poller => ZMQ::Raw::Poller->new,
95             timers => [],
96             handles => [],
97             events => [],
98             promises => [],
99             tevent => ZMQ::Raw::Loop::Event->new ($context,
100             on_set => sub
101             {
102 3     3   12 my ($event, $loop) = @_;
103 3         11 $loop->terminated (1);
104             }
105             )
106 2         118 };
107              
108 2         8 return bless $self, $class;
109             }
110              
111              
112              
113             sub run
114             {
115 15     15 1 667 my ($this) = @_;
116              
117 15         40 $this->terminated (0);
118 15         42 $this->tevent->reset();
119 15         72 $this->add ($this->tevent);
120              
121 15   100     44 while (!$this->terminated && $this->poller->size > 1)
122             {
123 48         188 $this->run_one;
124             }
125              
126 15         56 $this->remove ($this->tevent);
127              
128 15         65 $this->_cancel_timers();
129 15         54 $this->_cancel_events();
130 15         64 $this->_cancel_handles();
131 15         44 $this->_clear_promises();
132             }
133              
134              
135              
136             sub run_one
137             {
138 58     58 1 139 my ($this) = @_;
139              
140 58 50       151 if ($this->poller->size)
141             {
142 58         159 my $count = $this->poller->wait (-1);
143 58 50       1215 if ($count)
144             {
145 58 100 100     655 $this->_dispatch_events() || $this->_dispatch_handles() || $this->_dispatch_timers();
146 58         179 $this->promises ([grep { $_->status == ZMQ::Raw::Loop::Promise->PLANNED } @{$this->promises}]);
  5         56  
  58         203  
147             }
148              
149 58         348 return 1;
150             }
151              
152 0         0 return 0;
153             }
154              
155              
156              
157             sub add
158             {
159 63     63 1 3371 my ($this, $item) = @_;
160              
161 63 100       278 if (ref ($item) eq 'ZMQ::Raw::Loop::Timer')
    100          
    100          
    50          
162             {
163 39         133 $this->_add_timer ($item);
164             }
165             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Handle')
166             {
167 4         13 $this->_add_handle ($item);
168             }
169             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Event')
170             {
171 19         53 $this->_add_event ($item);
172             }
173             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Promise')
174             {
175 1         6 $this->_add_promise ($item);
176             }
177             else
178             {
179 0         0 croak "don't know how to add $item";
180             }
181             }
182              
183              
184              
185             sub _add_timer
186             {
187 76     76   196 my ($this, $timer) = @_;
188              
189 76         315 $timer->loop ($this);
190 76         189 $this->poller->add ($timer->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
191              
192 76 100       432 if (!$timer->running)
193             {
194 2         8 $timer->reset();
195             }
196              
197 76         156 push @{$this->timers}, $timer;
  76         186  
198             }
199              
200              
201              
202             sub _add_event
203             {
204 19     19   42 my ($this, $event) = @_;
205              
206 19         49 $this->poller->add ($event->read_handle, ZMQ::Raw->ZMQ_POLLIN);
207              
208 19 100       78 if ($event->timeout)
209             {
210 2         11 $event->timer (ZMQ::Raw::Timer->new ($this->context,
211             after => $event->timeout)
212             );
213 2         6 $this->poller->add ($event->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
214             }
215              
216 19         40 push @{$this->events}, $event;
  19         40  
217             }
218              
219              
220              
221             sub _add_promise
222             {
223 1     1   4 my ($this, $promise) = @_;
224              
225 1         2 push @{$this->promises}, $promise;
  1         3  
226             }
227              
228              
229              
230             sub _add_handle
231             {
232 4     4   7 my ($this, $handle) = @_;
233              
234 4         8 my $events = 0;
235 4 50       12 if ($handle->on_readable)
236             {
237 4         20 $events |= ZMQ::Raw->ZMQ_POLLIN;
238             }
239 4 50       12 if ($handle->on_writable)
240             {
241 0         0 $events |= ZMQ::Raw->ZMQ_POLLOUT;
242             }
243 4 50       12 if ($handle->timeout)
244             {
245 4         12 $handle->timer (ZMQ::Raw::Timer->new ($this->context,
246             after => $handle->timeout)
247             );
248 4         13 $this->poller->add ($handle->timer->socket, ZMQ::Raw->ZMQ_POLLIN);
249             }
250              
251 4         20 $handle->loop ($this);
252 4         12 $this->poller->add ($handle->handle, $events);
253              
254 4         10 push @{$this->handles}, $handle;
  4         10  
255             }
256              
257              
258              
259             sub remove
260             {
261 45     45 1 146 my ($this, $item) = @_;
262              
263 45 100       304 if (ref ($item) eq 'ZMQ::Raw::Loop::Timer')
    50          
    50          
264             {
265 30         84 $this->_remove_timer ($item);
266             }
267             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Handle')
268             {
269 0         0 $this->_remove_handle ($item);
270             }
271             elsif (ref ($item) eq 'ZMQ::Raw::Loop::Event')
272             {
273 15         51 $this->_remove_event ($item);
274             }
275             else
276             {
277 0         0 croak "don't know how to remove $item";
278             }
279             }
280              
281              
282              
283             sub _remove_timer
284             {
285 78     78   187 my ($this, $timer) = @_;
286              
287 78         184 my @left;
288 78         152 foreach my $t (@{$this->timers})
  78         226  
289             {
290 120 100       442 if ($timer == $t)
291             {
292 76         238 my $socket = $timer->timer->socket;
293 76         584 $socket->recv (ZMQ::Raw->ZMQ_DONTWAIT);
294 76         339 $this->poller->remove ($socket);
295 76         296 next;
296             }
297              
298 44         183 push @left, $t;
299             }
300              
301 78         249 $this->timers (\@left);
302             }
303              
304              
305              
306             sub _remove_handle
307             {
308 4     4   11 my ($this, $handle) = @_;
309              
310 4         10 my @left;
311 4         8 foreach my $h (@{$this->handles})
  4         10  
312             {
313 4 50       18 if ($h == $handle)
314             {
315 4         11 $this->poller->remove ($handle->handle);
316              
317 4         15 my $timer = $handle->timer;
318 4 50       97 if ($timer)
319             {
320 4         12 $this->poller->remove ($timer->socket);
321 4         27 $timer->cancel();
322             }
323              
324 4         24 next;
325             }
326              
327 0         0 push @left, $h;
328             }
329              
330 4         16 $this->handles (\@left);
331             }
332              
333              
334              
335             sub _remove_event
336             {
337 21     21   63 my ($this, $event) = @_;
338              
339 21         36 my @left;
340 21         45 foreach my $e (@{$this->events})
  21         44  
341             {
342 26 100       74 if ($e == $event)
343             {
344 18         46 $this->poller->remove ($event->read_handle);
345              
346 18         67 my $timer = $event->timer;
347 18 100       86 if ($timer)
348             {
349 1         5 $this->poller->remove ($timer->socket);
350 1         7 $timer->cancel();
351             }
352              
353 18         52 next;
354             }
355              
356 8         23 push @left, $e;
357             }
358              
359 21         62 $this->events (\@left);
360             }
361              
362              
363              
364             sub _dispatch_handles
365             {
366 52     52   142 my ($this) = @_;
367              
368 52         136 foreach my $handle (@{$this->handles})
  52         215  
369             {
370 4         10 my $events = $this->poller->events ($handle->handle);
371 4 100       14 if ($events)
372             {
373 2         8 $this->_remove_handle ($handle);
374              
375 2 50       7 if ($events & ZMQ::Raw->ZMQ_POLLIN)
    0          
376             {
377 2         54 my $readable = $handle->on_readable;
378 2 50       7 &{$readable} ($handle, $this) if $readable;
  2         6  
379             }
380             elsif ($events & ZMQ::Raw->ZMQ_POLLOUT)
381             {
382 0         0 my $writable = $handle->on_writable;
383 0 0       0 &{$writable} ($handle, $this) if $writable;
  0         0  
384             }
385              
386 2         56 return 1;
387             }
388              
389 2 50       12 if ($handle->timer)
390             {
391 2         8 my $events = $this->poller->events ($handle->timer->socket);
392 2 50       10 if ($events)
393             {
394 2         31 $this->_remove_handle ($handle);
395              
396 2         19 my $timeout = $handle->on_timeout;
397 2 50       11 &{$timeout} ($handle, $this) if $timeout;
  2         14  
398              
399 2         19 return 1;
400             }
401             }
402             }
403              
404 48         458 return 0;
405             }
406              
407              
408              
409             sub _dispatch_events
410             {
411 58     58   303 my ($this) = @_;
412              
413 58         118 foreach my $event (@{$this->events})
  58         256  
414             {
415 62         273 my $events = $this->poller->events ($event->read_handle);
416 62 100       314 if ($events)
417             {
418 5         24 $event->reset();
419 5         31 $this->_remove_event ($event);
420              
421 5         20 my $set = $event->on_set;
422 5 50       20 &{$set} ($event, $this) if $set;
  5         49  
423 5         30 return 1;
424             }
425              
426 57 100       237 if ($event->timer)
427             {
428 4         13 my $events = $this->poller->events ($event->timer->socket);
429 4 100       19 if ($events)
430             {
431 1         9 $event->reset();
432 1         9 $this->_remove_event ($event);
433              
434 1         5 my $timeout = $event->on_timeout;
435 1 50       6 &{$timeout} ($event, $this) if $timeout;
  1         8  
436              
437 1         47 return 1;
438             }
439             }
440             }
441              
442 52         441 return 0;
443             }
444              
445              
446              
447             sub _dispatch_timers
448             {
449 48     48   142 my ($this) = @_;
450              
451 48         82 foreach my $timer (@{$this->timers})
  48         179  
452             {
453 87         530 my $socket = $timer->timer->socket;
454 87         249 my $events = $this->poller->events ($socket);
455 87 100       316 if ($events)
456             {
457 48         267 $this->_remove_timer ($timer);
458              
459 48         209 my $timeout = $timer->on_timeout;
460 48 50       185 &{$timeout} ($timer, $this) if ($timeout);
  48         335  
461              
462 48 100       317 if ($timer->timer->running())
463             {
464 37         148 $this->_add_timer ($timer);
465             }
466              
467 48         235 return 1;
468             }
469             }
470              
471 0         0 return 0;
472             }
473              
474              
475              
476             sub _cancel_timers
477             {
478 15     15   33 my ($this) = @_;
479              
480             AGAIN:
481 19         40 foreach my $timer (@{$this->timers})
  19         44  
482             {
483 4         19 $timer->cancel();
484 4         15 goto AGAIN;
485             }
486             }
487              
488              
489              
490             sub _cancel_events
491             {
492 15     15   43 my ($this) = @_;
493              
494 15         28 foreach my $event (@{$this->events})
  15         35  
495             {
496 1         3 my $events = $this->poller->events ($event->read_handle);
497 1         5 $this->poller->remove ($event->read_handle);
498              
499 1 50       4 if ($event->timer)
500             {
501 1         4 $event->timer->cancel();
502 1         9 $this->poller->remove ($event->timer->socket);
503             }
504             }
505              
506 15         44 $this->events ([]);
507             }
508              
509              
510              
511             sub _cancel_handles
512             {
513 15     15   35 my ($this) = @_;
514              
515 15         28 foreach my $handle (@{$this->handles})
  15         35  
516             {
517 0         0 $this->poller->remove ($handle->handle);
518              
519 0 0       0 if ($handle->timer)
520             {
521 0         0 $handle->timer->cancel();
522 0         0 $this->poller->remove ($handle->timer->socket);
523             }
524             }
525              
526 15         39 $this->handles ([]);
527             }
528              
529              
530              
531             sub _clear_promises
532             {
533 15     15   41 my ($this) = @_;
534              
535 15         48 $this->promises ([]);
536             }
537              
538              
539              
540             sub terminate
541             {
542 13     13 1 1479 my ($this) = @_;
543              
544 13         53 $this->tevent->set;
545             }
546              
547             =for Pod::Coverage context handles events poller timers promises terminated tevent
548              
549             =head1 AUTHOR
550              
551             Jacques Germishuys
552              
553             =head1 LICENSE AND COPYRIGHT
554              
555             Copyright 2017 Jacques Germishuys.
556              
557             This program is free software; you can redistribute it and/or modify it
558             under the terms of either: the GNU General Public License as published
559             by the Free Software Foundation; or the Artistic License.
560              
561             See http://dev.perl.org/licenses/ for more information.
562              
563             =cut
564              
565             1; # End of ZMQ::Raw::Loop