File Coverage

blib/lib/PubNub/PubSub.pm
Criterion Covered Total %
statement 45 133 33.8
branch 6 64 9.3
condition 8 43 18.6
subroutine 9 16 56.2
pod 5 5 100.0
total 73 261 27.9


line stmt bran cond sub pod time code
1             package PubNub::PubSub;
2              
3 2     2   35437 use strict;
  2         4  
  2         79  
4 2     2   22 use v5.10;
  2         7  
  2         144  
5             our $VERSION = '1.1.0';
6              
7 2     2   15 use Carp;
  2         9  
  2         181  
8 2     2   1153 use Mojo::JSON qw/encode_json/;
  2         108283  
  2         137  
9 2     2   1504 use Mojo::UserAgent;
  2         397137  
  2         25  
10 2     2   84 use Mojo::Util qw/url_escape/;
  2         3  
  2         102  
11              
12 2     2   928 use PubNub::PubSub::Message;
  2         5  
  2         2287  
13              
14             sub new {
15 1     1 1 18 my $class = shift;
16 1 50       9 my %args = @_ % 2 ? %{$_[0]} : @_;
  0         0  
17              
18 1   50     8 $args{host} ||= 'pubsub.pubnub.com';
19 1   50     7 $args{port} ||= 80;
20 1   50     11 $args{timeout} ||= 60; # for ua timeout
21 1   50     7 $args{publish_queue} ||= [];
22              
23 1 50       2 my $proto = ($args{port} == 443) ? 'https://' : 'http://';
24 1   33     6 $args{web_host} ||= $proto . $args{host};
25              
26 1         4 return bless \%args, $class;
27             }
28              
29             sub __ua {
30 0     0   0 my $self = shift;
31              
32 0 0       0 return $self->{ua} if exists $self->{ua};
33              
34 0         0 my $ua = Mojo::UserAgent->new;
35 0         0 $ua->max_redirects(3);
36 0         0 $ua->inactivity_timeout($self->{timeout});
37 0         0 $ua->proxy->detect; # env proxy
38 0         0 $ua->cookie_jar(0);
39 0         0 $ua->max_connections(100);
40 0         0 $self->{ua} = $ua;
41              
42 0         0 return $ua;
43             }
44              
45             sub publish {
46 0     0 1 0 my $self = shift;
47              
48 0 0       0 my %params = @_ % 2 ? %{$_[0]} : @_;
  0         0  
49 0   0     0 my $callback = $params{callback} || $self->{publish_callback};
50              
51 0         0 my $ua = $self->__ua;
52              
53 0         0 my @steps = map {
54 0         0 my $ref = $_;
55 0         0 my $url = $ref->{url};
56             sub {
57 0     0   0 my $delay = shift;
58 0         0 my $end = $delay->begin;
59             $ua->get($url => sub {
60 0 0       0 $callback->($_[1]->res, $ref->{message}) if $callback;
61 0         0 $end->();
62 0         0 });
63             }
64 0         0 } $self->__construct_publish_urls(%params);
65              
66 0         0 Mojo::IOLoop->delay(@steps)->wait;
67             }
68              
69             sub __construct_publish_urls {
70 3     3   8737 my ($self, %params) = @_;
71              
72 3   33     27 my $pub_key = $params{pub_key} || $self->{pub_key};
73 3 50       8 $pub_key or croak "pub_key is required.";
74 3   33     14 my $sub_key = $params{sub_key} || $self->{sub_key};
75 3 50       8 $sub_key or croak "sub_key is required.";
76 3   33     13 my $channel = $params{channel} || $self->{channel};
77 3 50       7 $channel or croak "channel is required.";
78 3 50       8 $params{messages} or croak "messages is required.";
79              
80 6         1220 return map {
81 6         21 my $json = $_->json;
82 6         373 my $uri = Mojo::URL->new( $self->{web_host} . qq~/publish/$pub_key/$sub_key/0/$channel/0/~ . url_escape($json) );
83 6         1050 $uri->query($_->query_params(\%params));
84 6         238 { url => $uri->to_string, message => $_ };
85 3         4 } map { PubNub::PubSub::Message->new($_) } @{$params{messages}};
  3         8  
86             }
87              
88             sub subscribe {
89 0     0 1   my $self = shift;
90 0 0         my %params = @_ % 2 ? %{$_[0]} : @_;
  0            
91              
92 0   0       my $sub_key = $params{sub_key} || $self->{sub_key};
93 0 0         $sub_key or croak "sub_key is required.";
94 0   0       my $channel = $params{channel} || $self->{channel};
95 0 0         $channel or croak "channel is required.";
96              
97 0 0         my $callback = $params{callback} or croak "callback is required.";
98 0   0       my $timetoken = $params{timetoken} || '0';
99              
100 0           my $ua = $self->__ua;
101              
102 0           my $tx = $ua->get($self->{web_host} . "/subscribe/$sub_key/$channel/0/$timetoken");
103 0 0         unless ($tx->success) {
104             # for example $tx->error->{message} =~ /Inactivity timeout/
105              
106             # This is not a traditional goto. Instead it exits this function
107             # and re-enters with @ as params.
108             #
109             # see goto docs, this is basically a method call which exits the current
110             # function first. So no extra call stack depth.
111 0           sleep 1;
112 0           @_ = ($self, %params, timetoken => $timetoken);
113 0           goto &subscribe;
114             }
115 0           my $json = $tx->res->json;
116 0 0         my @cb_args = $params{raw_msg}? ($json) : (@{$json->[0]});
  0            
117              
118 0 0         my $rtn = $callback ? $callback->(@cb_args) : 1;
119 0 0         return unless $rtn;
120              
121 0           $timetoken = $json->[1];
122 0           return $self->subscribe(%params, timetoken => $timetoken);
123             }
124              
125             sub subscribe_multi {
126 0     0 1   my $self = shift;
127 0 0         my %params = @_ % 2 ? %{$_[0]} : @_;
  0            
128 0 0         croak 'channels must be an arrayref'
129             unless ref($params{channels}) =~ /ARRAY/;
130 0 0         croak 'callback must be a hashref or coderef'
131             unless ref($params{callback}) =~ /(HASH|CODE)/;
132              
133 0           my $callback;
134 0 0         if (ref($params{callback}) =~ /HASH/){
135 0           for (keys %{$params{callback}}) {
  0            
136 0 0         croak "Non-coderef value found for callback key $_"
137             unless ref($params{callback}->{$_}) =~ /CODE/;
138             }
139             $callback = sub {
140 0     0     my ($obj) = @_;
141 0           my ($msg, $timetoken, $channel) = @$obj;
142 0           my $cb_dispatch = $params{callback};
143 0 0         unless ($channel) { # on connect messages
144 0 0         goto $cb_dispatch->{on_connect}
145             if exists $cb_dispatch->{on_connect};
146 0           return 1;
147             }
148 0 0         if (exists $cb_dispatch->{$channel}){
    0          
149              
150             # these are verified coderefs, so replacing the current stack
151             # frame with a call to the function. They will *not* jump to
152             # a label or other points. Basically this just lets us pretend
153             # that this was called directly by subscribe above.
154 0           goto $cb_dispatch->{$channel};
155             } elsif (exists $cb_dispatch->{'_default'}){
156 0           goto $cb_dispatch->{_default};
157             } else {
158 0           warn 'Using callback dispatch table, cannot find channel callback'
159             . ' and _default callback not specified';
160 0           return;
161             }
162 0           };
163             }
164 0 0         $callback = $params{callback} unless ref $callback;
165              
166 0           my $channel_string = join ',', @{$params{channels}};
  0            
167 0           return $self->subscribe(channel => $channel_string, callback => $callback,
168             raw_msg => 1);
169             }
170              
171             sub history {
172 0     0 1   my $self = shift;
173              
174 0 0 0       if (scalar(@_) == 1 and ref($_[0]) ne 'HASH' and $_[0] =~ /^\d+$/) {
      0        
175 0           @_ = (count => $_[0]);
176 0           warn "->history(\$num) is deprecated and will be removed in next few releases.\n";
177             }
178              
179 0 0         my %params = @_ % 2 ? %{$_[0]} : @_;
  0            
180              
181 0   0       my $sub_key = delete $params{sub_key} || $self->{sub_key};
182 0 0         $sub_key or croak "sub_key is required.";
183 0   0       my $channel = delete $params{channel} || $self->{channel};
184 0 0         $channel or croak "channel is required.";
185              
186 0           my $ua = $self->__ua;
187              
188 0           my $tx = $ua->get($self->{web_host} . "/v2/history/sub-key/$sub_key/channel/$channel" => form => \%params);
189 0 0         return [$tx->error->{message}] unless $tx->success;
190 0           return $tx->res->json;
191             }
192              
193             1;
194             __END__