File Coverage

blib/lib/Mojo/WebSocket/PubSub.pm
Criterion Covered Total %
statement 57 67 85.0
branch 2 2 100.0
condition n/a
subroutine 14 18 77.7
pod 1 3 33.3
total 74 90 82.2


line stmt bran cond sub pod time code
1             package Mojo::WebSocket::PubSub;
2             $Mojo::WebSocket::PubSub::VERSION = '0.04';
3 1     1   509724 use Mojo::Base 'Mojo::EventEmitter';
  1         10  
  1         5  
4 1     1   573 use Mojo::WebSocket::PubSub::Syntax;
  1         3  
  1         8  
5 1     1   37 use Mojo::UserAgent;
  1         3  
  1         6  
6 1     1   22 use Mojo::IOLoop;
  1         3  
  1         8  
7              
8             has url => 'http://127.0.0.1:9069/psws';
9             has tx => undef;
10             has ua => sub { state $ua; $ua = Mojo::UserAgent->new };
11              
12             sub new {
13 4     4 1 1005802 my $s = shift->SUPER::new(@_);
14 4         52 $s->{syn} = new Mojo::WebSocket::PubSub::Syntax;
15              
16             # Open WebSocket to pubsub service
17             $s->ua->websocket_p( $s->url )->then(
18             sub {
19 4     4   10692 my $tx = shift;
20 4         22 $s->tx($tx);
21              
22             # Wait for WebSocket to be closed
23 4         53 $s->{syn}->on( all => sub { $s->_rcvd( $_[1], $_[2] ) } );
  5         88  
24             $s->{syn}->on( broadcast_notify => sub {
25 1         17 $s->emit(notify => $_[1]->{msg} ) ;
26 4         59 } );
27             $s->tx->on(
28             finish => sub {
29 0         0 my ( $tx, $code, $reason ) = @_;
30 0         0 say "WebSocket closed with status $code.";
31             }
32 4         27 );
33             $s->tx->on(
34             json => sub {
35 5         4202 my ( $tx, $msg ) = @_;
36 5         24 $s->{syn}->parse($msg);
37             }
38 4         59 );
39 4         273 say "WebSocket connected";
40 4         25 $s->_send_keepalive;
41             }
42             )->catch(
43             sub {
44 0     0   0 my $err = shift;
45              
46             # Handle failed WebSocket handshakes and other exceptions
47 0         0 warn "WebSocket error: $err";
48             }
49 4         35 )->wait;
50 4         2894 return $s;
51             }
52              
53             sub DESTROY {
54 0     0   0 my $s = shift;
55 0         0 $s->tx(undef);
56             }
57              
58             sub listen {
59 3     3 0 84 my $s = shift;
60 3         7 my $ch = shift;
61 3         5 my $ret = 1;
62             new Mojo::Promise(
63             sub {
64 3     3   73 my ( $r, $f ) = @_;
65 3         18 $s->{syn}->on( 'listened' => sub { $r->( $_[1] ) } );
  3         34  
66 3         30 Mojo::IOLoop->timer( 5 => sub { $f->() } );
  0         0  
67 3         160 $s->_send( $s->{syn}->listen($ch) );
68             }
69 3     0   18 )->catch( sub { $ret = 0 } )->wait;
  0         0  
70 3         2818 return $ret;
71             }
72              
73             sub publish {
74 1     1 0 24 my $s = shift;
75 1         3 my $msg = shift;
76 1         2 my $ret = 1;
77             new Mojo::Promise(
78             sub {
79 1     1   28 my ( $r, $f ) = @_;
80 1         8 $s->{syn}->on( 'notified' => sub { $r->( $_[1] ) } );
  1         12  
81 1         10 Mojo::IOLoop->timer( 5 => sub { $f->() } );
  0         0  
82 1         57 $s->_send( $s->{syn}->notify($msg) );
83             }
84 1     0   10 )->catch( sub { $ret = 0 } )->wait;
  0         0  
85 1         886 return $ret;
86             }
87              
88             sub _send {
89 8     8   42 shift->tx->send( { json => shift } );
90             }
91              
92             sub _send_keepalive {
93             # send keepalive every inactivity_timeout/2
94 5     5   81 state $tid;
95 5 100       46 Mojo::IOLoop->remove($tid) if ($tid);
96 5         156 my $s = shift;
97 5         14 my $t2 = Mojo::IOLoop->stream($s->tx->connection)->timeout/2;
98 5         236 say $t2;
99             $tid = Mojo::IOLoop->recurring(
100             $t2 => sub {
101 4     4   3002539 $s->_send( $s->{syn}->keepalive );
102             }
103 5         52 );
104             }
105              
106             sub _rcvd {
107 5     5   24 my $s = shift;
108             #p @_;
109             }
110              
111             1;
112              
113             =pod
114              
115             =head1 NAME
116              
117             Mojo::WebSocket::PubSub - A Mojolicious publish/subscribe channels based on websocket.
118              
119             =for html

120            
121             github workflow tests
122            
123             Top language:
124             github last commit
125            

126              
127             =head1 VERSION
128              
129             version 0.04
130              
131             =head1 SYNOPSIS
132              
133             =head1 DESCRIPTION
134              
135             A Mojolicious publish/subscribe channels based on websocket.
136              
137             =encoding UTF-8
138              
139             =head1 BUGS/CONTRIBUTING
140              
141             Please report any bugs through the web interface at L
142             If you want to contribute changes or otherwise involve yourself in development, feel free to fork the Git repository from
143             L.
144              
145             =head1 SUPPORT
146              
147             You can find this documentation with the perldoc command too.
148              
149             perldoc Mojo::WebSocket::PubSub
150              
151             =head1 AUTHOR
152              
153             Emiliano Bruni
154              
155             =head1 COPYRIGHT AND LICENSE
156              
157             This software is copyright (c) 2021 by Emiliano Bruni.
158              
159             This is free software; you can redistribute it and/or modify it under
160             the same terms as the Perl 5 programming language system itself.
161              
162             =cut
163              
164             __END__