File Coverage

blib/lib/Mojo/WebSocketProxy/Backend/JobAsync.pm
Criterion Covered Total %
statement 66 75 88.0
branch 4 8 50.0
condition 7 20 35.0
subroutine 14 14 100.0
pod 3 3 100.0
total 94 120 78.3


line stmt bran cond sub pod time code
1             package Mojo::WebSocketProxy::Backend::JobAsync;
2              
3 1     1   601084 use strict;
  1         8  
  1         27  
4 1     1   6 use warnings;
  1         2  
  1         26  
5              
6 1     1   4 use parent qw(Mojo::WebSocketProxy::Backend);
  1         2  
  1         5  
7              
8 1     1   38 no indirect;
  1         2  
  1         4  
9              
10 1     1   397 use DataDog::DogStatsd::Helper qw(stats_inc);
  1         2420  
  1         55  
11 1     1   7 use IO::Async::Loop::Mojo;
  1         1  
  1         23  
12 1     1   345 use Job::Async;
  1         122544  
  1         46  
13 1     1   7 use JSON::MaybeUTF8 qw(encode_json_utf8 decode_json_utf8);
  1         3  
  1         60  
14 1     1   6 use Log::Any qw($log);
  1         2  
  1         10  
15 1     1   594 use MojoX::JSON::RPC::Client;
  1         2078  
  1         5  
16              
17             our $VERSION = '0.12'; ## VERSION
18              
19             __PACKAGE__->register_type('job_async');
20              
21             =head1 NAME
22              
23             Mojo::WebSocketProxy::Backend::JobAsync
24              
25             =head1 DESCRIPTION
26              
27             A subclass of L which dispatches RPC requests
28             via L.
29              
30             =cut
31              
32             =head1 CLASS METHODS
33              
34             =head2 new
35              
36             Returns a new instance. Required params:
37              
38             =over 4
39              
40             =item loop => IO::Async::Loop
41              
42             Containing L instance.
43              
44             =item jobman => Job::Async
45              
46             Optional L instance.
47              
48             =item client => Job::Async::Client
49              
50             Optional L instance. Will be constructed from
51             C<< $jobman->client >> if not provided.
52              
53             =back
54              
55             =cut
56              
57             sub new {
58 1     1 1 3 my ($class, %args) = @_;
59             # Avoid holding these - we only want the Job::Async::Client instance, and everything else
60             # should be attached to the loop (which sticks around longer than we expect to).
61 1         3 delete $args{loop};
62 1         2 delete $args{jobman};
63              
64 1         2 my $self = bless \%args, $class;
65              
66 1         5 return $self;
67             }
68              
69             =head1 METHODS
70              
71             =cut
72              
73             =head2 client
74              
75             $client = $backend->client
76              
77             Returns the L instance.
78              
79             =cut
80              
81 4     4 1 25 sub client { return shift->{client} }
82              
83             =head2 call_rpc
84              
85             Implements the L interface.
86              
87             =cut
88              
89             sub call_rpc {
90 2     2 1 6 my ($self, $c, $req_storage) = @_;
91 2         5 my $method = $req_storage->{method};
92 2   33     10 my $msg_type = $req_storage->{msg_type} ||= $req_storage->{method};
93              
94             # We'd like to provide some flexibility for people trying to integrate this into
95             # other systems, so any combination of Job::Async::Client, Job::Async and/or IO::Async::Loop
96             # instance can be provided here.
97 2   33     12 $self->{client} //= do {
98             # We don't hold a ref to this, since that might introduce unfortunate cycles
99 0   0     0 $self->{loop} //= do {
100 0         0 require IO::Async::Loop::Mojo;
101 0         0 local $ENV{IO_ASYNC_LOOP} = 'IO::Async::Loop::Mojo';
102 0         0 IO::Async::Loop->new;
103             };
104 0         0 $self->{loop}->add(my $jobman = Job::Async->new);
105              
106             # Let's not pull it in unless we have it already, but we do want to avoid sharing number
107             # sequences in forked workers.
108 0 0       0 Math::Random::Secure::srand() if Math::Random::Secure->can('srand');
109 0         0 my $client_job = $jobman->client(redis => $self->{redis});
110 0         0 $client_job->start->retain;
111 0         0 $client_job;
112             };
113              
114 2   50     21 $req_storage->{call_params} ||= {};
115 2         15 my $rpc_response_cb = $self->get_rpc_response_cb($c, $req_storage);
116              
117 2   50     7 my $before_get_rpc_response_hook = delete($req_storage->{before_get_rpc_response}) || [];
118 2   50     6 my $after_got_rpc_response_hook = delete($req_storage->{after_got_rpc_response}) || [];
119 2   50     6 my $before_call_hook = delete($req_storage->{before_call}) || [];
120 2         7 my $params = $self->make_call_params($c, $req_storage);
121 2         15 $log->debugf("method %s has params = %s", $method, $params);
122 2         9 $_->($c, $req_storage) for @$before_call_hook;
123             $self->client->submit(
124             name => $req_storage->{name},
125             params => encode_json_utf8($params)
126             )->on_ready(
127             sub {
128 2     2   3508 my ($f) = @_;
129 2         9 $log->debugf('->submit completion: ', $f->state);
130              
131 2         19 $_->($c, $req_storage) for @$before_get_rpc_response_hook;
132              
133             # unconditionally stop any further processing if client is already disconnected
134              
135 2 50 33     11 return Future->done unless $c and $c->tx;
136              
137 2         13 my $api_response;
138              
139 2 100       7 if ($f->is_done) {
140 1         8 my $result = MojoX::JSON::RPC::Client::ReturnObject->new(rpc_response => decode_json_utf8($f->get));
141              
142 1         32 $_->($c, $req_storage, $result) for @$after_got_rpc_response_hook;
143              
144 1         5 $api_response = $rpc_response_cb->($result->result);
145 1         6 stats_inc("rpc_queue.client.jobs.success", {tags => ["rpc:" . $req_storage->{name}, 'clientID:' . $self->client->id]});
146             } else {
147 1         8 my ($failure) = $f->failure;
148 1         12 $log->warnf("method %s failed: %s", $method, $failure);
149             stats_inc("rpc_queue.client.jobs.fail",
150 1         6 {tags => ["rpc:" . $req_storage->{name}, 'clientID:' . $self->client->id, 'error:' . $failure]});
151              
152 1         89 $api_response = $c->wsp_error($msg_type, 'WrongResponse', 'Sorry, an error occurred while processing your request.');
153             }
154              
155 2 50       26219 return unless $api_response;
156              
157 2         69 $c->send({json => $api_response}, $req_storage);
158 2         6 })->retain;
159 2         1391 return;
160             }
161              
162             1;