File Coverage

blib/lib/MCE/Queue.pm
Criterion Covered Total %
statement 460 581 79.1
branch 190 362 52.4
condition 61 162 37.6
subroutine 49 57 85.9
pod 1 1 100.0
total 761 1163 65.4


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Hybrid (normal and priority) queues.
4             ##
5             ###############################################################################
6              
7             package MCE::Queue;
8              
9 37     37   172104 use strict;
  37         88  
  37         1306  
10 37     37   198 use warnings;
  37         59  
  37         1015  
11              
12 37     37   204 no warnings qw( threads recursion uninitialized );
  37         76  
  37         2193  
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
17             ## no critic (TestingAndDebugging::ProhibitNoStrict)
18              
19 37     37   231 use Scalar::Util qw( looks_like_number );
  37         87  
  37         2735  
20 37     37   1167 use MCE::Util qw( $LF );
  37         59  
  37         3697  
21 37     37   1104 use MCE::Mutex ();
  37         49  
  37         8312  
22              
23             ###############################################################################
24             ## ----------------------------------------------------------------------------
25             ## Import routine.
26             ##
27             ###############################################################################
28              
29             our ($HIGHEST,$LOWEST, $FIFO,$LIFO, $LILO,$FILO) = (1,0, 1,0, 1,0);
30              
31             my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
32             my ($_def, $_imported) = ({});
33              
34             sub import {
35 39     39   269 my ($_class, $_pkg) = (shift, caller);
36              
37             ## Process module arguments.
38 39         167 my $_p = $_def->{$_pkg} = {
39             AWAIT => 0, PORDER => $HIGHEST, TYPE => $FIFO,
40             };
41              
42 39         151 while (my $_argument = shift) {
43 0         0 my $_arg = lc $_argument;
44              
45 0 0       0 $_p->{AWAIT } = shift, next if ( $_arg eq 'await' );
46 0 0       0 $_p->{PORDER} = shift, next if ( $_arg eq 'porder' );
47 0 0       0 $_p->{TYPE } = shift, next if ( $_arg eq 'type' );
48              
49 0         0 _croak("Error: ($_argument) invalid module option");
50             }
51              
52 39 100       581 return if $_imported++;
53              
54             ## Define public methods to internal methods.
55 37     37   277 no strict 'refs'; no warnings 'redefine';
  37     37   73  
  37         1200  
  37         190  
  37         122  
  37         10905  
56              
57 37 100 66     333 if ($INC{'MCE.pm'} && MCE->wid == 0) {
58 35         109 _mce_m_init();
59             }
60              
61 37         70 *{ 'MCE::Queue::await' } = \&_mce_m_await;
  37         221  
62 37         240 *{ 'MCE::Queue::clear' } = \&_mce_m_clear;
  37         228  
63 37         66 *{ 'MCE::Queue::end' } = \&_mce_m_end;
  37         110  
64 37         70 *{ 'MCE::Queue::enqueue' } = \&_mce_m_enqueue;
  37         230  
65 37         83 *{ 'MCE::Queue::enqueuep' } = \&_mce_m_enqueuep;
  37         116  
66 37         65 *{ 'MCE::Queue::dequeue' } = \&_mce_m_dequeue;
  37         121  
67 37         105 *{ 'MCE::Queue::dequeue_nb' } = \&_mce_m_dequeue_nb;
  37         133  
68 37         101 *{ 'MCE::Queue::dequeue_timed' } = \&_mce_m_dequeue_timed;
  37         147  
69 37         924 *{ 'MCE::Queue::pending' } = \&_mce_m_pending;
  37         158  
70 37         78 *{ 'MCE::Queue::insert' } = \&_mce_m_insert;
  37         677  
71 37         115 *{ 'MCE::Queue::insertp' } = \&_mce_m_insertp;
  37         120  
72 37         73 *{ 'MCE::Queue::peek' } = \&_mce_m_peek;
  37         117  
73 37         70 *{ 'MCE::Queue::peekp' } = \&_mce_m_peekp;
  37         103  
74 37         74 *{ 'MCE::Queue::peekh' } = \&_mce_m_peekh;
  37         118  
75 37         77 *{ 'MCE::Queue::heap' } = \&_mce_m_heap;
  37         105  
76              
77 37         4103 return;
78             }
79              
80             ###############################################################################
81             ## ----------------------------------------------------------------------------
82             ## Define constants & variables.
83             ##
84             ###############################################################################
85              
86             use constant {
87 37         231940 OUTPUT_W_QUE => 'W~QUE', # Await from the queue
88             OUTPUT_C_QUE => 'C~QUE', # Clear the queue
89             OUTPUT_E_QUE => 'E~QUE', # End the queue
90              
91             OUTPUT_A_QUE => 'A~QUE', # Enqueue into queue (array)
92             OUTPUT_A_QUP => 'A~QUP', # Enqueue into queue (array (p))
93             OUTPUT_D_QUE => 'D~QUE', # Dequeue from queue (blocking)
94             OUTPUT_D_QUN => 'D~QUN', # Dequeue from queue (non-blocking)
95             OUTPUT_D_QUT => 'D~QUT', # Dequeue from queue (timed)
96              
97             OUTPUT_N_QUE => 'N~QUE', # Return the number of items
98             OUTPUT_I_QUE => 'I~QUE', # Insert into queue
99             OUTPUT_I_QUP => 'I~QUP', # Insert into queue (p)
100              
101             OUTPUT_P_QUE => 'P~QUE', # Peek into queue
102             OUTPUT_P_QUP => 'P~QUP', # Peek into queue (p)
103             OUTPUT_P_QUH => 'P~QUH', # Peek into heap
104             OUTPUT_H_QUE => 'H~QUE' # Return the heap
105 37     37   257 };
  37         70  
