File Coverage

blib/lib/Mojo/WebSocketProxy/Backend/JobAsync.pm
Criterion Covered Total %
statement 55 61 90.1
branch 4 8 50.0
condition 7 20 35.0
subroutine 11 11 100.0
pod 3 3 100.0
total 80 103 77.6


line stmt bran cond sub pod time code
1             package Mojo::WebSocketProxy::Backend::JobAsync;
2              
3 1     1   519283 use strict;
  1         10  
  1         22  
4 1     1   5 use warnings;
  1         2  
  1         22  
5              
6 1     1   5 use parent qw(Mojo::WebSocketProxy::Backend);
  1         2  
  1         3  
7              
8 1     1   38 no indirect;
  1         1  
  1         3  
9              
10 1     1   37 use IO::Async::Loop::Mojo;
  1         2  
  1         18  
11 1     1   330 use Job::Async;
  1         112361  
  1         40  
12              
13 1     1   9 use Log::Any qw($log);
  1         2  
  1         9  
14              
15             our $VERSION = '0.11'; ## VERSION
16              
17             __PACKAGE__->register_type('job_async');
18              
19             =head1 NAME
20              
21             Mojo::WebSocketProxy::Backend::JobAsync
22              
23             =head1 DESCRIPTION
24              
25             A subclass of L which dispatches RPC requests
26             via L.
27              
28             =cut
29              
30             =head1 CLASS METHODS
31              
32             =head2 new
33              
34             Returns a new instance. Required params:
35              
36             =over 4
37              
38             =item loop => IO::Async::Loop
39              
40             Containing L instance.
41              
42             =item jobman => Job::Async
43              
44             Optional L instance.
45              
46             =item client => Job::Async::Client
47              
48             Optional L instance. Will be constructed from
49             C<< $jobman->client >> if not provided.
50              
51             =back
52              
53             =cut
54              
55             sub new {
56 1     1 1 3 my ($class, %args) = @_;
57             # Avoid holding these - we only want the Job::Async::Client instance, and everything else
58             # should be attached to the loop (which sticks around longer than we expect to).
59 1         2 my $loop = delete $args{loop};
60 1         2 my $jobman = delete $args{jobman};
61              
62 1         2 my $self = bless \%args, $class;
63              
64             # We'd like to provide some flexibility for people trying to integrate this into
65             # other systems, so any combination of Job::Async::Client, Job::Async and/or IO::Async::Loop
66             # instance can be provided here.
67 1   33     8 $self->{client} //= do {
68 0 0       0 unless ($jobman) {
69             # We don't hold a ref to this, since that might introduce unfortunate cycles
70 0   0     0 $loop //= do {
71 0         0 require IO::Async::Loop::Mojo;
72 0         0 IO::Async::Loop::Mojo->new;
73             };
74 0         0 $loop->add($jobman = Job::Async->new);
75             }
76              
77 0         0 $jobman->client;
78             };
79 1         4 return $self;
80             }
81              
82             =head1 METHODS
83              
84             =cut
85              
86             =head2 client
87              
88             $client = $backend->client
89              
90             Returns the L instance.
91              
92             =cut
93              
94 2     2 1 7 sub client { return shift->{client} }
95              
96             =head2 call_rpc
97              
98             Implements the L interface.
99              
100             =cut
101              
102             sub call_rpc {
103 2     2 1 6 my ($self, $c, $req_storage) = @_;
104 2         3 my $method = $req_storage->{method};
105 2   33     9 my $msg_type = $req_storage->{msg_type} ||= $req_storage->{method};
106              
107 2   50     18 $req_storage->{call_params} ||= {};
108              
109 2         12 my $rpc_response_cb = $self->get_rpc_response_cb($c, $req_storage);
110              
111 2   50     6 my $before_get_rpc_response_hook = delete($req_storage->{before_get_rpc_response}) || [];
112 2   50     4 my $after_got_rpc_response_hook = delete($req_storage->{after_got_rpc_response}) || [];
113 2   50     5 my $before_call_hook = delete($req_storage->{before_call}) || [];
114 2         19 my $params = $self->make_call_params($c, $req_storage);
115 2         12 $log->debugf("method %s has params = %s", $method, $params);
116              
117 2         40 $_->($c, $req_storage) for @$before_call_hook;
118              
119             $self->client->submit(data => $req_storage)->on_ready(
120             sub {
121 2     2   3409 my ($f) = @_;
122 2         7 $log->debugf('->submit completion: ', $f->state);
123              
124 2         17 $_->($c, $req_storage) for @$before_get_rpc_response_hook;
125              
126             # unconditionally stop any further processing if client is already disconnected
127 2 50 33     9 return Future->done unless $c and $c->tx;
128              
129 2         11 my $api_response;
130 2 100       5 if ($f->is_done) {
131 1         7 my ($result) = $f->get;
132              
133 1         9 $_->($c, $req_storage, $result) for @$after_got_rpc_response_hook;
134              
135 1         2 $api_response = $rpc_response_cb->($result);
136             } else {
137 1         6 my ($failure) = $f->failure;
138 1         12 $log->warnf("method %s failed: %s", $method, $failure);
139              
140 1         10 $api_response = $c->wsp_error($msg_type, 'WrongResponse', 'Sorry, an error occurred while processing your request.');
141             }
142 2 50       5 return unless $api_response;
143              
144 2         40 $c->send({json => $api_response}, $req_storage);
145 2         6 })->retain;
146 2         1223 return;
147             }
148              
149             1;