File Coverage

blib/lib/Mojolicious/Broker.pm
Criterion Covered Total %
statement 46 52 88.4
branch 1 2 50.0
condition n/a
subroutine 10 13 76.9
pod 4 6 66.6
total 61 73 83.5


line stmt bran cond sub pod time code
1             package Mojolicious::Broker;
2             # ABSTRACT: A message broker for WebSockets
3             $Mojolicious::Broker::VERSION = '0.001';
4              
5 1     1   1116 use Mojo::Base 'Mojolicious';
  1         2  
  1         8  
6 1     1   124849 use Scalar::Util qw( refaddr );
  1         4  
  1         63  
7 1     1   5 use File::Basename qw( dirname );
  1         2  
  1         52  
8 1     1   6 use File::Spec::Functions qw( catdir );
  1         1  
  1         706  
9              
10             my %topics;
11              
12              
13             sub add_topic_subscriber {
14 2     2 1 80 my ( $self, $topic ) = @_;
15 2         10 $topics{ $topic }{ refaddr $self } = $self;
16 2         5 return;
17             }
18              
19              
20             sub remove_topic_subscriber {
21 0     0 1 0 my ( $self, $topic ) = @_;
22 0         0 delete $topics{ $topic }{ refaddr $self };
23 0         0 return;
24             }
25              
26              
27             sub publish_topic_message {
28 1     1 1 38 my ( $self, $topic, $message ) = @_;
29 1         6 my @parts = split m{/}, $topic;
30 1         4 my @topics = map { join '/', @parts[0..$_] } 0..$#parts;
  1         5  
31 1         3 for my $topic ( @topics ) {
32 1         1 $_->send( $message ) for values %{ $topics{ $topic } };
  1         7  
33             }
34 1         296 return;
35             }
36              
37              
38             sub route_websocket_sub {
39 2     2 0 18539 my ( $c ) = @_;
40 2         10 Mojo::IOLoop->stream($c->tx->connection)->timeout(1200);
41              
42 2         179 my $topic = $c->stash( 'topic' );
43 2         48 $c->add_topic_subscriber( $topic );
44              
45             $c->on( finish => sub {
46 0     0   0 my ( $c ) = @_;
47 0         0 $c->remove_topic_subscriber( $topic );
48 2         21 } );
49             };
50              
51              
52             sub route_websocket_pub {
53 1     1 0 11289 my ( $c ) = @_;
54 1         5 Mojo::IOLoop->stream($c->tx->connection)->timeout(1200);
55              
56 1         102 my $topic = $c->stash( 'topic' );
57             $c->on( message => sub {
58 1     1   4295 my ( $c, $message ) = @_;
59 1         16 $c->publish_topic_message( $topic, $message );
60 1         18 } );
61             }
62              
63             sub startup {
64 1     1 1 14547 my ( $app ) = @_;
65 1         6 $app->helper( add_topic_subscriber => \&add_topic_subscriber );
66 1         24 $app->helper( remove_topic_subscriber => \&remove_topic_subscriber );
67 1         27 $app->helper( publish_topic_message => \&publish_topic_message );
68 1         22 my $r = $app->routes;
69 1         18 $r->websocket( '/sub/*topic' )->to( cb => \&route_websocket_sub )->name( 'sub' );
70 1         458 $r->websocket( '/pub/*topic' )->to( cb => \&route_websocket_pub )->name( 'pub' );
71              
72 1 50       232 if ( $app->mode eq 'development' ) {
73             # Enable the example app
74 1         11 $app->home->parse( catdir( dirname( __FILE__ ), 'Broker' ) );
75 1         93 $app->static->paths->[0] = $app->home->rel_dir('public');
76 1         27 $app->renderer->paths->[0] = $app->home->rel_dir('templates');
77 1     0   21 $r->get( '/' )->to( cb => sub { shift->render( 'index' ) } );
  0            
78             }
79             }
80              
81             1;
82              
83             __END__