106              
107             ## Attributes used internally.
108             ## _qr_sock _qw_sock _datp _datq _dsem _heap _id _init_pid _porder _type
109             ## _ar_sock _aw_sock _asem _tsem
110              
111             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
112              
113             my %_valid_fields_new = map { $_ => 1 } qw(
114             await barrier fast gather porder queue type
115             );
116              
117             my $_all = {};
118             my $_qid = 0;
119              
120             sub CLONE {
121 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
122             }
123              
124             sub DESTROY {
125 23     23   102 my ($_Q) = @_;
126 23 50       111 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
127              
128 23 50       217 delete $_all->{ $_Q->{_id} } if exists $_Q->{_id};
129 23         538 undef $_Q->{_datp}, undef $_Q->{_datq}, undef $_Q->{_heap};
130              
131 23 100 66     225 if (exists $_Q->{_init_pid} && $_Q->{_init_pid} eq $_pid) {
132 16         65 MCE::Util::_destroy_socks($_Q, qw(_aw_sock _ar_sock _qw_sock _qr_sock));
133             }
134              
135 23         495 return;
136             }
137              
138             ###############################################################################
139             ## ----------------------------------------------------------------------------
140             ## New instance instantiation.
141             ##
142             ###############################################################################
143              
144             sub new {
145 51     51 1 2224 my ($_class, %_argv) = @_;
146 51         162 my $_pkg = caller;
147              
148 51         153 @_ = ();
149              
150 51   33     120 my $_Q = {}; bless($_Q, ref($_class) || $_class);
  51         388  
151              
152 51         225 for my $_p (keys %_argv) {
153             _croak("Queue: ($_p) is not a valid constructor argument")
154 45 50       346 unless (exists $_valid_fields_new{$_p});
155             }
156              
157 51         365 $_Q->{_asem} = 0; # Semaphore count variable for the ->await method
158 51         154 $_Q->{_datp} = {}; # Priority data { p1 => [ ], p2 => [ ], pN => [ ] }
159 51         146 $_Q->{_heap} = []; # Priority heap [ pN, p2, p1 ] in heap order
160             # fyi, _datp will always dequeue before _datq
161              
162             $_Q->{_await} = (defined $_argv{await})
163 51 100 50     465 ? $_argv{await} : $_def->{$_pkg}{AWAIT} || 0;
164              
165             $_Q->{_porder} = (defined $_argv{porder})
166 51 100 33     599 ? $_argv{porder} : $_def->{$_pkg}{PORDER} || $HIGHEST;
167              
168             $_Q->{_type} = (defined $_argv{type})
169 51 100 33     399 ? $_argv{type} : $_def->{$_pkg}{TYPE} || $FIFO;
170              
171             ## -------------------------------------------------------------------------
172              
173 51 100       198 if (exists $_argv{queue}) {
174             _croak('Queue: (queue) is not an ARRAY reference')
175 7 50       26 unless (ref $_argv{queue} eq 'ARRAY');
176              
177 7         22 $_Q->{_datq} = $_argv{queue};
178             }
179             else {
180 44         149 $_Q->{_datq} = [];
181             }
182              
183 51 50       237 if (exists $_argv{gather}) {
184             _croak('Queue: (gather) is not a CODE reference')
185 0 0       0 unless (ref $_argv{gather} eq 'CODE');
186              
187 0         0 $_Q->{gather} = $_argv{gather};
188             }
189              
190             ## -------------------------------------------------------------------------
191              
192 51         535 $_Q->{_qr_mutex} = MCE::Mutex->new();
193 51 50       320 $_Q->{_init_pid} = $_tid ? $$ .'.'. $_tid : $$;
194 51         169 $_Q->{_id} = ++$_qid; $_all->{$_qid} = $_Q;
  51         191  
195 51         127 $_Q->{_dsem} = 0;
196              
197 51         354 MCE::Util::_sock_pair($_Q, qw(_qr_sock _qw_sock), undef, 1);
198 51 100       235 MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock), undef, 1) if $_Q->{_await};
199              
200 51         247 return $_Q;
201             }
202              
203             ###############################################################################
204             ## ----------------------------------------------------------------------------
205             ## Private methods.
206             ##
207             ###############################################################################
208              
209             sub _croak {
210 0 0   0   0 unless ($INC{'MCE.pm'}) {
211 0         0 $\ = undef; require Carp; goto &Carp::croak;
  0         0  
  0         0  
212             } else {
213 0         0 goto &MCE::_croak;
214             }
215             }
216              
217             ## Add items to the tail of the queue with priority level.
218              
219             sub _enqueuep {
220 83     83   214 my ($_Q, $_p) = (shift, shift);
221              
222             ## Enlist priority into the heap.
223 83 100 100     244 if (!exists $_Q->{_datp}->{$_p} || @{ $_Q->{_datp}->{$_p} } == 0) {
  36         119  
224              
225 65 100       81 unless (scalar @{ $_Q->{_heap} }) {
  65 100       175  
226 50         65 push @{ $_Q->{_heap} }, $_p;
  50         166  
227             }
228 0         0 elsif ($_Q->{_porder}) {
229 9         87 $_Q->_heap_insert_high($_p);
230             }
231             else {
232 6         51 $_Q->_heap_insert_low($_p);
233             }
234             }
235              
236             ## Append item(s) into the queue.
237 83         133 push @{ $_Q->{_datp}->{$_p} }, @_;
  83         275  
238              
239 83         153 return;
240             }
241              
242             ## Return one item from the queue.
243              
244             sub _dequeue {
245 922     922   1598 my ($_Q) = @_;
246              
247             ## Return item from the non-priority queue.
248 922 100       989 unless (scalar @{ $_Q->{_heap} }) {
  922         1818  
249             return ($_Q->{_type})
250 828 100       1467 ? shift @{ $_Q->{_datq} } : pop @{ $_Q->{_datq} };
  814         2289  
  14         60  
251             }
252              
253 94         172 my $_p = $_Q->{_heap}->[0];
254              
255             ## Delist priority from the heap when 1 item remains.
256 94 100       113 shift @{ $_Q->{_heap} } if (@{ $_Q->{_datp}->{$_p} } == 1);
  47         83  
  94         200  
257              
258             ## Return item from the priority queue.
259             return ($_Q->{_type})
260 94 100       198 ? shift @{ $_Q->{_datp}->{$_p} } : pop @{ $_Q->{_datp}->{$_p} };
  70         208  
  24         119  
261             }
262              
263             ## Helper method for getting the reference to the underlying array.
264             ## Use with test scripts for comparing data only (not a public API).
265              
266             sub _get_aref {
267 50     50   7179 my ($_Q, $_p) = @_;
268              
269 50 50 66     359 return if ($INC{'MCE.pm'} && !defined $MCE::MCE->{_wid});
270 50 50 66     231 return if (defined $MCE::MCE && $MCE::MCE->{_wid});
271              
272 50 100       123 if (defined $_p) {
273 45 50 33     251 _croak('Queue: (get_aref priority) is not an integer')
274             if (!looks_like_number($_p) || int($_p) != $_p);
275              
276 45 100       163 return undef unless (exists $_Q->{_datp}->{$_p});
277 36         480 return $_Q->{_datp}->{$_p};
278             }
279              
280 5         34 return $_Q->{_datq};
281             }
282              
283             ## Insert priority into the heap. A lower priority level comes first.
284              
285             sub _heap_insert_low {
286 6     6   25 my ($_Q, $_p) = @_;
287              
288             ## Insert priority at the head of the heap.
289 6 50       29 if ($_p < $_Q->{_heap}->[0]) {
    100          
290 0         0 unshift @{ $_Q->{_heap} }, $_p;
  0         0  
291             }
292              
293             ## Insert priority at the end of the heap.
294             elsif ($_p > $_Q->{_heap}->[-1]) {
295 4         10 push @{ $_Q->{_heap} }, $_p;
  4         18  
296             }
297              
298             ## Insert priority through binary search.
299             else {
300 2         24 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  2         11  
  2         17  
301              
302 2         14 while ($_lower < $_upper) {
303 4         19 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
304 4 100       11 if ($_p > $_Q->{_heap}->[$_midpoint]) {
305 2         8 $_lower = $_midpoint + 1;
306             } else {
307 2         10 $_upper = $_midpoint;
308             }
309             }
310              
311             ## Insert priority into the heap.
312 2         4 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  2         8  
313             }
314              
315 6         21 return;
316             }
317              
318             ## Insert priority into the heap. A higher priority level comes first.
319              
320             sub _heap_insert_high {
321 9     9   38 my ($_Q, $_p) = @_;
322              
323             ## Insert priority at the head of the heap.
324 9 100       79 if ($_p > $_Q->{_heap}->[0]) {
    50          
325 6         21 unshift @{ $_Q->{_heap} }, $_p;
  6         34  
326             }
327              
328             ## Insert priority at the end of the heap.
329             elsif ($_p < $_Q->{_heap}->[-1]) {
330 0         0 push @{ $_Q->{_heap} }, $_p;
  0         0  
331             }
332              
333             ## Insert priority through binary search.
334             else {
335 3         14 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  3         12  
  3         10  
336              
337 3         12 while ($_lower < $_upper) {
338 6         13 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
339 6 100       18 if ($_p < $_Q->{_heap}->[$_midpoint]) {
340 3         9 $_lower = $_midpoint + 1;
341             } else {
342 3         19 $_upper = $_midpoint;
343             }
344             }
345              
346             ## Insert priority into the heap.
347 3         12 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  3         28  
348             }
349              
350 9         24 return;
351             }
352              
353             ###############################################################################
354             ## ----------------------------------------------------------------------------
355             ## Output routines for the manager process.
356             ##
357             ###############################################################################
358              
359             {
360             my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_cnt, $_i, $_id);
361             my ($_len, $_p, $_t, $_Q, $_has_data, $_pending);
362              
363             my %_output_function = (
364              
365             OUTPUT_W_QUE.$LF => sub { # Await from the queue
366             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
367              
368             chomp($_id = <$_DAU_R_SOCK>),
369             chomp($_t = <$_DAU_R_SOCK>);
370              
371             $_Q = $_all->{$_id};
372             $_Q->{_tsem} = $_t;
373              
374             if ($_Q->pending() <= $_t) {
375             syswrite($_Q->{_aw_sock}, $LF);
376             } else {
377             $_Q->{_asem} += 1;
378             }
379              
380             print {$_DAU_R_SOCK} $LF;
381              
382             return;
383             },
384              
385             OUTPUT_C_QUE.$LF => sub { # Clear the queue
386             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
387              
388             chomp($_id = <$_DAU_R_SOCK>);
389             _mce_m_clear($_all->{$_id});
390              
391             print {$_DAU_R_SOCK} $LF;
392              
393             return;
394             },
395              
396             OUTPUT_E_QUE.$LF => sub { # End the queue
397             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
398              
399             chomp($_id = <$_DAU_R_SOCK>);
400             _mce_m_end($_all->{$_id});
401              
402             print {$_DAU_R_SOCK} $LF;
403              
404             return;
405             },
406              
407             ## ----------------------------------------------------------------------
408              
409             OUTPUT_A_QUE.$LF => sub { # Enqueue into queue (A)
410             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
411              
412             chomp($_id = <$_DAU_R_SOCK>),
413             chomp($_len = <$_DAU_R_SOCK>);
414              
415             read $_DAU_R_SOCK, my($_buf), $_len;
416              
417             $_Q = $_all->{$_id};
418              
419             if ($_Q->{gather}) {
420             local $_ = $_MCE->{thaw}($_buf);
421             $_Q->{gather}($_Q, @{ $_ });
422             }
423             else {
424             $_Q->_mce_m_enqueue(@{ $_MCE->{thaw}($_buf) });
425             }
426              
427             return;
428             },
429              
430             OUTPUT_A_QUP.$LF => sub { # Enqueue into queue (A,p)
431             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
432              
433             chomp($_id = <$_DAU_R_SOCK>),
434             chomp($_p = <$_DAU_R_SOCK>),
435             chomp($_len = <$_DAU_R_SOCK>);
436              
437             read $_DAU_R_SOCK, my($_buf), $_len;
438              
439             $_Q = $_all->{$_id};
440             $_Q->_mce_m_enqueuep($_p, @{ $_MCE->{thaw}($_buf) });
441              
442             return;
443             },
444              
445             ## ----------------------------------------------------------------------
446              
447             OUTPUT_D_QUE.$LF => sub { # Dequeue from queue (B)
448             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
449              
450             chomp($_id = <$_DAU_R_SOCK>),
451             chomp($_cnt = <$_DAU_R_SOCK>);
452              
453             $_cnt = 0 if ($_cnt == 1);
454             $_Q = $_all->{$_id};
455              
456             my (@_items, $_buf);
457              
458             if ($_cnt) {
459             my $_pending = @{ $_Q->{_datq} };
460              
461             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
462             for my $_h (@{ $_Q->{_heap} }) {
463             $_pending += @{ $_Q->{_datp}->{$_h} };
464             }
465             }
466             $_cnt = $_pending if $_pending < $_cnt;
467              
468             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
469             }
470             else {
471             $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
472             $_buf = $_Q->_dequeue();
473             }
474              
475             if ($_cnt) {
476             $_buf = $_MCE->{freeze}(\@_items);
477             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
478             }
479             elsif ($_has_data) {
480             $_buf = $_MCE->{freeze}([ $_buf ]);
481             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
482             }
483             elsif (exists $_Q->{_ended}) {
484             print {$_DAU_R_SOCK} '-2'.$LF;
485             }
486             else {
487             print {$_DAU_R_SOCK} '-1'.$LF;
488             $_Q->{_dsem} += 1;
489             }
490              
491             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
492             for my $_i (1 .. $_Q->{_asem}) {
493             syswrite($_Q->{_aw_sock}, $LF);
494             }
495             $_Q->{_asem} = 0;
496             }
497              
498             return;
499             },
500              
501             OUTPUT_D_QUN.$LF => sub { # Dequeue from queue (NB)
502             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
503              
504             chomp($_id = <$_DAU_R_SOCK>),
505             chomp($_cnt = <$_DAU_R_SOCK>);
506              
507             $_Q = $_all->{$_id};
508              
509             if ($_cnt == 1) {
510             my $_buf = $_Q->_dequeue();
511              
512             if (defined $_buf) {
513             $_buf = $_MCE->{freeze}([ $_buf ]);
514             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
515             }
516             else {
517             print {$_DAU_R_SOCK} '-1'.$LF;
518             }
519             }
520             else {
521             my @_items;
522             my $_pending = @{ $_Q->{_datq} };
523              
524             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
525             for my $_h (@{ $_Q->{_heap} }) {
526             $_pending += @{ $_Q->{_datp}->{$_h} };
527             }
528             }
529             $_cnt = $_pending if $_pending < $_cnt;
530              
531             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
532              
533             if ($_cnt) {
534             my $_buf = $_MCE->{freeze}(\@_items);
535             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
536             }
537             else {
538             print {$_DAU_R_SOCK} '-1'.$LF;
539             }
540             }
541              
542             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
543             for my $_i (1 .. $_Q->{_asem}) {
544             syswrite($_Q->{_aw_sock}, $LF);
545             }
546             $_Q->{_asem} = 0;
547             }
548              
549             return;
550             },
551              
552             OUTPUT_D_QUT.$LF => sub { # Dequeue from queue (Timed)
553             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
554              
555             chomp($_id = <$_DAU_R_SOCK>);
556              
557             $_Q = $_all->{$_id};
558             $_Q->{_dsem} -= 1 if $_Q->{_dsem};
559              
560             print {$_DAU_R_SOCK} $LF;
561              
562             return;
563             },
564              
565             ## ----------------------------------------------------------------------
566              
567             OUTPUT_N_QUE.$LF => sub { # Return number of items
568             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
569              
570             chomp($_id = <$_DAU_R_SOCK>);
571              
572             print {$_DAU_R_SOCK} $_all->{$_id}->_mce_m_pending().$LF;
573              
574             return;
575             },
576              
577             OUTPUT_I_QUE.$LF => sub { # Insert into queue
578             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
579              
580             chomp($_id = <$_DAU_R_SOCK>),
581             chomp($_i = <$_DAU_R_SOCK>),
582             chomp($_len = <$_DAU_R_SOCK>);
583              
584             read $_DAU_R_SOCK, my($_buf), $_len;
585              
586             $_Q = $_all->{$_id};
587             $_Q->_mce_m_insert($_i, @{ $_MCE->{thaw}($_buf) });
588              
589             return;
590             },
591              
592             OUTPUT_I_QUP.$LF => sub { # Insert into queue (p)
593             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
594              
595             chomp($_id = <$_DAU_R_SOCK>),
596             chomp($_p = <$_DAU_R_SOCK>),
597             chomp($_i = <$_DAU_R_SOCK>),
598             chomp($_len = <$_DAU_R_SOCK>);
599              
600             read $_DAU_R_SOCK, my($_buf), $_len;
601              
602             $_Q = $_all->{$_id};
603             $_Q->_mce_m_insertp($_p, $_i, @{ $_MCE->{thaw}($_buf) });
604              
605             return;
606             },
607              
608             ## ----------------------------------------------------------------------
609              
610             OUTPUT_P_QUE.$LF => sub { # Peek into queue
611             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
612              
613             chomp($_id = <$_DAU_R_SOCK>),
614             chomp($_i = <$_DAU_R_SOCK>);
615              
616             $_Q = $_all->{$_id};
617             $_buf = $_Q->_mce_m_peek($_i);
618              
619             if (defined $_buf) {
620             $_buf = $_MCE->{freeze}([ $_buf ]);
621             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
622             } else {
623             print {$_DAU_R_SOCK} '-1'.$LF;
624             }
625              
626             return;
627             },
628              
629             OUTPUT_P_QUP.$LF => sub { # Peek into queue (p)
630             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
631              
632             chomp($_id = <$_DAU_R_SOCK>),
633             chomp($_p = <$_DAU_R_SOCK>),
634             chomp($_i = <$_DAU_R_SOCK>);
635              
636             $_Q = $_all->{$_id};
637             $_buf = $_Q->_mce_m_peekp($_p, $_i);
638              
639             if (defined $_buf) {
640             $_buf = $_MCE->{freeze}([ $_buf ]);
641             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
642             } else {
643             print {$_DAU_R_SOCK} '-1'.$LF;
644             }
645              
646             return;
647             },
648              
649             OUTPUT_P_QUH.$LF => sub { # Peek into heap
650             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
651              
652             chomp($_id = <$_DAU_R_SOCK>),
653             chomp($_i = <$_DAU_R_SOCK>);
654              
655             $_Q = $_all->{$_id};
656             $_buf = $_Q->_mce_m_peekh($_i);
657              
658             if (defined $_buf) {
659             $_buf = $_MCE->{freeze}([ $_buf ]);
660             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
661             } else {
662             print {$_DAU_R_SOCK} '-1'.$LF;
663             }
664              
665             return;
666             },
667              
668             OUTPUT_H_QUE.$LF => sub { # Return the heap
669             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
670              
671             chomp($_id = <$_DAU_R_SOCK>);
672              
673             $_Q = $_all->{$_id};
674             $_buf = $_MCE->{freeze}([ $_Q->_mce_m_heap() ]);
675              
676             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
677              
678             return;
679             },
680              
681             );
682              
683             ## -------------------------------------------------------------------------
684              
685             sub _mce_m_loop_begin {
686 87     87   365 ($_MCE, $_DAU_R_SOCK_REF) = @_;
687              
688 87         260 return;
689             }
690              
691             sub _mce_m_loop_end {
692 87     87   253 $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = $_cnt = $_i = $_id =
693             $_len = $_p = $_Q = undef;
694              
695 87         178 return;
696             }
697              
698             sub _mce_m_init {
699 35     35   139 MCE::_attach_plugin(
700             \%_output_function, \&_mce_m_loop_begin, \&_mce_m_loop_end,
701             \&_mce_w_init
702             );
703              
704 35         62 return;
705             }
706              
707             }
708              
709             ###############################################################################
710             ## ----------------------------------------------------------------------------
711             ## Methods for the manager process.
712             ##
713             ###############################################################################
714              
715             ## await ( pending_threshold )
716              
717             sub _mce_m_await {
718             # Handled by the manager process when called by MCE workers.
719 0     0   0 return;
720             }
721              
722             ## clear ( )
723              
724             sub _mce_m_clear {
725 28     28   2248 my ($_Q) = @_;
726              
727 28         49 %{ $_Q->{_datp} } = ();
  28         99  
728 28         49 @{ $_Q->{_datq} } = ();
  28         69  
729 28         49 @{ $_Q->{_heap} } = ();
  28         57  
730              
731 28         59 return;
732             }
733              
734             ## end ( )
735              
736             sub _mce_m_end {
737 0     0   0 my ($_Q) = @_;
738              
739 0 0       0 if (!exists $_Q->{_ended}) {
740 0         0 for my $_i (1 .. $_Q->{_dsem}) { syswrite($_Q->{_qw_sock}, $LF) }
  0         0  
741 0         0 $_Q->{_dsem} = 0, $_Q->{_ended} = undef;
742             }
743              
744 0         0 return;
745             }
746              
747             ## enqueue ( item [, item, ... ] )
748              
749             sub _mce_m_enqueue {
750 569     569   2300 my $_Q = shift;
751              
752 569 50       15149 return unless (scalar @_);
753              
754 569 50       1199 if (exists $_Q->{_ended}) {
755 0         0 warn "Queue: (enqueue) called on queue that has been 'end'ed\n";
756 0         0 return;
757             }
758              
759 569 100       1097 if ($_Q->{_dsem}) {
760 176         513 for my $_i (1 .. scalar @_) {
761 178         5517 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
762 178 100       858 last unless $_Q->{_dsem};
763             }
764             }
765              
766             ## Append item(s) into the queue.
767 569         777 push @{ $_Q->{_datq} }, @_;
  569         1536  
768              
769 569         1383 return;
770             }
771              
772             ## enqueuep ( priority, item [, item, ... ] )
773              
774             sub _mce_m_enqueuep {
775 74     74   251 my ($_Q, $_p) = (shift, shift);
776              
777 74 50 33     573 _croak('Queue: (enqueuep priority) is not an integer')
778             if (!looks_like_number($_p) || int($_p) != $_p);
779              
780 74 50       172 return unless (scalar @_);
781              
782 74 50       202 if (exists $_Q->{_ended}) {
783 0         0 warn "Queue: (enqueuep) called on queue that has been 'end'ed\n";
784 0         0 return;
785             }
786              
787 74 50       149 if ($_Q->{_dsem}) {
788 0         0 for my $_i (1 .. scalar @_) {
789 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
790 0 0       0 last unless $_Q->{_dsem};
791             }
792             }
793              
794 74         409 $_Q->_enqueuep($_p, @_);
795              
796 74         118 return;
797             }
798              
799             ## dequeue ( )
800             ## dequeue ( count )
801              
802             sub _mce_m_dequeue {
803 16     16   1547 my ($_Q, $_cnt) = @_;
804 16         25 my (@_items, $_has_data, $_buf);
805              
806 16 100 100     60 if (defined $_cnt && $_cnt ne '1') {
807 6 50 33     76 _croak('Queue: (dequeue count argument) is not valid')
      33        
808             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
809              
810 6         10 my $_pending = @{ $_Q->{_datq} };
  6         17  
811              
812 6 50 100     16 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  4         12  
813 4         6 for my $_h (@{ $_Q->{_heap} }) {
  4         8  
814 10         11 $_pending += @{ $_Q->{_datp}->{$_h} };
  10         19  
815             }
816             }
817 6 50       16 $_cnt = $_pending if $_pending < $_cnt;
818              
819 6         16 for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
  28         57  
820             }
821             else {
822 10 50 66     14 $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
823 10         25 $_buf = $_Q->_dequeue();
824             }
825              
826 16 100       57 return @_items if (scalar @_items);
827 10 50       42 return $_buf if ($_has_data);
828 0 0       0 return () if (exists $_Q->{_ended});
829              
830 0         0 $_Q->{_dsem} += 1, MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
831              
832 0         0 goto \&_mce_m_dequeue;
833             }
834              
835             ## dequeue_nb ( )
836             ## dequeue_nb ( count )
837              
838             sub _mce_m_dequeue_nb {
839 4     4   16 my ($_Q, $_cnt) = @_;
840              
841 4 50 33     16 if (defined $_cnt && $_cnt ne '1') {
842 0 0 0     0 _croak('Queue: (dequeue_nb count argument) is not valid')
      0        
843             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
844              
845 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
846              
847 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
848 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
849 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
850             }
851             }
852              
853 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
854              
855 0         0 return map { $_Q->_dequeue() } 1 .. $_cnt;
  0         0  
