File Coverage

blib/lib/MCE/Queue.pm
Criterion Covered Total %
statement 460 581 79.1
branch 190 362 52.4
condition 61 162 37.6
subroutine 49 57 85.9
pod 1 1 100.0
total 761 1163 65.4


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Hybrid (normal and priority) queues.
4             ##
5             ###############################################################################
6              
7             package MCE::Queue;
8              
9 37     37   169217 use strict;
  37         110  
  37         1280  
10 37     37   227 use warnings;
  37         97  
  37         1092  
11              
12 37     37   185 no warnings qw( threads recursion uninitialized );
  37         87  
  37         2474  
13              
14             our $VERSION = '1.888';
15              
16             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
17             ## no critic (TestingAndDebugging::ProhibitNoStrict)
18              
19 37     37   344 use Scalar::Util qw( looks_like_number );
  37         80  
  37         3138  
20 37     37   1173 use MCE::Util qw( $LF );
  37         76  
  37         3848  
21 37     37   1265 use MCE::Mutex ();
  37         83  
  37         9736  
22              
23             ###############################################################################
24             ## ----------------------------------------------------------------------------
25             ## Import routine.
26             ##
27             ###############################################################################
28              
29             our ($HIGHEST,$LOWEST, $FIFO,$LIFO, $LILO,$FILO) = (1,0, 1,0, 1,0);
30              
31             my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
32             my ($_def, $_imported) = ({});
33              
34             sub import {
35 39     39   372 my ($_class, $_pkg) = (shift, caller);
36              
37             ## Process module arguments.
38 39         467 my $_p = $_def->{$_pkg} = {
39             AWAIT => 0, PORDER => $HIGHEST, TYPE => $FIFO,
40             };
41              
42 39         172 while (my $_argument = shift) {
43 0         0 my $_arg = lc $_argument;
44              
45 0 0       0 $_p->{AWAIT } = shift, next if ( $_arg eq 'await' );
46 0 0       0 $_p->{PORDER} = shift, next if ( $_arg eq 'porder' );
47 0 0       0 $_p->{TYPE } = shift, next if ( $_arg eq 'type' );
48              
49 0         0 _croak("Error: ($_argument) invalid module option");
50             }
51              
52 39 100       604 return if $_imported++;
53              
54             ## Define public methods to internal methods.
55 37     37   276 no strict 'refs'; no warnings 'redefine';
  37     37   158  
  37         1450  
  37         230  
  37         109  
  37         12225  
56              
57 37 100 66     429 if ($INC{'MCE.pm'} && MCE->wid == 0) {
58 35         132 _mce_m_init();
59             }
60              
61 37         67 *{ 'MCE::Queue::await' } = \&_mce_m_await;
  37         291  
62 37         197 *{ 'MCE::Queue::clear' } = \&_mce_m_clear;
  37         299  
63 37         101 *{ 'MCE::Queue::end' } = \&_mce_m_end;
  37         122  
64 37         66 *{ 'MCE::Queue::enqueue' } = \&_mce_m_enqueue;
  37         234  
65 37         88 *{ 'MCE::Queue::enqueuep' } = \&_mce_m_enqueuep;
  37         217  
66 37         74 *{ 'MCE::Queue::dequeue' } = \&_mce_m_dequeue;
  37         106  
67 37         57 *{ 'MCE::Queue::dequeue_nb' } = \&_mce_m_dequeue_nb;
  37         121  
68 37         106 *{ 'MCE::Queue::dequeue_timed' } = \&_mce_m_dequeue_timed;
  37         132  
69 37         1068 *{ 'MCE::Queue::pending' } = \&_mce_m_pending;
  37         227  
70 37         96 *{ 'MCE::Queue::insert' } = \&_mce_m_insert;
  37         798  
71 37         142 *{ 'MCE::Queue::insertp' } = \&_mce_m_insertp;
  37         194  
72 37         109 *{ 'MCE::Queue::peek' } = \&_mce_m_peek;
  37         104  
73 37         69 *{ 'MCE::Queue::peekp' } = \&_mce_m_peekp;
  37         152  
74 37         54 *{ 'MCE::Queue::peekh' } = \&_mce_m_peekh;
  37         126  
75 37         127 *{ 'MCE::Queue::heap' } = \&_mce_m_heap;
  37         125  
76              
77 37         4920 return;
78             }
79              
80             ###############################################################################
81             ## ----------------------------------------------------------------------------
82             ## Define constants & variables.
83             ##
84             ###############################################################################
85              
86             use constant {
87 37         259972 OUTPUT_W_QUE => 'W~QUE', # Await from the queue
88             OUTPUT_C_QUE => 'C~QUE', # Clear the queue
89             OUTPUT_E_QUE => 'E~QUE', # End the queue
90              
91             OUTPUT_A_QUE => 'A~QUE', # Enqueue into queue (array)
92             OUTPUT_A_QUP => 'A~QUP', # Enqueue into queue (array (p))
93             OUTPUT_D_QUE => 'D~QUE', # Dequeue from queue (blocking)
94             OUTPUT_D_QUN => 'D~QUN', # Dequeue from queue (non-blocking)
95             OUTPUT_D_QUT => 'D~QUT', # Dequeue from queue (timed)
96              
97             OUTPUT_N_QUE => 'N~QUE', # Return the number of items
98             OUTPUT_I_QUE => 'I~QUE', # Insert into queue
99             OUTPUT_I_QUP => 'I~QUP', # Insert into queue (p)
100              
101             OUTPUT_P_QUE => 'P~QUE', # Peek into queue
102             OUTPUT_P_QUP => 'P~QUP', # Peek into queue (p)
103             OUTPUT_P_QUH => 'P~QUH', # Peek into heap
104             OUTPUT_H_QUE => 'H~QUE' # Return the heap
105 37     37   352 };
  37         108  
