File Coverage

blib/lib/Event/Distributor.pm
Criterion Covered Total %
statement 56 59 94.9
branch 6 8 75.0
condition n/a
subroutine 17 18 94.4
pod 7 8 87.5
total 86 93 92.4


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014-2018 -- leonerd@leonerd.org.uk
5              
6             package Event::Distributor;
7              
8 2     2   58848 use strict;
  2         6  
  2         41  
9 2     2   8 use warnings;
  2         2  
  2         56  
10              
11             our $VERSION = '0.05';
12              
13 2     2   7 use Carp;
  2         3  
  2         82  
14 2     2   726 use Syntax::Keyword::Try;
  2         930  
  2         8  
15              
16 2     2   589 use Future;
  2         7338  
  2         39  
17              
18 2     2   602 use Event::Distributor::Signal;
  2         4  
  2         40  
19 2     2   595 use Event::Distributor::Action;
  2         3  
  2         38  
20 2     2   580 use Event::Distributor::Query;
  2         3  
  2         938  
21              
22             =head1 NAME
23              
24             C - a simple in-process pub/sub mechanism
25              
26             =head1 SYNOPSIS
27              
28             use Event::Distributor;
29              
30             my $dist = Event::Distributor->new;
31              
32             $dist->declare_signal( "announce" );
33              
34              
35             $dist->subscribe_sync( announce => sub {
36             my ( $message ) = @_;
37             say $message;
38             });
39              
40             $dist->subscribe_async( announce => sub {
41             my ( $message ) = @_;
42             return $async_http->POST( "http://server/message", $message );
43             });
44              
45              
46             $dist->fire_sync( announce => "Hello, world!" );
47              
48             =head1 DESCRIPTION
49              
50             Instances of this class provide a simple publish/subscribe mechanism within a
51             single process, for either synchronous or L-based asynchronous use.
52              
53             A given instance has a set of named events. Subscribers are C references
54             attached to a named event. Publishers can declare the existence of a named
55             event, and then later invoke it by passing in arguments, which are distributed
56             to all of the subscribers of that named event.
57              
58             It is specifically I an error to request to subscribe an event that has
59             not yet been declared, in order to allow multiple modules of code to be loaded
60             and subscribe events the others publish, without introducing loading order
61             dependencies. An event only needs to be declared by the time it is fired.
62              
63             Natively all of the events provided by the distributor are fully-asynchronous
64             in nature. Each subscriber is expected to return a L instance which
65             will indicate its completion; the results of these are merged into a single
66             future returned by the fire method itself. However, to support synchronous or
67             semi-synchronous programs using it, both the observe and invoke methods also
68             have a synchronous variant. Note however, that this module does not provide
69             any kind of asynchronous detachment of synchronous functions; using the
70             L method to subscribe a long-running blocking function will
71             cause the C methods to block until that method returns. To achieve a
72             truely-asynchronous experience the attached code will need to use some kind of
73             asynchronous event system.
74              
75             This module is very-much a work-in-progress, and many ideas may still be added
76             or changed about it. It is the start of a concrete implementaion of some of
77             the ideas in my "Event-Reflexive Programming" series of blog posts. See the
78             L and L sections for more detail.
79              
80             =head1 EVENTS
81              
82             Each of the events known by a distributor has a name. Conceptually each also
83             has a type. Currently there are three types of event, a "signal", an "action",
84             and a "query".
85              
86             =over 2
87              
88             =item *
89              
90             A signal event simply informs subscribers that some event or condition has
91             occurred. Additional arguments can be passed from the invoker to the
92             subscribers, but subscriptions are not expected to return a meaningful value,
93             nor does firing this event return a value. All subscriber functions are
94             invoked sequentually and synchronously by a C method (though, of
95             course, asynchronous subscribers synchronously return a future instance, which
96             allows them to continue working asynchronously).
97              
98             =item *
99              
100             An action event requires a single subscriber, and represents a request from
101             the invoker to the subscriber to perform some activity. This behaves much like
102             a regular (L-returning) method call, except that the indirection
103             mechanism of the distributor allows a more flexible method of connection
104             between the two sides.
105              
106             =item *
107              
108             A query event invokes subscriber code expecting a successful return, returning
109             the first result that is successful. If a synchronous subscriber returns a
110             result, or if an asynchronous one returns a successful immediate Future, then
111             no further subscribers are invoked, and that result is taken immediately. Any
112             other pending Futures are then cancelled.
113              
114             =back
115              
116             =cut
117              
118             sub new
119             {
120 5     5 0 1639 my $class = shift;
121              
122 5         13 my $self = bless {
123             events => {},
124             pre_registration => {},
125             }, $class;
126              
127 5         9 return $self;
128             }
129              
130             =head1 METHODS
131              
132             =cut
133              
134             sub _add_event
135             {
136 5     5   6 my $self = shift;
137 5         6 my ( $name, $event ) = @_;
138              
139 5 50       13 $self->{events}{$name} and
140             croak "Cannot declare an event '$name' a second time";
141              
142 5         7 $self->{events}{$name} = $event;
143              
144 5 100       13 if( my $subs = delete $self->{pre_registration}{$name} ) {
145 1         3 $event->subscribe( $_ ) for @$subs;
146             }
147             }
148              
149             =head2 declare_signal
150              
151             $distributor->declare_signal( $name )
152              
153             Declares a new "signal" event of the given name.
154              
155             =cut
156              
157             sub declare_signal
158             {
159 4     4 1 13 my $self = shift;
160 4         6 my ( $name ) = @_;
161              
162 4         17 $self->_add_event( $name, Event::Distributor::Signal->new );
163             }
164              
165             =head2 declare_action
166              
167             $distributor->declare_action( $name )
168              
169             I
170              
171             Declares a new "action" event of the given name.
172              
173             =cut
174              
175             sub declare_action
176             {
177 0     0 1 0 my $self = shift;
178 0         0 my ( $name ) = @_;
179              
180 0         0 $self->_add_event( $name, Event::Distributor::Action->new );
181             }
182              
183             =head2 declare_query
184              
185             $distributor->declare_query( $name )
186              
187             I
188              
189             Declares a new "query" event of the given name.
190              
191             =cut
192              
193             sub declare_query
194             {
195 1     1 1 5 my $self = shift;
196 1         2 my ( $name ) = @_;
197              
198 1         9 $self->_add_event( $name, Event::Distributor::Query->new );
199             }
200              
201             =head2 subscribe_async
202              
203             $distributor->subscribe_async( $name, $code )
204              
205             Adds a new C reference to the list of subscribers for the named event.
206             This subscriber is expected to return a L that will eventually yield
207             its result.
208              
209             When invoked the code will be passed the distributor object itself and the
210             list of arguments, and is expected to return a L.
211              
212             $f = $code->( $distributor, @args )
213              
214             =cut
215              
216             sub subscribe_async
217             {
218 6     6 1 19 my $self = shift;
219 6         9 my ( $name, $code ) = @_;
220              
221 6 100       14 if( my $event = $self->{events}{$name} ) {
222 5         14 $event->subscribe( $code );
223             }
224             else {
225 1         2 push @{ $self->{pre_registration}{$name} }, $code;
  1         4  
226             }
227             }
228              
229             =head2 subscribe_sync
230              
231             $distributor->subscribe_sync( $name, $code )
232              
233             Adds a new C reference to the list of subscribers for the named event.
234             This subscriber is expected to perform its work synchronously and return its
235             result immediately.
236              
237             In non-blocking or asynchronous applications, this method should only be used
238             for simple subscribers which can immediately return having completed their
239             work. If the work is likely to take some time by blocking on external factors,
240             consider instead using the L method.
241              
242             When invoked the code will be passed the distributor object itself and the
243             list of arguments.
244              
245             $code->( $distributor, @args )
246              
247             =cut
248              
249             sub subscribe_sync
250             {
251 4     4 1 20 my $self = shift;
252 4         7 my ( $name, $code ) = @_;
253              
254             $self->subscribe_async( $name, sub {
255 4     4   16 my @args = @_;
256             try {
257             return Future->done( $code->( @args ) );
258             }
259 4         7 catch {
260             return Future->fail( $@ );
261             }
262 4         13 });
263             }
264              
265             =head2 fire_async
266              
267             $f = $distributor->fire_async( $name, @args )
268              
269             Invokes the named event, passing the arguments to the subscriber functions.
270             This function returns as soon as all the subscriber functions have been
271             invoked, returning a L that will eventually complete when all the
272             futures returned by the subscriber functions have completed.
273              
274             =cut
275              
276             sub fire_async
277             {
278 5     5 1 10 my $self = shift;
279 5         8 my ( $name, @args ) = @_;
280              
281 5 50       10 my $event = $self->{events}{$name} or
282             croak "Cannot fire an event '$name' when it doesn't exist";
283              
284 5         13 $event->fire( $self, @args );
285             }
286              
287             =head2 fire_sync
288              
289             $distributor->fire_sync( $name, @args )
290              
291             Invokes the named event, passing the arguments to the subscriber functions.
292             This function synchronously waits until all the subscriber futures have
293             completed, and will return once they have all done so.
294              
295             Note that since this method calls the C method on the Future instance
296             returned by L, it is required that this either be an immediate, or
297             be some subclass that can actually perform the await operation. This should be
298             the case if it is provided by an event framework or similar, or custom
299             application logic.
300              
301             =cut
302              
303             sub fire_sync
304             {
305 3     3 1 9 my $self = shift;
306 3         5 $self->fire_async( @_ )->get;
307             }
308              
309             =head1 TODO
310              
311             Some of these ideas appear in the "Event-Reflexive Progamming" series of blog
312             posts, and may be suitable for implementation here. All of these ideas are
313             simply for consideration; there is no explicit promise that any of these will
314             actually be implemented.
315              
316             =over 4
317              
318             =item *
319              
320             Unsubscription from events.
321              
322             =item *
323              
324             Define (or document the lack of) ordering between subscriptions of a given
325             event.
326              
327             =item *
328              
329             Refine the failure-handling semantics of signals.
330              
331             =item *
332              
333             Ability to invoke signals after the current one is finished, by deferring the
334             C method. Should this be a new C method, or a property of the
335             signal itself?
336              
337             =item *
338              
339             More control over the semantics of value-returning events - scatter/map/gather
340             pattern.
341              
342             =item *
343              
344             Sub-heirarchies of events.
345              
346             =item *
347              
348             Subclasses for specific event frameworks (L).
349              
350             =item *
351              
352             Subclasses (or other behaviours) for out-of-process event serialisation and
353             subscribers.
354              
355             =item *
356              
357             Event parameter filtering mechanics - allows parametric heirarchies,
358             instrumentation logging, efficient out-of-process subscribers.
359              
360             =back
361              
362             =head1 SEE ALSO
363              
364             =over 4
365              
366             =item L
367              
368             =back
369              
370             =head1 AUTHOR
371              
372             Paul Evans
373              
374             =cut
375              
376             0x55AA;