856             }
857              
858 4         11 my $_buf = $_Q->_dequeue();
859              
860 4 50       27 return defined($_buf) ? $_buf : ();
861             }
862              
863             ## dequeue_timed ( timeout )
864             ## dequeue_timed ( timeout, count )
865              
866             sub _mce_m_dequeue_timed {
867 4     4   12 my ($_Q, $_timeout, $_cnt) = @_;
868              
869 4 50       13 if (defined $_timeout) {
870 0 0       0 _croak('Queue: (dequeue_timed timeout argument) is not valid')
871             if (!looks_like_number($_timeout));
872             }
873              
874 4 50 33     21 if (defined $_cnt && $_cnt ne '1') {
875 0 0 0     0 _croak('Queue: (dequeue_timed count argument) is not valid')
      0        
876             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
877              
878 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
879              
880 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
881 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
882 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
883             }
884             }
885              
886 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
887              
888 0         0 return map { $_Q->_dequeue() } 1 .. $_cnt;
  0         0  
889             }
890              
891 4         19 my $_buf = $_Q->_dequeue();
892              
893 4 50       27 return defined($_buf) ? $_buf : ();
894             }
895              
896             ## pending ( )
897              
898             sub _mce_m_pending {
899 14     14   1108 my ($_Q) = @_;
900 14         24 my $_pending = @{ $_Q->{_datq} };
  14         38  
901              
902 14 100       28 if (scalar @{ $_Q->{_heap} }) {
  14         55  
903 9         17 for my $_h (@{ $_Q->{_heap} }) {
  9         103  
904 9         21 $_pending += @{ $_Q->{_datp}->{$_h} };
  9         33  
905             }
906             }
907              
908             return (exists $_Q->{_ended})
909 14 0       473 ? $_pending ? $_pending : undef
    50          
910             : $_pending;
911             }
912              
913             ## insert ( index, item [, item, ... ] )
914              
915             sub _mce_m_insert {
916 50     50   160 my ($_Q, $_i) = (shift, shift);
917              
918 50 50 33     262 _croak('Queue: (insert index) is not an integer')
919             if (!looks_like_number($_i) || int($_i) != $_i);
920              
921 50 50       115 return unless (scalar @_);
922              
923 50 50       102 if (exists $_Q->{_ended}) {
924 0         0 warn "Queue: (insert) called on queue that has been 'end'ed\n";
925 0         0 return;
926             }
927              
928 50 50       77 if ($_Q->{_dsem}) {
929 0         0 for my $_i (1 .. scalar @_) {
930 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
931 0 0       0 last unless $_Q->{_dsem};
932             }
933             }
934              
935 50 100       66 if (abs($_i) > scalar @{ $_Q->{_datq} }) {
  50         111  
936 10 100       34 if ($_i >= 0) {
937 5 100       16 if ($_Q->{_type}) {
938 3         5 push @{ $_Q->{_datq} }, @_;
  3         14  
939             } else {
940 2         13 unshift @{ $_Q->{_datq} }, @_;
  2         16  
941             }
942             }
943             else {
944 5 100       19 if ($_Q->{_type}) {
945 3         6 unshift @{ $_Q->{_datq} }, @_;
  3         13  
946             } else {
947 2         10 push @{ $_Q->{_datq} }, @_;
  2         7  
948             }
949             }
950             }
951             else {
952 40 100       68 if (!$_Q->{_type}) {
953             $_i = ($_i >= 0)
954 16 100       44 ? scalar(@{ $_Q->{_datq} }) - $_i
  10         29  
955             : abs($_i);
956             }
957 40         56 splice @{ $_Q->{_datq} }, $_i, 0, @_;
  40         146  
958             }
959              
960 50         92 return;
961             }
962              
963             ## insertp ( priority, index, item [, item, ... ] )
964              
965             sub _mce_m_insertp {
966 90     90   260 my ($_Q, $_p, $_i) = (shift, shift, shift);
967              
968 90 50 33     491 _croak('Queue: (insertp priority) is not an integer')
969             if (!looks_like_number($_p) || int($_p) != $_p);
970 90 50 33     382 _croak('Queue: (insertp index) is not an integer')
971             if (!looks_like_number($_i) || int($_i) != $_i);
972              
973 90 50       197 return unless (scalar @_);
974              
975 90 50       179 if (exists $_Q->{_ended}) {
976 0         0 warn "Queue: (insertp) called on queue that has been 'end'ed\n";
977 0         0 return;
978             }
979              
980 90 50       179 if ($_Q->{_dsem}) {
981 0         0 for my $_i (1 .. scalar @_) {
982 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
983 0 0       0 last unless $_Q->{_dsem};
984             }
985             }
986              
987 90 100 50     236 if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) {
  90         254  
988              
989 81 100       105 if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} }) {
  81         169  
990 18 100       59 if ($_i >= 0) {
991 9 100       35 if ($_Q->{_type}) {
992 5         10 push @{ $_Q->{_datp}->{$_p} }, @_;
  5         23  
993             } else {
994 4         38 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  4         55  
995             }
996             }
997             else {
998 9 100       30 if ($_Q->{_type}) {
999 5         9 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  5         20  
1000             } else {
1001 4         17 push @{ $_Q->{_datp}->{$_p} }, @_;
  4         27  
1002             }
1003             }
1004             }
1005             else {
1006 63 100       118 if (!$_Q->{_type}) {
1007             $_i = ($_i >=0)
1008 28 100       97 ? scalar(@{ $_Q->{_datp}->{$_p} }) - $_i
  16         59  
1009             : abs($_i);
1010             }
1011 63         87 splice @{ $_Q->{_datp}->{$_p} }, $_i, 0, @_;
  63         201  
1012             }
1013             }
1014             else {
1015 9         32 $_Q->_enqueuep($_p, @_);
1016             }
1017              
1018 90         158 return;
1019             }
1020              
1021             ## peek ( index )
1022             ## peek ( )
1023              
1024             sub _mce_m_peek {
1025 55     55   160 my ($_Q, $_i) = @_;
1026              
1027 55 100       115 if ($_i) {
1028 40 50 33     310 _croak('Queue: (peek index) is not an integer')
1029             if (!looks_like_number($_i) || int($_i) != $_i);
1030             }
1031 15         27 else { $_i = 0 }
1032              
1033 55 100       95 return undef if (abs($_i) > scalar @{ $_Q->{_datq} });
  55         186  
1034              
1035 40 100       95 if (!$_Q->{_type}) {
1036             $_i = ($_i >= 0)
1037 16 100       50 ? scalar(@{ $_Q->{_datq} }) - ($_i + 1)
  10         30  
1038             : abs($_i + 1);
1039             }
1040              
1041 40         149 return $_Q->{_datq}->[$_i];
1042             }
1043              
1044             ## peekp ( priority, index )
1045             ## peekp ( priority )
1046              
1047             sub _mce_m_peekp {
1048 99     99   260 my ($_Q, $_p, $_i) = @_;
1049              
1050 99 100       202 if ($_i) {
1051 72 50 33     618 _croak('Queue: (peekp index) is not an integer')
1052             if (!looks_like_number($_i) || int($_i) != $_i);
1053             }
1054 27         33 else { $_i = 0 }
1055              
1056 99 50 33     537 _croak('Queue: (peekp priority) is not an integer')
1057             if (!looks_like_number($_p) || int($_p) != $_p);
1058              
1059 99 50       267 return undef unless (exists $_Q->{_datp}->{$_p});
1060 99 100       140 return undef if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} });
  99         312  
