File Coverage

blib/lib/Dancer/Plugin/StreamData.pm
Criterion Covered Total %
statement 15 44 34.0
branch 0 10 0.0
condition n/a
subroutine 5 9 55.5
pod 0 2 0.0
total 20 65 30.7


line stmt bran cond sub pod time code
1             package Dancer::Plugin::StreamData;
2              
3 1     1   24170 use strict;
  1         3  
  1         48  
4 1     1   7 use warnings;
  1         1  
  1         39  
5 1     1   6 use Carp;
  1         7  
  1         112  
6              
7 1     1   1785 use Dancer ':syntax';
  1         251917  
  1         7  
8 1     1   798 use Dancer::Plugin;
  1         1344  
  1         438  
9              
10             our $VERSION = '0.9';
11              
12             =head1 NAME
13              
14             Dancer::Plugin::StreamData - stream long responses instead of sending them in one piece
15              
16             =head1 SYNOPSIS
17              
18             package MyWebApp;
19            
20             use Dancer;
21             use Dancer::Plugin::StreamData;
22            
23             get '/some_route' => sub {
24              
25             # ...
26            
27             return stream_data($data_obj, \&stream_my_data);
28             };
29            
30             sub stream_my_data {
31            
32             my ($data_obj, $writer) = @_;
33            
34             while ( $output = $data_obj->get_some_data() )
35             {
36             $writer->write($output);
37             }
38            
39             $writer->close();
40             }
41              
42             =head1 DESCRIPTION
43              
44             This plugin is useful for situations in which a L application wants to
45             return a large set of data such as the results from a database query. This is
46             especially important where the result set might total tens or hundreds of
47             megabytes, which would be awkward to marshall within the memory of a single
48             server process and could lead to a long delay before the start of data
49             delivery.
50              
51             The C function allows the application to stream a response one
52             chunk at a time. For example, the data could be fetched row by row from a
53             database server, with each row processed and then dispatched to the client via
54             the write() method.
55              
56             The reason for this plugin is that the interface defined by PSGI for data
57             streaming is annoyingly complex and difficult to work with. By hiding the
58             complexity, this plugin makes it simple to set up an application which streams
59             long responses instead of marshalling them into a single response message.
60              
61             This plugin can be used with any L compatible web server, and includes a
62             method by which you can check whether the server supports streaming.
63              
64             =head1 USAGE
65              
66             =cut
67              
68             # Between the PSGI interface standard and the way Dancer does things,
69             # streaming a response involves a callback that returns a callback that is
70             # passed a callback, none of which are called with the necessary parameters.
71             # So the easiest way to get the necessary information to the routines that
72             # need it is to store this information in private variables. Not the most
73             # elegant solution, but it works. In fact, Dancer itself stores a lot of
74             # things in private variables.
75              
76             my $stream_object;
77             my $stream_call;
78             my $stream_status;
79             my @stream_headers;
80              
81              
82             =head2 stream_data
83              
84             This function takes two parameters: a data object, and a stream callback. The
85             data object need not contain the data itself; it may be a database handle or
86             other reference by means of which the data will be obtained. The callback
87             can be specified either as a code reference, or as a string. In the latter
88             case, it will be invoked as a method call on the data object.
89              
90             Before calling C, the HTTP status and response headers may be set
91             by the usual mechanisms of Dancer. A call to C will terminate
92             route processing, analagous to C. Any further code in the route
93             handler will be ignored. If an 'after' hook is defined in this app, it will
94             be called as usual after route processing and may modify the response status
95             and/or headers.
96              
97             The callback is invoked after the response headers have been sent. Its job is
98             to stream the body of the response. The callback is passed two parameters:
99             the data object, and a 'writer' object.
100              
101             =cut
102              
103             # This is the main symbol that we export using the 'register' mechanism of
104             # Dancer::Plugin.pm. It takes two parameters: an arbitrary Perl reference
105             # (the "data"), and a routine to be called in order to stream it. The latter
106             # can be specified either as a string value, in which case it is taken to be a
107             # method name and invoked on the data reference, or it can be a code
108             # reference. The data reference might contain, e.g. a database handle from
109             # which data is to be read and the results streamed to the client.
110              
111             register 'stream_data' => sub {
112            
113 0     0     my ($data, $call) = @_;
114            
115             # First make sure that the server supports streaming
116            
117 0           my $env = Dancer::SharedData->request->env;
118 0 0         unless ( $env->{'psgi.streaming'} ) {
119 0           croak 'Sorry, this server does not support PSGI streaming.';
120             }
121            
122             # Store the parameters for later use by stream_callback()
123            
124 0           $stream_object = $data;
125 0           $stream_call = $call;
126            
127             # Clear the global variables that we used to preserve the status code
128             # and content type.
129            
130 0           $stream_status = undef;
131 0           @stream_headers = ();
132            
133             # Indicate to Dancer that the response will be streamed, and specify a
134             # callback to set up the streaming.
135            
136 0           my $resp = Dancer::SharedData::response;
137 0           $resp->streamed(\&prepare_stream);
138            
139 0           my $c = Dancer::Continuation::Route::FileSent->new(return_value => $resp);
140 0           $c->throw;
141             };
142              
143              
144             # This routine will be called by Dancer, and will be passed the status code
145             # and headers that have been determined for the response being assembled. Its
146             # job is to return a callback that will in turn be called at the proper time
147             # to begin streaming the data. Unfortunately, it will be called *twice*, the
148             # second time with an improper status code and headers. Consequently, we must
149             # ignore the second invocation.
150              
151             sub prepare_stream {
152              
153 0     0 0   my ($status, $headers) = @_;
154            
155             # Store the status and headers we were given, because the callback that
156             # does the actual streaming will have to present them directly to the PSGI
157             # interface. We have no way of actually getting that information to it
158             # other than a private variable (declared above).
159            
160             # The variable $stream_status is made undefined by the stream_data()
161             # function (see above) and so we only set it if it has not been set
162             # since. This gets around the problem of this routine (prepare_stream())
163             # being called twice.
164            
165 0 0         if ( !defined $stream_status )
166             {
167 0           $stream_status = $status;
168 0           @stream_headers = ();
169            
170             # We filter the headers to remove content-length, since we don't
171             # necessarily know what the content length is going to be (that's one
172             # of the advantages of using this module).
173            
174 0           for ( my $i = 0; $i < @$headers; $i = $i + 2 )
175             {
176 0 0         if ( $headers->[$i] !~ /content-length/i )
177             {
178 0           push @stream_headers, $headers->[$i];
179 0           push @stream_headers, $headers->[$i+1];
180             }
181             }
182             }
183            
184             # Tell Dancer that it should call the function stream_callback() when
185             # ready for streaming to begin.
186            
187 0           return \&stream_callback;
188             }
189              
190             =pod
191              
192             The writer object, as specified by L, implements two methods:
193              
194             =head3 write
195              
196             Sends its argument immediately to the client as the next piece of the response.
197             You can call this method as many times as necessary to send all of the data.
198              
199             =head3 close
200              
201             Closes the connection to the client, terminating the response. It is
202             important to call C at the end of processing, otherwise the client will
203             erroneously report that the connection was closed prematurely before all of
204             the data was sent.
205              
206             =cut
207              
208             # This subroutine is called at the proper time for data streaming to begin.
209             # It is passed a callback according to the PSGI standard that can be called to
210             # procure a writer object to which we can actually write the data a chunk at a
211             # time. As each chunk is written, it is sent off to the client as part of the
212             # response body.
213              
214             sub stream_callback {
215            
216             # Grab the callback, which is the first parameter.
217            
218 0     0 0   my $psgi_callback = shift;
219            
220             # Use the callback we were given to procure a writer object, and in the
221             # process pass the status and headers stored by prepare_stream() above.
222             # This will cause the HTTP response to be emitted, with a keep-alive
223             # header so that the client will know to wait for more data to come.
224            
225 0           my $writer = $psgi_callback->( [ $stream_status, \@stream_headers ] );
226            
227             # Now we call the routine specified in the original call to stream_data.
228             # If it was given as a code reference, we call it and pass in the "data"
229             # object as the first parameter. Otherwise, we use it as a method name
230             # and invoke it on the "data" object. In either case, we pass the writer
231             # object as a parameter.
232            
233 0 0         if ( ref $stream_call eq 'CODE' )
234             {
235 0           $stream_call->($stream_object, $writer);
236             }
237            
238             else
239             {
240 0           $stream_object->$stream_call($writer);
241             }
242             }
243              
244              
245             =head2 server_supports_streaming
246              
247             This function returns true if the server you are working with supports
248             PSGI-style streaming, false otherwise.
249              
250             Here is an example of how you might use it:
251              
252             if ( server_supports_streaming ) {
253             stream_data($query, 'streamResult');
254             } else {
255             return $query->generateResult();
256             }
257              
258             =cut
259              
260             register 'server_supports_streaming' => sub {
261            
262 0     0     my $env = Dancer::SharedData->request->env;
263 0 0         return 1 if $env->{'psgi.streaming'};
264 0           return undef; # otherwise
265             };
266              
267              
268             register_plugin;
269             1;
270              
271             __END__