106              
107             ## Attributes used internally.
108             ## _qr_sock _qw_sock _datp _datq _dsem _heap _id _init_pid _porder _type
109             ## _ar_sock _aw_sock _asem _tsem
110              
111             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
112              
113             my %_valid_fields_new = map { $_ => 1 } qw(
114             await barrier fast gather porder queue type
115             );
116              
117             my $_all = {};
118             my $_qid = 0;
119              
120             sub CLONE {
121 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
122             }
123              
124             sub DESTROY {
125 23     23   177 my ($_Q) = @_;
126 23 50       264 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
127              
128 23 50       240 delete $_all->{ $_Q->{_id} } if exists $_Q->{_id};
129 23         397 undef $_Q->{_datp}, undef $_Q->{_datq}, undef $_Q->{_heap};
130              
131 23 100 66     319 if (exists $_Q->{_init_pid} && $_Q->{_init_pid} eq $_pid) {
132 16         108 MCE::Util::_destroy_socks($_Q, qw(_aw_sock _ar_sock _qw_sock _qr_sock));
133             }
134              
135 23         622 return;
136             }
137              
138             ###############################################################################
139             ## ----------------------------------------------------------------------------
140             ## New instance instantiation.
141             ##
142             ###############################################################################
143              
144             sub new {
145 51     51 1 2462 my ($_class, %_argv) = @_;
146 51         355 my $_pkg = caller;
147              
148 51         207 @_ = ();
149              
150 51   33     147 my $_Q = {}; bless($_Q, ref($_class) || $_class);
  51         486  
151              
152 51         417 for my $_p (keys %_argv) {
153             _croak("Queue: ($_p) is not a valid constructor argument")
154 45 50       188 unless (exists $_valid_fields_new{$_p});
155             }
156              
157 51         336 $_Q->{_asem} = 0; # Semaphore count variable for the ->await method
158 51         292 $_Q->{_datp} = {}; # Priority data { p1 => [ ], p2 => [ ], pN => [ ] }
159 51         182 $_Q->{_heap} = []; # Priority heap [ pN, p2, p1 ] in heap order
160             # fyi, _datp will always dequeue before _datq
161              
162             $_Q->{_await} = (defined $_argv{await})
163 51 100 50     648 ? $_argv{await} : $_def->{$_pkg}{AWAIT} || 0;
164              
165             $_Q->{_porder} = (defined $_argv{porder})
166 51 100 33     436 ? $_argv{porder} : $_def->{$_pkg}{PORDER} || $HIGHEST;
167              
168             $_Q->{_type} = (defined $_argv{type})
169 51 100 33     343 ? $_argv{type} : $_def->{$_pkg}{TYPE} || $FIFO;
170              
171             ## -------------------------------------------------------------------------
172              
173 51 100       227 if (exists $_argv{queue}) {
174             _croak('Queue: (queue) is not an ARRAY reference')
175 7 50       28 unless (ref $_argv{queue} eq 'ARRAY');
176              
177 7         16 $_Q->{_datq} = $_argv{queue};
178             }
179             else {
180 44         161 $_Q->{_datq} = [];
181             }
182              
183 51 50       271 if (exists $_argv{gather}) {
184             _croak('Queue: (gather) is not a CODE reference')
185 0 0       0 unless (ref $_argv{gather} eq 'CODE');
186              
187 0         0 $_Q->{gather} = $_argv{gather};
188             }
189              
190             ## -------------------------------------------------------------------------
191              
192 51         801 $_Q->{_qr_mutex} = MCE::Mutex->new();
193 51 50       385 $_Q->{_init_pid} = $_tid ? $$ .'.'. $_tid : $$;
194 51         208 $_Q->{_id} = ++$_qid; $_all->{$_qid} = $_Q;
  51         195  
195 51         153 $_Q->{_dsem} = 0;
196              
197 51         335 MCE::Util::_sock_pair($_Q, qw(_qr_sock _qw_sock), undef, 1);
198 51 100       209 MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock), undef, 1) if $_Q->{_await};
199              
200 51         274 return $_Q;
201             }
202              
203             ###############################################################################
204             ## ----------------------------------------------------------------------------
205             ## Private methods.
206             ##
207             ###############################################################################
208              
209             sub _croak {
210 0 0   0   0 unless ($INC{'MCE.pm'}) {
211 0         0 $\ = undef; require Carp; goto &Carp::croak;
  0         0  
  0         0  
212             } else {
213 0         0 goto &MCE::_croak;
214             }
215             }
216              
217             ## Add items to the tail of the queue with priority level.
218              
219             sub _enqueuep {
220 83     83   181 my ($_Q, $_p) = (shift, shift);
221              
222             ## Enlist priority into the heap.
223 83 100 100     282 if (!exists $_Q->{_datp}->{$_p} || @{ $_Q->{_datp}->{$_p} } == 0) {
  36         136  
224              
225 65 100       85 unless (scalar @{ $_Q->{_heap} }) {
  65 100       180  
226 50         84 push @{ $_Q->{_heap} }, $_p;
  50         195  
227             }
228 0         0 elsif ($_Q->{_porder}) {
229 9         100 $_Q->_heap_insert_high($_p);
230             }
231             else {
232 6         43 $_Q->_heap_insert_low($_p);
233             }
234             }
235              
236             ## Append item(s) into the queue.
237 83         146 push @{ $_Q->{_datp}->{$_p} }, @_;
  83         288  
238              
239 83         163 return;
240             }
241              
242             ## Return one item from the queue.
243              
244             sub _dequeue {
245 894     894   1772 my ($_Q) = @_;
246              
247             ## Return item from the non-priority queue.
248 894 100       1304 unless (scalar @{ $_Q->{_heap} }) {
  894         2018  
249             return ($_Q->{_type})
250 800 100       1739 ? shift @{ $_Q->{_datq} } : pop @{ $_Q->{_datq} };
  786         2682  
  14         62  
251             }
252              
253 94         172 my $_p = $_Q->{_heap}->[0];
254              
255             ## Delist priority from the heap when 1 item remains.
256 94 100       119 shift @{ $_Q->{_heap} } if (@{ $_Q->{_datp}->{$_p} } == 1);
  47         83  
  94         265  
257              
258             ## Return item from the priority queue.
259             return ($_Q->{_type})
260 94 100       237 ? shift @{ $_Q->{_datp}->{$_p} } : pop @{ $_Q->{_datp}->{$_p} };
  70         182  
  24         101  
261             }
262              
263             ## Helper method for getting the reference to the underlying array.
264             ## Use with test scripts for comparing data only (not a public API).
265              
266             sub _get_aref {
267 50     50   6453 my ($_Q, $_p) = @_;
268              
269 50 50 66     414 return if ($INC{'MCE.pm'} && !defined $MCE::MCE->{_wid});
270 50 50 66     197 return if (defined $MCE::MCE && $MCE::MCE->{_wid});
271              
272 50 100       124 if (defined $_p) {
273 45 50 33     250 _croak('Queue: (get_aref priority) is not an integer')
274             if (!looks_like_number($_p) || int($_p) != $_p);
275              
276 45 100       180 return undef unless (exists $_Q->{_datp}->{$_p});
277 36         453 return $_Q->{_datp}->{$_p};
278             }
279              
280 5         35 return $_Q->{_datq};
281             }
282              
283             ## Insert priority into the heap. A lower priority level comes first.
284              
285             sub _heap_insert_low {
286 6     6   24 my ($_Q, $_p) = @_;
287              
288             ## Insert priority at the head of the heap.
289 6 50       33 if ($_p < $_Q->{_heap}->[0]) {
    100          
290 0         0 unshift @{ $_Q->{_heap} }, $_p;
  0         0  
291             }
292              
293             ## Insert priority at the end of the heap.
294             elsif ($_p > $_Q->{_heap}->[-1]) {
295 4         12 push @{ $_Q->{_heap} }, $_p;
  4         22  
296             }
297              
298             ## Insert priority through binary search.
299             else {
300 2         4 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  2         9  
  2         6  
301              
302 2         19 while ($_lower < $_upper) {
303 4         13 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
304 4 100       36 if ($_p > $_Q->{_heap}->[$_midpoint]) {
305 2         6 $_lower = $_midpoint + 1;
306             } else {
307 2         9 $_upper = $_midpoint;
308             }
309             }
310              
311             ## Insert priority into the heap.
312 2         7 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  2         17  
313             }
314              
315 6         18 return;
316             }
317              
318             ## Insert priority into the heap. A higher priority level comes first.
319              
320             sub _heap_insert_high {
321 9     9   43 my ($_Q, $_p) = @_;
322              
323             ## Insert priority at the head of the heap.
324 9 100       45 if ($_p > $_Q->{_heap}->[0]) {
    50          
325 6         21 unshift @{ $_Q->{_heap} }, $_p;
  6         42  
326             }
327              
328             ## Insert priority at the end of the heap.
329             elsif ($_p < $_Q->{_heap}->[-1]) {
330 0         0 push @{ $_Q->{_heap} }, $_p;
  0         0  
331             }
332              
333             ## Insert priority through binary search.
334             else {
335 3         7 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  3         14  
  3         20  
336              
337 3         24 while ($_lower < $_upper) {
338 6         25 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
339 6 100       19 if ($_p < $_Q->{_heap}->[$_midpoint]) {
340 3         22 $_lower = $_midpoint + 1;
341             } else {
342 3         13 $_upper = $_midpoint;
343             }
344             }
345              
346             ## Insert priority into the heap.
347 3         11 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  3         24  
348             }
349              
350 9         31 return;
351             }
352              
353             ###############################################################################
354             ## ----------------------------------------------------------------------------
355             ## Output routines for the manager process.
356             ##
357             ###############################################################################
358              
359             {
360             my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_cnt, $_i, $_id);
361             my ($_len, $_p, $_t, $_Q, $_has_data, $_pending);
362              
363             my %_output_function = (
364              
365             OUTPUT_W_QUE.$LF => sub { # Await from the queue
366             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
367              
368             chomp($_id = <$_DAU_R_SOCK>),
369             chomp($_t = <$_DAU_R_SOCK>);
370              
371             $_Q = $_all->{$_id};
372             $_Q->{_tsem} = $_t;
373              
374             if ($_Q->pending() <= $_t) {
375             syswrite($_Q->{_aw_sock}, $LF);
376             } else {
377             $_Q->{_asem} += 1;
378             }
379              
380             print {$_DAU_R_SOCK} $LF;
381              
382             return;
383             },
384              
385             OUTPUT_C_QUE.$LF => sub { # Clear the queue
386             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
387              
388             chomp($_id = <$_DAU_R_SOCK>);
389             _mce_m_clear($_all->{$_id});
390              
391             print {$_DAU_R_SOCK} $LF;
392              
393             return;
394             },
395              
396             OUTPUT_E_QUE.$LF => sub { # End the queue
397             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
398              
399             chomp($_id = <$_DAU_R_SOCK>);
400             _mce_m_end($_all->{$_id});
401              
402             print {$_DAU_R_SOCK} $LF;
403              
404             return;
405             },
406              
407             ## ----------------------------------------------------------------------
408              
409             OUTPUT_A_QUE.$LF => sub { # Enqueue into queue (A)
410             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
411              
412             chomp($_id = <$_DAU_R_SOCK>),
413             chomp($_len = <$_DAU_R_SOCK>);
414              
415             read $_DAU_R_SOCK, my($_buf), $_len;
416              
417             $_Q = $_all->{$_id};
418              
419             if ($_Q->{gather}) {
420             local $_ = $_MCE->{thaw}($_buf);
421             $_Q->{gather}($_Q, @{ $_ });
422             }
423             else {
424             $_Q->_mce_m_enqueue(@{ $_MCE->{thaw}($_buf) });
425             }
426              
427             return;
428             },
429              
430             OUTPUT_A_QUP.$LF => sub { # Enqueue into queue (A,p)
431             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
432              
433             chomp($_id = <$_DAU_R_SOCK>),
434             chomp($_p = <$_DAU_R_SOCK>),
435             chomp($_len = <$_DAU_R_SOCK>);
436              
437             read $_DAU_R_SOCK, my($_buf), $_len;
438              
439             $_Q = $_all->{$_id};
440             $_Q->_mce_m_enqueuep($_p, @{ $_MCE->{thaw}($_buf) });
441              
442             return;
443             },
444              
445             ## ----------------------------------------------------------------------
446              
447             OUTPUT_D_QUE.$LF => sub { # Dequeue from queue (B)
448             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
449              
450             chomp($_id = <$_DAU_R_SOCK>),
451             chomp($_cnt = <$_DAU_R_SOCK>);
452              
453             $_cnt = 0 if ($_cnt == 1);
454             $_Q = $_all->{$_id};
455              
456             my (@_items, $_buf);
457              
458             if ($_cnt) {
459             my $_pending = @{ $_Q->{_datq} };
460              
461             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
462             for my $_h (@{ $_Q->{_heap} }) {
463             $_pending += @{ $_Q->{_datp}->{$_h} };
464             }
465             }
466             $_cnt = $_pending if $_pending < $_cnt;
467              
468             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
469             }
470             else {
471             $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
472             $_buf = $_Q->_dequeue();
473             }
474              
475             if ($_cnt) {
476             $_buf = $_MCE->{freeze}(\@_items);
477             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
478             }
479             elsif ($_has_data) {
480             $_buf = $_MCE->{freeze}([ $_buf ]);
481             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
482             }
483             elsif (exists $_Q->{_ended}) {
484             print {$_DAU_R_SOCK} '-2'.$LF;
485             }
486             else {
487             print {$_DAU_R_SOCK} '-1'.$LF;
488             $_Q->{_dsem} += 1;
489             }
490              
491             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
492             for my $_i (1 .. $_Q->{_asem}) {
493             syswrite($_Q->{_aw_sock}, $LF);
494             }
495             $_Q->{_asem} = 0;
496             }
497              
498             return;
499             },
500              
501             OUTPUT_D_QUN.$LF => sub { # Dequeue from queue (NB)
502             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
503              
504             chomp($_id = <$_DAU_R_SOCK>),
505             chomp($_cnt = <$_DAU_R_SOCK>);
506              
507             $_Q = $_all->{$_id};
508              
509             if ($_cnt == 1) {
510             my $_buf = $_Q->_dequeue();
511              
512             if (defined $_buf) {
513             $_buf = $_MCE->{freeze}([ $_buf ]);
514             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
515             }
516             else {
517             print {$_DAU_R_SOCK} '-1'.$LF;
518             }
519             }
520             else {
521             my @_items;
522             my $_pending = @{ $_Q->{_datq} };
523              
524             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
525             for my $_h (@{ $_Q->{_heap} }) {
526             $_pending += @{ $_Q->{_datp}->{$_h} };
527             }
528             }
529             $_cnt = $_pending if $_pending < $_cnt;
530              
531             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
532              
533             if ($_cnt) {
534             my $_buf = $_MCE->{freeze}(\@_items);
535             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
536             }
537             else {
538             print {$_DAU_R_SOCK} '-1'.$LF;
539             }
540             }
541              
542             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
543             for my $_i (1 .. $_Q->{_asem}) {
544             syswrite($_Q->{_aw_sock}, $LF);
545             }
546             $_Q->{_asem} = 0;
547             }
548              
549             return;
550             },
551              
552             OUTPUT_D_QUT.$LF => sub { # Dequeue from queue (Timed)
553             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
554              
555             chomp($_id = <$_DAU_R_SOCK>);
556              
557             $_Q = $_all->{$_id};
558             $_Q->{_dsem} -= 1 if $_Q->{_dsem};
559              
560             print {$_DAU_R_SOCK} $LF;
561              
562             return;
563             },
564              
565             ## ----------------------------------------------------------------------
566              
567             OUTPUT_N_QUE.$LF => sub { # Return number of items
568             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
569              
570             chomp($_id = <$_DAU_R_SOCK>);
571              
572             print {$_DAU_R_SOCK} $_all->{$_id}->_mce_m_pending().$LF;
573              
574             return;
575             },
576              
577             OUTPUT_I_QUE.$LF => sub { # Insert into queue
578             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
579              
580             chomp($_id = <$_DAU_R_SOCK>),
581             chomp($_i = <$_DAU_R_SOCK>),
582             chomp($_len = <$_DAU_R_SOCK>);
583              
584             read $_DAU_R_SOCK, my($_buf), $_len;
585              
586             $_Q = $_all->{$_id};
587             $_Q->_mce_m_insert($_i, @{ $_MCE->{thaw}($_buf) });
588              
589             return;
590             },
591              
592             OUTPUT_I_QUP.$LF => sub { # Insert into queue (p)
593             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
594              
595             chomp($_id = <$_DAU_R_SOCK>),
596             chomp($_p = <$_DAU_R_SOCK>),
597             chomp($_i = <$_DAU_R_SOCK>),
598             chomp($_len = <$_DAU_R_SOCK>);
599              
600             read $_DAU_R_SOCK, my($_buf), $_len;
601              
602             $_Q = $_all->{$_id};
603             $_Q->_mce_m_insertp($_p, $_i, @{ $_MCE->{thaw}($_buf) });
604              
605             return;
606             },
607              
608             ## ----------------------------------------------------------------------
609              
610             OUTPUT_P_QUE.$LF => sub { # Peek into queue
611             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
612              
613             chomp($_id = <$_DAU_R_SOCK>),
614             chomp($_i = <$_DAU_R_SOCK>);
615              
616             $_Q = $_all->{$_id};
617             $_buf = $_Q->_mce_m_peek($_i);
618              
619             if (defined $_buf) {
620             $_buf = $_MCE->{freeze}([ $_buf ]);
621             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
622             } else {
623             print {$_DAU_R_SOCK} '-1'.$LF;
624             }
625              
626             return;
627             },
628              
629             OUTPUT_P_QUP.$LF => sub { # Peek into queue (p)
630             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
631              
632             chomp($_id = <$_DAU_R_SOCK>),
633             chomp($_p = <$_DAU_R_SOCK>),
634             chomp($_i = <$_DAU_R_SOCK>);
635              
636             $_Q = $_all->{$_id};
637             $_buf = $_Q->_mce_m_peekp($_p, $_i);
638              
639             if (defined $_buf) {
640             $_buf = $_MCE->{freeze}([ $_buf ]);
641             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
642             } else {
643             print {$_DAU_R_SOCK} '-1'.$LF;
644             }
645              
646             return;
647             },
648              
649             OUTPUT_P_QUH.$LF => sub { # Peek into heap
650             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
651              
652             chomp($_id = <$_DAU_R_SOCK>),
653             chomp($_i = <$_DAU_R_SOCK>);
654              
655             $_Q = $_all->{$_id};
656             $_buf = $_Q->_mce_m_peekh($_i);
657              
658             if (defined $_buf) {
659             $_buf = $_MCE->{freeze}([ $_buf ]);
660             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
661             } else {
662             print {$_DAU_R_SOCK} '-1'.$LF;
663             }
664              
665             return;
666             },
667              
668             OUTPUT_H_QUE.$LF => sub { # Return the heap
669             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
670              
671             chomp($_id = <$_DAU_R_SOCK>);
672              
673             $_Q = $_all->{$_id};
674             $_buf = $_MCE->{freeze}([ $_Q->_mce_m_heap() ]);
675              
676             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
677              
678             return;
679             },
680              
681             );
682              
683             ## -------------------------------------------------------------------------
684              
685             sub _mce_m_loop_begin {
686 87     87   401 ($_MCE, $_DAU_R_SOCK_REF) = @_;
687              
688 87         288 return;
689             }
690              
691             sub _mce_m_loop_end {
692 87     87   354 $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = $_cnt = $_i = $_id =
693             $_len = $_p = $_Q = undef;
694              
695 87         221 return;
696             }
697              
698             sub _mce_m_init {
699 35     35   192 MCE::_attach_plugin(
700             \%_output_function, \&_mce_m_loop_begin, \&_mce_m_loop_end,
701             \&_mce_w_init
702             );
703              
704 35         73 return;
705             }
706              
707             }
708              
709             ###############################################################################
710             ## ----------------------------------------------------------------------------
711             ## Methods for the manager process.
712             ##
713             ###############################################################################
714              
715             ## await ( pending_threshold )
716              
717             sub _mce_m_await {
718             # Handled by the manager process when called by MCE workers.
719 0     0   0 return;
720             }
721              
722             ## clear ( )
723              
724             sub _mce_m_clear {
725 28     28   2569 my ($_Q) = @_;
726              
727 28         48 %{ $_Q->{_datp} } = ();
  28         104  
728 28         52 @{ $_Q->{_datq} } = ();
  28         77  
729 28         52 @{ $_Q->{_heap} } = ();
  28         51  
730              
731 28         69 return;
732             }
733              
734             ## end ( )
735              
736             sub _mce_m_end {
737 0     0   0 my ($_Q) = @_;
738              
739 0 0       0 if (!exists $_Q->{_ended}) {
740 0         0 for my $_i (1 .. $_Q->{_dsem}) { syswrite($_Q->{_qw_sock}, $LF) }
  0         0  
741 0         0 $_Q->{_dsem} = 0, $_Q->{_ended} = undef;
742             }
743              
744 0         0 return;
745             }
746              
747             ## enqueue ( item [, item, ... ] )
748              
749             sub _mce_m_enqueue {
750 569     569   2528 my $_Q = shift;
751              
752 569 50       1371 return unless (scalar @_);
753              
754 569 50       20103 if (exists $_Q->{_ended}) {
755 0         0 warn "Queue: (enqueue) called on queue that has been 'end'ed\n";
756 0         0 return;
757             }
758              
759 569 100       1458 if ($_Q->{_dsem}) {
760 146         558 for my $_i (1 .. scalar @_) {
761 150         6574 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
762 150 100       853 last unless $_Q->{_dsem};
763             }
764             }
765              
766             ## Append item(s) into the queue.
767 569         829 push @{ $_Q->{_datq} }, @_;
  569         1841  
768              
769 569         1725 return;
770             }
771              
772             ## enqueuep ( priority, item [, item, ... ] )
773              
774             sub _mce_m_enqueuep {
775 74     74   235 my ($_Q, $_p) = (shift, shift);
776              
777 74 50 33     527 _croak('Queue: (enqueuep priority) is not an integer')
778             if (!looks_like_number($_p) || int($_p) != $_p);
779              
780 74 50       207 return unless (scalar @_);
781              
782 74 50       197 if (exists $_Q->{_ended}) {
783 0         0 warn "Queue: (enqueuep) called on queue that has been 'end'ed\n";
784 0         0 return;
785             }
786              
787 74 50       165 if ($_Q->{_dsem}) {
788 0         0 for my $_i (1 .. scalar @_) {
789 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
790 0 0       0 last unless $_Q->{_dsem};
791             }
792             }
793              
794 74         374 $_Q->_enqueuep($_p, @_);
795              
796 74         97 return;
797             }
798              
799             ## dequeue ( )
800             ## dequeue ( count )
801              
802             sub _mce_m_dequeue {
803 16     16   1455 my ($_Q, $_cnt) = @_;
804 16         29 my (@_items, $_has_data, $_buf);
805              
806 16 100 100     60 if (defined $_cnt && $_cnt ne '1') {
807 6 50 33     48 _croak('Queue: (dequeue count argument) is not valid')
      33        
808             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
809              
810 6         9 my $_pending = @{ $_Q->{_datq} };
  6         15  
811              
812 6 50 100     16 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  4         13  
813 4         5 for my $_h (@{ $_Q->{_heap} }) {
  4         9  
814 10         12 $_pending += @{ $_Q->{_datp}->{$_h} };
  10         19  
815             }
816             }
817 6 50       14 $_cnt = $_pending if $_pending < $_cnt;
818              
819 6         16 for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
  28         63  
820             }
821             else {
822 10 50 66     13 $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
823 10         23 $_buf = $_Q->_dequeue();
824             }
825              
826 16 100       52 return @_items if (scalar @_items);
827 10 50       41 return $_buf if ($_has_data);
828 0 0       0 return () if (exists $_Q->{_ended});
829              
830 0         0 $_Q->{_dsem} += 1, MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
831              
832 0         0 goto \&_mce_m_dequeue;
833             }
834              
835             ## dequeue_nb ( )
836             ## dequeue_nb ( count )
837              
838             sub _mce_m_dequeue_nb {
839 4     4   12 my ($_Q, $_cnt) = @_;
840              
841 4 50 33     17 if (defined $_cnt && $_cnt ne '1') {
842 0 0 0     0 _croak('Queue: (dequeue_nb count argument) is not valid')
      0        
843             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
844              
845 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
846              
847 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
848 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
849 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
850             }
851             }
852              
853 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
854              
855 0         0 return map { $_Q->_dequeue() } 1 .. $_cnt;
  0         0  
