File Coverage

blib/lib/Net/Stomp/Producer/Transactional.pm
Criterion Covered Total %
statement 44 44 100.0
branch 7 10 70.0
condition n/a
subroutine 12 12 100.0
pod 4 4 100.0
total 67 70 95.7


line stmt bran cond sub pod time code
1             package Net::Stomp::Producer::Transactional;
2             $Net::Stomp::Producer::Transactional::VERSION = '2.005';
3             {
4             $Net::Stomp::Producer::Transactional::DIST = 'Net-Stomp-Producer';
5             }
6 1     1   177977 use Moose;
  1         371512  
  1         7  
7             extends 'Net::Stomp::Producer';
8 1     1   6710 use Net::Stomp::Producer::Exceptions;
  1         4  
  1         44  
9 1     1   432 use MooseX::Types::Common::Numeric 'PositiveOrZeroInt';
  1         94496  
  1         6  
10 1     1   2183 use Try::Tiny;
  1         2  
  1         514  
11              
12             # ABSTRACT: subclass of Net::Stomp::Producer with transaction-like behaviour
13              
14              
15             has _transactions => (
16             is => 'ro',
17             isa => 'ArrayRef[ArrayRef]',
18             default => sub { [] },
19             traits => [ 'Array' ],
20             handles => {
21             in_transaction => 'count',
22             _start_transaction => 'push',
23             _drop_transaction => 'pop',
24             _all_transactions => 'elements',
25             _clear_transactions => 'clear',
26             },
27             );
28              
29             sub _current_transaction {
30 15     15   24 my ($self) = @_;
31              
32 15 50       516 Net::Stomp::Producer::Exceptions::Transactional->throw()
33             unless $self->in_transaction;
34              
35 15         422 return $self->_transactions->[-1];
36             }
37              
38             sub _add_frames_to_transaction {
39 10     10   20 my ($self,@frames) = @_;
40              
41 10 50       317 Net::Stomp::Producer::Exceptions::Transactional->throw()
42             unless $self->in_transaction;
43              
44 10         18 push @{$self->_current_transaction},@frames;
  10         25  
45              
46 10         18 return;
47             }
48              
49              
50             sub txn_begin {
51 8     8 1 28371 my ($self) = @_;
52              
53 8         346 $self->_start_transaction([]);
54              
55 8         11 return;
56             }
57              
58              
59             sub txn_commit {
60 7     7 1 12608 my ($self) = @_;
61              
62 7 100       274 Net::Stomp::Producer::Exceptions::Transactional->throw()
63             unless $self->in_transaction;
64              
65 5         12 my $messages = $self->_current_transaction;
66 5         165 $self->_drop_transaction;
67 5 100       157 if ($self->in_transaction) {
68             # commit to the outer transaction
69 1         4 $self->_add_frames_to_transaction(@$messages);
70             }
71             else {
72 4         12 for my $f (@$messages) {
73 6         59 $self->_really_send($f);
74             }
75             }
76              
77 5         107 return;
78             }
79              
80              
81             sub txn_rollback {
82 3     3 1 12 my ($self) = @_;
83              
84 3 50       98 Net::Stomp::Producer::Exceptions::Transactional->throw()
85             unless $self->in_transaction;
86              
87 3         101 $self->_drop_transaction;
88              
89 3         33 return;
90             }
91              
92              
93             override send => sub {
94             my ($self,$destination,$headers,$body) = @_;
95              
96             my $actual_headers = $self->_prepare_message($destination,$headers,$body);
97              
98             if ($self->in_transaction) {
99             $self->_add_frames_to_transaction($actual_headers);
100             }
101             else {
102             $self->_really_send($actual_headers);
103             }
104              
105             return;
106             };
107              
108              
109             sub txn_do {
110 4     4 1 12869 my ($self,$code) = @_;
111              
112 4         36 $self->txn_begin;
113             try {
114 4     4   146 $code->();
115             }
116             catch {
117 2     2   37 $self->txn_rollback;
118 2         14 die $_;
119 4         26 };
120 2         1379 $self->txn_commit;
121 2         5 return;
122             }
123              
124             __PACKAGE__->meta->make_immutable;
125              
126             1;
127              
128             __END__
129              
130             =pod
131              
132             =encoding UTF-8
133              
134             =head1 NAME
135              
136             Net::Stomp::Producer::Transactional - subclass of Net::Stomp::Producer with transaction-like behaviour
137              
138             =head1 VERSION
139              
140             version 2.005
141              
142             =head1 SYNOPSIS
143              
144             my $p = Net::Stomp::Producer::Transactional->new({
145             servers => [ { hostname => 'localhost', port => 61613, } ],
146             });
147              
148             $p->txn_begin();
149              
150             $p->send('/queue/somewhere',
151             { type => 'my_message' },
152             'body contents');
153             # nothing sent yet
154              
155             # some time later
156             $p->txn_commit();
157             # all messages are sent now
158              
159             Also:
160              
161             $p->txn_do(sub{
162             # do something...
163              
164             $p->send(@msg1);
165              
166             # do something else...
167              
168             $p->send(@msg2);
169             });
170             # all messages are sent now, unless an exception was thrown
171              
172             =head1 DESCRIPTION
173              
174             A subclass of L<Net::Stomp::Producer>, this class adds some
175             transaction-like behaviour.
176              
177             If you call L</txn_begin>, the messages sent through this object will
178             be kept in memory instead of being actually sent to the STOMP
179             connection. They will be sent when you call L</txn_commit>,
180              
181             There is also a L</txn_do> method, which takes a coderef and executes
182             it between a L</txn_begin> and a L</txn_commit>. If the coderef throws
183             an exception, the messages are forgotten.
184              
185             Please remember that this has nothing to do with STOMP transactions,
186             nor with the L<Net::Stomp::Producer/transactional_sending>
187             attribute. We could, in future, re-implement this to delegate
188             transactional behaviour to the broker via STOMP's C<BEGIN> and
189             C<COMMIT> frames. At the moment we do everything client-side.
190              
191             =head1 METHODS
192              
193             =head2 C<in_transaction>
194              
195             If true, we are inside a "transaction". You can change this with
196             L</txn_begin>, L</txn_commit> and L</txn_rollback>.
197              
198             =head2 C<txn_begin>
199              
200             Start a transaction, so that subsequent calls to C<send> or
201             C<transform_and_send> won't really send messages to the connection,
202             but keep them in memory.
203              
204             You can call this method multiple times; the transaction will end (and
205             messages will be sent) when you call L</txn_commit> as many times as
206             you called C<txn_begin>.
207              
208             Calling L</txn_rollback> will destroy the messages sent since the most
209             recent C<txn_begin>. In other words, transactions are properly
210             re-entrant.
211              
212             =head2 C<txn_commit>
213              
214             Commit the current transaction. If this was the outer-most
215             transaction, send all buffered messages.
216              
217             If you call this method outside of a transaction, you'll get a
218             L<Net::Stomp::Producer::Exceptions::Transactional> exception.
219              
220             =head2 C<txn_rollback>
221              
222             Roll back the current transaction, destroying all messages "sent"
223             inside it.
224              
225             If you call this method outside of a transaction, you'll get a
226             L<Net::Stomp::Producer::Exceptions::Transactional> exception.
227              
228             =head2 C<send>
229              
230             If not L</in_transaction>, send the message normally; otherwise, add
231             it to the in-memory buffer. See L<the base
232             method|Net::Stomp::Producer/send> for more details.
233              
234             =head2 C<txn_do>
235              
236             $p->txn_do(sub {
237             $p->send(@something);
238             });
239              
240             This method executes the given coderef between a L</txn_begin> and a
241             L</txn_commit>.
242              
243             If the coderef throws an exception, L</txn_rollback> will be called,
244             and the exception re-thrown.
245              
246             This method is re-entrant:
247              
248             $p->txn_do(sub {
249             $p->send(@msg1);
250             eval {
251             $p->txn_do(sub {
252             $p->send(@msg2);
253             die "boom\n";
254             });
255             };
256             $p->send(@msg3);
257             });
258              
259             The first and thind messages will be sent, the second one will not.
260              
261             =head1 AUTHOR
262              
263             Gianni Ceccarelli <gianni.ceccarelli@net-a-porter.com>
264              
265             =head1 COPYRIGHT AND LICENSE
266              
267             This software is copyright (c) 2012 by Net-a-porter.com.
268              
269             This is free software; you can redistribute it and/or modify it under
270             the same terms as the Perl 5 programming language system itself.
271              
272             =cut