File Coverage

blib/lib/Mojolicious/Plugin/Pubsub.pm
Criterion Covered Total %
statement 114 125 91.2
branch 22 34 64.7
condition 1 3 33.3
subroutine 17 20 85.0
pod 1 1 100.0
total 155 183 84.7


line stmt bran cond sub pod time code
1 6     6   13544 use strict;
  6         14  
  6         168  
2 6     6   26 use warnings;
  6         10  
  6         250  
3             package Mojolicious::Plugin::Pubsub;
4             #ABSTRACT: Pubsub plugin for Mojolicious
5             $Mojolicious::Plugin::Pubsub::VERSION = '0.006';
6 6     6   28 use Mojo::Base 'Mojolicious::Plugin';
  6         8  
  6         44  
7              
8 6     6   1162 use Mojo::IOLoop;
  6         14  
  6         54  
9 6     6   182 use Mojo::JSON qw( decode_json encode_json );
  6         12  
  6         322  
10 6     6   36 use Mojo::Util qw( b64_decode b64_encode deprecated );
  6         10  
  6         360  
11 6     6   38 use IO::Socket::UNIX;
  6         12  
  6         62  
12              
13             my $client;
14             my $conf;
15              
16             sub register {
17 6     6 1 220 my ($self, $app, $cfg) = @_;
18              
19 6         14 $cfg->{cbs} = [];
20 6 50       22 push @{ $cfg->{subs} }, $cfg->{cb} if exists $cfg->{cb};
  6         20  
21 6 100       36 $cfg->{socket} = $app->home->child($app->moniker . '.pubsub') unless exists $cfg->{socket};
22 6         310 $conf = $cfg;
23              
24 6         44 my $loop = Mojo::IOLoop->singleton;
25              
26 6 50       294 pipe my $in, my $out or die "Could not open pipe pair: $!";
27              
28 6         10217 my $pid = fork();
29 6 50       555 die "Could not fork: $!" if not defined $pid;
30              
31 6 100       356 if ($pid) {
32 3         131 close $out;
33 3         8995 chomp(my $result = readline $in);
34 3         198 close $in;
35              
36 3 50       59 die "Could not establish pubsub socket: $result" if $result ne 'DONE';
37             } else {
38             # now in fork
39 3         140 close $in;
40 3         250 $loop->reset;
41              
42 3         1476 my @streams;
43              
44 3 50 33     214 unless (-e $conf->{socket} and IO::Socket::UNIX->new(Peer => $conf->{socket})) {
45 3         199 my $server = eval { $loop->server(
46             {path => $conf->{socket}} => sub {
47 6     6   72272 my (undef, $stream) = @_;
48 6         27 push @streams, $stream;
49              
50 6         20 my $msg;
51             $stream->on(
52             read => sub {
53 4         2039 my ($stream, $bytes) = @_;
54 4         25 $msg .= $bytes;
55              
56 4         28 while (length $msg) {
57 4 50       112 if ($msg =~ s/^(.+\n)//) {
58 4         38 my $line = $1;
59 4         26 foreach my $str (@streams) { $str->write($line); }
  5         57  
60             } else {
61 0         0 return;
62             }
63             }
64             }
65 6         119 );
66              
67             $stream->on(
68             close => sub {
69 6         1091159 @streams = grep $_ ne $_[0], @streams;
70 6 100       121 $loop->stop unless @streams;
71             }
72 6         129 );
73              
74 6         72 $stream->timeout(0);
75             }
76 3         201 ); };
77              
78 3 50       5939 if (not defined $server) {
79 0         0 print $out $@;
80 0         0 close $out;
81 0         0 exit;
82             }
83             }
84              
85 3         71 print $out "DONE\n";
86 3         236 close $out;
87              
88 3 50       66 $loop->start unless $loop->is_running;
89 3         2844 exit;
90             }
91              
92 3     3   185 $loop->next_tick(sub { _connect() });
  3         55947  
93              
94             $app->helper(
95             'pubsub.publish' => sub {
96 4     4   5637 my $self = shift;
97 4         114 my $msg = b64_encode(encode_json([@_]), "");
98              
99 4         532 _send($msg . "\n");
100              
101 4         1220 return $self;
102             }
103 3         1007 );
104              
105             $app->helper(
106             'pubsub.subscribe' => sub {
107 1     1   102 my $self = shift;
108 1         2 my $cb = shift;
109              
110 1         2 push @{ $conf->{subs} }, $cb;
  1         3  
111              
112 1         2 return $self;
113             }
114 3         476 );
115              
116             $app->helper(
117             'pubsub.unsubscribe' => sub {
118 1     1   182 my $self = shift;
119 1         3 my $cb = shift;
120              
121 1         2 @{ $conf->{subs} } = grep { $_ != $cb } @{ $conf->{subs} };
  1         3  
  1         5  
  1         8  
122              
123 1         4 return $self;
124             }
125 3         131 );
126              
127             $app->helper(
128             publish => sub {
129 0     0   0 deprecated '->publish is deprecated in favour of ->pubsub->publish';
130 0         0 shift->pubsub->publish(@_);
131 3         102 });
132             $app->helper(
133             subscribe => sub {
134 0     0   0 deprecated '->subscribe is deprecated in favour of ->pubsub->subscribe';
135 0         0 shift->pubsub->subscribe(@_);
136 3         104 });
137             $app->helper(
138             unsubscribe => sub {
139 0     0   0 deprecated '->unsubscribe is deprecated in favour of ->pubsub->unsubscribe';
140 0         0 shift->pubsub->unsubscribe(@_);
141 3         106 });
142              
143             }
144              
145             sub _send {
146 4     4   27 my ($msg) = @_;
147              
148 4 100       43 if (not defined $client) {
149 3     3   45 return _connect(sub { $_[0]->write($msg); });
  3         44  
150             }
151              
152 1         5 $client->write($msg);
153             }
154              
155             sub _connect {
156              
157 6     6   22 my $cb = shift;
158              
159             Mojo::IOLoop->singleton->client(
160             { path => $conf->{socket} } => sub {
161 6     6   4891 my ($loop, $err, $stream) = @_;
162 6 50       55 die sprintf "Could not connect to %s: %s", $conf->{socket}, $err if defined $err;
163              
164 6 100       55 if (defined $client) {
165 3         18 $stream->close();
166 3 50       338 $cb->($client) if defined $cb;
167              
168 3         19 return;
169             }
170              
171 3         9 $client = $stream;
172              
173 3         14 my $msg;
174             $stream->on(read => sub {
175 4         4332 my ($stream, $bytes) = @_;
176              
177 4         21 $msg .= $bytes;
178              
179 4         22 while (length $msg) {
180 4 50       37 if ($msg =~ s/^(.+)\n//) {
181 4         16 my $b64 = $1;
182 4         81 my $args = decode_json(b64_decode($b64));
183 4         783 foreach my $subscriber (@{ $conf->{subs} }) {
  4         15  
184 4         9 $subscriber->(@{ $args });
  4         34  
185             }
186             }
187             else {
188             return
189 0         0 }
190              
191             }
192 3         48 });
193              
194 3         68 $stream->timeout(0);
195              
196 3 50       104 $cb->($stream) if defined $cb;
197              
198             }
199 6         80 );
200             }
201              
202             1;
203              
204             __END__