File Coverage

blib/lib/Plack/Handler/Message/Passing.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Plack::Handler::Message::Passing;
2 1     1   4088 use Moose;
  0            
  0            
3             use AnyEvent;
4             use Message::Passing::Output::ZeroMQ;
5             use Message::Passing::Input::ZeroMQ;
6             use JSON qw/ encode_json decode_json /;
7             use Try::Tiny qw/ try catch /;
8             use Plack::Middleware::BufferedStreaming;
9             use namespace::autoclean;
10              
11             with 'Message::Passing::Role::Output';
12              
13             has app => (
14             is => 'rw',
15             );
16              
17             has [qw/ host port /] => (
18             is => 'ro',
19             );
20              
21             has output_to => (
22             is => 'ro',
23             default => sub { {} },
24             );
25              
26             sub get_output_to {
27             my ($self, $address) = @_;
28             $self->output_to->{$address} ||= Message::Passing::Output::ZeroMQ->new(
29             connect => $address,
30             socket_type => 'PUB',
31             );
32             }
33              
34             sub consume {
35             my ($self, $msg) = @_;
36             my $env = decode_json($msg);
37             my $errors;
38             open(my $error_fh, '>', \$errors) or die $!;
39             $env->{'psgi.errors'} = $error_fh;
40             my $input = delete($env->{'psgi.input'}) || '';
41             open(my $input_fh, '<', \$input) or die $!;
42             $env->{'psgi.input'} = $input_fh;
43             my $clientid = $env->{'psgix.message.passing.clientid'};
44             my $reply_to = $env->{'psgix.message.passing.returnaddress'};
45             my $res;
46             try { $res = $self->app->($env) }
47             catch {
48             my $exception = "Caught exception: $_ - request aborted\n";
49             $errors .= $exception;
50             my $html = qq{<html><head><title>Internal server error</title></head>
51             <body><h1>Internal Server Error</h1><pre>$exception</pre></body>
52             </html>
53             };
54             $res = [
55             500,
56             [
57             'Content-Type' => 'text/html',
58             'Content-Length' => length($html),
59             ],
60             [ $html ]
61             ];
62             };
63             my $return_data = encode_json({
64             clientid => $clientid,
65             response => $res,
66             errors => $errors,
67             });
68             my $output_to = $self->get_output_to($reply_to);
69             $output_to->consume($return_data);
70             }
71              
72             sub run {
73             my ($self, $app) = @_;
74             my $buffered = Plack::Middleware::BufferedStreaming->wrap($app);
75             $self->app($buffered);
76             if (!$self->host) {
77             die "Please specify --host with the send_address passed to Plack::App::Message::Passing\n";
78             }
79             if (!$self->port) {
80             die "Please specify --port with the send_address port passed to Plack::App::Message::Passing\n";
81             }
82             my $connect_address = sprintf('tcp://%s:%s', $self->host, $self->port);
83             my $input = Message::Passing::Input::ZeroMQ->new(
84             connect => $connect_address,
85             socket_type => 'PULL',
86             output_to => $self,
87             );
88             AnyEvent->condvar->recv;
89             }
90              
91             __PACKAGE__->meta->make_immutable;
92             1;
93              
94             =head1 NAME
95              
96             Plack::Handler::Message::Passing - handles PSGI requests sent via Message::Passing
97              
98             =head1 SYNOPSIS
99              
100             plackup -E production -s Message::Passing testapp.psgi --host 127.0.0.1 --port 5556
101              
102             =head1 DESCRIPTION
103              
104             Connects via ZeroMQ to an instance of L<Plack::App::Message::Passing>, and
105             inflates a PSGI request from JSON, then runs it against a real application.
106              
107             Returns the PSGI response and error stream to the parent handler
108              
109             =head1 SEE ALSO
110              
111             =over
112              
113             =item L<Message::Passing::PSGI>
114              
115             =item L<Plack::App::Message::Passing>
116              
117             =back
118              
119             =head1 AUTHOR, COPYRIGHT AND LICENSE
120              
121             See L<Message::Passing::PSGI>
122              
123             =cut
124