File Coverage

blib/lib/PubNub/PubSub.pm
Criterion Covered Total %
statement 47 135 34.8
branch 6 64 9.3
condition 8 43 18.6
subroutine 10 17 58.8
pod 5 5 100.0
total 76 264 28.7


line stmt bran cond sub pod time code
1             package PubNub::PubSub;
2              
3 2     2   26257 use strict;
  2         5  
  2         52  
4 2     2   8 use warnings;
  2         3  
  2         45  
5 2     2   20 use v5.10;
  2         6  
6              
7 2     2   9 use Carp;
  2         2  
  2         154  
8 2     2   981 use Mojo::JSON qw/encode_json/;
  2         183642  
  2         138  
9 2     2   1386 use Mojo::UserAgent;
  2         249126  
  2         13  
10 2     2   65 use Mojo::Util qw/url_escape/;
  2         3  
  2         74  
11              
12 2     2   751 use PubNub::PubSub::Message;
  2         4  
  2         1954  
13              
14             our $VERSION = '1.0.1';
15              
16             sub new { ## no critic (RequireArgUnpacking)
17 1     1 1 7 my $class = shift;
18 1 50       7 my %args = @_ % 2 ? %{$_[0]} : @_;
  0         0  
19              
20 1   50     6 $args{host} ||= 'pubsub.pubnub.com';
21 1   50     4 $args{port} ||= 80;
22 1   50     5 $args{timeout} ||= 60; # for ua timeout
23 1   50     4 $args{publish_queue} ||= [];
24              
25 1 50       3 my $proto = ($args{port} == 443) ? 'https://' : 'http://';
26 1   33     6 $args{web_host} ||= $proto . $args{host};
27              
28 1         3 return bless \%args, $class;
29             }
30              
31             sub __ua {
32 0     0   0 my $self = shift;
33              
34 0 0       0 return $self->{ua} if exists $self->{ua};
35              
36 0         0 my $ua = Mojo::UserAgent->new;
37 0         0 $ua->max_redirects(3);
38 0         0 $ua->inactivity_timeout($self->{timeout});
39 0         0 $ua->proxy->detect; # env proxy
40 0         0 $ua->cookie_jar(0);
41 0         0 $ua->max_connections(100);
42 0         0 $self->{ua} = $ua;
43              
44 0         0 return $ua;
45             }
46              
47             sub publish { ## no critic (RequireArgUnpacking)
48 0     0 1 0 my $self = shift;
49              
50 0 0       0 my %params = @_ % 2 ? %{$_[0]} : @_;
  0         0  
51 0   0     0 my $callback = $params{callback} || $self->{publish_callback};
52              
53 0         0 my $ua = $self->__ua;
54              
55             my @steps = map {
56 0         0 my $ref = $_;
  0         0  
57 0         0 my $url = $ref->{url};
58             sub {
59 0     0   0 my $delay = shift;
60 0         0 my $end = $delay->begin;
61             $ua->get(
62             $url => sub {
63 0 0       0 $callback->($_[1]->res, $ref->{message}) if $callback;
64 0         0 $end->();
65 0         0 });
66             }
67 0         0 } $self->__construct_publish_urls(%params);
68              
69 0         0 return Mojo::IOLoop->delay(@steps)->wait;
70             }
71              
72             sub __construct_publish_urls {
73 3     3   6845 my ($self, %params) = @_;
74              
75 3   33     13 my $pub_key = $params{pub_key} || $self->{pub_key};
76 3 50       6 $pub_key or croak "pub_key is required.";
77 3   33     8 my $sub_key = $params{sub_key} || $self->{sub_key};
78 3 50       6 $sub_key or croak "sub_key is required.";
79 3   33     7 my $channel = $params{channel} || $self->{channel};
80 3 50       5 $channel or croak "channel is required.";
81 3 50       6 $params{messages} or croak "messages is required.";
82              
83             return map {
84 6         589 my $json = $_->json;
85 6         245 my $uri = Mojo::URL->new($self->{web_host} . qq~/publish/$pub_key/$sub_key/0/$channel/0/~ . url_escape($json));
86 6         452 $uri->query($_->query_params(\%params));
87             {
88 6         290 url => $uri->to_string,
89             message => $_
90             };
91             } map {
92 6         15 PubNub::PubSub::Message->new($_)
93 3         3 } @{$params{messages}};
  3         19  
94             }
95              
96             sub subscribe { ## no critic (RequireArgUnpacking)
97 0     0 1   my $self = shift;
98 0 0         my %params = @_ % 2 ? %{$_[0]} : @_;
  0            
99              
100 0   0       my $sub_key = $params{sub_key} || $self->{sub_key};
101 0 0         $sub_key or croak "sub_key is required.";
102 0   0       my $channel = $params{channel} || $self->{channel};
103 0 0         $channel or croak "channel is required.";
104              
105 0 0         my $callback = $params{callback} or croak "callback is required.";
106 0   0       my $timetoken = $params{timetoken} || '0';
107              
108 0           my $ua = $self->__ua;
109              
110 0           my $tx = $ua->get($self->{web_host} . "/subscribe/$sub_key/$channel/0/$timetoken");
111 0 0         unless ($tx->success) {
112             # for example $tx->error->{message} =~ /Inactivity timeout/
113              
114             # This is not a traditional goto. Instead it exits this function
115             # and re-enters with @ as params.
116             #
117             # see goto docs, this is basically a method call which exits the current
118             # function first. So no extra call stack depth.
119 0           sleep 1;
120 0           @_ = ($self, %params, timetoken => $timetoken);
121 0           goto &subscribe;
122             }
123 0           my $json = $tx->res->json;
124 0 0         my @cb_args = $params{raw_msg} ? ($json) : (@{$json->[0]});
  0            
125              
126 0 0         my $rtn = $callback ? $callback->(@cb_args) : 1;
127 0 0         return unless $rtn;
128              
129 0           $timetoken = $json->[1];
130 0           return $self->subscribe(%params, timetoken => $timetoken);
131             }
132              
133             sub subscribe_multi { ## no critic (RequireArgUnpacking)
134 0     0 1   my $self = shift;
135 0 0         my %params = @_ % 2 ? %{$_[0]} : @_;
  0            
136             croak 'channels must be an arrayref'
137 0 0         unless ref($params{channels}) =~ /ARRAY/;
138             croak 'callback must be a hashref or coderef'
139 0 0         unless ref($params{callback}) =~ /(HASH|CODE)/;
140              
141 0           my $callback;
142 0 0         if (ref($params{callback}) =~ /HASH/) {
143 0           for (keys %{$params{callback}}) {
  0            
144             croak "Non-coderef value found for callback key $_"
145 0 0         unless ref($params{callback}->{$_}) =~ /CODE/;
146             }
147             $callback = sub {
148 0     0     my ($obj) = @_;
149 0           my (undef, undef, $channel) = @$obj;
150 0           my $cb_dispatch = $params{callback};
151 0 0         unless ($channel) { # on connect messages
152             goto $cb_dispatch->{on_connect}
153 0 0         if exists $cb_dispatch->{on_connect};
154 0           return 1;
155             }
156 0 0         if (exists $cb_dispatch->{$channel}) {
    0          
157              
158             # these are verified coderefs, so replacing the current stack
159             # frame with a call to the function. They will *not* jump to
160             # a label or other points. Basically this just lets us pretend
161             # that this was called directly by subscribe above.
162 0           goto $cb_dispatch->{$channel};
163             } elsif (exists $cb_dispatch->{'_default'}) {
164 0           goto $cb_dispatch->{_default};
165             } else {
166 0           warn 'Using callback dispatch table, cannot find channel callback' . ' and _default callback not specified';
167 0           return;
168             }
169 0           };
170             }
171 0 0         $callback = $params{callback} unless ref $callback;
172              
173 0           my $channel_string = join ',', @{$params{channels}};
  0            
174 0           return $self->subscribe(
175             channel => $channel_string,
176             callback => $callback,
177             raw_msg => 1
178             );
179             }
180              
181             sub history { ## no critic (RequireArgUnpacking)
182 0     0 1   my $self = shift;
183              
184 0 0 0       if (scalar(@_) == 1 and ref($_[0]) ne 'HASH' and $_[0] =~ /^\d+$/) {
      0        
185 0           @_ = (count => $_[0]);
186 0           warn "->history(\$num) is deprecated and will be removed in next few releases.\n";
187             }
188              
189 0 0         my %params = @_ % 2 ? %{$_[0]} : @_;
  0            
190              
191 0   0       my $sub_key = delete $params{sub_key} || $self->{sub_key};
192 0 0         $sub_key or croak "sub_key is required.";
193 0   0       my $channel = delete $params{channel} || $self->{channel};
194 0 0         $channel or croak "channel is required.";
195              
196 0           my $ua = $self->__ua;
197              
198 0           my $tx = $ua->get($self->{web_host} . "/v2/history/sub-key/$sub_key/channel/$channel" => form => \%params);
199 0 0         return [$tx->error->{message}] unless $tx->success;
200 0           return $tx->res->json;
201             }
202              
203             1;
204             __END__