File Coverage

blib/lib/Event/Distributor.pm
Criterion Covered Total %
statement 49 55 89.0
branch 6 8 75.0
condition n/a
subroutine 15 17 88.2
pod 7 8 87.5
total 77 88 87.5


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014-2015 -- leonerd@leonerd.org.uk
5              
6             package Event::Distributor;
7              
8 2     2   35955 use strict;
  2         10  
  2         55  
9 2     2   10 use warnings;
  2         3  
  2         88  
10              
11             our $VERSION = '0.04';
12              
13 2     2   17 use Carp;
  2         4  
  2         131  
14              
15 2     2   1103 use Future;
  2         10926  
  2         54  
16              
17 2     2   1115 use Event::Distributor::Signal;
  2         4  
  2         56  
18 2     2   985 use Event::Distributor::Action;
  2         4  
  2         46  
19 2     2   1040 use Event::Distributor::Query;
  2         5  
  2         1308  
20              
21             =head1 NAME
22              
23             C - a simple in-process pub/sub mechanism
24              
25             =head1 SYNOPSIS
26              
27             use Event::Distributor;
28              
29             my $dist = Event::Distributor->new;
30              
31             $dist->declare_signal( "announce" );
32              
33              
34             $dist->subscribe_sync( announce => sub {
35             my ( $message ) = @_;
36             say $message;
37             });
38              
39             $dist->subscribe_async( announce => sub {
40             my ( $message ) = @_;
41             return $async_http->POST( "http://server/message", $message );
42             });
43              
44              
45             $dist->fire_sync( announce => "Hello, world!" );
46              
47             =head1 DESCRIPTION
48              
49             Instances of this class provide a simple publish/subscribe mechanism within a
50             single process, for either synchronous or L-based asynchronous use.
51              
52             A given instance has a set of named events. Subscribers are C references
53             attached to a named event. Publishers can declare the existence of a named
54             event, and then later invoke it by passing in arguments, which are distributed
55             to all of the subscribers of that named event.
56              
57             It is specifically I an error to request to subscribe an event that has
58             not yet been declared, in order to allow multiple modules of code to be loaded
59             and subscribe events the others publish, without introducing loading order
60             dependencies. An event only needs to be declared by the time it is fired.
61              
62             Natively all of the events provided by the distributor are fully-asynchronous
63             in nature. Each subscriber is expected to return a L instance which
64             will indicate its completion; the results of these are merged into a single
65             future returned by the fire method itself. However, to support synchronous or
66             semi-synchronous programs using it, both the observe and invoke methods also
67             have a synchronous variant. Note however, that this module does not provide
68             any kind of asynchronous detachment of synchronous functions; using the
69             L method to subscribe a long-running blocking function will
70             cause the C methods to block until that method returns. To achieve a
71             truely-asynchronous experience the attached code will need to use some kind of
72             asynchronous event system.
73              
74             This module is very-much a work-in-progress, and many ideas may still be added
75             or changed about it. It is the start of a concrete implementaion of some of
76             the ideas in my "Event-Reflexive Programming" series of blog posts. See the
77             L and L sections for more detail.
78              
79             =head1 EVENTS
80              
81             Each of the events known by a distributor has a name. Conceptually each also
82             has a type. Currently there are three types of event, a "signal", an "action",
83             and a "query".
84              
85             =over 2
86              
87             =item *
88              
89             A signal event simply informs subscribers that some event or condition has
90             occurred. Additional arguments can be passed from the invoker to the
91             subscribers, but subscriptions are not expected to return a meaningful value,
92             nor does firing this event return a value. All subscriber functions are
93             invoked sequentually and synchronously by a C method (though, of
94             course, asynchronous subscribers synchronously return a future instance, which
95             allows them to continue working asynchronously).
96              
97             =item *
98              
99             An action event requires a single subscriber, and represents a request from
100             the invoker to the subscriber to perform some activity. This behaves much like
101             a regular (L-returning) method call, except that the indirection
102             mechanism of the distributor allows a more flexible method of connection
103             between the two sides.
104              
105             =item *
106              
107             A query event invokes subscriber code expecting a successful return, returning
108             the first result that is successful. If a synchronous subscriber returns a
109             result, or if an asynchronous one returns a successful immediate Future, then
110             no further subscribers are invoked, and that result is taken immediately. Any
111             other pending Futures are then cancelled.
112              
113             =back
114              
115             =cut
116              
117             sub new
118             {
119 4     4 0 1702 my $class = shift;
120              
121 4         14 my $self = bless {
122             events => {},
123             pre_registration => {},
124             }, $class;
125              
126 4         11 return $self;
127             }
128              
129             =head1 METHODS
130              
131             =cut
132              
133             sub _add_event
134             {
135 4     4   5 my $self = shift;
136 4         5 my ( $name, $event ) = @_;
137              
138 4 50       17 $self->{events}{$name} and
139             croak "Cannot declare an event '$name' a second time";
140              
141 4         9 $self->{events}{$name} = $event;
142              
143 4 100       13 if( my $subs = delete $self->{pre_registration}{$name} ) {
144 1         5 $event->subscribe( $_ ) for @$subs;
145             }
146             }
147              
148             =head2 declare_signal
149              
150             $distributor->declare_signal( $name )
151              
152             Declares a new "signal" event of the given name.
153              
154             =cut
155              
156             sub declare_signal
157             {
158 4     4 1 14 my $self = shift;
159 4         7 my ( $name ) = @_;
160              
161 4         37 $self->_add_event( $name, Event::Distributor::Signal->new );
162             }
163              
164             =head2 declare_action
165              
166             $distributor->declare_action( $name )
167              
168             I
169              
170             Declares a new "action" event of the given name.
171              
172             =cut
173              
174             sub declare_action
175             {
176 0     0 1 0 my $self = shift;
177 0         0 my ( $name ) = @_;
178              
179 0         0 $self->_add_event( $name, Event::Distributor::Action->new );
180             }
181              
182             =head2 declare_query
183              
184             $distributor->declare_query( $name )
185              
186             I
187              
188             Declares a new "query" event of the given name.
189              
190             =cut
191              
192             sub declare_query
193             {
194 0     0 1 0 my $self = shift;
195 0         0 my ( $name ) = @_;
196              
197 0         0 $self->_add_event( $name, Event::Distributor::Query->new );
198             }
199              
200             =head2 subscribe_async
201              
202             $distributor->subscribe_async( $name, $code )
203              
204             Adds a new C reference to the list of subscribers for the named event.
205             This subscriber is expected to return a L that will eventually yield
206             its result.
207              
208             When invoked the code will be passed the distributor object itself and the
209             list of arguments, and is expected to return a L.
210              
211             $f = $code->( $distributor, @args )
212              
213             =cut
214              
215             sub subscribe_async
216             {
217 4     4 1 17 my $self = shift;
218 4         8 my ( $name, $code ) = @_;
219              
220 4 100       14 if( my $event = $self->{events}{$name} ) {
221 3         13 $event->subscribe( $code );
222             }
223             else {
224 1         1 push @{ $self->{pre_registration}{$name} }, $code;
  1         6  
225             }
226             }
227              
228             =head2 subscribe_sync
229              
230             $distributor->subscribe_sync( $name, $code )
231              
232             Adds a new C reference to the list of subscribers for the named event.
233             This subscriber is expected to perform its work synchronously and return its
234             result immediately.
235              
236             In non-blocking or asynchronous applications, this method should only be used
237             for simple subscribers which can immediately return having completed their
238             work. If the work is likely to take some time by blocking on external factors,
239             consider instead using the L method.
240              
241             When invoked the code will be passed the distributor object itself and the
242             list of arguments.
243              
244             $code->( $distributor, @args )
245              
246             =cut
247              
248             sub subscribe_sync
249             {
250 2     2 1 13 my $self = shift;
251 2         4 my ( $name, $code ) = @_;
252              
253             $self->subscribe_async( $name, sub {
254 2     2   15 Future->done( $code->( @_ ) )
255 2         9 });
256             }
257              
258             =head2 fire_async
259              
260             $f = $distributor->fire_async( $name, @args )
261              
262             Invokes the named event, passing the arguments to the subscriber functions.
263             This function returns as soon as all the subscriber functions have been
264             invoked, returning a L that will eventually complete when all the
265             futures returned by the subscriber functions have completed.
266              
267             =cut
268              
269             sub fire_async
270             {
271 4     4 1 10 my $self = shift;
272 4         10 my ( $name, @args ) = @_;
273              
274 4 50       12 my $event = $self->{events}{$name} or
275             croak "Cannot fire an event '$name' when it doesn't exist";
276              
277 4         14 $event->fire( $self, @args );
278             }
279              
280             =head2 fire_sync
281              
282             $distributor->fire_sync( $name, @args )
283              
284             Invokes the named event, passing the arguments to the subscriber functions.
285             This function synchronously waits until all the subscriber futures have
286             completed, and will return once they have all done so.
287              
288             Note that since this method calls the C method on the Future instance
289             returned by L, it is required that this either be an immediate, or
290             be some subclass that can actually perform the await operation. This should be
291             the case if it is provided by an event framework or similar, or custom
292             application logic.
293              
294             =cut
295              
296             sub fire_sync
297             {
298 2     2 1 8 my $self = shift;
299 2         5 $self->fire_async( @_ )->get;
300             }
301              
302             =head1 TODO
303              
304             Some of these ideas appear in the "Event-Reflexive Progamming" series of blog
305             posts, and may be suitable for implementation here. All of these ideas are
306             simply for consideration; there is no explicit promise that any of these will
307             actually be implemented.
308              
309             =over 4
310              
311             =item *
312              
313             Unsubscription from events.
314              
315             =item *
316              
317             Define (or document the lack of) ordering between subscriptions of a given
318             event.
319              
320             =item *
321              
322             Refine the failure-handling semantics of signals.
323              
324             =item *
325              
326             Ability to invoke signals after the current one is finished, by deferring the
327             C method. Should this be a new C method, or a property of the
328             signal itself?
329              
330             =item *
331              
332             More control over the semantics of value-returning events - scatter/map/gather
333             pattern.
334              
335             =item *
336              
337             Sub-heirarchies of events.
338              
339             =item *
340              
341             Subclasses for specific event frameworks (L).
342              
343             =item *
344              
345             Subclasses (or other behaviours) for out-of-process event serialisation and
346             subscribers.
347              
348             =item *
349              
350             Event parameter filtering mechanics - allows parametric heirarchies,
351             instrumentation logging, efficient out-of-process subscribers.
352              
353             =back
354              
355             =head1 SEE ALSO
356              
357             =over 4
358              
359             =item L
360              
361             =back
362              
363             =head1 AUTHOR
364              
365             Paul Evans
366              
367             =cut
368              
369             0x55AA;