1061              
1062 72 100       178 if (!$_Q->{_type}) {
1063             $_i = ($_i >= 0)
1064 32 100       104 ? scalar(@{ $_Q->{_datp}->{$_p} }) - ($_i + 1)
  20         76  
1065             : abs($_i + 1);
1066             }
1067              
1068 72         269 return $_Q->{_datp}->{$_p}->[$_i];
1069             }
1070              
1071             ## peekh ( index )
1072             ## peekh ( )
1073              
1074             sub _mce_m_peekh {
1075 10     10   1261 my ($_Q, $_i) = @_;
1076              
1077 10 100       130 if ($_i) {
1078 5 50 33     94 _croak('Queue: (peekh index) is not an integer')
1079             if (!looks_like_number($_i) || int($_i) != $_i);
1080             }
1081 5         15 else { $_i = 0 }
1082              
1083 10 50       24 return undef if (abs($_i) > scalar @{ $_Q->{_heap} });
  10         33  
1084 10         46 return $_Q->{_heap}->[$_i];
1085             }
1086              
1087             ## heap ( )
1088              
1089             sub _mce_m_heap {
1090 5     5   39 return @{ shift->{_heap} };
  5         82  
1091             }
1092              
1093             ###############################################################################
1094             ## ----------------------------------------------------------------------------
1095             ## Methods for the worker process.
1096             ##
1097             ###############################################################################
1098              
1099             {
1100             my (
1101             $_MCE, $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_chn, $_lock_chn,
1102             $_dat_ex, $_dat_un, $_len, $_pending
1103             );
1104              
1105             my $_req1 = sub {
1106             local $\ = undef if (defined $\);
1107              
1108             $_dat_ex->() if $_lock_chn;
1109             print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1110             print({$_DAU_W_SOCK} $_[1], $_[2]);
1111              
1112             $_dat_un->() if $_lock_chn;
1113             };
1114              
1115             my $_req2 = sub {
1116             local $\ = undef if (defined $\);
1117             local $/ = $LF if ($/ ne $LF);
1118              
1119             $_dat_ex->() if $_lock_chn;
1120             print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1121             print({$_DAU_W_SOCK} $_[1]);
1122             <$_DAU_W_SOCK>;
1123              
1124             $_dat_un->() if $_lock_chn;
1125             };
1126              
1127             my $_req3 = sub {
1128             local $\ = undef if (defined $\);
1129             local $/ = $LF if ($/ ne $LF);
1130              
1131             $_dat_ex->() if $_lock_chn;
1132             print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1133             print({$_DAU_W_SOCK} $_[1]);
1134              
1135             chomp($_len = <$_DAU_W_SOCK>);
1136              
1137             if ($_len < 0) {
1138             $_dat_un->() if $_lock_chn;
1139             return defined($_[3]) ? () : undef;
1140             }
1141              
1142             read $_DAU_W_SOCK, my($_buf), $_len;
1143             $_dat_un->() if $_lock_chn;
1144              
1145             ($_[2] == 1)
1146             ? ($_MCE->{thaw}($_buf))->[0]
1147             : @{ $_MCE->{thaw}($_buf) };
1148             };
1149              
1150             sub _mce_w_init {
1151 28     28   394 ($_MCE) = @_;
1152 28         160 $_chn = $_MCE->{_chn};
1153 28         127 $_DAT_LOCK = $_MCE->{_dat_lock};
1154 28         104 $_DAT_W_SOCK = $_MCE->{_dat_w_sock}->[0];
1155 28         148 $_DAU_W_SOCK = $_MCE->{_dat_w_sock}->[$_chn];
1156 28         119 $_lock_chn = $_MCE->{_lock_chn};
1157              
1158 28 50       199 if ($_lock_chn) {
1159             # inlined for performance
1160             $_dat_ex = sub {
1161 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1162             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
1163 0 0       0 if $_is_MSWin32;
1164             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
1165 0 0       0 unless $_DAT_LOCK->{ $_pid };
1166 0         0 };
1167             $_dat_un = sub {
1168 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1169             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
1170 0 0       0 if $_DAT_LOCK->{ $_pid };
1171 0         0 };
1172             }
1173              
1174 28         1208 $_all = {};
1175              
1176 37     37   316 no strict 'refs'; no warnings 'redefine';
  37     37   74  
  37         1437  
  37         266  
  37         133  
  37         92360  
1177              
1178 28         169 *{ 'MCE::Queue::await' } = \&_mce_w_await;
  28         1716  
1179 28         353 *{ 'MCE::Queue::clear' } = \&_mce_w_clear;
  28         517  
1180 28         143 *{ 'MCE::Queue::end' } = \&_mce_w_end;
  28         457  
1181 28         162 *{ 'MCE::Queue::enqueue' } = \&_mce_w_enqueue;
  28         574  
1182 28         169 *{ 'MCE::Queue::enqueuep' } = \&_mce_w_enqueuep;
  28         389  
1183 28         95 *{ 'MCE::Queue::dequeue' } = \&_mce_w_dequeue;
  28         399  
1184 28         177 *{ 'MCE::Queue::dequeue_nb' } = \&_mce_w_dequeue_nb;
  28         246  
1185 28         314 *{ 'MCE::Queue::dequeue_timed' } = \&_mce_w_dequeue_timed;
  28         317  
1186 28         131 *{ 'MCE::Queue::pending' } = \&_mce_w_pending;
  28         471  
1187 28         260 *{ 'MCE::Queue::insert' } = \&_mce_w_insert;
  28         296  
1188 28         144 *{ 'MCE::Queue::insertp' } = \&_mce_w_insertp;
  28         171  
1189 28         89 *{ 'MCE::Queue::peek' } = \&_mce_w_peek;
  28         239  
1190 28         88 *{ 'MCE::Queue::peekp' } = \&_mce_w_peekp;
  28         554  
1191 28         162 *{ 'MCE::Queue::peekh' } = \&_mce_w_peekh;
  28         255  
1192 28         177 *{ 'MCE::Queue::heap' } = \&_mce_w_heap;
  28         328  
1193              
1194 28         212 return;
1195             }
1196              
1197             ## -------------------------------------------------------------------------
1198              
1199             sub _mce_w_await {
1200 0   0 0   0 my $_Q = shift; my $_t = shift || 0;
  0         0  
1201              
1202 0 0       0 return $_Q->_mce_m_await() if (exists $_all->{ $_Q->{_id} });
1203              
1204             _croak('Queue: (await) is not enabled for this queue')
1205 0 0       0 unless ($_Q->{_await});
1206 0 0 0     0 _croak('Queue: (await threshold) is not an integer')
1207             if (!looks_like_number($_t) || int($_t) != $_t);
1208              
1209 0 0       0 $_t = 0 if ($_t < 0);
1210 0         0 $_req2->(OUTPUT_W_QUE, $_Q->{_id}.$LF . $_t.$LF);
1211              
1212 0 0       0 MCE::Util::_sock_ready($_Q->{_ar_sock}) if $_is_MSWin32;
1213 0         0 MCE::Util::_sysread($_Q->{_ar_sock}, my($_next), 1);
1214              
1215 0         0 return;
1216             }
1217              
1218             sub _mce_w_clear {
1219 8     8   68 my ($_Q) = @_;
1220              
1221 8 50       38 return $_Q->_mce_m_clear() if (exists $_all->{ $_Q->{_id} });
1222              
1223 8         37 $_req2->(OUTPUT_C_QUE, $_Q->{_id}.$LF);
1224              
1225 8         35 return;
1226             }
1227              
1228             sub _mce_w_end {
1229 0     0   0 my ($_Q) = @_;
1230              
1231 0 0       0 return $_Q->_mce_m_end() if (exists $_all->{ $_Q->{_id} });
1232              
1233 0         0 $_req2->(OUTPUT_E_QUE, $_Q->{_id}.$LF);
1234              
1235 0         0 return;
1236             }
1237              
1238             ## -------------------------------------------------------------------------
1239              
1240             sub _mce_w_enqueue {
1241 56     56   536 my $_Q = shift;
1242              
1243 56 50       256 return $_Q->_mce_m_enqueue(@_) if (exists $_all->{ $_Q->{_id} });
1244              
1245 56 50       128 if (scalar @_) {
1246 56         521 my $_tmp = $_MCE->{freeze}([ @_ ]);
1247 56         191 my $_buf = $_Q->{_id}.$LF . length($_tmp).$LF;
1248 56         183 $_req1->(OUTPUT_A_QUE, $_buf, $_tmp);
1249             }
1250              
1251 56         181 return;
1252             }
1253              
1254             sub _mce_w_enqueuep {
1255 20     20   215 my ($_Q, $_p) = (shift, shift);
1256              
1257 20 50       208 return $_Q->_mce_m_enqueuep($_p, @_) if (exists $_all->{ $_Q->{_id} });
1258              
1259 20 50 33     230 _croak('Queue: (enqueuep priority) is not an integer')
1260             if (!looks_like_number($_p) || int($_p) != $_p);
1261              
1262 20 50       48 if (scalar @_) {
1263 20         335 my $_tmp = $_MCE->{freeze}([ @_ ]);
1264 20         91 my $_buf = $_Q->{_id}.$LF . $_p.$LF . length($_tmp).$LF;
1265 20         132 $_req1->(OUTPUT_A_QUP, $_buf, $_tmp);
1266             }
1267              
1268 20         59 return;
1269             }
1270              
1271             ## -------------------------------------------------------------------------
1272              
1273             sub _mce_w_dequeue {
1274 234     234   517 my $_buf; my ($_Q, $_cnt) = @_;
  234         559  
1275              
1276 234 50       819 return $_Q->_mce_m_dequeue($_cnt) if (exists $_all->{ $_Q->{_id} });
1277              
1278 234 100 100     738 if (defined $_cnt && $_cnt ne '1') {
1279 6 50 33     127 _croak('Queue: (dequeue count argument) is not valid')
      33        
1280             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
1281             } else {
1282 228         363 $_cnt = 1;
1283             }
1284              
1285             {
1286 234 50       307 local $\ = undef if (defined $\);
  234         569  
1287 234 50       611 local $/ = $LF if ($/ ne $LF);
1288              
1289 234 50       447 $_dat_ex->() if $_lock_chn;
1290              
1291 234         3504 print({$_DAT_W_SOCK} OUTPUT_D_QUE.$LF . $_chn.$LF),
1292 234         295 print({$_DAU_W_SOCK} $_Q->{_id}.$LF . $_cnt.$LF);
  234         2583  
1293 234         114008 chomp($_len = <$_DAU_W_SOCK>);
1294              
1295 234 100       2164 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1296              
1297 234 50       624 $_dat_un->() if $_lock_chn;
1298             }
1299              
1300 234 100 100     3924 return ($_MCE->{thaw}($_buf))->[0] if ($_len > 0 && $_cnt == 1);
1301 57 100       140 return @{ $_MCE->{thaw}($_buf) } if ($_len > 0);
  6         178  
1302 51 50       138 return if ($_len == -2);
1303              
1304 51 50       111 MCE::Util::_sock_ready($_Q->{_qr_sock}) if $_is_MSWin32;
1305 51         366 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
1306              
1307 51         623 goto \&_mce_w_dequeue;
1308             }
1309              
1310             sub _mce_w_dequeue_nb {
1311 4     4   24 my ($_Q, $_cnt) = @_;
1312              
1313 4 50       25 return $_Q->_mce_m_dequeue_nb($_cnt) if (exists $_all->{ $_Q->{_id} });
1314              
1315 4 50 33     32 if (defined $_cnt && $_cnt ne '1') {
1316 0 0 0     0 _croak('Queue: (dequeue_nb count argument) is not valid')
      0        
1317             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
1318             } else {
1319 4         12 $_cnt = 1;
1320             }
1321              
1322 4         33 $_req3->(OUTPUT_D_QUN, $_Q->{_id}.$LF . $_cnt.$LF, $_cnt, 1);
1323             }
1324              
1325             sub _mce_w_dequeue_timed {
1326 4     4   24 my ($_Q, $_timeout, $_cnt) = @_;
1327 4         16 my ($_buf, $_start);
1328              
1329             return $_Q->_mce_m_dequeue_timed($_timeout, $_cnt)
1330 4 50       29 if (exists $_all->{ $_Q->{_id} });
1331              
1332 4 50       21 if (defined $_timeout) {
1333 0 0       0 _croak('Queue: (dequeue_timed count argument) is not valid')
1334             if (!looks_like_number($_timeout));
1335 0         0 $_start = MCE::Util::_time();
1336             }
1337              
1338 4 50 33     48 if (defined $_cnt && $_cnt ne '1') {
1339 0 0 0     0 _croak('Queue: (dequeue_timed count argument) is not valid')
      0        
1340             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
1341             } else {
1342 4         18 $_cnt = 1;
1343             }
1344              
1345 4 50 33     31 if (! $_timeout || $_timeout < 0.0) {
1346 4         31 return $_req3->(OUTPUT_D_QUN, $_Q->{_id}.$LF . $_cnt.$LF, $_cnt, 1);
1347             }
1348              
1349             {
1350 0 0       0 local $\ = undef if (defined $\);
  0         0  
1351 0 0       0 local $/ = $LF if ($/ ne $LF);
1352              
1353 0 0       0 $_dat_ex->() if $_lock_chn;
1354              
1355 0         0 print({$_DAT_W_SOCK} OUTPUT_D_QUE.$LF . $_chn.$LF),
1356 0         0 print({$_DAU_W_SOCK} $_Q->{_id}.$LF . $_cnt.$LF);
  0         0  
1357 0         0 chomp($_len = <$_DAU_W_SOCK>);
1358              
1359 0 0       0 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1360              
1361 0 0       0 $_dat_un->() if $_lock_chn;
1362             }
1363              
1364 0 0 0     0 return ($_MCE->{thaw}($_buf))->[0] if ($_len > 0 && $_cnt == 1);
1365 0 0       0 return @{ $_MCE->{thaw}($_buf) } if ($_len > 0);
  0         0  
1366 0 0       0 return if ($_len == -2);
1367              
1368 0         0 $_Q->{_qr_mutex}->lock();
1369 0         0 $_timeout = $_timeout - (MCE::Util::_time() - $_start) - 0.045;
1370 0 0       0 $_timeout = 0.0 if $_timeout < 0.045;
1371              
1372 0         0 CORE::vec(my $_r, CORE::fileno($_Q->{_qr_sock}), 1) = 1;
1373 0 0       0 if (CORE::select($_r, undef, undef, $_timeout) > 0) {
1374 0         0 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
1375 0         0 $_Q->{_qr_mutex}->unlock();
1376 0         0 return $_req3->(OUTPUT_D_QUN, $_Q->{_id}.$LF . $_cnt.$LF, $_cnt, 1);
1377             }
1378              
1379 0         0 $_Q->{_qr_mutex}->unlock();
1380 0         0 $_req2->(OUTPUT_D_QUT, $_Q->{_id}.$LF);
1381 0         0 MCE::Util::_sleep(0.045); # yield
1382              
1383 0         0 return ();
1384             }
1385              
1386             ## -------------------------------------------------------------------------
1387              
1388             sub _mce_w_pending {
1389 4     4   47 my ($_Q) = @_;
1390              
1391 4 50       35 return $_Q->_mce_m_pending() if (exists $_all->{ $_Q->{_id} });
1392              
1393 4 50       31 local $\ = undef if (defined $\);
1394 4 50       20 local $/ = $LF if ($/ ne $LF);
1395              
1396 4 50       24 $_dat_ex->() if $_lock_chn;
1397 4         68 print({$_DAT_W_SOCK} OUTPUT_N_QUE.$LF . $_chn.$LF),
1398 4         39 print({$_DAU_W_SOCK} $_Q->{_id}.$LF);
  4         46  
1399              
1400 4         7552 chomp($_pending = <$_DAU_W_SOCK>);
1401 4 50       53 $_dat_un->() if $_lock_chn;
1402              
1403 4 50       108 length($_pending) ? int($_pending) : undef;
1404             }
1405              
1406             sub _mce_w_insert {
1407 20     20   108 my ($_Q, $_i) = (shift, shift);
1408              
1409 20 50       56 return $_Q->_mce_m_insert($_i, @_) if (exists $_all->{ $_Q->{_id} });
1410              
1411 20 50 33     115 _croak('Queue: (insert index) is not an integer')
1412             if (!looks_like_number($_i) || int($_i) != $_i);
1413              
1414 20 50       42 return unless (scalar @_);
1415              
1416 20         140 my $_tmp = $_MCE->{freeze}([ @_ ]);
1417 20         70 my $_buf = $_Q->{_id}.$LF . $_i.$LF . length($_tmp).$LF . $_tmp;
1418              
1419 20         46 $_req1->(OUTPUT_I_QUE, $_buf, '');
1420              
1421 20         59 return;
1422             }
1423              
1424             sub _mce_w_insertp {
1425 20     20   114 my ($_Q, $_p, $_i) = (shift, shift, shift);
1426              
1427 20 50       57 return $_Q->_mce_m_insertp($_p, $_i, @_) if (exists $_all->{ $_Q->{_id} });
1428              
1429 20 50 33     116 _croak('Queue: (insertp priority) is not an integer')
1430             if (!looks_like_number($_p) || int($_p) != $_p);
1431 20 50 33     68 _croak('Queue: (insertp index) is not an integer')
1432             if (!looks_like_number($_i) || int($_i) != $_i);
1433              
1434 20 50       40 return unless (scalar @_);
1435              
1436 20         156 my $_tmp = $_MCE->{freeze}([ @_ ]);
1437 20         86 my $_buf = $_Q->{_id}.$LF . $_p.$LF . $_i.$LF . length($_tmp).$LF . $_tmp;
1438              
1439 20         46 $_req1->(OUTPUT_I_QUP, $_buf, '');
1440              
1441 20         57 return;
1442             }
1443              
1444             ## -------------------------------------------------------------------------
1445              
1446             sub _mce_w_peek {
1447 22   100 22   56 my $_Q = shift; my $_i = shift || 0;
  22         89  
1448              
1449 22 50       65 return $_Q->_mce_m_peek($_i, @_) if (exists $_all->{ $_Q->{_id} });
1450              
1451 22 50 33     160 _croak('Queue: (peek index) is not an integer')
1452             if (!looks_like_number($_i) || int($_i) != $_i);
1453              
1454 22         87 $_req3->(OUTPUT_P_QUE, $_Q->{_id}.$LF . $_i.$LF, 1);
1455             }
1456              
1457             sub _mce_w_peekp {
1458 22   100 22   66 my ($_Q, $_p) = (shift, shift); my $_i = shift || 0;
  22         95  
1459              
1460 22 50       84 return $_Q->_mce_m_peekp($_p, $_i, @_) if (exists $_all->{ $_Q->{_id} });
1461              
1462 22 50 33     149 _croak('Queue: (peekp priority) is not an integer')
1463             if (!looks_like_number($_p) || int($_p) != $_p);
1464 22 50 33     87 _croak('Queue: (peekp index) is not an integer')
1465             if (!looks_like_number($_i) || int($_i) != $_i);
1466              
1467 22         87 $_req3->(OUTPUT_P_QUP, $_Q->{_id}.$LF . $_p.$LF . $_i.$LF, 1);
1468             }
1469              
1470             sub _mce_w_peekh {
1471 4   100 4   40 my $_Q = shift; my $_i = shift || 0;
  4         18  
1472              
1473 4 50       15 return $_Q->_mce_m_peekh($_i, @_) if (exists $_all->{ $_Q->{_id} });
1474              
1475 4 50 33     62 _croak('Queue: (peekh index) is not an integer')
1476             if (!looks_like_number($_i) || int($_i) != $_i);
1477              
1478 4         21 my $_ret = $_req3->(OUTPUT_P_QUH, $_Q->{_id}.$LF . $_i.$LF, 1);
1479              
1480 4 50       62 length($_ret) ? int($_ret) : undef;
1481             }
1482              
1483             sub _mce_w_heap {
1484 2     2   17 my ($_Q) = @_;
1485              
1486 2 50       9 return $_Q->_mce_m_heap() if (exists $_all->{ $_Q->{_id} });
1487              
1488 2         24 $_req3->(OUTPUT_H_QUE, $_Q->{_id}.$LF, 0);
1489             }
1490              
1491             }
1492              
1493             1;
1494              
1495             __END__