File Coverage

blib/lib/Net/RabbitMQ/Java.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package Net::RabbitMQ::Java;
2              
3 10     10   12058 use strict;
  10         19  
  10         353  
4 10     10   53 use warnings;
  10         17  
  10         422  
5              
6             our $VERSION = '2.030102';
7              
8 10     10   9957 use Data::UUID;
  10         9853  
  10         733  
9 10     10   10557 use File::ShareDir qw(dist_dir);
  10         105766  
  10         909  
10 10     10   18896 use Inline::Java qw(cast);
  0            
  0            
11              
12             my %callbacks = (); # callback_id => [ CallbackCaller, sub {} ]
13             my %obj_callbacks = (); # object_ref => [ callback_id, ... ]
14             my ($Helper);
15              
16             # callback helpers
17             sub _callback {
18             my ($callback_id, $args) = @_;
19             $callbacks{$callback_id}->[1]->(@$args);
20             }
21             sub _callback_error {
22             print STDERR shift, "\n";
23             exit;
24             }
25             sub processCallbacks { # class method
26             my $class = shift;
27             $_->[0]->process for values %callbacks;
28             }
29              
30             # prototypes for method overloading
31             sub method_with_3_args_or_map {
32             my $orig = shift;
33            
34             # if we were called with more than 3 arguments,
35             # last one can be a hash
36             splice @_, -1, 1, encode_Map($_[-1]) if @_ > 1+3;
37            
38             return $orig->(@_);
39             }
40             sub decoding_accessor {
41             my $decoder = shift;
42             return sub {
43             my $orig = shift;
44             return $decoder->($orig->(@_));
45             };
46             }
47             sub encoding_setter {
48             my $encoder = shift;
49             return sub {
50             my $orig = shift;
51             return $orig->($encoder->(@_));
52             };
53             }
54             sub callback_setter {
55             my ($type, $decode_coderef) = @_;
56             $decode_coderef ||= sub {@_}; # second arg is optional
57             return sub {
58             my ($orig, $obj, $coderef) = @_;
59            
60             # generate the callback unique identifier
61             my $callback_id = "${type}_${obj}_" . Data::UUID->new->create_hex;
62            
63             # create a Java helper listener
64             my $listener_obj = "Net::RabbitMQ::Java::Helper::$type"->new($Helper, $callback_id);
65             $orig->($obj, $listener_obj);
66             my $callbackCaller = $listener_obj->getCallbackCaller;
67            
68             # save the callback in Perl-land
69             $coderef ||= sub {};
70             $callbacks{$callback_id} = [
71             $callbackCaller,
72             sub { $coderef->($decode_coderef->(@_)) }
73             ];
74            
75             # save an index of callbacks added to channels, so that we
76             # can remove them when channel objects get destroyed
77             $obj_callbacks{"$obj"} ||= [];
78             push @{ $obj_callbacks{"$obj"} }, $callback_id;
79            
80             return $callbackCaller;
81             };
82             }
83             sub destroy_callbacks {
84             my $self = shift;
85             if ($obj_callbacks{"$self"}) {
86             delete $callbacks{$_} for @{ $obj_callbacks{"$self"} };
87             delete $obj_callbacks{"$self"};
88             }
89             return Inline::Java::Object::DESTROY($self, @_);
90             }
91              
92             # encoding/decoding subroutines
93             sub encode_ByteArray ($) {
94             my $string = shift;
95             # this is very inefficient; an idea would be to
96             # convert to hexadecimal (base64?) and pass it as a String
97             # to a Java helper object to call .getBytes() on
98             return [ unpack("c*", $string) ];
99             }
100              
101             sub decode_ByteArray ($) {
102             my $byteArray = shift;
103             return pack("c*", @$byteArray);
104             }
105              
106             sub encode_Map ($) {
107             my $hash = shift;
108             return undef unless $hash;
109             return $hash if ref $hash =~ /java::util::HashMap$/;
110             my $map_obj = new java::util::HashMap;
111             foreach my $key (keys %$hash) {
112             $map_obj->put($key, $hash->{$key});
113             }
114             return $map_obj;
115             }
116              
117             sub decode_Map ($) {
118             my $map_obj = shift;
119             my $hash = {};
120             return $hash unless $map_obj;
121             my $it = $map_obj->entrySet->iterator;
122             while ($it->hasNext) {
123             my $entry_obj = cast('java.util.Map$Entry', $it->next);
124             # getValue returns a com.rabbitmq.client.impl.LongStringHelper.ByteArrayLongString
125             $hash->{ $entry_obj->getKey } = $entry_obj->getValue->toString;
126             }
127             return $hash;
128             }
129              
130             sub encode_Date ($) {
131             my $dt = shift;
132             return undef unless $dt;
133             return $dt if ref $dt =~ /java::util::Date$/;
134             return new java::util::Date(ref $dt eq 'DateTime' ? $dt->epoch : $dt);
135             }
136              
137             sub encode_BasicProperties ($) {
138             return undef unless $_[0];
139             return $_[0] if ref $_[0] =~ /AMQP::BasicProperties$/;
140             my %args = %{$_[0]};
141            
142             $args{headers} = encode_Map $args{headers};
143             $args{timestamp} = encode_Date $args{timestamp};
144            
145             my $props_obj = Net::RabbitMQ::Java::Client::AMQP::BasicProperties->new(
146             map delete $args{$_},
147             qw(contentType contentEncoding headers deliveryMode priority
148             correlationId replyTo expiration messageId timestamp type
149             userId appId clusterId)
150             );
151             !%args or die "Unknown properties: " . join(', ', keys %args);
152             return $props_obj;
153             }
154              
155             # main code
156             my $inited = 0;
157             sub init {
158             my ($class, %params) = @_;
159             return if $inited;
160             $inited = 1;
161            
162             # load Java code
163             my $share_dir = dist_dir('Net-RabbitMQ-Java');
164             my $helper_code;
165             {
166             # TODO: we should pre-compile Helper.java
167             # borrowing code from Inline-Java/Makefile.PL
168             # (or maybe Inline::Java itself provides helper methods?)
169             local $/;
170             open(my $fh, '<', "$share_dir/java/Helper.java") or die;
171             $helper_code = <$fh>;
172             close $fh;
173             }
174            
175             $params{CLASSPATH} ||= '';
176             $params{CLASSPATH} .= ":$share_dir/java/rabbitmq-client.jar:$share_dir/java/commons-io-1.2.jar";
177             Inline->bind(
178             Java => $helper_code,
179             %params,
180             AUTOSTUDY => 1,
181             STUDY => [qw(
182             Helper
183             java.util.Date
184             java.util.HashMap
185             com.rabbitmq.client.ConnectionFactory
186             com.rabbitmq.client.AMQP$BasicProperties
187             com.rabbitmq.client.impl.AMQConnection
188             com.rabbitmq.client.impl.ChannelN
189             com.rabbitmq.client.GetResponse
190             com.rabbitmq.client.QueueingConsumer
191             com.rabbitmq.client.QueueingConsumer$Delivery
192             )],
193             );
194            
195             # alias our namespaces
196             $Helper = Net::RabbitMQ::Java::Helper->new;
197             *java:: = *Net::RabbitMQ::Java::java::;
198             *Net::RabbitMQ::Java::Client:: = *Net::RabbitMQ::Java::com::rabbitmq::client::;
199            
200             # override methods that need to be more Perl-friendly
201             my %override_subs = (
202             'impl::ChannelN::basicPublish' => sub {
203             my $orig = shift;
204            
205             # last argument is message body
206             splice @_, -1, 1, encode_ByteArray $_[-1];
207            
208             # next-to-last argument is a basic properties hash
209             splice @_, -2, 1, encode_BasicProperties $_[-2];
210            
211             return $orig->(@_);
212             },
213             'impl::ChannelN::basicConsume' => sub {
214             my $orig = shift;
215            
216             # if called with 7 arguments, 6th is a map
217             splice @_, -2, 1, encode_Map $_[-2] if @_ == 1+7;
218            
219             return $orig->(@_);
220             },
221             'impl::ChannelN::exchangeBind' => \&method_with_3_args_or_map,
222             'impl::ChannelN::exchangeDeclare' => \&method_with_3_args_or_map,
223             'impl::ChannelN::exchangeUnbind' => \&method_with_3_args_or_map,
224             'impl::ChannelN::queueBind' => \&method_with_3_args_or_map,
225             'impl::ChannelN::queueDeclare' => \&method_with_3_args_or_map,
226             'impl::ChannelN::queueUnbind' => \&method_with_3_args_or_map,
227            
228             'impl::ChannelN::setReturnListener' => callback_setter('ReturnListener', sub {
229             # last argument is message body
230             splice @_, -1, 1, decode_ByteArray $_[-1];
231             @_;
232             }),
233             'impl::ChannelN::setConfirmListener' => callback_setter('ConfirmListener'),
234             'impl::ChannelN::setFlowListener' => callback_setter('FlowListener'),
235             'impl::ChannelN::addShutdownListener' => callback_setter('ShutdownListener'),
236             'impl::AMQConnection::addShutdownListener' => callback_setter('ShutdownListener'),
237            
238             'ConnectionFactory::getClientProperties' => decoding_accessor(\&decode_Map),
239             'impl::AMQConnection::getClientProperties' => decoding_accessor(\&decode_Map),
240             'impl::AMQConnection::getServerProperties' => decoding_accessor(\&decode_Map),
241             'QueueingConsumer::Delivery::getBody' => decoding_accessor(\&decode_ByteArray),
242             'GetResponse::getBody' => decoding_accessor(\&decode_ByteArray),
243             'AMQP::BasicProperties::getHeaders' => decoding_accessor(\&decode_Map),
244            
245             'ConnectionFactory::setClientProperties' => encoding_setter(\&encode_Map),
246             );
247             my %new_subs = (
248             'impl::ChannelN::DESTROY' => \&destroy_callbacks,
249             'impl::AMQConnection::DESTROY' => \&destroy_callbacks,
250             );
251             {
252             no strict 'refs';
253             no warnings 'redefine';
254             foreach my $sub (keys %override_subs) {
255             my $fullname = "Net::RabbitMQ::Java::Client::$sub";
256             my $orig = *$fullname{CODE} or die "failed to override $fullname";
257            
258             *{ $fullname } = sub {
259             return $override_subs{$sub}->($orig, @_);
260             };
261             }
262             *{ "Net::RabbitMQ::Java::Client::$_" } = $new_subs{$_}
263             for keys %new_subs;
264             }
265             }
266              
267              
268             1;
269             __END__