File Coverage

blib/lib/Plack/App/Message/Passing.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Plack::App::Message::Passing;
2 2     2   102786 use Moose;
  0            
  0            
3             use Scalar::Util qw/ weaken refaddr /;
4             use Message::Passing::Input::ZeroMQ;
5             use Message::Passing::Output::ZeroMQ;
6             use JSON qw/ encode_json decode_json /;
7             use namespace::autoclean;
8              
9             with qw/
10             Message::Passing::Role::Input
11             Message::Passing::Role::Output
12             /;
13              
14             has return_address => (
15             isa => 'Str',
16             is => 'ro',
17             required => 1,
18             );
19              
20             has input => (
21             is => 'ro',
22             lazy => 1,
23             default => sub {
24             my $self = shift;
25             weaken($self);
26             Message::Passing::Input::ZeroMQ->new(
27             socket_bind => $self->return_address,
28             socket_type => 'SUB',
29             output_to => $self,
30             );
31             },
32             );
33              
34             has send_address => (
35             isa => 'Str',
36             is => 'ro',
37             required => 1,
38             );
39              
40             has '+output_to' => (
41             lazy => 1,
42             default => sub {
43             my $self = shift;
44             Message::Passing::Output::ZeroMQ->new(
45             socket_bind => $self->send_address,
46             socket_type => 'PUSH',
47             );
48             },
49             );
50              
51             has in_flight => (
52             isa => 'HashRef',
53             is => 'ro',
54             default => sub { {} },
55             );
56              
57             sub BUILD {
58             my $self = shift;
59             $self->input; # Build attribute.
60             }
61              
62             sub _handle_request {
63             my ($self, $base_env) = @_;
64             weaken($self);
65             my $env = {%$base_env};
66             die("You need to use a non-blocking server, such as Twiggy")
67             unless delete $env->{'psgi.nonblocking'};
68             delete $env->{'psgi.errors'};
69             delete $env->{'psgix.io'};
70             my $input_fh = delete $env->{'psgi.input'};
71             my $input = '';
72             my $len = 0;
73             do {
74             $len = $input_fh->read(my $buf, 4096);
75             $input .= $buf;
76             } while ($len);
77             $env->{'psgi.input'} = $input;
78             delete $env->{'psgi.streaming'};
79             $env->{'psgix.message.passing.clientid'} = refaddr($base_env);
80             $env->{'psgix.message.passing.returnaddress'} = $self->return_address;
81             $self->output_to->consume(encode_json $env);
82             return sub {
83             my $responder = shift;
84             $self->in_flight->{refaddr($base_env)} = [$base_env, $responder];
85             }
86             }
87              
88             sub to_app {
89             my $self = shift;
90             weaken($self);
91             sub {
92             my $env = shift;
93             $self->_handle_request($env);
94             };
95             }
96              
97             sub consume {
98             my ($self, $message) = @_;
99             $message = decode_json $message;
100             my $clientid = $message->{clientid};
101             my ($env, $responder) = @{ delete($self->in_flight->{$clientid}) };
102             if (length $message->{errors}) {
103             $env->{'psgi.errors'}->print($message->{errors});
104             }
105             $responder->($message->{response});
106             }
107              
108             __PACKAGE__->meta->make_immutable;
109             1;
110              
111             =head1 NAME
112              
113             Plack::App::Message::Passing - Send a PSGI environment via Message::Passing
114              
115             =head1 SYNOPSIS
116              
117             # Note that the -e has to all be on one line!
118             plackup -E production -s Twiggy -MPlack::App::Message::Passing -e'Plack::App::Message::Passing->new(return_address => "tcp://127.0.0.1:5555", send_address => "tcp://127.0.0.1:5556")->to_app'
119              
120             =head1 DESCRIPTION
121              
122             A L<PSGI> application which serializes the PSGI request as JSON, sends
123             it via ZeroMQ.
124              
125             Used with L<Plack::Handler::Message::Passing>, which inflates a PSGI
126             request from JSON, runs it against a real application, and returns the
127             results.
128              
129             =head1 SEE ALSO
130              
131             =over
132              
133             =item L<Message::Passing::PSGI>
134              
135             =item L<Plack::Handler::Message::Passing>
136              
137             =back
138              
139             =head1 AUTHOR, COPYRIGHT AND LICENSE
140              
141             See L<Message::Passing::PSGI>
142              
143             =cut
144