856             }
857              
858 4         12 my $_buf = $_Q->_dequeue();
859              
860 4 50       23 return defined($_buf) ? $_buf : ();
861             }
862              
863             ## dequeue_timed ( timeout )
864             ## dequeue_timed ( timeout, count )
865              
866             sub _mce_m_dequeue_timed {
867 4     4   9 my ($_Q, $_timeout, $_cnt) = @_;
868              
869 4 50       18 if (defined $_timeout) {
870 0 0       0 _croak('Queue: (dequeue_timed timeout argument) is not valid')
871             if (!looks_like_number($_timeout));
872             }
873              
874 4 50 33     14 if (defined $_cnt && $_cnt ne '1') {
875 0 0 0     0 _croak('Queue: (dequeue_timed count argument) is not valid')
      0        
876             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
877              
878 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
879              
880 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
881 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
882 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
883             }
884             }
885              
886 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
887              
888 0         0 return map { $_Q->_dequeue() } 1 .. $_cnt;
  0         0  
889             }
890              
891 4         16 my $_buf = $_Q->_dequeue();
892              
893 4 50       24 return defined($_buf) ? $_buf : ();
894             }
895              
896             ## pending ( )
897              
898             sub _mce_m_pending {
899 14     14   1118 my ($_Q) = @_;
900 14         26 my $_pending = @{ $_Q->{_datq} };
  14         51  
901              
902 14 100       24 if (scalar @{ $_Q->{_heap} }) {
  14         62  
903 9         20 for my $_h (@{ $_Q->{_heap} }) {
  9         113  
904 9         27 $_pending += @{ $_Q->{_datp}->{$_h} };
  9         37  
905             }
906             }
907              
908             return (exists $_Q->{_ended})
909 14 0       483 ? $_pending ? $_pending : undef
    50          
910             : $_pending;
911             }
912              
913             ## insert ( index, item [, item, ... ] )
914              
915             sub _mce_m_insert {
916 50     50   147 my ($_Q, $_i) = (shift, shift);
917              
918 50 50 33     263 _croak('Queue: (insert index) is not an integer')
919             if (!looks_like_number($_i) || int($_i) != $_i);
920              
921 50 50       106 return unless (scalar @_);
922              
923 50 50       93 if (exists $_Q->{_ended}) {
924 0         0 warn "Queue: (insert) called on queue that has been 'end'ed\n";
925 0         0 return;
926             }
927              
928 50 50       93 if ($_Q->{_dsem}) {
929 0         0 for my $_i (1 .. scalar @_) {
930 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
931 0 0       0 last unless $_Q->{_dsem};
932             }
933             }
934              
935 50 100       72 if (abs($_i) > scalar @{ $_Q->{_datq} }) {
  50         98  
936 10 100       31 if ($_i >= 0) {
937 5 100       14 if ($_Q->{_type}) {
938 3         6 push @{ $_Q->{_datq} }, @_;
  3         15  
939             } else {
940 2         4 unshift @{ $_Q->{_datq} }, @_;
  2         7  
941             }
942             }
943             else {
944 5 100       21 if ($_Q->{_type}) {
945 3         6 unshift @{ $_Q->{_datq} }, @_;
  3         13  
946             } else {
947 2         16 push @{ $_Q->{_datq} }, @_;
  2         17  
948             }
949             }
950             }
951             else {
952 40 100       77 if (!$_Q->{_type}) {
953             $_i = ($_i >= 0)
954 16 100       42 ? scalar(@{ $_Q->{_datq} }) - $_i
  10         30  
955             : abs($_i);
956             }
957 40         47 splice @{ $_Q->{_datq} }, $_i, 0, @_;
  40         211  
958             }
959              
960 50         92 return;
961             }
962              
963             ## insertp ( priority, index, item [, item, ... ] )
964              
965             sub _mce_m_insertp {
966 90     90   228 my ($_Q, $_p, $_i) = (shift, shift, shift);
967              
968 90 50 33     654 _croak('Queue: (insertp priority) is not an integer')
969             if (!looks_like_number($_p) || int($_p) != $_p);
970 90 50 33     409 _croak('Queue: (insertp index) is not an integer')
971             if (!looks_like_number($_i) || int($_i) != $_i);
972              
973 90 50       182 return unless (scalar @_);
974              
975 90 50       200 if (exists $_Q->{_ended}) {
976 0         0 warn "Queue: (insertp) called on queue that has been 'end'ed\n";
977 0         0 return;
978             }
979              
980 90 50       197 if ($_Q->{_dsem}) {
981 0         0 for my $_i (1 .. scalar @_) {
982 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
983 0 0       0 last unless $_Q->{_dsem};
984             }
985             }
986              
987 90 100 50     260 if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) {
  90         325  
988              
989 81 100       158 if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} }) {
  81         211  
990 18 100       46 if ($_i >= 0) {
991 9 100       29 if ($_Q->{_type}) {
992 5         10 push @{ $_Q->{_datp}->{$_p} }, @_;
  5         23  
993             } else {
994 4         32 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  4         63  
995             }
996             }
997             else {
998 9 100       31 if ($_Q->{_type}) {
999 5         10 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  5         29  
1000             } else {
1001 4         11 push @{ $_Q->{_datp}->{$_p} }, @_;
  4         24  
1002             }
1003             }
1004             }
1005             else {
1006 63 100       150 if (!$_Q->{_type}) {
1007             $_i = ($_i >=0)
1008 28 100       121 ? scalar(@{ $_Q->{_datp}->{$_p} }) - $_i
  16         108  
1009             : abs($_i);
1010             }
1011 63         104 splice @{ $_Q->{_datp}->{$_p} }, $_i, 0, @_;
  63         221  
1012             }
1013             }
1014             else {
1015 9         28 $_Q->_enqueuep($_p, @_);
1016             }
1017              
1018 90         184 return;
1019             }
1020              
1021             ## peek ( index )
1022             ## peek ( )
1023              
1024             sub _mce_m_peek {
1025 55     55   135 my ($_Q, $_i) = @_;
1026              
1027 55 100       114 if ($_i) {
1028 40 50 33     303 _croak('Queue: (peek index) is not an integer')
1029             if (!looks_like_number($_i) || int($_i) != $_i);
1030             }
1031 15         24 else { $_i = 0 }
1032              
1033 55 100       91 return undef if (abs($_i) > scalar @{ $_Q->{_datq} });
  55         178  
1034              
1035 40 100       97 if (!$_Q->{_type}) {
1036             $_i = ($_i >= 0)
1037 16 100       46 ? scalar(@{ $_Q->{_datq} }) - ($_i + 1)
  10         31  
1038             : abs($_i + 1);
1039             }
1040              
1041 40         143 return $_Q->{_datq}->[$_i];
1042             }
1043              
1044             ## peekp ( priority, index )
1045             ## peekp ( priority )
1046              
1047             sub _mce_m_peekp {
1048 99     99   248 my ($_Q, $_p, $_i) = @_;
1049              
1050 99 100       233 if ($_i) {
1051 72 50 33     551 _croak('Queue: (peekp index) is not an integer')
1052             if (!looks_like_number($_i) || int($_i) != $_i);
1053             }
1054 27         49 else { $_i = 0 }
1055              
1056 99 50 33     572 _croak('Queue: (peekp priority) is not an integer')
1057             if (!looks_like_number($_p) || int($_p) != $_p);
1058              
1059 99 50       276 return undef unless (exists $_Q->{_datp}->{$_p});
1060 99 100       155 return undef if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} });
  99         281  
