File Coverage

blib/lib/ZMQ/FFI/SocketBase.pm
Criterion Covered Total %
statement 27 158 17.0
branch 0 16 0.0
condition 0 4 0.0
subroutine 9 37 24.3
pod 0 22 0.0
total 36 237 15.1


line stmt bran cond sub pod time code
1             package ZMQ::FFI::SocketBase;
2             $ZMQ::FFI::SocketBase::VERSION = '0.16';
3 1     1   1656 use Moo;
  1         2  
  1         5  
4 1     1   260 use namespace::autoclean;
  1         2  
  1         8  
5              
6 1     1   995 no if $] >= 5.018, warnings => "experimental";
  1         8  
  1         4  
7 1     1   70 use feature 'switch';
  1         2  
  1         92  
8              
9 1     1   4 use Carp;
  1         2  
  1         64  
10 1     1   3 use FFI::Raw;
  1         2  
  1         16  
11 1     1   5 use Try::Tiny;
  1         4  
  1         66  
12              
13 1         9 use Math::Int64 qw(
14             int64_to_native native_to_int64
15             uint64_to_native native_to_uint64
16 1     1   5 );
  1         1  
17              
18 1     1   836 use ZMQ::FFI::Constants qw(:all);
  1         3  
  1         3594  
19              
20             with qw(
21             ZMQ::FFI::SocketRole
22             ZMQ::FFI::ErrorHandler
23             ZMQ::FFI::Versioner
24             );
25              
26             has _ffi => (
27             is => 'ro',
28             init_arg => undef,
29             lazy => 1,
30             builder => '_init_ffi',
31             );
32              
33             # real underlying zmq ctx pointer
34             has _ctx => (
35             is => 'ro',
36             lazy => 1,
37             default => sub { shift->ctx->_ctx },
38             );
39              
40             # real underlying zmq socket pointer
41             has _socket => (
42             is => 'rw',
43             default => -1,
44             );
45              
46             sub BUILD {
47 0     0 0   my $self = shift;
48              
49 0           $self->_socket( $self->_ffi->{zmq_socket}->($self->_ctx, $self->type) );
50              
51             try {
52 0     0     $self->check_null('zmq_socket', $self->_socket);
53             }
54             catch {
55 0     0     $self->_socket(-1);
56 0           die $_;
57 0           };
58              
59             # ensure clean edge state
60 0           while ( $self->has_pollin ) {
61 0           $self->recv();
62             }
63             }
64              
65             sub connect {
66 0     0 0   my ($self, $endpoint) = @_;
67              
68 0 0         unless ($endpoint) {
69 0           croak 'usage: $socket->connect($endpoint)'
70             }
71              
72             $self->check_error(
73 0           'zmq_connect',
74             $self->_ffi->{zmq_connect}->($self->_socket, $endpoint)
75             );
76             }
77              
78             sub disconnect {
79 0     0 0   croak 'unimplemented in base class';
80             }
81              
82             sub bind {
83 0     0 0   my ($self, $endpoint) = @_;
84              
85 0 0         unless ($endpoint) {
86 0           croak 'usage: $socket->bind($endpoint)'
87             }
88              
89             $self->check_error(
90 0           'zmq_bind',
91             $self->_ffi->{zmq_bind}->($self->_socket, $endpoint)
92             );
93             }
94              
95             sub unbind {
96 0     0 0   croak 'unimplemented in base class';
97             }
98              
99             sub send {
100 0     0 0   croak 'unimplemented in base class';
101             }
102              
103             sub send_multipart {
104 0     0 0   my ($self, $partsref, $flags) = @_;
105              
106 0   0       $flags //= 0;
107              
108 0   0       my @parts = @{$partsref // []};
  0            
109 0 0         unless (@parts) {
110 0           croak 'usage: send_multipart($parts, $flags)';
111             }
112              
113 0           for my $i (0..$#parts-1) {
114 0           $self->send($parts[$i], $flags | ZMQ_SNDMORE);
115             }
116              
117 0           $self->send($parts[$#parts], $flags);
118             }
119              
120             sub recv {
121 0     0 0   croak 'unimplemented in base class';
122             }
123              
124             sub recv_multipart {
125 0     0 0   my ($self, $flags) = @_;
126              
127 0           my @parts = ( $self->recv($flags) );
128              
129 0           my ($major) = $self->version;
130 0 0         my $type = $major == 2 ? 'int64_t' : 'int';
131              
132 0           while ( $self->get(ZMQ_RCVMORE, $type) ){
133 0           push @parts, $self->recv($flags);
134             }
135              
136 0           return @parts;
137             }
138              
139             sub get_fd {
140 0     0 0   my $self = shift;
141              
142 0           return $self->get(ZMQ_FD, 'int');
143             }
144              
145             sub set_linger {
146 0     0 0   my ($self, $linger) = @_;
147              
148 0           $self->set(ZMQ_LINGER, 'int', $linger);
149             }
150              
151             sub get_linger {
152 0     0 0   return shift->get(ZMQ_LINGER, 'int');
153             }
154              
155             sub set_identity {
156 0     0 0   my ($self, $id) = @_;
157              
158 0           $self->set(ZMQ_IDENTITY, 'binary', $id);
159             }
160              
161             sub get_identity {
162 0     0 0   return shift->get(ZMQ_IDENTITY, 'binary');
163             }
164              
165             sub subscribe {
166 0     0 0   my ($self, $topic) = @_;
167              
168 0           $self->set(ZMQ_SUBSCRIBE, 'binary', $topic);
169             }
170              
171             sub unsubscribe {
172 0     0 0   my ($self, $topic) = @_;
173              
174 0           $self->set(ZMQ_UNSUBSCRIBE, 'binary', $topic);
175             }
176              
177             sub has_pollin {
178 0     0 0   my $self = shift;
179              
180 0           my $zmq_events = $self->get(ZMQ_EVENTS, 'int');
181 0           return $zmq_events & ZMQ_POLLIN;
182             }
183              
184             sub has_pollout {
185 0     0 0   my $self = shift;
186              
187 0           my $zmq_events = $self->get(ZMQ_EVENTS, 'int');
188 0           return $zmq_events & ZMQ_POLLOUT;
189             }
190              
191             sub get {
192 0     0 0   my ($self, $opt, $opt_type) = @_;
193              
194 0           my $optval;
195             my $optval_ptr;
196 0           my $optval_len;
197              
198 0           for ($opt_type) {
199 0           when (/^(binary|string)$/) {
200             # ZMQ_IDENTITY uses binary type and
201             # can be at most 255 bytes long
202              
203             # ZMQ_LAST_ENDPOINT uses string type and
204             # expects a buffer large enough to hold an endpoint string
205              
206             # so for both cases 256 should be sufficient (including \0)
207 0           my $buflen = 256;
208              
209 0           $optval_ptr = FFI::Raw::memptr($buflen);
210 0           $optval_len = pack 'L!', $buflen;
211             }
212              
213 0           default {
214             # zeroed memory region
215 0           $optval = $self->_pack($opt_type, 0);
216              
217 0           $optval_ptr = unpack('L!', pack('P', $optval));
218 0           $optval_len = pack 'L!', length($optval);
219             }
220             }
221              
222 0           my $optval_len_ptr = unpack('L!', pack('P', $optval_len));
223              
224 0           $self->check_error(
225             'zmq_getsockopt',
226             $self->_ffi->{zmq_getsockopt}->(
227             $self->_socket,
228             $opt,
229             $optval_ptr,
230             $optval_len_ptr
231             )
232             );
233              
234 0           $optval = $self->_unpack($opt_type, $optval, $optval_ptr, $optval_len);
235 0           return $optval;
236             }
237              
238             sub set {
239 0     0 0   my ($self, $opt, $opt_type, $opt_val) = @_;
240              
241 0           my $ffi = $self->_ffi;
242              
243 0 0         if ($opt_type =~ m/^(binary|string$)/) {
244 0           $self->check_error(
245             'zmq_setsockopt',
246             $ffi->{str_zmq_setsockopt}->(
247             $self->_socket,
248             $opt,
249             $opt_val,
250             length($opt_val)
251             )
252             );
253             }
254             else {
255 0           my $packed = $self->_pack($opt_type, $opt_val);
256              
257 0           my $opt_ptr = unpack('L!', pack('P', $packed));
258 0           my $opt_len = length($packed);
259              
260 0           $self->check_error(
261             'zmq_setsockopt',
262             $ffi->{int_zmq_setsockopt}->(
263             $self->_socket,
264             $opt,
265             $opt_ptr,
266             $opt_len
267             )
268             );
269             }
270             }
271              
272             sub _pack {
273 0     0     my ($self, $opt_type, $val) = @_;
274              
275 0           my $packed;
276 0           for ($opt_type) {
277 0           when (/^int64_t$/) { $packed = int64_to_native($val) }
  0            
278 0           when (/^uint64_t$/) { $packed = uint64_to_native($val) }
  0            
279              
280 0           default {
281 0           $packed = pack $self->_pack_type($opt_type), $val;
282             }
283             }
284              
285 0           return $packed;
286             }
287              
288             sub _unpack {
289 0     0     my ($self, $opt_type, $optval, $optval_ptr, $optval_len) = @_;
290              
291 0           for ($opt_type) {
292 0           when (/^binary$/) {
293 0           $optval_len = unpack 'L!', $optval_len;
294              
295 0 0         if ($optval_len == 0) {
296 0           return;
297             }
298              
299 0           $optval = $optval_ptr->tostr($optval_len);
300             }
301              
302 0           when (/^string$/) {
303 0           $optval = $optval_ptr->tostr();
304             }
305              
306 0           when (/^int64_t$/) { $optval = native_to_int64($optval) }
  0            
307 0           when (/^uint64_t$/) { $optval = native_to_uint64($optval) }
  0            
308              
309 0           default {
310 0           $optval = unpack $self->_pack_type($opt_type), $optval;
311             }
312             }
313              
314 0           return $optval;
315             }
316              
317             sub _pack_type {
318 0     0     my ($self, $zmqtype) = @_;
319              
320             # opts we use native perl packing for (currently just int)
321 0 0         if ( $zmqtype eq 'int' ) {
322 0           return 'i!';
323             }
324              
325 0           confess "unsupported type '$zmqtype'";
326             }
327              
328             sub close {
329 0     0 0   my $self = shift;
330              
331 0           $self->check_error(
332             'zmq_close',
333             $self->_ffi->{zmq_close}->($self->_socket)
334             );
335              
336 0           $self->_socket(-1);
337             }
338              
339             sub _init_ffi {
340 0     0     my $self = shift;
341              
342 0           my $soname = $self->soname;
343              
344 0           my $ffi = {};
345 0           $ffi->{zmq_socket} = FFI::Raw->new(
346             $soname => 'zmq_socket',
347             FFI::Raw::ptr, # returns socket ptr
348             FFI::Raw::ptr, # takes ctx ptr
349             FFI::Raw::int # socket type
350             );
351              
352 0           $ffi->{zmq_getsockopt} = FFI::Raw->new(
353             $soname => 'zmq_getsockopt',
354             FFI::Raw::int, # retval
355             FFI::Raw::ptr, # socket ptr,
356             FFI::Raw::int, # option constant
357             FFI::Raw::ptr, # buf for option value
358             FFI::Raw::ptr # buf for size of option value
359             );
360              
361 0           $ffi->{int_zmq_setsockopt} = FFI::Raw->new(
362             $soname => 'zmq_setsockopt',
363             FFI::Raw::int, # retval
364             FFI::Raw::ptr, # socket ptr,
365             FFI::Raw::int, # option constant
366             FFI::Raw::ptr, # ptr to value int
367             FFI::Raw::int # size of option value
368             );
369              
370 0           $ffi->{str_zmq_setsockopt} = FFI::Raw->new(
371             $soname => 'zmq_setsockopt',
372             FFI::Raw::int, # retval
373             FFI::Raw::ptr, # socket ptr,
374             FFI::Raw::int, # option constant
375             FFI::Raw::str, # ptr to value string
376             FFI::Raw::int # size of option value
377             );
378              
379 0           $ffi->{zmq_connect} = FFI::Raw->new(
380             $soname => 'zmq_connect',
381             FFI::Raw::int,
382             FFI::Raw::ptr,
383             FFI::Raw::str
384             );
385              
386 0           $ffi->{zmq_bind} = FFI::Raw->new(
387             $soname => 'zmq_bind',
388             FFI::Raw::int,
389             FFI::Raw::ptr,
390             FFI::Raw::str
391             );
392              
393 0           $ffi->{zmq_msg_init} = FFI::Raw->new(
394             $soname => 'zmq_msg_init',
395             FFI::Raw::int, # retval
396             FFI::Raw::ptr, # zmq_msg_t ptr
397             );
398              
399 0           $ffi->{zmq_msg_init_size} = FFI::Raw->new(
400             $soname => 'zmq_msg_init_size',
401             FFI::Raw::int,
402             FFI::Raw::ptr,
403             FFI::Raw::int
404             );
405              
406 0           $ffi->{zmq_msg_size} = FFI::Raw->new(
407             $soname => 'zmq_msg_size',
408             FFI::Raw::int, # returns msg size in bytes
409             FFI::Raw::ptr # msg ptr
410             );
411              
412 0           $ffi->{zmq_msg_data} = FFI::Raw->new(
413             $soname => 'zmq_msg_data',
414             FFI::Raw::ptr, # msg data ptr
415             FFI::Raw::ptr # msg ptr
416             );
417              
418 0           $ffi->{zmq_msg_close} = FFI::Raw->new(
419             $soname => 'zmq_msg_data',
420             FFI::Raw::int, # retval
421             FFI::Raw::ptr # msg ptr
422             );
423              
424 0           $ffi->{zmq_close} = FFI::Raw->new(
425             $soname => 'zmq_close',
426             FFI::Raw::int,
427             FFI::Raw::ptr,
428             );
429              
430 0           $ffi->{memcpy} = FFI::Raw->new(
431             undef, 'memcpy',
432             FFI::Raw::ptr, # dest filled
433             FFI::Raw::ptr, # dest buf
434             FFI::Raw::ptr, # src
435             FFI::Raw::int # buf size
436             );
437              
438 0           return $ffi;
439             }
440              
441             sub DEMOLISH {
442 0     0 0   my $self = shift;
443              
444 0 0         unless ($self->_socket == -1) {
445 0           $self->close();
446             }
447             }
448              
449             __PACKAGE__->meta->make_immutable();
450              
451             __END__