File Coverage

blib/lib/Mojolicious/Plugin/PubSub/WebSocket.pm
Criterion Covered Total %
statement 62 62 100.0
branch 12 14 85.7
condition n/a
subroutine 9 9 100.0
pod 1 3 33.3
total 84 88 95.4


line stmt bran cond sub pod time code
1             package Mojolicious::Plugin::PubSub::WebSocket;
2             $Mojolicious::Plugin::PubSub::WebSocket::VERSION = '0.06';
3             # ABSTRACT: Plugin to implement PubSub protocol using websocket
4              
5 2     2   54065 use Mojo::Base 'Mojolicious::Plugin';
  2         6  
  2         13  
6 2     2   323 use Mojo::WebSocket::PubSub::Syntax;
  2         6  
  2         19  
7 2     2   991 use DDP;
  2         64714  
  2         20  
8              
9             sub register {
10 2     2 1 94 my ( $s, $app, $conf ) = @_;
11 2         16 $app->log->debug( "Loading " . __PACKAGE__ );
12              
13 2         148 my $r = $app->routes;
14              
15 2     98   22 $app->helper( psws_clients => sub { state $clients = {} } );
  98         1190  
16 2     28   237 $app->helper( psws_channels => sub { state $channels = {} } );
  28         344  
17              
18             $r->websocket('/psws')->to(
19             cb => sub {
20 20     20   170854 my $c = shift;
21 20         134 my $syn = new Mojo::WebSocket::PubSub::Syntax;
22 20         199 $syn->on( 'all' => sub { $s->psws_reply( $c, @_ ) } );
  27         336  
23             $c->on(
24             json => sub {
25 27         77025 $app->log->debug(
26             sprintf( "RCV from %s: %s",
27             $c->tx->connection, np $_[1] )
28             );
29 27         134765 $syn->parse( $_[1] );
30             }
31 20         257 );
32             $c->on(
33             finish => sub {
34 9         20049 my ( $c, $code, $reason ) = @_;
35 9         35 my $id = $c->tx->connection;
36 9         159 $c->app->log->debug(
37             "PSWS: WebSocket $id" . " closed with status $code" );
38 9 50       188 return unless exists $c->psws_clients->{$id};
39 9         26 my $client = $c->psws_clients->{$id};
40 9 100       30 if ( exists $client->{channel} ) {
41             $c->app->log->debug( "PSWS: WebSocket $id removed from "
42 6         18 . "channel " . $client->{channel} );
43 6         91 delete $c->psws_channels->{ $client->{channel} }->{$id};
44             }
45 9         27 delete $c->psws_clients->{$id};
46             },
47 20         4050 );
48              
49 20         1365 $s->connected($c);
50             }
51 2         185 );
52             }
53              
54             sub connected {
55 20     20 0 37 my $s = shift;
56 20         65 my $c = shift;
57 20         59 my $id = $c->tx->connection;
58 20         289 $c->app->log->debug("PSWS: New connection from $id");
59 20         301 $c->psws_clients->{$id} = { tx => $c->tx };
60             }
61              
62             sub psws_reply {
63 27     27 0 78 my ( $s, $c, $syn, $event, $req ) = @_;
64 27         93 my $id = $c->tx->connection;
65 27         443 $req->{id} = $id;
66              
67 27 100       92 if ( $event eq 'listen' ) {
68 18         40 my $ch = $req->{ch};
69 18 50       49 return unless $ch;
70             # leave previous channel if exists
71 18         62 my $pch = $c->psws_clients->{$id}->{channel};
72 18 100       58 delete $c->psws_channels->{$pch}->{$id} if $pch;
73 18         67 $c->psws_channels->{$ch}->{$id} = 1;
74 18         66 $c->psws_clients->{$id}->{channel} = $ch;
75             }
76 27 100       73 if ( my $res_f = $syn->lookup->{ $req->{t} }->{reply} ) {
77 22         170 my $res = $res_f->( $req, $id );
78 22 100       66 if ( $event eq 'notify' ) {
79 3         7 my $msg = $req->{msg};
80 3         13 my $ch = $c->psws_clients->{$id}->{channel};
81 3         8 foreach
82 3         10 my $client ( grep !/$id/, keys %{ $c->psws_channels->{$ch} } )
83             {
84 12         2357 $c->app->log->debug(
85             sprintf( 'SNT to %s: %s', $client, np $res) );
86 12         65981 $c->psws_clients->{$client}->{tx}->send( { json => $res } );
87             }
88              
89             # now reply to sender
90 3         839 $res = $syn->notified($req);
91             }
92 22         70 $c->app->log->debug( sprintf( 'SNT to %s: %s', $id, np $res) );
93 22         115850 $c->tx->send( { json => $res } );
94             }
95             }
96              
97             1;
98              
99             __END__