1061              
1062 72 100       162 if (!$_Q->{_type}) {
1063             $_i = ($_i >= 0)
1064 32 100       95 ? scalar(@{ $_Q->{_datp}->{$_p} }) - ($_i + 1)
  20         69  
1065             : abs($_i + 1);
1066             }
1067              
1068 72         238 return $_Q->{_datp}->{$_p}->[$_i];
1069             }
1070              
1071             ## peekh ( index )
1072             ## peekh ( )
1073              
1074             sub _mce_m_peekh {
1075 10     10   1211 my ($_Q, $_i) = @_;
1076              
1077 10 100       27 if ($_i) {
1078 5 50 33     83 _croak('Queue: (peekh index) is not an integer')
1079             if (!looks_like_number($_i) || int($_i) != $_i);
1080             }
1081 5         9 else { $_i = 0 }
1082              
1083 10 50       40 return undef if (abs($_i) > scalar @{ $_Q->{_heap} });
  10         45  
1084 10         40 return $_Q->{_heap}->[$_i];
1085             }
1086              
1087             ## heap ( )
1088              
1089             sub _mce_m_heap {
1090 5     5   20 return @{ shift->{_heap} };
  5         52  
1091             }
1092              
1093             ###############################################################################
1094             ## ----------------------------------------------------------------------------
1095             ## Methods for the worker process.
1096             ##
1097             ###############################################################################
1098              
1099             {
1100             my (
1101             $_MCE, $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_chn, $_lock_chn,
1102             $_dat_ex, $_dat_un, $_len, $_pending
1103             );
1104              
1105             my $_req1 = sub {
1106             local $\ = undef if (defined $\);
1107              
1108             $_dat_ex->() if $_lock_chn;
1109             print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1110             print({$_DAU_W_SOCK} $_[1], $_[2]);
1111              
1112             $_dat_un->() if $_lock_chn;
1113             };
1114              
1115             my $_req2 = sub {
1116             local $\ = undef if (defined $\);
1117             local $/ = $LF if ($/ ne $LF);
1118              
1119             $_dat_ex->() if $_lock_chn;
1120             print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1121             print({$_DAU_W_SOCK} $_[1]);
1122             <$_DAU_W_SOCK>;
1123              
1124             $_dat_un->() if $_lock_chn;
1125             };
1126              
1127             my $_req3 = sub {
1128             local $\ = undef if (defined $\);
1129             local $/ = $LF if ($/ ne $LF);
1130              
1131             $_dat_ex->() if $_lock_chn;
1132             print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1133             print({$_DAU_W_SOCK} $_[1]);
1134              
1135             chomp($_len = <$_DAU_W_SOCK>);
1136              
1137             if ($_len < 0) {
1138             $_dat_un->() if $_lock_chn;
1139             return defined($_[3]) ? () : undef;
1140             }
1141              
1142             read $_DAU_W_SOCK, my($_buf), $_len;
1143             $_dat_un->() if $_lock_chn;
1144              
1145             ($_[2] == 1)
1146             ? ($_MCE->{thaw}($_buf))->[0]
1147             : @{ $_MCE->{thaw}($_buf) };
1148             };
1149              
1150             sub _mce_w_init {
1151 28     28   466 ($_MCE) = @_;
1152 28         250 $_chn = $_MCE->{_chn};
1153 28         206 $_DAT_LOCK = $_MCE->{_dat_lock};
1154 28         196 $_DAT_W_SOCK = $_MCE->{_dat_w_sock}->[0];
1155 28         146 $_DAU_W_SOCK = $_MCE->{_dat_w_sock}->[$_chn];
1156 28         154 $_lock_chn = $_MCE->{_lock_chn};
1157              
1158 28 50       195 if ($_lock_chn) {
1159             # inlined for performance
1160             $_dat_ex = sub {
1161 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1162             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
1163 0 0       0 if $_is_MSWin32;
1164             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
1165 0 0       0 unless $_DAT_LOCK->{ $_pid };
1166 0         0 };
1167             $_dat_un = sub {
1168 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1169             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
1170 0 0       0 if $_DAT_LOCK->{ $_pid };
1171 0         0 };
1172             }
1173              
1174 28         1567 $_all = {};
1175              
1176 37     37   378 no strict 'refs'; no warnings 'redefine';
  37     37   74  
  37         1458  
  37         281  
  37         106  
  37         104445  
1177              
1178 28         229 *{ 'MCE::Queue::await' } = \&_mce_w_await;
  28         2529  
1179 28         438 *{ 'MCE::Queue::clear' } = \&_mce_w_clear;
  28         610  
1180 28         153 *{ 'MCE::Queue::end' } = \&_mce_w_end;
  28         511  
1181 28         224 *{ 'MCE::Queue::enqueue' } = \&_mce_w_enqueue;
  28         495  
1182 28         181 *{ 'MCE::Queue::enqueuep' } = \&_mce_w_enqueuep;
  28         656  
1183 28         194 *{ 'MCE::Queue::dequeue' } = \&_mce_w_dequeue;
  28         562  
1184 28         161 *{ 'MCE::Queue::dequeue_nb' } = \&_mce_w_dequeue_nb;
  28         336  
1185 28         505 *{ 'MCE::Queue::dequeue_timed' } = \&_mce_w_dequeue_timed;
  28         433  
1186 28         322 *{ 'MCE::Queue::pending' } = \&_mce_w_pending;
  28         421  
1187 28         157 *{ 'MCE::Queue::insert' } = \&_mce_w_insert;
  28         332  
1188 28         140 *{ 'MCE::Queue::insertp' } = \&_mce_w_insertp;
  28         529  
1189 28         216 *{ 'MCE::Queue::peek' } = \&_mce_w_peek;
  28         345  
1190 28         236 *{ 'MCE::Queue::peekp' } = \&_mce_w_peekp;
  28         616  
1191 28         182 *{ 'MCE::Queue::peekh' } = \&_mce_w_peekh;
  28         268  
1192 28         240 *{ 'MCE::Queue::heap' } = \&_mce_w_heap;
  28         372  
1193              
1194 28         249 return;
1195             }
1196              
1197             ## -------------------------------------------------------------------------
1198              
1199             sub _mce_w_await {
1200 0   0 0   0 my $_Q = shift; my $_t = shift || 0;
  0         0  
1201              
1202 0 0       0 return $_Q->_mce_m_await() if (exists $_all->{ $_Q->{_id} });
1203              
1204             _croak('Queue: (await) is not enabled for this queue')
1205 0 0       0 unless ($_Q->{_await});
1206 0 0 0     0 _croak('Queue: (await threshold) is not an integer')
1207             if (!looks_like_number($_t) || int($_t) != $_t);
1208              
1209 0 0       0 $_t = 0 if ($_t < 0);
1210 0         0 $_req2->(OUTPUT_W_QUE, $_Q->{_id}.$LF . $_t.$LF);
1211              
1212 0 0       0 MCE::Util::_sock_ready($_Q->{_ar_sock}) if $_is_MSWin32;
1213 0         0 MCE::Util::_sysread($_Q->{_ar_sock}, my($_next), 1);
1214              
1215 0         0 return;
1216             }
1217              
1218             sub _mce_w_clear {
1219 8     8   67 my ($_Q) = @_;
1220              
1221 8 50       36 return $_Q->_mce_m_clear() if (exists $_all->{ $_Q->{_id} });
1222              
1223 8         41 $_req2->(OUTPUT_C_QUE, $_Q->{_id}.$LF);
1224              
1225 8         35 return;
1226             }
1227              
1228             sub _mce_w_end {
1229 0     0   0 my ($_Q) = @_;
1230              
1231 0 0       0 return $_Q->_mce_m_end() if (exists $_all->{ $_Q->{_id} });
1232              
1233 0         0 $_req2->(OUTPUT_E_QUE, $_Q->{_id}.$LF);
1234              
1235 0         0 return;
1236             }
1237              
1238             ## -------------------------------------------------------------------------
1239              
1240             sub _mce_w_enqueue {
1241 56     56   687 my $_Q = shift;
1242              
1243 56 50       302 return $_Q->_mce_m_enqueue(@_) if (exists $_all->{ $_Q->{_id} });
1244              
1245 56 50       146 if (scalar @_) {
1246 56         662 my $_tmp = $_MCE->{freeze}([ @_ ]);
1247 56         234 my $_buf = $_Q->{_id}.$LF . length($_tmp).$LF;
1248 56         267 $_req1->(OUTPUT_A_QUE, $_buf, $_tmp);
1249             }
1250              
1251 56         245 return;
1252             }
1253              
1254             sub _mce_w_enqueuep {
1255 20     20   242 my ($_Q, $_p) = (shift, shift);
1256              
1257 20 50       189 return $_Q->_mce_m_enqueuep($_p, @_) if (exists $_all->{ $_Q->{_id} });
1258              
1259 20 50 33     177 _croak('Queue: (enqueuep priority) is not an integer')
1260             if (!looks_like_number($_p) || int($_p) != $_p);
1261              
1262 20 50       61 if (scalar @_) {
1263 20         469 my $_tmp = $_MCE->{freeze}([ @_ ]);
1264 20         92 my $_buf = $_Q->{_id}.$LF . $_p.$LF . length($_tmp).$LF;
1265 20         124 $_req1->(OUTPUT_A_QUP, $_buf, $_tmp);
1266             }
1267              
1268 20         66 return;
1269             }
1270              
1271             ## -------------------------------------------------------------------------
1272              
1273             sub _mce_w_dequeue {
1274 229     229   604 my $_buf; my ($_Q, $_cnt) = @_;
  229         692  
1275              
1276 229 50       1191 return $_Q->_mce_m_dequeue($_cnt) if (exists $_all->{ $_Q->{_id} });
1277              
1278 229 100 100     1007 if (defined $_cnt && $_cnt ne '1') {
1279 6 50 33     121 _croak('Queue: (dequeue count argument) is not valid')
      33        
1280             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
1281             } else {
1282 223         462 $_cnt = 1;
1283             }
1284              
1285             {
1286 229 50       426 local $\ = undef if (defined $\);
  229         781  
1287 229 50       801 local $/ = $LF if ($/ ne $LF);
1288              
1289 229 50       559 $_dat_ex->() if $_lock_chn;
1290              
1291 229         4520 print({$_DAT_W_SOCK} OUTPUT_D_QUE.$LF . $_chn.$LF),
1292 229         410 print({$_DAU_W_SOCK} $_Q->{_id}.$LF . $_cnt.$LF);
  229         3457  
1293 229         129218 chomp($_len = <$_DAU_W_SOCK>);
1294              
1295 229 100       2710 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1296              
1297 229 50       917 $_dat_un->() if $_lock_chn;
1298             }
1299              
1300 229 100 100     5125 return ($_MCE->{thaw}($_buf))->[0] if ($_len > 0 && $_cnt == 1);
1301 52 100       184 return @{ $_MCE->{thaw}($_buf) } if ($_len > 0);
  6         237  
1302 46 50       167 return if ($_len == -2);
1303              
1304 46 50       217 MCE::Util::_sock_ready($_Q->{_qr_sock}) if $_is_MSWin32;
1305 46         394 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
1306              
1307 46         801 goto \&_mce_w_dequeue;
1308             }
1309              
1310             sub _mce_w_dequeue_nb {
1311 4     4   30 my ($_Q, $_cnt) = @_;
1312              
1313 4 50       30 return $_Q->_mce_m_dequeue_nb($_cnt) if (exists $_all->{ $_Q->{_id} });
1314              
1315 4 50 33     25 if (defined $_cnt && $_cnt ne '1') {
1316 0 0 0     0 _croak('Queue: (dequeue_nb count argument) is not valid')
      0        
1317             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
1318             } else {
1319 4         8 $_cnt = 1;
1320             }
1321              
1322 4         28 $_req3->(OUTPUT_D_QUN, $_Q->{_id}.$LF . $_cnt.$LF, $_cnt, 1);
1323             }
1324              
1325             sub _mce_w_dequeue_timed {
1326 4     4   20 my ($_Q, $_timeout, $_cnt) = @_;
1327 4         13 my ($_buf, $_start);
1328              
1329             return $_Q->_mce_m_dequeue_timed($_timeout, $_cnt)
1330 4 50       29 if (exists $_all->{ $_Q->{_id} });
1331              
1332 4 50       31 if (defined $_timeout) {
1333 0 0       0 _croak('Queue: (dequeue_timed count argument) is not valid')
1334             if (!looks_like_number($_timeout));
1335 0         0 $_start = MCE::Util::_time();
1336             }
1337              
1338 4 50 33     29 if (defined $_cnt && $_cnt ne '1') {
1339 0 0 0     0 _croak('Queue: (dequeue_timed count argument) is not valid')
      0        
1340             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
1341             } else {
1342 4         14 $_cnt = 1;
1343             }
1344              
1345 4 50 33     21 if (! $_timeout || $_timeout < 0.0) {
1346 4         47 return $_req3->(OUTPUT_D_QUN, $_Q->{_id}.$LF . $_cnt.$LF, $_cnt, 1);
1347             }
1348              
1349             {
1350 0 0       0 local $\ = undef if (defined $\);
  0         0  
1351 0 0       0 local $/ = $LF if ($/ ne $LF);
1352              
1353 0 0       0 $_dat_ex->() if $_lock_chn;
1354              
1355 0         0 print({$_DAT_W_SOCK} OUTPUT_D_QUE.$LF . $_chn.$LF),
1356 0         0 print({$_DAU_W_SOCK} $_Q->{_id}.$LF . $_cnt.$LF);
  0         0  
1357 0         0 chomp($_len = <$_DAU_W_SOCK>);
1358              
1359 0 0       0 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1360              
1361 0 0       0 $_dat_un->() if $_lock_chn;
1362             }
1363              
1364 0 0 0     0 return ($_MCE->{thaw}($_buf))->[0] if ($_len > 0 && $_cnt == 1);
1365 0 0       0 return @{ $_MCE->{thaw}($_buf) } if ($_len > 0);
  0         0  
1366 0 0       0 return if ($_len == -2);
1367              
1368 0         0 $_Q->{_qr_mutex}->lock();
1369 0         0 $_timeout = $_timeout - (MCE::Util::_time() - $_start) - 0.045;
1370 0 0       0 $_timeout = 0.0 if $_timeout < 0.045;
1371              
1372 0         0 CORE::vec(my $_r, CORE::fileno($_Q->{_qr_sock}), 1) = 1;
1373 0 0       0 if (CORE::select($_r, undef, undef, $_timeout) > 0) {
1374 0         0 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
1375 0         0 $_Q->{_qr_mutex}->unlock();
1376 0         0 return $_req3->(OUTPUT_D_QUN, $_Q->{_id}.$LF . $_cnt.$LF, $_cnt, 1);
1377             }
1378              
1379 0         0 $_Q->{_qr_mutex}->unlock();
1380 0         0 $_req2->(OUTPUT_D_QUT, $_Q->{_id}.$LF);
1381 0         0 MCE::Util::_sleep(0.045); # yield
1382              
1383 0         0 return ();
1384             }
1385              
1386             ## -------------------------------------------------------------------------
1387              
1388             sub _mce_w_pending {
1389 4     4   87 my ($_Q) = @_;
1390              
1391 4 50       75 return $_Q->_mce_m_pending() if (exists $_all->{ $_Q->{_id} });
1392              
1393 4 50       32 local $\ = undef if (defined $\);
1394 4 50       50 local $/ = $LF if ($/ ne $LF);
1395              
1396 4 50       19 $_dat_ex->() if $_lock_chn;
1397 4         54 print({$_DAT_W_SOCK} OUTPUT_N_QUE.$LF . $_chn.$LF),
1398 4         10 print({$_DAU_W_SOCK} $_Q->{_id}.$LF);
  4         57  
1399              
1400 4         7164 chomp($_pending = <$_DAU_W_SOCK>);
1401 4 50       42 $_dat_un->() if $_lock_chn;
1402              
1403 4 50       55 length($_pending) ? int($_pending) : undef;
1404             }
1405              
1406             sub _mce_w_insert {
1407 20     20   165 my ($_Q, $_i) = (shift, shift);
1408              
1409 20 50       53 return $_Q->_mce_m_insert($_i, @_) if (exists $_all->{ $_Q->{_id} });
1410              
1411 20 50 33     121 _croak('Queue: (insert index) is not an integer')
1412             if (!looks_like_number($_i) || int($_i) != $_i);
1413              
1414 20 50       43 return unless (scalar @_);
1415              
1416 20         170 my $_tmp = $_MCE->{freeze}([ @_ ]);
1417 20         71 my $_buf = $_Q->{_id}.$LF . $_i.$LF . length($_tmp).$LF . $_tmp;
1418              
1419 20         48 $_req1->(OUTPUT_I_QUE, $_buf, '');
1420              
1421 20         61 return;
1422             }
1423              
1424             sub _mce_w_insertp {
1425 20     20   130 my ($_Q, $_p, $_i) = (shift, shift, shift);
1426              
1427 20 50       60 return $_Q->_mce_m_insertp($_p, $_i, @_) if (exists $_all->{ $_Q->{_id} });
1428              
1429 20 50 33     146 _croak('Queue: (insertp priority) is not an integer')
1430             if (!looks_like_number($_p) || int($_p) != $_p);
1431 20 50 33     100 _croak('Queue: (insertp index) is not an integer')
1432             if (!looks_like_number($_i) || int($_i) != $_i);
1433              
1434 20 50       41 return unless (scalar @_);
1435              
1436 20         165 my $_tmp = $_MCE->{freeze}([ @_ ]);
1437 20         75 my $_buf = $_Q->{_id}.$LF . $_p.$LF . $_i.$LF . length($_tmp).$LF . $_tmp;
1438              
1439 20         43 $_req1->(OUTPUT_I_QUP, $_buf, '');
1440              
1441 20         54 return;
1442             }
1443              
1444             ## -------------------------------------------------------------------------
1445              
1446             sub _mce_w_peek {
1447 22   100 22   45 my $_Q = shift; my $_i = shift || 0;
  22         126  
1448              
1449 22 50       73 return $_Q->_mce_m_peek($_i, @_) if (exists $_all->{ $_Q->{_id} });
1450              
1451 22 50 33     173 _croak('Queue: (peek index) is not an integer')
1452             if (!looks_like_number($_i) || int($_i) != $_i);
1453              
1454 22         92 $_req3->(OUTPUT_P_QUE, $_Q->{_id}.$LF . $_i.$LF, 1);
1455             }
1456              
1457             sub _mce_w_peekp {
1458 22   100 22   70 my ($_Q, $_p) = (shift, shift); my $_i = shift || 0;
  22         106  
1459              
1460 22 50       71 return $_Q->_mce_m_peekp($_p, $_i, @_) if (exists $_all->{ $_Q->{_id} });
1461              
1462 22 50 33     153 _croak('Queue: (peekp priority) is not an integer')
1463             if (!looks_like_number($_p) || int($_p) != $_p);
1464 22 50 33     88 _croak('Queue: (peekp index) is not an integer')
1465             if (!looks_like_number($_i) || int($_i) != $_i);
1466              
1467 22         111 $_req3->(OUTPUT_P_QUP, $_Q->{_id}.$LF . $_p.$LF . $_i.$LF, 1);
1468             }
1469              
1470             sub _mce_w_peekh {
1471 4   100 4   49 my $_Q = shift; my $_i = shift || 0;
  4         35  
1472              
1473 4 50       26 return $_Q->_mce_m_peekh($_i, @_) if (exists $_all->{ $_Q->{_id} });
1474              
1475 4 50 33     61 _croak('Queue: (peekh index) is not an integer')
1476             if (!looks_like_number($_i) || int($_i) != $_i);
1477              
1478 4         22 my $_ret = $_req3->(OUTPUT_P_QUH, $_Q->{_id}.$LF . $_i.$LF, 1);
1479              
1480 4 50       76 length($_ret) ? int($_ret) : undef;
1481             }
1482              
1483             sub _mce_w_heap {
1484 2     2   18 my ($_Q) = @_;
1485              
1486 2 50       14 return $_Q->_mce_m_heap() if (exists $_all->{ $_Q->{_id} });
1487              
1488 2         28 $_req3->(OUTPUT_H_QUE, $_Q->{_id}.$LF, 0);
1489             }
1490              
1491             }
1492              
1493             1;
1494              
1495             __END__