File Coverage

blib/lib/Reflexive/ZmqSocket.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Reflexive::ZmqSocket;
2             {
3             $Reflexive::ZmqSocket::VERSION = '1.130710';
4             }
5              
6             #ABSTRACT: Provides a reflexy way to talk over ZeroMQ sockets
7              
8 1     1   1336 use Moose;
  0            
  0            
9             use Moose::Util::TypeConstraints('enum');
10             use Try::Tiny;
11             use Errno qw(EAGAIN EINTR);
12             use ZeroMQ::Context;
13             use ZeroMQ::Socket;
14             use ZeroMQ::Constants qw/
15             ZMQ_FD
16             ZMQ_NOBLOCK
17             ZMQ_POLLIN
18             ZMQ_POLLOUT
19             ZMQ_EVENTS
20             ZMQ_SNDMORE
21             ZMQ_RCVMORE
22             ZMQ_PUSH
23             ZMQ_PULL
24             ZMQ_PUB
25             ZMQ_SUB
26             ZMQ_REQ
27             ZMQ_REP
28             ZMQ_DEALER
29             ZMQ_ROUTER
30             ZMQ_PAIR
31             /;
32             use Reflexive::ZmqSocket::ZmqError;
33             use Reflexive::ZmqSocket::ZmqMessage;
34             use Reflexive::ZmqSocket::ZmqMultiPartMessage;
35              
36             extends 'Reflex::Base';
37              
38              
39             has socket_type => (
40             is => 'ro',
41             isa => enum([ZMQ_REP, ZMQ_REQ, ZMQ_DEALER, ZMQ_ROUTER, ZMQ_PUB, ZMQ_SUB,
42             ZMQ_PUSH, ZMQ_PULL, ZMQ_PAIR]),
43             lazy => 1,
44             builder => '_build_socket_type',
45             );
46              
47             sub _build_socket_type { die 'This is a virtual method and should never be called' }
48              
49              
50             has endpoints => (
51             is => 'ro',
52             isa => 'ArrayRef[Str]',
53             traits => ['Array'],
54             predicate => 'has_endpoints',
55             handles => {
56             endpoints_count => 'count',
57             all_endpoints => 'elements',
58             }
59             );
60              
61              
62             has endpoint_action => (
63             is => 'ro',
64             isa => enum([qw/bind connect/]),
65             predicate => 'has_endpoint_action',
66             );
67              
68              
69             has socket_options => (
70             is => 'ro',
71             isa => 'HashRef',
72             default => sub { +{} },
73             );
74              
75              
76             has active => ( is => 'rw', isa => 'Bool', default => 1 );
77              
78              
79             has context => (
80             is => 'ro',
81             isa => 'ZeroMQ::Context',
82             lazy => 1,
83             builder => '_build_context',
84             );
85              
86             sub _build_context {
87             my ($self) = @_;
88             return ZeroMQ::Context->new();
89             }
90              
91              
92             has socket => (
93             is => 'ro',
94             isa => 'ZeroMQ::Socket',
95             lazy => 1,
96             builder => '_build_socket',
97             handles => [qw/
98             recv
99             getsockopt
100             setsockopt
101             close
102             connect
103             bind
104             /]
105             );
106              
107             after [qw/bind connect/] => sub {
108             my ($self) = @_;
109             $self->resume_reading() unless $self->active;
110             };
111              
112             before close => sub {
113             my ($self) = @_;
114             if($self->active)
115             {
116             $self->stop_reading;
117             $self->stop_writing;
118             }
119             };
120              
121             sub _build_socket {
122             my ($self) = @_;
123              
124             my $socket = ZeroMQ::Socket->new(
125             $self->context(),
126             $self->socket_type(),
127             );
128            
129             my $opts = $self->socket_options;
130              
131             foreach my $key (keys %$opts)
132             {
133             $socket->setsockopt($key, $opts->{$key});
134             }
135              
136             return $socket;
137             }
138              
139              
140             has filehandle => (
141             is => 'ro',
142             isa => 'FileHandle',
143             lazy => 1,
144             builder => '_build_filehandle',
145             );
146              
147             sub _build_filehandle {
148             my ($self) = @_;
149            
150             my $fd = $self->getsockopt(ZMQ_FD)
151             or die 'Unable retrieve file descriptor';
152              
153             open(my $zmq_fh, "+<&" . $fd)
154             or die "filehandle creation failed: $!";
155              
156             return $zmq_fh;
157             }
158              
159              
160             has buffer => (
161             is => 'ro',
162             isa => 'ArrayRef',
163             traits => ['Array'],
164             default => sub { [] },
165             handles => {
166             buffer_count => 'count',
167             dequeue_item => 'shift',
168             enqueue_item => 'push',
169             putback_item => 'unshift',
170             }
171             );
172              
173             with 'Reflex::Role::Readable' => {
174             att_active => 'active',
175             att_handle => 'filehandle',
176             cb_ready => 'zmq_readable',
177             method_pause => 'pause_reading',
178             method_resume => 'resume_reading',
179             method_stop => 'stop_reading',
180             };
181              
182             with 'Reflex::Role::Writable' => {
183             att_active => 'active',
184             att_handle => 'filehandle',
185             cb_ready => 'zmq_writable',
186             method_pause => 'pause_writing',
187             method_resume => 'resume_writing',
188             method_stop => 'stop_writing',
189             };
190              
191             sub BUILD {
192             my ($self) = @_;
193              
194             if($self->active)
195             {
196             $self->initialize_endpoints();
197             }
198             }
199              
200              
201             sub initialize_endpoints {
202             my ($self) = @_;
203            
204             die 'No endpoint_action defined when attempting to intialize endpoints'
205             unless $self->has_endpoint_action;
206              
207             die 'No endpoints defind when attempting to initialize endpoints'
208             unless $self->has_endpoints && $self->endpoints_count > 0;
209              
210             foreach my $endpoint ($self->all_endpoints)
211             {
212             my $action = $self->endpoint_action;
213            
214             try
215             {
216             $self->$action($endpoint);
217             }
218             catch
219             {
220             $self->emit(
221             -name => 'connect_error',
222             -type => 'Reflexive::ZmqSocket::ZmqError',
223             errnum => -1,
224             errstr => "Failed to $action to endpoint: $endpoint",
225             errfun => $action,
226             );
227             };
228             }
229             }
230              
231              
232             sub send {
233             my ($self, $item) = @_;
234             $self->enqueue_item($item);
235             $self->resume_writing();
236             return $self->buffer_count;
237             }
238              
239              
240              
241             sub zmq_writable {
242             my ($self, $args) = @_;
243              
244             MESSAGE: while ($self->buffer_count) {
245            
246             unless($self->getsockopt(ZMQ_EVENTS) & ZMQ_POLLOUT)
247             {
248             return;
249             }
250            
251             my $item = $self->dequeue_item;
252              
253             if(ref($item) eq 'ARRAY')
254             {
255             my $socket = $self->socket;
256              
257             my $first_part = shift(@$item);
258             my $ret = $self->socket->send($first_part, ZMQ_SNDMORE);
259             if($ret == 0)
260             {
261             for(0..$#$item)
262             {
263             my $part = $item->[$_];
264             if($_ == $#$item)
265             {
266             $socket->send($part);
267             my $rc = $self->do_read();
268              
269             if($rc == -1)
270             {
271             $self->pause_reading();
272              
273             $self->emit(
274             -name => 'socket_error',
275             -type => 'Reflexive::ZmqSocket::ZmqError',
276             errnum => ($! + 0),
277             errstr => "$!",
278             errfun => 'recv',
279             );
280             last MESSAGE;
281             }
282             elsif($rc == 0)
283             {
284             return;
285             }
286             elsif($rc == 1)
287             {
288             next MESSAGE;
289             }
290             }
291             else
292             {
293             $socket->send($part, ZMQ_SNDMORE);
294             }
295             }
296             }
297             elsif($ret == -1)
298             {
299             if($! == EAGAIN)
300             {
301             unshift(@$item, $first_part);
302             $self->putback_item($item);
303             next;
304             }
305             else
306             {
307             last;
308             }
309             }
310             }
311            
312             my $ret = $self->socket->send($item);
313             if($ret == 0)
314             {
315             my $rc = $self->do_read();
316              
317             if($rc == -1)
318             {
319             $self->pause_reading();
320              
321             $self->emit(
322             -name => 'socket_error',
323             -type => 'Reflexive::ZmqSocket::ZmqError',
324             errnum => ($! + 0),
325             errstr => "$!",
326             errfun => 'recv',
327             );
328             }
329             elsif($rc == 1)
330             {
331             next;
332             }
333             }
334             elsif($ret == -1)
335             {
336             if($! == EAGAIN)
337             {
338             $self->putback_item($item);
339             }
340             else
341             {
342             last;
343             }
344             }
345             }
346              
347             $self->pause_writing();
348            
349             if($! != EAGAIN)
350             {
351             $self->emit(
352             -name => 'socket_error',
353             -type => 'Reflexive::ZmqSocket::ZmqError',
354             errnum => ($! + 0),
355             errstr => "$!",
356             errfun => 'send',
357             );
358             }
359             else
360             {
361             $self->emit(-name => 'socket_flushed');
362             }
363             }
364              
365              
366             sub zmq_readable {
367             my ($self, $args) = @_;
368            
369             MESSAGE: while (1) {
370            
371             unless($self->getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN)
372             {
373             return;
374             }
375            
376             my $ret = $self->do_read();
377              
378             if($ret == -1)
379             {
380             $self->pause_reading();
381              
382             $self->emit(
383             -name => 'socket_error',
384             -type => 'Reflexive::ZmqSocket::ZmqError',
385             errnum => ($! + 0),
386             errstr => "$!",
387             errfun => 'recv',
388             );
389             }
390             elsif($ret == 0)
391             {
392             next MESSAGE;
393             }
394             elsif($ret == 1)
395             {
396             return;
397             }
398             }
399             }
400              
401              
402             sub do_read {
403             my ($self) = @_;
404              
405             if(my $msg = $self->recv(ZMQ_NOBLOCK)) {
406             if($self->getsockopt(ZMQ_RCVMORE))
407             {
408             my $messages = [$msg];
409            
410             do
411             {
412             push(@$messages, $self->recv());
413             }
414             while ($self->getsockopt(ZMQ_RCVMORE));
415              
416             $self->emit(
417             -name => 'multipart_message',
418             -type => 'Reflexive::ZmqSocket::ZmqMultiPartMessage',
419             message => $messages
420             );
421             return 1;
422             }
423             $self->emit(
424             -name => 'message',
425             -type => 'Reflexive::ZmqSocket::ZmqMessage',
426             message => $msg,
427             );
428             return 1;
429             }
430              
431             if($! == EAGAIN or $! == EINTR)
432             {
433             return 0;
434             }
435              
436             return -1;
437             }
438              
439             __PACKAGE__->meta->make_immutable();
440              
441             1;
442              
443              
444             =pod
445              
446             =head1 NAME
447              
448             Reflexive::ZmqSocket - Provides a reflexy way to talk over ZeroMQ sockets
449              
450             =head1 VERSION
451              
452             version 1.130710
453              
454             =head1 SYNOPSIS
455              
456             package App::Test;
457             use Moose;
458             extends 'Reflex::Base';
459             use Reflex::Trait::Watched qw/ watches /;
460             use Reflexive::ZmqSocket::RequestSocket;
461             use ZeroMQ::Constants(':all');
462              
463             watches request => (
464             isa => 'Reflexive::ZmqSocket::RequestSocket',
465             clearer => 'clear_request',
466             predicate => 'has_request',
467             );
468              
469             sub init {
470             my ($self) = @_;
471              
472             my $req = Reflexive::ZmqSocket::RequestSocket->new(
473             endpoints => [ 'tcp://127.0.0.1:54321' ],
474             endpoint_action => 'bind',
475             socket_options => {
476             +ZMQ_LINGER ,=> 1,
477             },
478             );
479              
480             $self->request($req);
481             }
482              
483             sub BUILD {
484             my ($self) = @_;
485            
486             $self->init();
487             }
488              
489             sub on_request_message {
490             my ($self, $msg) = @_;
491             }
492              
493             sub on_request_multipart_message {
494             my ($self, $msg) = @_;
495             my @parts = map { $_->data } $msg->all_parts;
496             }
497              
498             sub on_request_socket_flushed {
499             my ($self) = @_;
500             }
501            
502             sub on_request_socket_error {
503             my ($self, $msg) = @_;
504             }
505              
506             sub on_request_connect_error {
507             my ($self, $msg) = @_;
508             }
509              
510             sub on_request_bind_error {
511             my ($self, $msg) = @_;
512             }
513              
514             __PACKAGE__->meta->make_immutable();
515              
516             =head1 DESCRIPTION
517              
518             Reflexive::ZmqSocket provides a reflexy way to participate in ZeroMQ driven applications. A number of events are emitted from the instantiated objects of this class and its subclasses. On successful reads, either L</message> or L</multipart_message> is emitted. For errors, L</socket_error> is emitted. See L</EMITTED_EVENTS> for more informations.
519              
520             =head1 PUBLIC_ATTRIBUTES
521              
522             =head2 socket_type
523              
524             is: ro, isa: enum, lazy: 1
525              
526             This attribute holds what type of ZeroMQ socket should be built. It must be one
527             of the constants exported by the ZeroMQ::Constants package. The attribute is
528             populated by default in the various subclasses.
529              
530             =head2 endpoints
531              
532             is: ro, isa: ArrayRef[Str], traits: Array, predicate: has_endpoints
533              
534             This attribute holds an array reference of all of the endpoints to which the
535             socket should either bind or connect.
536              
537             The following methods are delegated to this attribute:
538              
539             endpoints_count
540             all_endpoints
541              
542             =head2 endpoint_action
543              
544             is: ro, isa: enum(bind, connect), predicate: has_endpoint_action
545              
546             This attribute determines the socket action to take against the provided
547             endpoints. While ZeroMQ allows sockets to both connect and bind, this module
548             limits it to either/or. Patches welcome :)
549              
550             =head2
551              
552             is: ro, isa: HashRef
553              
554             This attribute has the options for the socket. Options are applied at BUILD
555             time but before any action is taken on the end points. This allows for things
556             like setting the ZMQ_IDENTITY
557              
558             =head2 context
559              
560             is: ro, isa: ZeroMQ::Context
561              
562             This attribute holds the context that is required for building sockets.
563              
564             =head2 socket
565              
566             is: ro, isa: ZeroMQ::Socket
567              
568             This attribute holds the actual ZeroMQ socket created. The following methods
569             are delegated to this attribute:
570              
571             recv
572             getsockopt
573             setsockopt
574             close
575             connect
576             bind
577              
578             NOTE: close() is advised to stop polling the zmq_fd /before/ the call to the
579             underlying zmq_close. This means that items in the L</buffer> may not be sent
580             or owned by zmq and you are responsible for managing these items.
581              
582             =head1 PROTECTED_ATTRIBUTES
583              
584             =head2 active
585              
586             is: ro, isa: Bool, default: true
587              
588             This attribute controls whether the socket is observed or not for reads/writes according to Reflex
589              
590             =head2
591              
592             is: ro, isa: FileHandle
593              
594             This attribute contains a file handle built from the cloned file descriptor
595             from inside the ZeroMQ::Socket. This is where the magic happens in how we poll
596             for non-blocking IO.
597              
598             =head2 buffer
599              
600             is: ro, isa: ArrayRef, traits: Array
601              
602             Thie attribute is an internal buffer used for non-blocking writes.
603              
604             The following methods are delegated to this attribute:
605              
606             buffer_count
607             dequeue_item
608             enqueue_item
609             putback_item
610              
611             =head1 PUBLIC_METHODS
612              
613             =head2 send
614              
615             This method is for sending messages through the L</socket>. It is non-blocking
616             and will return the current buffer count.
617              
618             =head1 PROTECTED_METHODS
619              
620             =head2 initialize_endpoints
621              
622             This method attempts the defined L</endpoint_action> against the provided
623             L</endpoints>. This method is called at BUILD if L</active> is true. To defer
624             initialization, simply set L</active> to false.
625              
626             If the provided action against a particular endpoint fails, a connect_error
627             event will be emitted
628              
629             =head1 PRIVATE_METHODS
630              
631             =head2 zmq_writable
632              
633             This method is used internally to handle when the ZeroMQ socket is ready for
634             writing. This method can emit socket_error for various issues.
635              
636             =head2 zmq_readable
637              
638             This method is used internally by reflex to actually read from the ZeroMQ socket when it is readable. This method can emit socket_error when problems occur. For successful reads, either message or multipart_message will be emitted.
639              
640             =head2 do_read
641              
642             This private method does the actual reading from the socket.
643              
644             =head1 EMITTED_EVENTS
645              
646             =head2 message
647              
648             message is emitted when a successful read occurs on the socket. When this event is emitted, the payload is a single message (in terms of ZeroMQ this is the result of the other end sending a message wuthout using SNDMORE). See L<Reflexive::ZmqSocket::ZmqMessage> for more information.
649              
650             =head2 multipart_message
651              
652             multipart_message is emitted when multipart message is read from the socket. See L<Reflexive::ZmqSocket::ZmqMultiPartMessage> for more information.
653              
654             =head1 ACKNOWLEDGEMENTS
655              
656             This module was originally developed for Booking.com and through their gracious approval, we've released this module to CPAN.
657              
658             =head1 AUTHORS
659              
660             =over 4
661              
662             =item *
663              
664             Nicholas R. Perez <nperez@cpan.org>
665              
666             =item *
667              
668             Steffen Mueller <smueller@cpan.org>
669              
670             =back
671              
672             =head1 COPYRIGHT AND LICENSE
673              
674             This software is copyright (c) 2012 by Nicholas R. Perez <nperez@cpan.org>.
675              
676             This is free software; you can redistribute it and/or modify it under
677             the same terms as the Perl 5 programming language system itself.
678              
679             =cut
680              
681              
682             __END__
683              
684              
685              
686