File Coverage

blib/lib/Mojo/WebSocket/PubSub.pm
Criterion Covered Total %
statement 56 81 69.1
branch 3 4 75.0
condition n/a
subroutine 14 23 60.8
pod 1 5 20.0
total 74 113 65.4


line stmt bran cond sub pod time code
1             package Mojo::WebSocket::PubSub;
2             $Mojo::WebSocket::PubSub::VERSION = '0.05';
3 1     1   650144 use Mojo::Base 'Mojo::EventEmitter';
  1         11  
  1         8  
4 1     1   681 use Mojo::WebSocket::PubSub::Syntax;
  1         3  
  1         9  
5 1     1   38 use Mojo::UserAgent;
  1         3  
  1         8  
6 1     1   25 use Mojo::IOLoop;
  1         2  
  1         8  
7              
8             has url => 'http://127.0.0.1:9069/psws';
9             has tx => undef;
10             has ua => sub { state $ua; $ua = Mojo::UserAgent->new };
11             has auto_keepalive => 1;
12              
13             sub new {
14 4     4 1 1007439 my $s = shift->SUPER::new(@_);
15 4         45 $s->{syn} = new Mojo::WebSocket::PubSub::Syntax;
16              
17             # Open WebSocket to pubsub service
18             $s->ua->websocket_p( $s->url )->then(
19             sub {
20 4     4   10424 my $tx = shift;
21 4         24 $s->tx($tx);
22              
23             # Wait for WebSocket to be closed
24 4         48 $s->{syn}->on( all => sub { $s->_rcvd( $_[1], $_[2] ) } );
  5         87  
25             $s->{syn}->on(
26             broadcast_notify => sub {
27 1         17 $s->emit( notify => $_[1]->{msg} );
28             }
29 4         45 );
30             $s->tx->on(
31             finish => sub {
32 0         0 my ( $tx, $code, $reason ) = @_;
33 0         0 say "WebSocket closed with status $code.";
34             }
35 4         44 );
36             $s->tx->on(
37             json => sub {
38 5         4943 my ( $tx, $msg ) = @_;
39 5         21 $s->{syn}->parse($msg);
40             }
41 4         48 );
42 4         242 say "WebSocket connected";
43 4 50       35 $s->_send_keepalive if ( $s->auto_keepalive );
44             }
45             )->catch(
46             sub {
47 0     0   0 my $err = shift;
48              
49             # Handle failed WebSocket handshakes and other exceptions
50 0         0 warn "WebSocket error: $err";
51             }
52 4         49 )->wait;
53 4         3329 return $s;
54             }
55              
56             sub DESTROY {
57 0     0   0 my $s = shift;
58 0         0 $s->tx(undef);
59             }
60              
61             sub listen {
62 3     3 0 80 my $s = shift;
63 3         6 my $ch = shift;
64 3         5 my $ret = 1;
65             new Mojo::Promise(
66             sub {
67 3     3   92 my ( $r, $f ) = @_;
68 3         20 $s->{syn}->on( 'listened' => sub { $r->( $_[1] ) } );
  3         31  
69 3         31 Mojo::IOLoop->timer( 5 => sub { $f->() } );
  0         0  
70 3         172 $s->_send( $s->{syn}->listen($ch) );
71             }
72 3     0   17 )->catch( sub { $ret = 0 } )->wait;
  0         0  
73 3         2928 return $ret;
74             }
75              
76             sub publish {
77 1     1 0 14 my $s = shift;
78 1         3 my $msg = shift;
79 1         2 my $ret = 1;
80             new Mojo::Promise(
81             sub {
82 1     1   28 my ( $r, $f ) = @_;
83 1         7 $s->{syn}->on( 'notified' => sub { $r->( $_[1] ) } );
  1         12  
84 1         11 Mojo::IOLoop->timer( 5 => sub { $f->() } );
  0         0  
85 1         60 $s->_send( $s->{syn}->notify($msg) );
86             }
87 1     0   7 )->catch( sub { $ret = 0 } )->wait;
  0         0  
88 1         878 return $ret;
89             }
90              
91             sub keepalive {
92 0     0 0 0 my $s = shift;
93 0     0   0 Mojo::Promise->resolve->then( sub { $s->_send( $s->{syn}->keepalive ) } )
94 0         0 ->wait;
95 0         0 return 1;
96             }
97              
98             sub ping {
99 0     0 0 0 my $s = shift;
100 0         0 my $ret = 1;
101             new Mojo::Promise(
102             sub {
103 0     0   0 my ( $r, $f ) = @_;
104 0         0 $s->{syn}->on( 'pong' => sub { $r->( $_[1] ) } );
  0         0  
105 0         0 Mojo::IOLoop->timer( 5 => sub { $f->() } );
  0         0  
106 0         0 $s->_send( $s->{syn}->ping() );
107             }
108 0     0   0 )->catch( sub { $ret = 0 } )->wait;
  0         0  
109 0         0 return $ret;
110             }
111              
112             sub _send {
113 8     8   44 shift->tx->send( { json => shift } );
114             }
115              
116             sub _send_keepalive {
117              
118             # send keepalive every inactivity_timeout/2
119 5     5   119 state $tid;
120 5 100       52 Mojo::IOLoop->remove($tid) if ($tid);
121 5         168 my $s = shift;
122 5         18 my $t2 = Mojo::IOLoop->stream( $s->tx->connection )->timeout / 2;
123             $tid = Mojo::IOLoop->recurring(
124             $t2 => sub {
125 4     4   3002260 $s->_send( $s->{syn}->keepalive );
126             }
127 5         180 );
128             }
129              
130             sub _rcvd {
131 5     5   14 my $s = shift;
132             }
133              
134             1;
135              
136             =pod
137              
138             =head1 NAME
139              
140             Mojo::WebSocket::PubSub - A Mojolicious publish/subscribe channels based on websocket.
141              
142             =for html

143            
144             github workflow tests
145            
146             Top language:
147             github last commit
148            

149              
150             =head1 VERSION
151              
152             version 0.05
153              
154             =head1 SYNOPSIS
155              
156             =head1 DESCRIPTION
157              
158             A Mojolicious publish/subscribe channels based on websocket.
159              
160             =encoding UTF-8
161              
162             =head1 BUGS/CONTRIBUTING
163              
164             Please report any bugs through the web interface at L
165             If you want to contribute changes or otherwise involve yourself in development, feel free to fork the Git repository from
166             L.
167              
168             =head1 SUPPORT
169              
170             You can find this documentation with the perldoc command too.
171              
172             perldoc Mojo::WebSocket::PubSub
173              
174             =head1 AUTHOR
175              
176             Emiliano Bruni
177              
178             =head1 COPYRIGHT AND LICENSE
179              
180             This software is copyright (c) 2021 by Emiliano Bruni.
181              
182             This is free software; you can redistribute it and/or modify it under
183             the same terms as the Perl 5 programming language system itself.
184              
185             =cut
186              
187             __END__