File Coverage

blib/lib/Message/SmartMerge.pm
Criterion Covered Total %
statement 125 127 98.4
branch 43 60 71.6
condition 19 34 55.8
subroutine 15 15 100.0
pod 7 7 100.0
total 209 243 86.0


line stmt bran cond sub pod time code
1             package Message::SmartMerge;
2             {
3             $Message::SmartMerge::VERSION = '1.132270';
4             }
5              
6 8     8   387798 use 5.006;
  8         32  
  8         336  
7 8     8   44 use strict;
  8         17  
  8         301  
8 8     8   44 use warnings FATAL => 'all';
  8         26  
  8         359  
9 8     8   834 use Message::Match qw(mmatch);
  8         661  
  8         532  
10 8     8   676 use Message::Transform qw(mtransform);
  8         377  
  8         14896  
11              
12             =head1 NAME
13              
14             Message::SmartMerge - Enforce downstream transformations on message streams
15              
16             =cut
17              
18             =head1 SYNOPSIS
19              
20             use Message::SmartMerge;
21              
22             my $merge = Message::SmartMerge->new();
23             $merge->config({
24             merge_instance => 'instance',
25             });
26              
27             $merge->message({
28             instance => 'i1',
29             x => 'y',
30             this => 'whatever',
31             });
32             #no merges, so pass through:
33             #emit sends { instance => 'i1', x => 'y', this => 'whatever' }
34            
35             $merge->add_merge({
36             merge_id => 'm1',
37             match => {x => 'y'},
38             transform => {this => 'that'},
39             });
40             #so we've already passed through a message instance i1, and this new
41             #merge matches that, so the module will send a transformed message:
42             { instance => 'i1',
43             x => 'y',
44             this => 'that',
45             }
46              
47             #Now send another message through:
48             $merge->message({
49             instance => 'i1',
50             x => 'y',
51             this => 'not that',
52             something => 'else',
53             });
54             #merge matches x => 'y', so transforms this => 'that':
55             #emit sends:
56             { instance => 'i1',
57             x => 'y',
58             this => 'that',
59             something => 'else'
60             }
61              
62             $merge->remove_merge('m1');
63             #even though we didn't send a message in, removing a merge will trigger
64             #an emit to reflect that a change has occurred, specifically that the
65             #previously activated transform is no longer in force. It sends the
66             #last message received, without the transform
67             #emit sends:
68             { instance => 'i1',
69             x => 'y',
70             this => 'not that',
71             something => 'else'
72             }
73              
74             #Here's a way the message stream can clear a merge:
75             $merge->add_merge({
76             merge_id => 'm2',
77             match => {
78             x => 'y',
79             },
80             transform => {
81             foo => 'bar',
82             },
83             toggle_fields => ['something'],
84             });
85              
86             #Since m2 also matches x => 'y', we emit:
87             { instance => 'i1',
88             x => 'y',
89             this => 'not that',
90             something => 'else',
91             foo => 'bar',
92             }
93              
94             $merge->message({
95             instance => 'i1',
96             x => 'y',
97             foo => 'not bar',
98             something => 'else',
99             another => 'thing',
100             });
101             #the value of the single defined toggle field ('something') did not
102             #change from the first value we saw in it ('else'). So m2 stands:
103             { instance => 'i1',
104             x => 'y',
105             something => 'else',
106             foo => 'bar',
107             another => 'thing',
108             }
109             #even though we passed 'not bar' in with foo, it was transformed to 'bar'
110              
111             #Now let's hit the toggle:
112             $merge->message({
113             instance => 'i1',
114             x => 'y',
115             foo => 'not bar',
116             something => 'other',
117             another => 'thing',
118             });
119             #this will 'permanently' remove the merge m2 for i1; the message passes
120             #through untransformed:
121             { instance => 'i1',
122             x => 'y',
123             foo => 'not bar',
124             something => 'other',
125             another => 'thing',
126             }
127              
128             #Here's another way the message stream can clear a merge:
129             $merge->add_merge({
130             merge_id => 'm3',
131             match => {
132             i => 'j',
133             },
134             transform => {
135             a => 'b',
136             },
137             remove_match => {
138             remove => 'match',
139             },
140             });
141             #This causes nothing to emit, because there are no instances that match
142             #i => 'j'
143              
144             $merge->message({
145             instance => 'i2',
146             x => 'y',
147             i => 'j',
148             foo => 'not bar',
149             a => 'not b',
150             something => 'here',
151             });
152             #this is fun because it matches both m2 and m3. it would have matched
153             #m1 had we not removed it
154             #i2 has never been seen before, and m2 is a toggle. The toggle
155             #deallocates itself for an instance if the toggle field changes
156             #from the previous to the current message. Since there was no
157             #previous message for i2, the toggle merge deallocates itself for i2
158             #before it can take any action.
159             { instance => 'i2',
160             x => 'y',
161             i => 'j',
162             foo => 'not bar',
163             something => 'here',
164             a => 'b', #rather than 'not b'
165             }
166              
167             #and now to deallocate m3:
168             $merge->message({
169             instance => 'i2',
170             x => 'y',
171             i => 'j',
172             a => 'not b',
173             remove => 'match',
174             });
175             #which emits:
176             { instance => 'i2',
177             x => 'y',
178             i => 'j',
179             a => 'not b', #no longer transformed
180             remove => 'match',
181             }
182              
183             =head1 DESCRIPTION
184              
185             In message based programming, we think in terms of streams of messages
186             flowing from one way-point to another. Each way-point does only one thing
187             to messages flowing through it, independent from various other way-points.
188             The contract between these are required fields in the messages.
189              
190             This module is designed to modify the state of a stream of messages in a
191             powerful and configurable way.
192              
193             Conceptually, it will enforce certain transformations on certain message
194             streams. If the nature of the transformation changes, (for instance, if
195             it expires, or is deallocated some other way), the module will send a
196             'corrective' message.
197              
198             We call these configurations 'merges'. Part of a merge is a transformation.
199              
200             For example, when a new merge is configured, all of the matching message
201             instances will be re-sent with the new transform in force. And when
202             the merge is removed or expires, all of the matching message instances
203             are re-sent with their last received values. This effectively causes the
204             downstream receiver to be aware of stateful changes, but in a fully
205             message-oriented fashion.
206              
207             Merges can be added and removed explicitly with add_merge and remove_merge.
208             They can also expire, with expire and expire_at.
209              
210             More interestingly, merges can be deallocated for a given message stream
211             using one or two configurations: remove_match and toggle_fields.
212              
213             remove_match is simplest: if a message instance is under the influence of a
214             given merge that contains a remove_match config, and that message matches
215             the remove_match, then the merge is, for that instance, deallocated. The
216             message passes through that merge unchanged.
217              
218             toggle_fields is more tricky: it is an array of fields in the message to
219             consider. The toggle_fields configured merge will continue to be in force
220             as long as the value of all of the fields in toggle_fields is un-changed.
221             As soon as any of those values changes, the merge is, for that instance,
222             deallocated.
223              
224             This is pretty abstract stuff; more concrete examples will be forthcoming
225             in subsequent releases.
226              
227             =head1 SUBROUTINES/METHODS
228              
229             =head2 new
230              
231             my $merge = Message::SmartMerge->new(state => $previous_state);
232              
233             =over 4
234              
235             =item * state (optional)
236              
237             The hashref returned by a previous invocation of C<get_state>
238              
239             =back
240              
241             =cut
242              
243             sub new {
244 8     8 1 167 my $class = shift;
245 8         24 my $self = {};
246 8 50       45 die "Message::SmartMerge::new: even number of argument required\n"
247             if scalar @_ % 2;
248 8         27 my %args = @_;
249 8         28 bless ($self, $class);
250              
251 8   100     117 $self->{merges} = $args{state}->{merges} || {};
252 8   100     81 $self->{instances} = $args{state}->{instances} || {};
253 8 100 66     79 $self->config($args{state}->{config})
254             if $args{state} and
255             $args{state}->{config};
256 8         73 return $self;
257             }
258              
259              
260             =head2 get_state
261              
262             my $state_to_save = $merge->get_state();
263              
264             This method takes no arguments; it returns a hashref to all of the data
265             necessary to re-create the current behaviour of this library.
266              
267             Simply put, before your process exits, gather the return value of
268             get_state, and save it somewhere. When your process comes up, take that
269             information and pass it into the state key in the constructor. The library
270             will continue functioning as before.
271              
272             =cut
273             sub get_state {
274 1     1 1 3 my $self = shift;
275             return {
276 1         18 merges => $self->{merges},
277             instances => $self->{instances},
278             config => $self->{config},
279             };
280             }
281              
282             =head2 emit
283              
284             $merge->emit(%args)
285              
286             This method is designed to be over-ridden; the default implementation simply
287             adds the passed message to the package global
288             @Message::SmartMerge::return_messages and returns all of the arguments
289              
290             =over 4
291              
292             =item * message
293              
294             The message being sent out, which is a HASHref.
295              
296             =item * matching_merge_ids
297              
298             A HASHref whose keys are the merge IDs that were applied, and values are 1.
299              
300             =item * other: things (TODO)
301              
302             =back
303              
304             =cut
305             our @return_messages = ();
306             sub emit {
307 62     62 1 96 my $self = shift;
308 62         212 my %args = @_;
309 62         228 push @return_messages, $args{message};
310 62         431 return \%args;
311             }
312              
313             =head2 config
314              
315             $merge->config({
316             merge_instance => 'instance_key',
317             });
318              
319             =over 4
320              
321             =item * config_def (positional, required)
322              
323             HASHref of configuration
324              
325             =over 4
326              
327             =item * merge_instance (required)
328              
329             This is a scalar must exist as a key to every incoming message. The value
330             of this key must also be a scalar, and represent the 'instance' of a message
331             stream. That is, all messages of the same instance are considered a unified
332             stream.
333              
334             =back
335              
336             =back
337              
338             =cut
339             sub config {
340 8     8 1 23 my $self = shift;
341 8 50       38 my $new_config = shift or die "Message::SmartMerge::config: at least one argument is required\n";
342 8 50 33     139 die "Message::SmartMerge::config: required config attribute 'merge_instance' must be a scalar\n"
343             if not $new_config->{merge_instance} or
344             ref $new_config->{merge_instance};
345 8         74 $self->{config} = $new_config;
346 8         37 return $new_config;
347             }
348              
349             sub _expire_merges {
350 62     62   98 my $self = shift;
351 62         146 my $ts = time;
352 62         97 foreach my $merge_id (keys %{$self->{merges}}) {
  62         12703  
353 65         143 my $merge = $self->{merges}->{$merge_id};
354 65 100       267 if($merge->{expire} < $ts) {
355 3         18 $self->remove_merge($merge_id);
356             }
357             }
358             }
359              
360             =head2 add_merge
361              
362             $merge->add_merge({
363             merge_id => 'm1',
364             match => {x => 'y'},
365             transform => {this => 'to that'},
366             expire => 120, #expire in two minutes
367             expire_at => 1465173300, #expire in June 2016 (TODO)
368             });
369              
370             =over 4
371              
372             =item * merge_def (first positional, required)
373              
374             =over 4
375              
376             =item * merge_id (required)
377              
378             Unique scalar identifying this merge
379              
380             =item * match (required)
381              
382             Message::Match object (HASHref); defines messages this merge applies to
383              
384             =item * transform (required)
385              
386             Message::Transform object (HASHref); what changes to make
387             NOTE: considering not making transform required
388              
389             =item * expire (optional)
390              
391             How many seconds (integer) before this merge expires
392              
393             =item * expire_at (optional) (TODO)
394              
395             Epoch time (integer) this merge will expire
396              
397             =back
398              
399             =back
400              
401             =head3 exceptions
402              
403             =over 4
404              
405             =item * must have at least one argument, a HASH reference
406              
407             =item * passed merge must have a scalar merge_id
408              
409             =item * passed merge_id '$merge_id' is already defined
410              
411             =back
412              
413             =cut
414             sub add_merge {
415 12 50   12 1 3728 my $self = shift or die "Message::SmartMerge::add_merge: must be called as a method\n";
416 12         24 my $merge = shift;
417 12 50 33     5732 die "Message::SmartMerge::add_merge: must have at least one argument, a HASH reference\n"
      33        
418             if not $merge or
419             not ref $merge or
420             ref $merge ne 'HASH';
421 12 50       74 die "Message::SmartMerge::add_merge: even number of argument required\n"
422             if scalar @_ % 2;
423 12         55 my %args = @_;
424              
425 12         33 my $merge_id = $merge->{merge_id};
426 12 50 33     89 die "Message::SmartMerge::add_merge: passed merge must have a scalar merge_id\n"
427             if not $merge_id or
428             ref $merge_id;
429 12 50       58 die "Message::SmartMerge::add_merge: passed merge_id '$merge_id' is already defined\n"
430             if $self->{merges}->{$merge_id};
431 12 100       51 if($merge->{expire}) {
432 3         10 $merge->{expire} = time + $merge->{expire};
433             } else {
434 9         30 $merge->{expire} = 2147483647; #2^31 - 1 job security!
435             }
436 12         36 $self->{merges}->{$merge_id} = $merge;
437             #1. iterate through all of the message instances
438             #2. for each one that matches the new merge:
439             # a. n/a
440             # b. make a note of the message instance
441             #3. for each of the noted message instances,
442             # a. run the transforms
443             # b. emit the message
444 12         26 my @matched_instances = ();
445 12         26 foreach my $instance_name (sort keys %{$self->{instances}}) {
  12         79  
446 15         35 my $instance = $self->{instances}->{$instance_name};
447 15 100       80 next unless mmatch $instance->{message}, $merge->{match};
448              
449             #this instance matches the new merge
450 12         500 push @matched_instances, $instance_name;
451             }
452              
453             #for section 3 above, can I not simply call $self->message() on all of
454             #the matched messages?
455 12         127 $self->message($self->{instances}->{$_}->{message}) for @matched_instances;
456 12         72 return $merge;
457             }
458              
459             =head2 message
460              
461             $merge->message({
462             instance_key => 'instance1',
463             x => 'y',
464             });
465              
466             Coupled with the above defined merge, this message method will call the emit
467             method thusly: (Assuming it's still before the merge expired)
468              
469             ( message => {
470             instance_key => 'instance1',
471             x => 'y',
472             this => 'to that',
473             },
474             matching_merge_ids => {
475             m1 => 1,
476             },
477             )
478              
479             =over 4
480              
481             =item * message (first positional, required)
482              
483             =back
484              
485             =head3 exceptions
486              
487             =over 4
488              
489             =item * must have at least one argument, a HASH reference
490              
491             =item * passed message did not have instance field
492              
493             =back
494              
495             =cut
496             sub message {
497 62 50   62 1 22156 my $self = shift or die "Message::SmartMerge::message: must be called as a method\n";
498 62         106 my $message = shift;
499 62 50 33     693 die "Message::SmartMerge::message: must have at least one argument, a HASH reference\n"
      33        
500             if not $message or
501             not ref $message or
502             ref $message ne 'HASH';
503 62 50       270 die "Message::SmartMerge::message: even number of argument required\n"
504             if scalar @_ % 2;
505 62         127 my %args = @_;
506              
507 62         204 $self->_expire_merges(); #hideously inefficient; have this run at most
508             #once per second TODO
509              
510             #1. find message instance
511             #2. gather all of the merges that match
512             # a. identify any merges that should be cleared for this instance
513             # aa. check toggle_fields
514             # bb. check remove_match
515             #3. eliminate all of the merges that have toggled off or have cleared
516             #4. run the transforms
517             #5. emit the message
518 62         293 my $config = $self->{config};
519 62 50       235 my $instance_name = $message->{$config->{merge_instance}}
520             or die "Message::SmartMerge::message: passed message did not have instance field '$config->{merge_instance}'\n";
521              
522 62         144 my $instances = $self->{instances};
523 62         87 my $previous_message;
524 62 100       183 if(not $instances->{$instance_name}) {
525 15         97 $instances->{$instance_name} = {
526             cleared_merges => {},
527             initial_ts => time,
528             message => $message,
529             };
530             } else {
531 47         105 $previous_message = $instances->{$instance_name}->{message};
532 47         107 $instances->{$instance_name}->{message} = $message;
533             }
534 62         207 my $instance = $instances->{$instance_name};
535 62         108 my $matching_merge_ids = {};
536 62         94 foreach my $merge_id (keys %{$self->{merges}}) {
  62         176  
537 62 100       181 next if $instance->{cleared_merges}->{$merge_id};
538 53         99 my $merge = $self->{merges}->{$merge_id};
539 53 100       218 if(mmatch $message, $merge->{match}) {
540 47         1569 my $include = 1;
541             #so we have a matching merge that hasn't been previously
542             #eliminated
543             #figure out if it needs to be eliminated
544 47 100       119 if($merge->{toggle_fields}) { #section 2.a.aa
545 9         13 foreach my $toggle_field (@{$merge->{toggle_fields}}) {
  9         24  
546 9         21 my $toggle_field_value = $message->{$toggle_field};
547 9         16 my $previous_toggle_field_value = $previous_message->{$toggle_field};
548 9 100 100     94 if( not defined $toggle_field_value or
      100        
549             not defined $previous_toggle_field_value or
550             $toggle_field_value ne $previous_toggle_field_value) {
551             #toggles are different; remove this merge
552 4         6 $include = 0;
553 4         17 $instance->{cleared_merges}->{$merge_id} = 1;
554             }
555             }
556             }
557              
558 47 100       132 if($merge->{remove_match}) { #section 2.a.bb
559 3 100       11 if(mmatch $message, $merge->{remove_match}) {
560 1         28 $include = 0;
561 1         3 $instance->{cleared_merges}->{$merge_id} = 1;
562             }
563             }
564              
565 47 100       875 $matching_merge_ids->{$merge_id} = 1 if $include;
566             }
567             }
568             #at this point, $matching_merge_ids contains all of the merges we need
569              
570 62         305 my $emit_message = _fast_clone($message);
571             #4b: transform any remaining merges
572 62         110 foreach my $merge_id (keys %{$matching_merge_ids}) {
  62         205  
573 42         208 my $merge = $self->{merges}->{$merge_id};
574 42         189 mtransform $emit_message, $merge->{transform};
575             }
576              
577 62         903 $self->emit(message => $emit_message, matching_merge_ids => $matching_merge_ids);
578             }
579              
580             sub _fast_clone {
581 8     8   17546 use Storable;
  8         51473  
  8         4151  
582 62     62   88 my $thing = shift;
583 62         3311 return Storable::dclone $thing;
584             }
585              
586              
587              
588             =head2 remove_merge
589              
590             $merge->remove_merge('m1');
591              
592             =over 4
593              
594             =item * merge_id (first positional, required)
595              
596             The merge_id to be removed.
597              
598             =back
599              
600             =head3 exceptions
601              
602             =over 4
603              
604             =item * passed merge_id does not reference an existing merge
605              
606             =item * must have at least one argument, a scalar
607              
608             =back
609              
610             =cut
611             sub remove_merge {
612 7 50   7 1 1392 my $self = shift or die "Message::SmartMerge::remove_merge: must be called as a method\n";
613 7         20 my $merge_id = shift;
614 7 50 33     66 die "Message::SmartMerge::remove_merge: must have at least one argument, a scalar\n"
615             if not $merge_id or
616             ref $merge_id;
617 7 50       40 die "Message::SmartMerge::remove_merge: even number of argument required\n"
618             if scalar @_ % 2;
619 7         18 my %args = @_;
620              
621 7 50       38 die "Message::SmartMerge::remove_merge: passed merge_id does not reference an existing merge\n"
622             unless $self->{merges}->{$merge_id};
623             #should be about the same as add_merge, but 'in reverse'
624             #1. iterate through all of the message instances
625             #2. for each one that matches the to be deleted merge:
626             # a. skip and remove cleared_merges if cleared_merges matches this merge
627             # b. make a note of the message instance
628             #3. for each of the marked message instances,
629             # a. run the transforms
630             # b. emit the message
631 7         24 my $merge = $self->{merges}->{$merge_id};
632 7         16 my @matched_instances = ();
633 7         16 foreach my $instance_name (sort keys %{$self->{instances}}) {
  7         62  
634 17         83 my $instance = $self->{instances}->{$instance_name};
635 17 50       71 if($instance->{cleared_merges}->{$merge_id}) {
636 0         0 delete $instance->{cleared_merges}->{$merge_id};
637 0         0 next;
638             }
639 17 100       90 next unless mmatch $instance->{message}, $merge->{match};
640              
641             #this instance matches the new merge
642 9         398 push @matched_instances, $instance_name;
643             }
644 7         138 delete $self->{merges}->{$merge_id};
645 7         64 $self->message($self->{instances}->{$_}->{message}) for @matched_instances;
646 7         56 return $merge;
647             }
648              
649             =head1 AUTHOR
650              
651             Dana M. Diederich, <diederich@gmail.com>
652              
653             =head1 BUGS
654              
655             Please report any bugs or feature requests to C<bug-message-smartmerge at rt.cpan.org>, or through
656             the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Message-SmartMerge>. I will be notified, and then you'll
657             automatically be notified of progress on your bug as I make changes.
658              
659              
660             =head1 SEE ALSO
661              
662             http://c2.com/cgi/wiki?AlanKayOnMessaging
663             http://spin.atomicobject.com/2012/11/15/message-oriented-programming/
664              
665              
666              
667             =head1 SUPPORT
668              
669             You can find documentation for this module with the perldoc command.
670              
671             perldoc Message::SmartMerge
672              
673              
674             You can also look for information at:
675              
676             =over 4
677              
678             =item * Report bugs and feature requests here
679              
680             L<https://github.com/dana/perl-Message-SmartMerge/issues>
681              
682             =item * AnnoCPAN: Annotated CPAN documentation
683              
684             L<http://annocpan.org/dist/Message-SmartMerge>
685              
686             =item * CPAN Ratings
687              
688             L<http://cpanratings.perl.org/d/Message-SmartMerge>
689              
690             =item * Search CPAN
691              
692             L<https://metacpan.org/module/Message::SmartMerge>
693              
694             =back
695              
696              
697             =head1 ACKNOWLEDGEMENTS
698              
699              
700             =head1 LICENSE AND COPYRIGHT
701              
702             Copyright 2013 Dana M. Diederich.
703              
704             This program is free software; you can redistribute it and/or modify it
705             under the terms of the the Artistic License (2.0). You may obtain a
706             copy of the full license at:
707              
708             L<http://www.perlfoundation.org/artistic_license_2_0>
709              
710             Any use, modification, and distribution of the Standard or Modified
711             Versions is governed by this Artistic License. By using, modifying or
712             distributing the Package, you accept this license. Do not use, modify,
713             or distribute the Package, if you do not accept this license.
714              
715             If your Modified Version has been derived from a Modified Version made
716             by someone other than you, you are nevertheless required to ensure that
717             your Modified Version complies with the requirements of this license.
718              
719             This license does not grant you the right to use any trademark, service
720             mark, tradename, or logo of the Copyright Holder.
721              
722             This license includes the non-exclusive, worldwide, free-of-charge
723             patent license to make, have made, use, offer to sell, sell, import and
724             otherwise transfer the Package with respect to any patent claims
725             licensable by the Copyright Holder that are necessarily infringed by the
726             Package. If you institute patent litigation (including a cross-claim or
727             counterclaim) against any party alleging that the Package constitutes
728             direct or contributory patent infringement, then this Artistic License
729             to you shall terminate on the date that such litigation is filed.
730              
731             Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER
732             AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES.
733             THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
734             PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY
735             YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR
736             CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR
737             CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE,
738             EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
739              
740              
741             =cut
742              
743             1; # End of Message::SmartMerge
744              
745             __END__
746              
747             Notes:
748              
749             Algo: keep a list of all of the instances.
750             When a message arrives,
751              
752              
753              
754              
755              
756              
757              
758             ... other notes...
759              
760             we need to be able to send a message when there's any expiration.
761