File Coverage

blib/lib/Mojo/WebSocket/PubSub.pm
Criterion Covered Total %
statement 62 87 71.2
branch 3 4 75.0
condition n/a
subroutine 15 24 62.5
pod 1 6 16.6
total 81 121 66.9


line stmt bran cond sub pod time code
1             package Mojo::WebSocket::PubSub;
2             $Mojo::WebSocket::PubSub::VERSION = '0.06';
3 1     1   595390 use Mojo::Base 'Mojo::EventEmitter';
  1         9  
  1         7  
4 1     1   646 use Mojo::WebSocket::PubSub::Syntax;
  1         2  
  1         9  
5 1     1   36 use Mojo::UserAgent;
  1         2  
  1         6  
6 1     1   25 use Mojo::IOLoop;
  1         2  
  1         9  
7              
8             has url => 'http://127.0.0.1:9069/psws';
9             has tx => undef;
10             has ua => sub { state $ua; $ua = Mojo::UserAgent->new };
11             has auto_keepalive => 1;
12              
13             sub new {
14 5     5 1 1001012 my $s = shift->SUPER::new(@_);
15 5         55 $s->{syn} = new Mojo::WebSocket::PubSub::Syntax;
16 5         48 return $s;
17             }
18              
19             sub connect {
20 5     5 0 11 my $s = shift;
21             # Open WebSocket to pubsub service
22             $s->ua->websocket_p( $s->url )->then(
23             sub {
24 5     5   12668 my $tx = shift;
25 5         27 $s->tx($tx);
26              
27             # Wait for WebSocket to be closed
28 5         60 $s->{syn}->on( all => sub { $s->_rcvd( $_[1], $_[2] ) } );
  7         85  
29 5         53 $s->{syn}->on( all => sub { $s->emit( all => ($_[1], $_[2] ) ) } );
  7         37  
30 5         52 $s->{syn}->on( all => sub { $s->emit( $_[1] => $_[2] ) } );
  7         83  
31             $s->{syn}->on(
32             broadcast_notify => sub {
33 1         12 $s->emit( notify => $_[1]->{msg} );
34             }
35 5         42 );
36             $s->tx->on(
37             finish => sub {
38 0         0 my ( $tx, $code, $reason ) = @_;
39 0         0 $s->emit("finish" => ($code, $reason));
40             }
41 5         34 );
42             $s->tx->on(
43             json => sub {
44 7         7711 my ( $tx, $msg ) = @_;
45 7         35 $s->{syn}->parse($msg);
46             }
47 5         71 );
48 5         118 $s->emit("connected" => $s->url);
49 5 50       78 $s->_send_keepalive if ( $s->auto_keepalive );
50             }
51             )->catch(
52             sub {
53 0     0   0 my $err = shift;
54              
55             # Handle failed WebSocket handshakes and other exceptions
56 0         0 warn "WebSocket error: $err";
57             }
58 5         20 )->wait;
59 5         4268 return $s;
60             }
61              
62             sub DESTROY {
63 0     0   0 my $s = shift;
64 0         0 $s->tx(undef);
65             }
66              
67             sub listen {
68 5     5 0 119 my $s = shift;
69 5         13 my $ch = shift;
70 5         11 my $ret = 1;
71             new Mojo::Promise(
72             sub {
73 5     5   145 my ( $r, $f ) = @_;
74 5         28 $s->{syn}->on( 'listened' => sub { $r->( $_[1] ) } );
  6         75  
75 5         60 Mojo::IOLoop->timer( 5 => sub { $f->() } );
  0         0  
76 5         310 $s->_send( $s->{syn}->listen($ch) );
77             }
78 5     0   31 )->catch( sub { $ret = 0 } )->wait;
  0         0  
79 5         5153 return $ret;
80             }
81              
82             sub publish {
83 1     1 0 14 my $s = shift;
84 1         3 my $msg = shift;
85 1         3 my $ret = 1;
86             new Mojo::Promise(
87             sub {
88 1     1   29 my ( $r, $f ) = @_;
89 1         8 $s->{syn}->on( 'notified' => sub { $r->( $_[1] ) } );
  1         13  
90 1         12 Mojo::IOLoop->timer( 5 => sub { $f->() } );
  0         0  
91 1         59 $s->_send( $s->{syn}->notify($msg) );
92             }
93 1     0   9 )->catch( sub { $ret = 0 } )->wait;
  0         0  
94 1         941 return $ret;
95             }
96              
97             sub keepalive {
98 0     0 0 0 my $s = shift;
99 0     0   0 Mojo::Promise->resolve->then( sub { $s->_send( $s->{syn}->keepalive ) } )
100 0         0 ->wait;
101 0         0 return 1;
102             }
103              
104             sub ping {
105 0     0 0 0 my $s = shift;
106 0         0 my $ret = 1;
107             new Mojo::Promise(
108             sub {
109 0     0   0 my ( $r, $f ) = @_;
110 0         0 $s->{syn}->on( 'pong' => sub { $r->( $_[1] ) } );
  0         0  
111 0         0 Mojo::IOLoop->timer( 5 => sub { $f->() } );
  0         0  
112 0         0 $s->_send( $s->{syn}->ping() );
113             }
114 0     0   0 )->catch( sub { $ret = 0 } )->wait;
  0         0  
115 0         0 return $ret;
116             }
117              
118             sub _send {
119 10     10   53 shift->tx->send( { json => shift } );
120             }
121              
122             sub _send_keepalive {
123              
124             # send keepalive every inactivity_timeout/2
125 6     6   119 state $tid;
126 6 100       41 Mojo::IOLoop->remove($tid) if ($tid);
127 6         182 my $s = shift;
128 6         18 my $t2 = Mojo::IOLoop->stream( $s->tx->connection )->timeout / 2;
129             $tid = Mojo::IOLoop->recurring(
130             $t2 => sub {
131 4     4   2990524 $s->_send( $s->{syn}->keepalive );
132             }
133 6         212 );
134             }
135              
136             sub _rcvd {
137 7     7   19 my $s = shift;
138             }
139              
140             1;
141              
142             =pod
143              
144             =head1 NAME
145              
146             Mojo::WebSocket::PubSub - A Mojolicious publish/subscribe channels based on websocket.
147              
148             =for html

149            
150             github workflow tests
151            
152             Top language:
153             github last commit
154            

155              
156             =head1 VERSION
157              
158             version 0.06
159              
160             =head1 SYNOPSIS
161              
162             =head1 DESCRIPTION
163              
164             A Mojolicious publish/subscribe channels based on websocket.
165              
166             =encoding UTF-8
167              
168             =head1 BUGS/CONTRIBUTING
169              
170             Please report any bugs through the web interface at L
171             If you want to contribute changes or otherwise involve yourself in development, feel free to fork the Git repository from
172             L.
173              
174             =head1 SUPPORT
175              
176             You can find this documentation with the perldoc command too.
177              
178             perldoc Mojo::WebSocket::PubSub
179              
180             =head1 AUTHOR
181              
182             Emiliano Bruni
183              
184             =head1 COPYRIGHT AND LICENSE
185              
186             This software is copyright (c) 2021 by Emiliano Bruni.
187              
188             This is free software; you can redistribute it and/or modify it under
189             the same terms as the Perl 5 programming language system itself.
190              
191             =cut
192              
193             __END__