File Coverage

blib/lib/Event/Distributor.pm
Criterion Covered Total %
statement 55 58 94.8
branch 6 8 75.0
condition n/a
subroutine 17 18 94.4
pod 7 8 87.5
total 85 92 92.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014-2021 -- leonerd@leonerd.org.uk
5              
6             package Event::Distributor 0.06;
7              
8 2     2   85440 use v5.14;
  2         18  
9 2     2   10 use warnings;
  2         5  
  2         51  
10              
11 2     2   9 use Carp;
  2         4  
  2         114  
12 2     2   1033 use Feature::Compat::Try;
  2         658  
  2         8  
13              
14 2     2   3820 use Future;
  2         12473  
  2         65  
15              
16 2     2   852 use Event::Distributor::Signal;
  2         5  
  2         58  
17 2     2   866 use Event::Distributor::Action;
  2         6  
  2         57  
18 2     2   845 use Event::Distributor::Query;
  2         4  
  2         1366  
19              
20             =head1 NAME
21              
22             C - a simple in-process pub/sub mechanism
23              
24             =head1 SYNOPSIS
25              
26             use Event::Distributor;
27              
28             my $dist = Event::Distributor->new;
29              
30             $dist->declare_signal( "announce" );
31              
32              
33             $dist->subscribe_sync( announce => sub {
34             my ( $dist, $message ) = @_;
35             say $message;
36             });
37              
38             $dist->subscribe_async( announce => sub {
39             my ( $dist, $message ) = @_;
40             return $async_http->POST( "http://server/message", $message );
41             });
42              
43              
44             $dist->fire_sync( announce => "Hello, world!" );
45              
46             =head1 DESCRIPTION
47              
48             Instances of this class provide a simple publish/subscribe mechanism within a
49             single process, for either synchronous or L-based asynchronous use.
50              
51             A given instance has a set of named events. Subscribers are C references
52             attached to a named event. Publishers can declare the existence of a named
53             event, and then later invoke it by passing in arguments, which are distributed
54             to all of the subscribers of that named event.
55              
56             It is specifically I an error to request to subscribe an event that has
57             not yet been declared, in order to allow multiple modules of code to be loaded
58             and subscribe events the others publish, without introducing loading order
59             dependencies. An event only needs to be declared by the time it is fired.
60              
61             Natively all of the events provided by the distributor are fully-asynchronous
62             in nature. Each subscriber is expected to return a L instance which
63             will indicate its completion; the results of these are merged into a single
64             future returned by the fire method itself. However, to support synchronous or
65             semi-synchronous programs using it, both the observe and invoke methods also
66             have a synchronous variant. Note however, that this module does not provide
67             any kind of asynchronous detachment of synchronous functions; using the
68             L method to subscribe a long-running blocking function will
69             cause the C methods to block until that method returns. To achieve a
70             truely-asynchronous experience the attached code will need to use some kind of
71             asynchronous event system.
72              
73             This module is very-much a work-in-progress, and many ideas may still be added
74             or changed about it. It is the start of a concrete implementaion of some of
75             the ideas in my "Event-Reflexive Programming" series of blog posts. See the
76             L and L sections for more detail.
77              
78             =head1 EVENTS
79              
80             Each of the events known by a distributor has a name. Conceptually each also
81             has a type. Currently there are three types of event, a "signal", an "action",
82             and a "query".
83              
84             =over 2
85              
86             =item *
87              
88             A signal event simply informs subscribers that some event or condition has
89             occurred. Additional arguments can be passed from the invoker to the
90             subscribers, but subscriptions are not expected to return a meaningful value,
91             nor does firing this event return a value. All subscriber functions are
92             invoked sequentually and synchronously by a C method (though, of
93             course, asynchronous subscribers synchronously return a future instance, which
94             allows them to continue working asynchronously).
95              
96             =item *
97              
98             An action event requires a single subscriber, and represents a request from
99             the invoker to the subscriber to perform some activity. This behaves much like
100             a regular (L-returning) method call, except that the indirection
101             mechanism of the distributor allows a more flexible method of connection
102             between the two sides.
103              
104             =item *
105              
106             A query event invokes subscriber code expecting a successful return, returning
107             the first result that is successful. If a synchronous subscriber returns a
108             result, or if an asynchronous one returns a successful immediate Future, then
109             no further subscribers are invoked, and that result is taken immediately. Any
110             other pending Futures are then cancelled.
111              
112             =back
113              
114             =cut
115              
116             sub new
117             {
118 5     5 0 2351 my $class = shift;
119              
120 5         20 my $self = bless {
121             events => {},
122             pre_registration => {},
123             }, $class;
124              
125 5         14 return $self;
126             }
127              
128             =head1 METHODS
129              
130             =cut
131              
132             sub _add_event
133             {
134 5     5   10 my $self = shift;
135 5         9 my ( $name, $event ) = @_;
136              
137 5 50       16 $self->{events}{$name} and
138             croak "Cannot declare an event '$name' a second time";
139              
140 5         11 $self->{events}{$name} = $event;
141              
142 5 100       17 if( my $subs = delete $self->{pre_registration}{$name} ) {
143 1         5 $event->subscribe( $_ ) for @$subs;
144             }
145             }
146              
147             =head2 declare_signal
148              
149             $distributor->declare_signal( $name )
150              
151             Declares a new "signal" event of the given name.
152              
153             =cut
154              
155             sub declare_signal
156             {
157 4     4 1 14 my $self = shift;
158 4         10 my ( $name ) = @_;
159              
160 4         21 $self->_add_event( $name, Event::Distributor::Signal->new );
161             }
162              
163             =head2 declare_action
164              
165             $distributor->declare_action( $name )
166              
167             I
168              
169             Declares a new "action" event of the given name.
170              
171             =cut
172              
173             sub declare_action
174             {
175 0     0 1 0 my $self = shift;
176 0         0 my ( $name ) = @_;
177              
178 0         0 $self->_add_event( $name, Event::Distributor::Action->new );
179             }
180              
181             =head2 declare_query
182              
183             $distributor->declare_query( $name )
184              
185             I
186              
187             Declares a new "query" event of the given name.
188              
189             =cut
190              
191             sub declare_query
192             {
193 1     1 1 7 my $self = shift;
194 1         3 my ( $name ) = @_;
195              
196 1         12 $self->_add_event( $name, Event::Distributor::Query->new );
197             }
198              
199             =head2 subscribe_async
200              
201             $distributor->subscribe_async( $name, $code )
202              
203             Adds a new C reference to the list of subscribers for the named event.
204             This subscriber is expected to return a L that will eventually yield
205             its result.
206              
207             When invoked the code will be passed the distributor object itself and the
208             list of arguments, and is expected to return a L.
209              
210             $f = $code->( $distributor, @args )
211              
212             =cut
213              
214             sub subscribe_async
215             {
216 6     6 1 24 my $self = shift;
217 6         11 my ( $name, $code ) = @_;
218              
219 6 100       22 if( my $event = $self->{events}{$name} ) {
220 5         17 $event->subscribe( $code );
221             }
222             else {
223 1         2 push @{ $self->{pre_registration}{$name} }, $code;
  1         5  
224             }
225             }
226              
227             =head2 subscribe_sync
228              
229             $distributor->subscribe_sync( $name, $code )
230              
231             Adds a new C reference to the list of subscribers for the named event.
232             This subscriber is expected to perform its work synchronously and return its
233             result immediately.
234              
235             In non-blocking or asynchronous applications, this method should only be used
236             for simple subscribers which can immediately return having completed their
237             work. If the work is likely to take some time by blocking on external factors,
238             consider instead using the L method.
239              
240             When invoked the code will be passed the distributor object itself and the
241             list of arguments.
242              
243             $code->( $distributor, @args )
244              
245             =cut
246              
247             sub subscribe_sync
248             {
249 4     4 1 29 my $self = shift;
250 4         9 my ( $name, $code ) = @_;
251              
252             $self->subscribe_async( $name, sub {
253 4     4   21 my @args = @_;
254             try {
255             return Future->done( $code->( @args ) );
256             }
257 4         11 catch ( $e ) {
258             return Future->fail( $e );
259             }
260 4         16 });
261             }
262              
263             =head2 fire_async
264              
265             $f = $distributor->fire_async( $name, @args )
266              
267             Invokes the named event, passing the arguments to the subscriber functions.
268             This function returns as soon as all the subscriber functions have been
269             invoked, returning a L that will eventually complete when all the
270             futures returned by the subscriber functions have completed.
271              
272             =cut
273              
274             sub fire_async
275             {
276 5     5 1 12 my $self = shift;
277 5         10 my ( $name, @args ) = @_;
278              
279 5 50       15 my $event = $self->{events}{$name} or
280             croak "Cannot fire an event '$name' when it doesn't exist";
281              
282 5         15 $event->fire( $self, @args );
283             }
284              
285             =head2 fire_sync
286              
287             $distributor->fire_sync( $name, @args )
288              
289             Invokes the named event, passing the arguments to the subscriber functions.
290             This function synchronously waits until all the subscriber futures have
291             completed, and will return once they have all done so.
292              
293             Note that since this method calls the C method on the Future instance
294             returned by L, it is required that this either be an immediate, or
295             be some subclass that can actually perform the await operation. This should be
296             the case if it is provided by an event framework or similar, or custom
297             application logic.
298              
299             =cut
300              
301             sub fire_sync
302             {
303 3     3 1 11 my $self = shift;
304 3         7 $self->fire_async( @_ )->get;
305             }
306              
307             =head1 TODO
308              
309             Some of these ideas appear in the "Event-Reflexive Progamming" series of blog
310             posts, and may be suitable for implementation here. All of these ideas are
311             simply for consideration; there is no explicit promise that any of these will
312             actually be implemented.
313              
314             =over 4
315              
316             =item *
317              
318             Unsubscription from events.
319              
320             =item *
321              
322             Define (or document the lack of) ordering between subscriptions of a given
323             event.
324              
325             =item *
326              
327             Refine the failure-handling semantics of signals.
328              
329             =item *
330              
331             Ability to invoke signals after the current one is finished, by deferring the
332             C method. Should this be a new C method, or a property of the
333             signal itself?
334              
335             =item *
336              
337             More control over the semantics of value-returning events - scatter/map/gather
338             pattern.
339              
340             =item *
341              
342             Sub-heirarchies of events.
343              
344             =item *
345              
346             Subclasses for specific event frameworks (L).
347              
348             =item *
349              
350             Subclasses (or other behaviours) for out-of-process event serialisation and
351             subscribers.
352              
353             =item *
354              
355             Event parameter filtering mechanics - allows parametric heirarchies,
356             instrumentation logging, efficient out-of-process subscribers.
357              
358             =back
359              
360             =head1 SEE ALSO
361              
362             =over 4
363              
364             =item L
365              
366             =back
367              
368             =head1 AUTHOR
369              
370             Paul Evans
371              
372             =cut
373              
374             0x55AA;