File Coverage

blib/lib/MCE/Queue.pm
Criterion Covered Total %
statement 460 581 79.1
branch 190 362 52.4
condition 61 162 37.6
subroutine 49 57 85.9
pod 1 1 100.0
total 761 1163 65.4


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Hybrid (normal and priority) queues.
4             ##
5             ###############################################################################
6              
7             package MCE::Queue;
8              
9 37     37   193984 use strict;
  37         83  
  37         977  
10 37     37   143 use warnings;
  37         64  
  37         996  
11              
12 37     37   145 no warnings qw( threads recursion uninitialized );
  37         58  
  37         1950  
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
17             ## no critic (TestingAndDebugging::ProhibitNoStrict)
18              
19 37     37   224 use Scalar::Util qw( looks_like_number );
  37         47  
  37         2232  
20 37     37   874 use MCE::Util qw( $LF );
  37         60  
  37         3690  
21 37     37   961 use MCE::Mutex ();
  37         68  
  37         7522  
22              
23             ###############################################################################
24             ## ----------------------------------------------------------------------------
25             ## Import routine.
26             ##
27             ###############################################################################
28              
29             our ($HIGHEST,$LOWEST, $FIFO,$LIFO, $LILO,$FILO) = (1,0, 1,0, 1,0);
30              
31             my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
32             my ($_def, $_imported) = ({});
33              
34             sub import {
35 39     39   269 my ($_class, $_pkg) = (shift, caller);
36              
37             ## Process module arguments.
38 39         179 my $_p = $_def->{$_pkg} = {
39             AWAIT => 0, PORDER => $HIGHEST, TYPE => $FIFO,
40             };
41              
42 39         152 while (my $_argument = shift) {
43 0         0 my $_arg = lc $_argument;
44              
45 0 0       0 $_p->{AWAIT } = shift, next if ( $_arg eq 'await' );
46 0 0       0 $_p->{PORDER} = shift, next if ( $_arg eq 'porder' );
47 0 0       0 $_p->{TYPE } = shift, next if ( $_arg eq 'type' );
48              
49 0         0 _croak("Error: ($_argument) invalid module option");
50             }
51              
52 39 100       471 return if $_imported++;
53              
54             ## Define public methods to internal methods.
55 37     37   241 no strict 'refs'; no warnings 'redefine';
  37     37   75  
  37         1105  
  37         180  
  37         56  
  37         11008  
56              
57 37 100 66     346 if ($INC{'MCE.pm'} && MCE->wid == 0) {
58 35         89 _mce_m_init();
59             }
60              
61 37         75 *{ 'MCE::Queue::await' } = \&_mce_m_await;
  37         183  
62 37         74 *{ 'MCE::Queue::clear' } = \&_mce_m_clear;
  37         118  
63 37         62 *{ 'MCE::Queue::end' } = \&_mce_m_end;
  37         205  
64 37         69 *{ 'MCE::Queue::enqueue' } = \&_mce_m_enqueue;
  37         216  
65 37         165 *{ 'MCE::Queue::enqueuep' } = \&_mce_m_enqueuep;
  37         174  
66 37         57 *{ 'MCE::Queue::dequeue' } = \&_mce_m_dequeue;
  37         168  
67 37         74 *{ 'MCE::Queue::dequeue_nb' } = \&_mce_m_dequeue_nb;
  37         111  
68 37         56 *{ 'MCE::Queue::dequeue_timed' } = \&_mce_m_dequeue_timed;
  37         139  
69 37         1105 *{ 'MCE::Queue::pending' } = \&_mce_m_pending;
  37         160  
70 37         56 *{ 'MCE::Queue::insert' } = \&_mce_m_insert;
  37         639  
71 37         107 *{ 'MCE::Queue::insertp' } = \&_mce_m_insertp;
  37         114  
72 37         65 *{ 'MCE::Queue::peek' } = \&_mce_m_peek;
  37         139  
73 37         69 *{ 'MCE::Queue::peekp' } = \&_mce_m_peekp;
  37         109  
74 37         60 *{ 'MCE::Queue::peekh' } = \&_mce_m_peekh;
  37         106  
75 37         51 *{ 'MCE::Queue::heap' } = \&_mce_m_heap;
  37         104  
76              
77 37         3853 return;
78             }
79              
80             ###############################################################################
81             ## ----------------------------------------------------------------------------
82             ## Define constants & variables.
83             ##
84             ###############################################################################
85              
86             use constant {
87 37         200402 OUTPUT_W_QUE => 'W~QUE', # Await from the queue
88             OUTPUT_C_QUE => 'C~QUE', # Clear the queue
89             OUTPUT_E_QUE => 'E~QUE', # End the queue
90              
91             OUTPUT_A_QUE => 'A~QUE', # Enqueue into queue (array)
92             OUTPUT_A_QUP => 'A~QUP', # Enqueue into queue (array (p))
93             OUTPUT_D_QUE => 'D~QUE', # Dequeue from queue (blocking)
94             OUTPUT_D_QUN => 'D~QUN', # Dequeue from queue (non-blocking)
95             OUTPUT_D_QUT => 'D~QUT', # Dequeue from queue (timed)
96              
97             OUTPUT_N_QUE => 'N~QUE', # Return the number of items
98             OUTPUT_I_QUE => 'I~QUE', # Insert into queue
99             OUTPUT_I_QUP => 'I~QUP', # Insert into queue (p)
100              
101             OUTPUT_P_QUE => 'P~QUE', # Peek into queue
102             OUTPUT_P_QUP => 'P~QUP', # Peek into queue (p)
103             OUTPUT_P_QUH => 'P~QUH', # Peek into heap
104             OUTPUT_H_QUE => 'H~QUE' # Return the heap
105 37     37   240 };
  37         69  
106              
107             ## Attributes used internally.
108             ## _qr_sock _qw_sock _datp _datq _dsem _heap _id _init_pid _porder _type
109             ## _ar_sock _aw_sock _asem _tsem
110              
111             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
112              
113             my %_valid_fields_new = map { $_ => 1 } qw(
114             await barrier fast gather porder queue type
115             );
116              
117             my $_all = {};
118             my $_qid = 0;
119              
120             sub CLONE {
121 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
122             }
123              
124             sub DESTROY {
125 23     23   68 my ($_Q) = @_;
126 23 50       84 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
127              
128 23 50       164 delete $_all->{ $_Q->{_id} } if exists $_Q->{_id};
129 23         246 undef $_Q->{_datp}, undef $_Q->{_datq}, undef $_Q->{_heap};
130              
131 23 100 66     258 if (exists $_Q->{_init_pid} && $_Q->{_init_pid} eq $_pid) {
132 16         50 MCE::Util::_destroy_socks($_Q, qw(_aw_sock _ar_sock _qw_sock _qr_sock));
133             }
134              
135 23         564 return;
136             }
137              
138             ###############################################################################
139             ## ----------------------------------------------------------------------------
140             ## New instance instantiation.
141             ##
142             ###############################################################################
143              
144             sub new {
145 51     51 1 2538 my ($_class, %_argv) = @_;
146 51         136 my $_pkg = caller;
147              
148 51         131 @_ = ();
149              
150 51   33     106 my $_Q = {}; bless($_Q, ref($_class) || $_class);
  51         359  
151              
152 51         209 for my $_p (keys %_argv) {
153             _croak("Queue: ($_p) is not a valid constructor argument")
154 45 50       154 unless (exists $_valid_fields_new{$_p});
155             }
156              
157 51         374 $_Q->{_asem} = 0; # Semaphore count variable for the ->await method
158 51         183 $_Q->{_datp} = {}; # Priority data { p1 => [ ], p2 => [ ], pN => [ ] }
159 51         142 $_Q->{_heap} = []; # Priority heap [ pN, p2, p1 ] in heap order
160             # fyi, _datp will always dequeue before _datq
161              
162             $_Q->{_await} = (defined $_argv{await})
163 51 100 50     982 ? $_argv{await} : $_def->{$_pkg}{AWAIT} || 0;
164              
165             $_Q->{_porder} = (defined $_argv{porder})
166 51 100 33     279 ? $_argv{porder} : $_def->{$_pkg}{PORDER} || $HIGHEST;
167              
168             $_Q->{_type} = (defined $_argv{type})
169 51 100 33     276 ? $_argv{type} : $_def->{$_pkg}{TYPE} || $FIFO;
170              
171             ## -------------------------------------------------------------------------
172              
173 51 100       144 if (exists $_argv{queue}) {
174             _croak('Queue: (queue) is not an ARRAY reference')
175 7 50       28 unless (ref $_argv{queue} eq 'ARRAY');
176              
177 7         23 $_Q->{_datq} = $_argv{queue};
178             }
179             else {
180 44         190 $_Q->{_datq} = [];
181             }
182              
183 51 50       171 if (exists $_argv{gather}) {
184             _croak('Queue: (gather) is not a CODE reference')
185 0 0       0 unless (ref $_argv{gather} eq 'CODE');
186              
187 0         0 $_Q->{gather} = $_argv{gather};
188             }
189              
190             ## -------------------------------------------------------------------------
191              
192 51         469 $_Q->{_qr_mutex} = MCE::Mutex->new();
193 51 50       226 $_Q->{_init_pid} = $_tid ? $$ .'.'. $_tid : $$;
194 51         149 $_Q->{_id} = ++$_qid; $_all->{$_qid} = $_Q;
  51         207  
195 51         145 $_Q->{_dsem} = 0;
196              
197 51         319 MCE::Util::_sock_pair($_Q, qw(_qr_sock _qw_sock), undef, 1);
198 51 100       263 MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock), undef, 1) if $_Q->{_await};
199              
200 51         193 return $_Q;
201             }
202              
203             ###############################################################################
204             ## ----------------------------------------------------------------------------
205             ## Private methods.
206             ##
207             ###############################################################################
208              
209             sub _croak {
210 0 0   0   0 unless ($INC{'MCE.pm'}) {
211 0         0 $\ = undef; require Carp; goto &Carp::croak;
  0         0  
  0         0  
212             } else {
213 0         0 goto &MCE::_croak;
214             }
215             }
216              
217             ## Add items to the tail of the queue with priority level.
218              
219             sub _enqueuep {
220 83     83   156 my ($_Q, $_p) = (shift, shift);
221              
222             ## Enlist priority into the heap.
223 83 100 100     257 if (!exists $_Q->{_datp}->{$_p} || @{ $_Q->{_datp}->{$_p} } == 0) {
  36         111  
224              
225 65 100       68 unless (scalar @{ $_Q->{_heap} }) {
  65 100       166  
226 50         53 push @{ $_Q->{_heap} }, $_p;
  50         186  
227             }
228 0         0 elsif ($_Q->{_porder}) {
229 9         49 $_Q->_heap_insert_high($_p);
230             }
231             else {
232 6         30 $_Q->_heap_insert_low($_p);
233             }
234             }
235              
236             ## Append item(s) into the queue.
237 83         111 push @{ $_Q->{_datp}->{$_p} }, @_;
  83         278  
238              
239 83         136 return;
240             }
241              
242             ## Return one item from the queue.
243              
244             sub _dequeue {
245 944     944   1404 my ($_Q) = @_;
246              
247             ## Return item from the non-priority queue.
248 944 100       1174 unless (scalar @{ $_Q->{_heap} }) {
  944         1618  
249             return ($_Q->{_type})
250 850 100       2008 ? shift @{ $_Q->{_datq} } : pop @{ $_Q->{_datq} };
  836         2116  
  14         43  
251             }
252              
253 94         150 my $_p = $_Q->{_heap}->[0];
254              
255             ## Delist priority from the heap when 1 item remains.
256 94 100       89 shift @{ $_Q->{_heap} } if (@{ $_Q->{_datp}->{$_p} } == 1);
  47         83  
  94         195  
257              
258             ## Return item from the priority queue.
259             return ($_Q->{_type})
260 94 100       209 ? shift @{ $_Q->{_datp}->{$_p} } : pop @{ $_Q->{_datp}->{$_p} };
  70         163  
  24         84  
261             }
262              
263             ## Helper method for getting the reference to the underlying array.
264             ## Use with test scripts for comparing data only (not a public API).
265              
266             sub _get_aref {
267 50     50   6738 my ($_Q, $_p) = @_;
268              
269 50 50 66     322 return if ($INC{'MCE.pm'} && !defined $MCE::MCE->{_wid});
270 50 50 66     156 return if (defined $MCE::MCE && $MCE::MCE->{_wid});
271              
272 50 100       98 if (defined $_p) {
273 45 50 33     244 _croak('Queue: (get_aref priority) is not an integer')
274             if (!looks_like_number($_p) || int($_p) != $_p);
275              
276 45 100       132 return undef unless (exists $_Q->{_datp}->{$_p});
277 36         339 return $_Q->{_datp}->{$_p};
278             }
279              
280 5         31 return $_Q->{_datq};
281             }
282              
283             ## Insert priority into the heap. A lower priority level comes first.
284              
285             sub _heap_insert_low {
286 6     6   16 my ($_Q, $_p) = @_;
287              
288             ## Insert priority at the head of the heap.
289 6 50       39 if ($_p < $_Q->{_heap}->[0]) {
    100          
290 0         0 unshift @{ $_Q->{_heap} }, $_p;
  0         0  
291             }
292              
293             ## Insert priority at the end of the heap.
294             elsif ($_p > $_Q->{_heap}->[-1]) {
295 4         12 push @{ $_Q->{_heap} }, $_p;
  4         17  
296             }
297              
298             ## Insert priority through binary search.
299             else {
300 2         12 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  2         10  
  2         9  
301              
302 2         10 while ($_lower < $_upper) {
303 4         12 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
304 4 100       20 if ($_p > $_Q->{_heap}->[$_midpoint]) {
305 2         8 $_lower = $_midpoint + 1;
306             } else {
307 2         7 $_upper = $_midpoint;
308             }
309             }
310              
311             ## Insert priority into the heap.
312 2         6 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  2         7  
313             }
314              
315 6         15 return;
316             }
317              
318             ## Insert priority into the heap. A higher priority level comes first.
319              
320             sub _heap_insert_high {
321 9     9   25 my ($_Q, $_p) = @_;
322              
323             ## Insert priority at the head of the heap.
324 9 100       57 if ($_p > $_Q->{_heap}->[0]) {
    50          
325 6         22 unshift @{ $_Q->{_heap} }, $_p;
  6         30  
326             }
327              
328             ## Insert priority at the end of the heap.
329             elsif ($_p < $_Q->{_heap}->[-1]) {
330 0         0 push @{ $_Q->{_heap} }, $_p;
  0         0  
331             }
332              
333             ## Insert priority through binary search.
334             else {
335 3         14 my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
  3         5  
  3         8  
336              
337 3         9 while ($_lower < $_upper) {
338 6         13 my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
339 6 100       28 if ($_p < $_Q->{_heap}->[$_midpoint]) {
340 3         10 $_lower = $_midpoint + 1;
341             } else {
342 3         7 $_upper = $_midpoint;
343             }
344             }
345              
346             ## Insert priority into the heap.
347 3         17 splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
  3         22  
348             }
349              
350 9         33 return;
351             }
352              
353             ###############################################################################
354             ## ----------------------------------------------------------------------------
355             ## Output routines for the manager process.
356             ##
357             ###############################################################################
358              
359             {
360             my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_cnt, $_i, $_id);
361             my ($_len, $_p, $_t, $_Q, $_has_data, $_pending);
362              
363             my %_output_function = (
364              
365             OUTPUT_W_QUE.$LF => sub { # Await from the queue
366             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
367              
368             chomp($_id = <$_DAU_R_SOCK>),
369             chomp($_t = <$_DAU_R_SOCK>);
370              
371             $_Q = $_all->{$_id};
372             $_Q->{_tsem} = $_t;
373              
374             if ($_Q->pending() <= $_t) {
375             syswrite($_Q->{_aw_sock}, $LF);
376             } else {
377             $_Q->{_asem} += 1;
378             }
379              
380             print {$_DAU_R_SOCK} $LF;
381              
382             return;
383             },
384              
385             OUTPUT_C_QUE.$LF => sub { # Clear the queue
386             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
387              
388             chomp($_id = <$_DAU_R_SOCK>);
389             _mce_m_clear($_all->{$_id});
390              
391             print {$_DAU_R_SOCK} $LF;
392              
393             return;
394             },
395              
396             OUTPUT_E_QUE.$LF => sub { # End the queue
397             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
398              
399             chomp($_id = <$_DAU_R_SOCK>);
400             _mce_m_end($_all->{$_id});
401              
402             print {$_DAU_R_SOCK} $LF;
403              
404             return;
405             },
406              
407             ## ----------------------------------------------------------------------
408              
409             OUTPUT_A_QUE.$LF => sub { # Enqueue into queue (A)
410             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
411              
412             chomp($_id = <$_DAU_R_SOCK>),
413             chomp($_len = <$_DAU_R_SOCK>);
414              
415             read $_DAU_R_SOCK, my($_buf), $_len;
416              
417             $_Q = $_all->{$_id};
418              
419             if ($_Q->{gather}) {
420             local $_ = $_MCE->{thaw}($_buf);
421             $_Q->{gather}($_Q, @{ $_ });
422             }
423             else {
424             $_Q->_mce_m_enqueue(@{ $_MCE->{thaw}($_buf) });
425             }
426              
427             return;
428             },
429              
430             OUTPUT_A_QUP.$LF => sub { # Enqueue into queue (A,p)
431             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
432              
433             chomp($_id = <$_DAU_R_SOCK>),
434             chomp($_p = <$_DAU_R_SOCK>),
435             chomp($_len = <$_DAU_R_SOCK>);
436              
437             read $_DAU_R_SOCK, my($_buf), $_len;
438              
439             $_Q = $_all->{$_id};
440             $_Q->_mce_m_enqueuep($_p, @{ $_MCE->{thaw}($_buf) });
441              
442             return;
443             },
444              
445             ## ----------------------------------------------------------------------
446              
447             OUTPUT_D_QUE.$LF => sub { # Dequeue from queue (B)
448             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
449              
450             chomp($_id = <$_DAU_R_SOCK>),
451             chomp($_cnt = <$_DAU_R_SOCK>);
452              
453             $_cnt = 0 if ($_cnt == 1);
454             $_Q = $_all->{$_id};
455              
456             my (@_items, $_buf);
457              
458             if ($_cnt) {
459             my $_pending = @{ $_Q->{_datq} };
460              
461             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
462             for my $_h (@{ $_Q->{_heap} }) {
463             $_pending += @{ $_Q->{_datp}->{$_h} };
464             }
465             }
466             $_cnt = $_pending if $_pending < $_cnt;
467              
468             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
469             }
470             else {
471             $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
472             $_buf = $_Q->_dequeue();
473             }
474              
475             if ($_cnt) {
476             $_buf = $_MCE->{freeze}(\@_items);
477             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
478             }
479             elsif ($_has_data) {
480             $_buf = $_MCE->{freeze}([ $_buf ]);
481             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
482             }
483             elsif (exists $_Q->{_ended}) {
484             print {$_DAU_R_SOCK} '-2'.$LF;
485             }
486             else {
487             print {$_DAU_R_SOCK} '-1'.$LF;
488             $_Q->{_dsem} += 1;
489             }
490              
491             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
492             for my $_i (1 .. $_Q->{_asem}) {
493             syswrite($_Q->{_aw_sock}, $LF);
494             }
495             $_Q->{_asem} = 0;
496             }
497              
498             return;
499             },
500              
501             OUTPUT_D_QUN.$LF => sub { # Dequeue from queue (NB)
502             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
503              
504             chomp($_id = <$_DAU_R_SOCK>),
505             chomp($_cnt = <$_DAU_R_SOCK>);
506              
507             $_Q = $_all->{$_id};
508              
509             if ($_cnt == 1) {
510             my $_buf = $_Q->_dequeue();
511              
512             if (defined $_buf) {
513             $_buf = $_MCE->{freeze}([ $_buf ]);
514             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
515             }
516             else {
517             print {$_DAU_R_SOCK} '-1'.$LF;
518             }
519             }
520             else {
521             my @_items;
522             my $_pending = @{ $_Q->{_datq} };
523              
524             if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
525             for my $_h (@{ $_Q->{_heap} }) {
526             $_pending += @{ $_Q->{_datp}->{$_h} };
527             }
528             }
529             $_cnt = $_pending if $_pending < $_cnt;
530              
531             for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
532              
533             if ($_cnt) {
534             my $_buf = $_MCE->{freeze}(\@_items);
535             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
536             }
537             else {
538             print {$_DAU_R_SOCK} '-1'.$LF;
539             }
540             }
541              
542             if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
543             for my $_i (1 .. $_Q->{_asem}) {
544             syswrite($_Q->{_aw_sock}, $LF);
545             }
546             $_Q->{_asem} = 0;
547             }
548              
549             return;
550             },
551              
552             OUTPUT_D_QUT.$LF => sub { # Dequeue from queue (Timed)
553             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
554              
555             chomp($_id = <$_DAU_R_SOCK>);
556              
557             $_Q = $_all->{$_id};
558             $_Q->{_dsem} -= 1 if $_Q->{_dsem};
559              
560             print {$_DAU_R_SOCK} $LF;
561              
562             return;
563             },
564              
565             ## ----------------------------------------------------------------------
566              
567             OUTPUT_N_QUE.$LF => sub { # Return number of items
568             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
569              
570             chomp($_id = <$_DAU_R_SOCK>);
571              
572             print {$_DAU_R_SOCK} $_all->{$_id}->_mce_m_pending().$LF;
573              
574             return;
575             },
576              
577             OUTPUT_I_QUE.$LF => sub { # Insert into queue
578             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
579              
580             chomp($_id = <$_DAU_R_SOCK>),
581             chomp($_i = <$_DAU_R_SOCK>),
582             chomp($_len = <$_DAU_R_SOCK>);
583              
584             read $_DAU_R_SOCK, my($_buf), $_len;
585              
586             $_Q = $_all->{$_id};
587             $_Q->_mce_m_insert($_i, @{ $_MCE->{thaw}($_buf) });
588              
589             return;
590             },
591              
592             OUTPUT_I_QUP.$LF => sub { # Insert into queue (p)
593             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
594              
595             chomp($_id = <$_DAU_R_SOCK>),
596             chomp($_p = <$_DAU_R_SOCK>),
597             chomp($_i = <$_DAU_R_SOCK>),
598             chomp($_len = <$_DAU_R_SOCK>);
599              
600             read $_DAU_R_SOCK, my($_buf), $_len;
601              
602             $_Q = $_all->{$_id};
603             $_Q->_mce_m_insertp($_p, $_i, @{ $_MCE->{thaw}($_buf) });
604              
605             return;
606             },
607              
608             ## ----------------------------------------------------------------------
609              
610             OUTPUT_P_QUE.$LF => sub { # Peek into queue
611             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
612              
613             chomp($_id = <$_DAU_R_SOCK>),
614             chomp($_i = <$_DAU_R_SOCK>);
615              
616             $_Q = $_all->{$_id};
617             $_buf = $_Q->_mce_m_peek($_i);
618              
619             if (defined $_buf) {
620             $_buf = $_MCE->{freeze}([ $_buf ]);
621             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
622             } else {
623             print {$_DAU_R_SOCK} '-1'.$LF;
624             }
625              
626             return;
627             },
628              
629             OUTPUT_P_QUP.$LF => sub { # Peek into queue (p)
630             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
631              
632             chomp($_id = <$_DAU_R_SOCK>),
633             chomp($_p = <$_DAU_R_SOCK>),
634             chomp($_i = <$_DAU_R_SOCK>);
635              
636             $_Q = $_all->{$_id};
637             $_buf = $_Q->_mce_m_peekp($_p, $_i);
638              
639             if (defined $_buf) {
640             $_buf = $_MCE->{freeze}([ $_buf ]);
641             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
642             } else {
643             print {$_DAU_R_SOCK} '-1'.$LF;
644             }
645              
646             return;
647             },
648              
649             OUTPUT_P_QUH.$LF => sub { # Peek into heap
650             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
651              
652             chomp($_id = <$_DAU_R_SOCK>),
653             chomp($_i = <$_DAU_R_SOCK>);
654              
655             $_Q = $_all->{$_id};
656             $_buf = $_Q->_mce_m_peekh($_i);
657              
658             if (defined $_buf) {
659             $_buf = $_MCE->{freeze}([ $_buf ]);
660             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
661             } else {
662             print {$_DAU_R_SOCK} '-1'.$LF;
663             }
664              
665             return;
666             },
667              
668             OUTPUT_H_QUE.$LF => sub { # Return the heap
669             my $_buf; $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
670              
671             chomp($_id = <$_DAU_R_SOCK>);
672              
673             $_Q = $_all->{$_id};
674             $_buf = $_MCE->{freeze}([ $_Q->_mce_m_heap() ]);
675              
676             print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
677              
678             return;
679             },
680              
681             );
682              
683             ## -------------------------------------------------------------------------
684              
685             sub _mce_m_loop_begin {
686 87     87   299 ($_MCE, $_DAU_R_SOCK_REF) = @_;
687              
688 87         217 return;
689             }
690              
691             sub _mce_m_loop_end {
692 87     87   296 $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = $_cnt = $_i = $_id =
693             $_len = $_p = $_Q = undef;
694              
695 87         184 return;
696             }
697              
698             sub _mce_m_init {
699 35     35   157 MCE::_attach_plugin(
700             \%_output_function, \&_mce_m_loop_begin, \&_mce_m_loop_end,
701             \&_mce_w_init
702             );
703              
704 35         90 return;
705             }
706              
707             }
708              
709             ###############################################################################
710             ## ----------------------------------------------------------------------------
711             ## Methods for the manager process.
712             ##
713             ###############################################################################
714              
715             ## await ( pending_threshold )
716              
717             sub _mce_m_await {
718             # Handled by the manager process when called by MCE workers.
719 0     0   0 return;
720             }
721              
722             ## clear ( )
723              
724             sub _mce_m_clear {
725 28     28   2279 my ($_Q) = @_;
726              
727 28         42 %{ $_Q->{_datp} } = ();
  28         98  
728 28         45 @{ $_Q->{_datq} } = ();
  28         55  
729 28         42 @{ $_Q->{_heap} } = ();
  28         53  
730              
731 28         50 return;
732             }
733              
734             ## end ( )
735              
736             sub _mce_m_end {
737 0     0   0 my ($_Q) = @_;
738              
739 0 0       0 if (!exists $_Q->{_ended}) {
740 0         0 for my $_i (1 .. $_Q->{_dsem}) { syswrite($_Q->{_qw_sock}, $LF) }
  0         0  
741 0         0 $_Q->{_dsem} = 0, $_Q->{_ended} = undef;
742             }
743              
744 0         0 return;
745             }
746              
747             ## enqueue ( item [, item, ... ] )
748              
749             sub _mce_m_enqueue {
750 569     569   2058 my $_Q = shift;
751              
752 569 50       1027 return unless (scalar @_);
753              
754 569 50       1027 if (exists $_Q->{_ended}) {
755 0         0 warn "Queue: (enqueue) called on queue that has been 'end'ed\n";
756 0         0 return;
757             }
758              
759 569 100       1106 if ($_Q->{_dsem}) {
760 186         489 for my $_i (1 .. scalar @_) {
761 200         38726 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
762 200 100       861 last unless $_Q->{_dsem};
763             }
764             }
765              
766             ## Append item(s) into the queue.
767 569         600 push @{ $_Q->{_datq} }, @_;
  569         1526  
768              
769 569         1272 return;
770             }
771              
772             ## enqueuep ( priority, item [, item, ... ] )
773              
774             sub _mce_m_enqueuep {
775 74     74   200 my ($_Q, $_p) = (shift, shift);
776              
777 74 50 33     440 _croak('Queue: (enqueuep priority) is not an integer')
778             if (!looks_like_number($_p) || int($_p) != $_p);
779              
780 74 50       142 return unless (scalar @_);
781              
782 74 50       138 if (exists $_Q->{_ended}) {
783 0         0 warn "Queue: (enqueuep) called on queue that has been 'end'ed\n";
784 0         0 return;
785             }
786              
787 74 50       156 if ($_Q->{_dsem}) {
788 0         0 for my $_i (1 .. scalar @_) {
789 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
790 0 0       0 last unless $_Q->{_dsem};
791             }
792             }
793              
794 74         241 $_Q->_enqueuep($_p, @_);
795              
796 74         86 return;
797             }
798              
799             ## dequeue ( )
800             ## dequeue ( count )
801              
802             sub _mce_m_dequeue {
803 16     16   1528 my ($_Q, $_cnt) = @_;
804 16         25 my (@_items, $_has_data, $_buf);
805              
806 16 100 100     60 if (defined $_cnt && $_cnt ne '1') {
807 6 50 33     54 _croak('Queue: (dequeue count argument) is not valid')
      33        
808             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
809              
810 6         10 my $_pending = @{ $_Q->{_datq} };
  6         13  
811              
812 6 50 100     18 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  4         11  
813 4         6 for my $_h (@{ $_Q->{_heap} }) {
  4         11  
814 10         10 $_pending += @{ $_Q->{_datp}->{$_h} };
  10         19  
815             }
816             }
817 6 50       14 $_cnt = $_pending if $_pending < $_cnt;
818              
819 6         16 for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
  28         49  
820             }
821             else {
822 10 50 66     10 $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
823 10         22 $_buf = $_Q->_dequeue();
824             }
825              
826 16 100       46 return @_items if (scalar @_items);
827 10 50       33 return $_buf if ($_has_data);
828 0 0       0 return () if (exists $_Q->{_ended});
829              
830 0         0 $_Q->{_dsem} += 1, MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
831              
832 0         0 goto \&_mce_m_dequeue;
833             }
834              
835             ## dequeue_nb ( )
836             ## dequeue_nb ( count )
837              
838             sub _mce_m_dequeue_nb {
839 4     4   11 my ($_Q, $_cnt) = @_;
840              
841 4 50 33     16 if (defined $_cnt && $_cnt ne '1') {
842 0 0 0     0 _croak('Queue: (dequeue_nb count argument) is not valid')
      0        
843             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
844              
845 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
846              
847 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
848 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
849 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
850             }
851             }
852              
853 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
854              
855 0         0 return map { $_Q->_dequeue() } 1 .. $_cnt;
  0         0  
856             }
857              
858 4         48 my $_buf = $_Q->_dequeue();
859              
860 4 50       22 return defined($_buf) ? $_buf : ();
861             }
862              
863             ## dequeue_timed ( timeout )
864             ## dequeue_timed ( timeout, count )
865              
866             sub _mce_m_dequeue_timed {
867 4     4   11 my ($_Q, $_timeout, $_cnt) = @_;
868              
869 4 50       18 if (defined $_timeout) {
870 0 0       0 _croak('Queue: (dequeue_timed timeout argument) is not valid')
871             if (!looks_like_number($_timeout));
872             }
873              
874 4 50 33     13 if (defined $_cnt && $_cnt ne '1') {
875 0 0 0     0 _croak('Queue: (dequeue_timed count argument) is not valid')
      0        
876             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
877              
878 0         0 my $_pending = @{ $_Q->{_datq} };
  0         0  
879              
880 0 0 0     0 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
  0         0  
881 0         0 for my $_h (@{ $_Q->{_heap} }) {
  0         0  
882 0         0 $_pending += @{ $_Q->{_datp}->{$_h} };
  0         0  
883             }
884             }
885              
886 0 0       0 $_cnt = $_pending if $_pending < $_cnt;
887              
888 0         0 return map { $_Q->_dequeue() } 1 .. $_cnt;
  0         0  
889             }
890              
891 4         17 my $_buf = $_Q->_dequeue();
892              
893 4 50       20 return defined($_buf) ? $_buf : ();
894             }
895              
896             ## pending ( )
897              
898             sub _mce_m_pending {
899 14     14   1128 my ($_Q) = @_;
900 14         20 my $_pending = @{ $_Q->{_datq} };
  14         41  
901              
902 14 100       27 if (scalar @{ $_Q->{_heap} }) {
  14         53  
903 9         14 for my $_h (@{ $_Q->{_heap} }) {
  9         75  
904 9         18 $_pending += @{ $_Q->{_datp}->{$_h} };
  9         29  
905             }
906             }
907              
908             return (exists $_Q->{_ended})
909 14 0       365 ? $_pending ? $_pending : undef
    50          
910             : $_pending;
911             }
912              
913             ## insert ( index, item [, item, ... ] )
914              
915             sub _mce_m_insert {
916 50     50   120 my ($_Q, $_i) = (shift, shift);
917              
918 50 50 33     248 _croak('Queue: (insert index) is not an integer')
919             if (!looks_like_number($_i) || int($_i) != $_i);
920              
921 50 50       86 return unless (scalar @_);
922              
923 50 50       83 if (exists $_Q->{_ended}) {
924 0         0 warn "Queue: (insert) called on queue that has been 'end'ed\n";
925 0         0 return;
926             }
927              
928 50 50       75 if ($_Q->{_dsem}) {
929 0         0 for my $_i (1 .. scalar @_) {
930 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
931 0 0       0 last unless $_Q->{_dsem};
932             }
933             }
934              
935 50 100       71 if (abs($_i) > scalar @{ $_Q->{_datq} }) {
  50         88  
936 10 100       21 if ($_i >= 0) {
937 5 100       23 if ($_Q->{_type}) {
938 3         6 push @{ $_Q->{_datq} }, @_;
  3         10  
939             } else {
940 2         6 unshift @{ $_Q->{_datq} }, @_;
  2         9  
941             }
942             }
943             else {
944 5 100       13 if ($_Q->{_type}) {
945 3         3 unshift @{ $_Q->{_datq} }, @_;
  3         11  
946             } else {
947 2         10 push @{ $_Q->{_datq} }, @_;
  2         11  
948             }
949             }
950             }
951             else {
952 40 100       60 if (!$_Q->{_type}) {
953             $_i = ($_i >= 0)
954 16 100       34 ? scalar(@{ $_Q->{_datq} }) - $_i
  10         20  
955             : abs($_i);
956             }
957 40         44 splice @{ $_Q->{_datq} }, $_i, 0, @_;
  40         92  
958             }
959              
960 50         72 return;
961             }
962              
963             ## insertp ( priority, index, item [, item, ... ] )
964              
965             sub _mce_m_insertp {
966 90     90   183 my ($_Q, $_p, $_i) = (shift, shift, shift);
967              
968 90 50 33     437 _croak('Queue: (insertp priority) is not an integer')
969             if (!looks_like_number($_p) || int($_p) != $_p);
970 90 50 33     333 _croak('Queue: (insertp index) is not an integer')
971             if (!looks_like_number($_i) || int($_i) != $_i);
972              
973 90 50       132 return unless (scalar @_);
974              
975 90 50       164 if (exists $_Q->{_ended}) {
976 0         0 warn "Queue: (insertp) called on queue that has been 'end'ed\n";
977 0         0 return;
978             }
979              
980 90 50       134 if ($_Q->{_dsem}) {
981 0         0 for my $_i (1 .. scalar @_) {
982 0         0 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
983 0 0       0 last unless $_Q->{_dsem};
984             }
985             }
986              
987 90 100 50     223 if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) {
  90         213  
988              
989 81 100       92 if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} }) {
  81         122  
990 18 100       49 if ($_i >= 0) {
991 9 100       35 if ($_Q->{_type}) {
992 5         10 push @{ $_Q->{_datp}->{$_p} }, @_;
  5         16  
993             } else {
994 4         27 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  4         37  
995             }
996             }
997             else {
998 9 100       54 if ($_Q->{_type}) {
999 5         10 unshift @{ $_Q->{_datp}->{$_p} }, @_;
  5         16  
1000             } else {
1001 4         29 push @{ $_Q->{_datp}->{$_p} }, @_;
  4         32  
1002             }
1003             }
1004             }
1005             else {
1006 63 100       106 if (!$_Q->{_type}) {
1007             $_i = ($_i >=0)
1008 28 100       83 ? scalar(@{ $_Q->{_datp}->{$_p} }) - $_i
  16         46  
1009             : abs($_i);
1010             }
1011 63         72 splice @{ $_Q->{_datp}->{$_p} }, $_i, 0, @_;
  63         150  
1012             }
1013             }
1014             else {
1015 9         35 $_Q->_enqueuep($_p, @_);
1016             }
1017              
1018 90         162 return;
1019             }
1020              
1021             ## peek ( index )
1022             ## peek ( )
1023              
1024             sub _mce_m_peek {
1025 55     55   108 my ($_Q, $_i) = @_;
1026              
1027 55 100       90 if ($_i) {
1028 40 50 33     252 _croak('Queue: (peek index) is not an integer')
1029             if (!looks_like_number($_i) || int($_i) != $_i);
1030             }
1031 15         22 else { $_i = 0 }
1032              
1033 55 100       73 return undef if (abs($_i) > scalar @{ $_Q->{_datq} });
  55         158  
1034              
1035 40 100       84 if (!$_Q->{_type}) {
1036             $_i = ($_i >= 0)
1037 16 100       44 ? scalar(@{ $_Q->{_datq} }) - ($_i + 1)
  10         20  
1038             : abs($_i + 1);
1039             }
1040              
1041 40         113 return $_Q->{_datq}->[$_i];
1042             }
1043              
1044             ## peekp ( priority, index )
1045             ## peekp ( priority )
1046              
1047             sub _mce_m_peekp {
1048 99     99   200 my ($_Q, $_p, $_i) = @_;
1049              
1050 99 100       183 if ($_i) {
1051 72 50 33     478 _croak('Queue: (peekp index) is not an integer')
1052             if (!looks_like_number($_i) || int($_i) != $_i);
1053             }
1054 27         32 else { $_i = 0 }
1055              
1056 99 50 33     465 _croak('Queue: (peekp priority) is not an integer')
1057             if (!looks_like_number($_p) || int($_p) != $_p);
1058              
1059 99 50       225 return undef unless (exists $_Q->{_datp}->{$_p});
1060 99 100       148 return undef if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} });
  99         239  
1061              
1062 72 100       148 if (!$_Q->{_type}) {
1063             $_i = ($_i >= 0)
1064 32 100       65 ? scalar(@{ $_Q->{_datp}->{$_p} }) - ($_i + 1)
  20         58  
1065             : abs($_i + 1);
1066             }
1067              
1068 72         207 return $_Q->{_datp}->{$_p}->[$_i];
1069             }
1070              
1071             ## peekh ( index )
1072             ## peekh ( )
1073              
1074             sub _mce_m_peekh {
1075 10     10   1340 my ($_Q, $_i) = @_;
1076              
1077 10 100       26 if ($_i) {
1078 5 50 33     80 _croak('Queue: (peekh index) is not an integer')
1079             if (!looks_like_number($_i) || int($_i) != $_i);
1080             }
1081 5         7 else { $_i = 0 }
1082              
1083 10 50       26 return undef if (abs($_i) > scalar @{ $_Q->{_heap} });
  10         34  
1084 10         35 return $_Q->{_heap}->[$_i];
1085             }
1086              
1087             ## heap ( )
1088              
1089             sub _mce_m_heap {
1090 5     5   13 return @{ shift->{_heap} };
  5         47  
1091             }
1092              
1093             ###############################################################################
1094             ## ----------------------------------------------------------------------------
1095             ## Methods for the worker process.
1096             ##
1097             ###############################################################################
1098              
1099             {
1100             my (
1101             $_MCE, $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_chn, $_lock_chn,
1102             $_dat_ex, $_dat_un, $_len, $_pending
1103             );
1104              
1105             my $_req1 = sub {
1106             local $\ = undef if (defined $\);
1107              
1108             $_dat_ex->() if $_lock_chn;
1109             print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1110             print({$_DAU_W_SOCK} $_[1], $_[2]);
1111              
1112             $_dat_un->() if $_lock_chn;
1113             };
1114              
1115             my $_req2 = sub {
1116             local $\ = undef if (defined $\);
1117             local $/ = $LF if ($/ ne $LF);
1118              
1119             $_dat_ex->() if $_lock_chn;
1120             print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1121             print({$_DAU_W_SOCK} $_[1]);
1122             <$_DAU_W_SOCK>;
1123              
1124             $_dat_un->() if $_lock_chn;
1125             };
1126              
1127             my $_req3 = sub {
1128             local $\ = undef if (defined $\);
1129             local $/ = $LF if ($/ ne $LF);
1130              
1131             $_dat_ex->() if $_lock_chn;
1132             print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1133             print({$_DAU_W_SOCK} $_[1]);
1134              
1135             chomp($_len = <$_DAU_W_SOCK>);
1136              
1137             if ($_len < 0) {
1138             $_dat_un->() if $_lock_chn;
1139             return defined($_[3]) ? () : undef;
1140             }
1141              
1142             read $_DAU_W_SOCK, my($_buf), $_len;
1143             $_dat_un->() if $_lock_chn;
1144              
1145             ($_[2] == 1)
1146             ? ($_MCE->{thaw}($_buf))->[0]
1147             : @{ $_MCE->{thaw}($_buf) };
1148             };
1149              
1150             sub _mce_w_init {
1151 28     28   357 ($_MCE) = @_;
1152 28         343 $_chn = $_MCE->{_chn};
1153 28         104 $_DAT_LOCK = $_MCE->{_dat_lock};
1154 28         74 $_DAT_W_SOCK = $_MCE->{_dat_w_sock}->[0];
1155 28         84 $_DAU_W_SOCK = $_MCE->{_dat_w_sock}->[$_chn];
1156 28         65 $_lock_chn = $_MCE->{_lock_chn};
1157              
1158 28 50       167 if ($_lock_chn) {
1159             # inlined for performance
1160             $_dat_ex = sub {
1161 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1162             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
1163 0 0       0 if $_is_MSWin32;
1164             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
1165 0 0       0 unless $_DAT_LOCK->{ $_pid };
1166 0         0 };
1167             $_dat_un = sub {
1168 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1169             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
1170 0 0       0 if $_DAT_LOCK->{ $_pid };
1171 0         0 };
1172             }
1173              
1174 28         2189 $_all = {};
1175              
1176 37     37   296 no strict 'refs'; no warnings 'redefine';
  37     37   78  
  37         1342  
  37         247  
  37         100  
  37         80447  
1177              
1178 28         131 *{ 'MCE::Queue::await' } = \&_mce_w_await;
  28         1377  
1179 28         267 *{ 'MCE::Queue::clear' } = \&_mce_w_clear;
  28         372  
1180 28         200 *{ 'MCE::Queue::end' } = \&_mce_w_end;
  28         549  
1181 28         112 *{ 'MCE::Queue::enqueue' } = \&_mce_w_enqueue;
  28         605  
1182 28         104 *{ 'MCE::Queue::enqueuep' } = \&_mce_w_enqueuep;
  28         391  
1183 28         86 *{ 'MCE::Queue::dequeue' } = \&_mce_w_dequeue;
  28         374  
1184 28         221 *{ 'MCE::Queue::dequeue_nb' } = \&_mce_w_dequeue_nb;
  28         240  
1185 28         121 *{ 'MCE::Queue::dequeue_timed' } = \&_mce_w_dequeue_timed;
  28         232  
1186 28         184 *{ 'MCE::Queue::pending' } = \&_mce_w_pending;
  28         501  
1187 28         79 *{ 'MCE::Queue::insert' } = \&_mce_w_insert;
  28         194  
1188 28         140 *{ 'MCE::Queue::insertp' } = \&_mce_w_insertp;
  28         266  
1189 28         103 *{ 'MCE::Queue::peek' } = \&_mce_w_peek;
  28         263  
1190 28         97 *{ 'MCE::Queue::peekp' } = \&_mce_w_peekp;
  28         369  
1191 28         176 *{ 'MCE::Queue::peekh' } = \&_mce_w_peekh;
  28         254  
1192 28         105 *{ 'MCE::Queue::heap' } = \&_mce_w_heap;
  28         139  
1193              
1194 28         158 return;
1195             }
1196              
1197             ## -------------------------------------------------------------------------
1198              
1199             sub _mce_w_await {
1200 0   0 0   0 my $_Q = shift; my $_t = shift || 0;
  0         0  
1201              
1202 0 0       0 return $_Q->_mce_m_await() if (exists $_all->{ $_Q->{_id} });
1203              
1204             _croak('Queue: (await) is not enabled for this queue')
1205 0 0       0 unless ($_Q->{_await});
1206 0 0 0     0 _croak('Queue: (await threshold) is not an integer')
1207             if (!looks_like_number($_t) || int($_t) != $_t);
1208              
1209 0 0       0 $_t = 0 if ($_t < 0);
1210 0         0 $_req2->(OUTPUT_W_QUE, $_Q->{_id}.$LF . $_t.$LF);
1211              
1212 0 0       0 MCE::Util::_sock_ready($_Q->{_ar_sock}) if $_is_MSWin32;
1213 0         0 MCE::Util::_sysread($_Q->{_ar_sock}, my($_next), 1);
1214              
1215 0         0 return;
1216             }
1217              
1218             sub _mce_w_clear {
1219 8     8   44 my ($_Q) = @_;
1220              
1221 8 50       32 return $_Q->_mce_m_clear() if (exists $_all->{ $_Q->{_id} });
1222              
1223 8         46 $_req2->(OUTPUT_C_QUE, $_Q->{_id}.$LF);
1224              
1225 8         33 return;
1226             }
1227              
1228             sub _mce_w_end {
1229 0     0   0 my ($_Q) = @_;
1230              
1231 0 0       0 return $_Q->_mce_m_end() if (exists $_all->{ $_Q->{_id} });
1232              
1233 0         0 $_req2->(OUTPUT_E_QUE, $_Q->{_id}.$LF);
1234              
1235 0         0 return;
1236             }
1237              
1238             ## -------------------------------------------------------------------------
1239              
1240             sub _mce_w_enqueue {
1241 56     56   790 my $_Q = shift;
1242              
1243 56 50       214 return $_Q->_mce_m_enqueue(@_) if (exists $_all->{ $_Q->{_id} });
1244              
1245 56 50       132 if (scalar @_) {
1246 56         492 my $_tmp = $_MCE->{freeze}([ @_ ]);
1247 56         196 my $_buf = $_Q->{_id}.$LF . length($_tmp).$LF;
1248 56         190 $_req1->(OUTPUT_A_QUE, $_buf, $_tmp);
1249             }
1250              
1251 56         168 return;
1252             }
1253              
1254             sub _mce_w_enqueuep {
1255 20     20   222 my ($_Q, $_p) = (shift, shift);
1256              
1257 20 50       205 return $_Q->_mce_m_enqueuep($_p, @_) if (exists $_all->{ $_Q->{_id} });
1258              
1259 20 50 33     145 _croak('Queue: (enqueuep priority) is not an integer')
1260             if (!looks_like_number($_p) || int($_p) != $_p);
1261              
1262 20 50       163 if (scalar @_) {
1263 20         311 my $_tmp = $_MCE->{freeze}([ @_ ]);
1264 20         87 my $_buf = $_Q->{_id}.$LF . $_p.$LF . length($_tmp).$LF;
1265 20         103 $_req1->(OUTPUT_A_QUP, $_buf, $_tmp);
1266             }
1267              
1268 20         53 return;
1269             }
1270              
1271             ## -------------------------------------------------------------------------
1272              
1273             sub _mce_w_dequeue {
1274 244     244   422 my $_buf; my ($_Q, $_cnt) = @_;
  244         436  
1275              
1276 244 50       818 return $_Q->_mce_m_dequeue($_cnt) if (exists $_all->{ $_Q->{_id} });
1277              
1278 244 100 100     927 if (defined $_cnt && $_cnt ne '1') {
1279 6 50 33     80 _croak('Queue: (dequeue count argument) is not valid')
      33        
1280             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
1281             } else {
1282 238         413 $_cnt = 1;
1283             }
1284              
1285             {
1286 244 50       293 local $\ = undef if (defined $\);
  244         515  
1287 244 50       651 local $/ = $LF if ($/ ne $LF);
1288              
1289 244 50       546 $_dat_ex->() if $_lock_chn;
1290              
1291 244         3585 print({$_DAT_W_SOCK} OUTPUT_D_QUE.$LF . $_chn.$LF),
1292 244         448 print({$_DAU_W_SOCK} $_Q->{_id}.$LF . $_cnt.$LF);
  244         2524  
1293 244         142767 chomp($_len = <$_DAU_W_SOCK>);
1294              
1295 244 100       1547 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1296              
1297 244 50       698 $_dat_un->() if $_lock_chn;
1298             }
1299              
1300 244 100 100     3797 return ($_MCE->{thaw}($_buf))->[0] if ($_len > 0 && $_cnt == 1);
1301 67 100       209 return @{ $_MCE->{thaw}($_buf) } if ($_len > 0);
  6         167  
1302 61 50       122 return if ($_len == -2);
1303              
1304 61 50       115 MCE::Util::_sock_ready($_Q->{_qr_sock}) if $_is_MSWin32;
1305 61         330 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
1306              
1307 61         724 goto \&_mce_w_dequeue;
1308             }
1309              
1310             sub _mce_w_dequeue_nb {
1311 4     4   17 my ($_Q, $_cnt) = @_;
1312              
1313 4 50       22 return $_Q->_mce_m_dequeue_nb($_cnt) if (exists $_all->{ $_Q->{_id} });
1314              
1315 4 50 33     24 if (defined $_cnt && $_cnt ne '1') {
1316 0 0 0     0 _croak('Queue: (dequeue_nb count argument) is not valid')
      0        
1317             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
1318             } else {
1319 4         9 $_cnt = 1;
1320             }
1321              
1322 4         23 $_req3->(OUTPUT_D_QUN, $_Q->{_id}.$LF . $_cnt.$LF, $_cnt, 1);
1323             }
1324              
1325             sub _mce_w_dequeue_timed {
1326 4     4   15 my ($_Q, $_timeout, $_cnt) = @_;
1327 4         12 my ($_buf, $_start);
1328              
1329             return $_Q->_mce_m_dequeue_timed($_timeout, $_cnt)
1330 4 50       19 if (exists $_all->{ $_Q->{_id} });
1331              
1332 4 50       18 if (defined $_timeout) {
1333 0 0       0 _croak('Queue: (dequeue_timed count argument) is not valid')
1334             if (!looks_like_number($_timeout));
1335 0         0 $_start = MCE::Util::_time();
1336             }
1337              
1338 4 50 33     40 if (defined $_cnt && $_cnt ne '1') {
1339 0 0 0     0 _croak('Queue: (dequeue_timed count argument) is not valid')
      0        
1340             if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
1341             } else {
1342 4         9 $_cnt = 1;
1343             }
1344              
1345 4 50 33     29 if (! $_timeout || $_timeout < 0.0) {
1346 4         41 return $_req3->(OUTPUT_D_QUN, $_Q->{_id}.$LF . $_cnt.$LF, $_cnt, 1);
1347             }
1348              
1349             {
1350 0 0       0 local $\ = undef if (defined $\);
  0         0  
1351 0 0       0 local $/ = $LF if ($/ ne $LF);
1352              
1353 0 0       0 $_dat_ex->() if $_lock_chn;
1354              
1355 0         0 print({$_DAT_W_SOCK} OUTPUT_D_QUE.$LF . $_chn.$LF),
1356 0         0 print({$_DAU_W_SOCK} $_Q->{_id}.$LF . $_cnt.$LF);
  0         0  
1357 0         0 chomp($_len = <$_DAU_W_SOCK>);
1358              
1359 0 0       0 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1360              
1361 0 0       0 $_dat_un->() if $_lock_chn;
1362             }
1363              
1364 0 0 0     0 return ($_MCE->{thaw}($_buf))->[0] if ($_len > 0 && $_cnt == 1);
1365 0 0       0 return @{ $_MCE->{thaw}($_buf) } if ($_len > 0);
  0         0  
1366 0 0       0 return if ($_len == -2);
1367              
1368 0         0 $_Q->{_qr_mutex}->lock();
1369 0         0 $_timeout = $_timeout - (MCE::Util::_time() - $_start) - 0.045;
1370 0 0       0 $_timeout = 0.0 if $_timeout < 0.045;
1371              
1372 0         0 CORE::vec(my $_r, CORE::fileno($_Q->{_qr_sock}), 1) = 1;
1373 0 0       0 if (CORE::select($_r, undef, undef, $_timeout) > 0) {
1374 0         0 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
1375 0         0 $_Q->{_qr_mutex}->unlock();
1376 0         0 return $_req3->(OUTPUT_D_QUN, $_Q->{_id}.$LF . $_cnt.$LF, $_cnt, 1);
1377             }
1378              
1379 0         0 $_Q->{_qr_mutex}->unlock();
1380 0         0 $_req2->(OUTPUT_D_QUT, $_Q->{_id}.$LF);
1381 0         0 MCE::Util::_sleep(0.045); # yield
1382              
1383 0         0 return ();
1384             }
1385              
1386             ## -------------------------------------------------------------------------
1387              
1388             sub _mce_w_pending {
1389 4     4   42 my ($_Q) = @_;
1390              
1391 4 50       24 return $_Q->_mce_m_pending() if (exists $_all->{ $_Q->{_id} });
1392              
1393 4 50       25 local $\ = undef if (defined $\);
1394 4 50       18 local $/ = $LF if ($/ ne $LF);
1395              
1396 4 50       12 $_dat_ex->() if $_lock_chn;
1397 4         45 print({$_DAT_W_SOCK} OUTPUT_N_QUE.$LF . $_chn.$LF),
1398 4         13 print({$_DAU_W_SOCK} $_Q->{_id}.$LF);
  4         36  
1399              
1400 4         5918 chomp($_pending = <$_DAU_W_SOCK>);
1401 4 50       33 $_dat_un->() if $_lock_chn;
1402              
1403 4 50       64 length($_pending) ? int($_pending) : undef;
1404             }
1405              
1406             sub _mce_w_insert {
1407 20     20   106 my ($_Q, $_i) = (shift, shift);
1408              
1409 20 50       53 return $_Q->_mce_m_insert($_i, @_) if (exists $_all->{ $_Q->{_id} });
1410              
1411 20 50 33     92 _croak('Queue: (insert index) is not an integer')
1412             if (!looks_like_number($_i) || int($_i) != $_i);
1413              
1414 20 50       40 return unless (scalar @_);
1415              
1416 20         107 my $_tmp = $_MCE->{freeze}([ @_ ]);
1417 20         58 my $_buf = $_Q->{_id}.$LF . $_i.$LF . length($_tmp).$LF . $_tmp;
1418              
1419 20         38 $_req1->(OUTPUT_I_QUE, $_buf, '');
1420              
1421 20         44 return;
1422             }
1423              
1424             sub _mce_w_insertp {
1425 20     20   85 my ($_Q, $_p, $_i) = (shift, shift, shift);
1426              
1427 20 50       49 return $_Q->_mce_m_insertp($_p, $_i, @_) if (exists $_all->{ $_Q->{_id} });
1428              
1429 20 50 33     144 _croak('Queue: (insertp priority) is not an integer')
1430             if (!looks_like_number($_p) || int($_p) != $_p);
1431 20 50 33     73 _croak('Queue: (insertp index) is not an integer')
1432             if (!looks_like_number($_i) || int($_i) != $_i);
1433              
1434 20 50       39 return unless (scalar @_);
1435              
1436 20         120 my $_tmp = $_MCE->{freeze}([ @_ ]);
1437 20         61 my $_buf = $_Q->{_id}.$LF . $_p.$LF . $_i.$LF . length($_tmp).$LF . $_tmp;
1438              
1439 20         42 $_req1->(OUTPUT_I_QUP, $_buf, '');
1440              
1441 20         46 return;
1442             }
1443              
1444             ## -------------------------------------------------------------------------
1445              
1446             sub _mce_w_peek {
1447 22   100 22   39 my $_Q = shift; my $_i = shift || 0;
  22         73  
1448              
1449 22 50       54 return $_Q->_mce_m_peek($_i, @_) if (exists $_all->{ $_Q->{_id} });
1450              
1451 22 50 33     125 _croak('Queue: (peek index) is not an integer')
1452             if (!looks_like_number($_i) || int($_i) != $_i);
1453              
1454 22         69 $_req3->(OUTPUT_P_QUE, $_Q->{_id}.$LF . $_i.$LF, 1);
1455             }
1456              
1457             sub _mce_w_peekp {
1458 22   100 22   53 my ($_Q, $_p) = (shift, shift); my $_i = shift || 0;
  22         70  
1459              
1460 22 50       180 return $_Q->_mce_m_peekp($_p, $_i, @_) if (exists $_all->{ $_Q->{_id} });
1461              
1462 22 50 33     121 _croak('Queue: (peekp priority) is not an integer')
1463             if (!looks_like_number($_p) || int($_p) != $_p);
1464 22 50 33     70 _croak('Queue: (peekp index) is not an integer')
1465             if (!looks_like_number($_i) || int($_i) != $_i);
1466              
1467 22         72 $_req3->(OUTPUT_P_QUP, $_Q->{_id}.$LF . $_p.$LF . $_i.$LF, 1);
1468             }
1469              
1470             sub _mce_w_peekh {
1471 4   100 4   29 my $_Q = shift; my $_i = shift || 0;
  4         29  
1472              
1473 4 50       33 return $_Q->_mce_m_peekh($_i, @_) if (exists $_all->{ $_Q->{_id} });
1474              
1475 4 50 33     66 _croak('Queue: (peekh index) is not an integer')
1476             if (!looks_like_number($_i) || int($_i) != $_i);
1477              
1478 4         28 my $_ret = $_req3->(OUTPUT_P_QUH, $_Q->{_id}.$LF . $_i.$LF, 1);
1479              
1480 4 50       49 length($_ret) ? int($_ret) : undef;
1481             }
1482              
1483             sub _mce_w_heap {
1484 2     2   33 my ($_Q) = @_;
1485              
1486 2 50       8 return $_Q->_mce_m_heap() if (exists $_all->{ $_Q->{_id} });
1487              
1488 2         19 $_req3->(OUTPUT_H_QUE, $_Q->{_id}.$LF, 0);
1489             }
1490              
1491             }
1492              
1493             1;
1494              
1495             __END__
1496              
1497             ###############################################################################
1498             ## ----------------------------------------------------------------------------
1499             ## Module usage.
1500             ##
1501             ###############################################################################
1502              
1503             =head1 NAME
1504              
1505             MCE::Queue - Hybrid (normal and priority) queues
1506              
1507             =head1 VERSION
1508              
1509             This document describes MCE::Queue version 1.887
1510              
1511             =head1 SYNOPSIS
1512              
1513             use MCE;
1514             use MCE::Queue;
1515              
1516             my $q = MCE::Queue->new;
1517              
1518             $q->enqueue( qw/ wherefore art thou romeo / );
1519              
1520             my $item = $q->dequeue;
1521              
1522             if ( $q->pending ) {
1523             ;
1524             }
1525              
1526             =head1 DESCRIPTION
1527              
1528             This module provides a queue interface supporting normal and priority
1529             queues and utilizing the IPC engine behind MCE. Data resides under the
1530             manager process. Three options are available for overriding the default
1531             value for new queues. The porder option applies to priority queues only.
1532              
1533             use MCE::Queue porder => $MCE::Queue::HIGHEST,
1534             type => $MCE::Queue::FIFO;
1535              
1536             use MCE::Queue; # Same as above
1537              
1538             ## Possible values
1539              
1540             porder => $MCE::Queue::HIGHEST # Highest priority items dequeue first
1541             $MCE::Queue::LOWEST # Lowest priority items dequeue first
1542              
1543             type => $MCE::Queue::FIFO # First in, first out
1544             $MCE::Queue::LIFO # Last in, first out
1545             $MCE::Queue::LILO # (Synonym for FIFO)
1546             $MCE::Queue::FILO # (Synonym for LIFO)
1547              
1548             =head1 DEMONSTRATION
1549              
1550             MCE::Queue provides two run modes.
1551              
1552             (A) The C<MCE::Queue> object is constructed before running MCE. The data
1553             resides under the manager process. Workers send and request data via IPC.
1554              
1555             (B) Workers might want to construct a queue for local access. In this mode,
1556             the data resides under the worker process and not available to other workers
1557             including the manager process.
1558              
1559             use MCE;
1560             use MCE::Queue;
1561              
1562             my $F = MCE::Queue->new( fast => 1 );
1563             my $consumers = 8;
1564              
1565             my $mce = MCE->new(
1566              
1567             task_end => sub {
1568             my ($mce, $task_id, $task_name) = @_;
1569             $F->end() if $task_name eq 'dir';
1570             },
1571              
1572             user_tasks => [{
1573             max_workers => 1, task_name => 'dir',
1574              
1575             user_func => sub {
1576             ## Create a "standalone queue" only accessible to this worker.
1577             my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]);
1578              
1579             while (defined (my $dir = $D->dequeue_nb)) {
1580             my (@files, @dirs); foreach (glob("$dir/*")) {
1581             if (-d $_) { push @dirs, $_; next; }
1582             push @files, $_;
1583             }
1584             $D->enqueue(@dirs ) if scalar @dirs;
1585             $F->enqueue(@files) if scalar @files;
1586             }
1587             }
1588             },{
1589             max_workers => $consumers, task_name => 'file',
1590              
1591             user_func => sub {
1592             while (defined (my $file = $F->dequeue)) {
1593             MCE->say($file);
1594             }
1595             }
1596             }]
1597              
1598             )->run({ user_args => [ $ARGV[0] || '.' ] });
1599              
1600             __END__
1601              
1602             Results taken from files_mce.pl and files_thr.pl on the web.
1603             https://github.com/marioroy/mce-examples/tree/master/other
1604              
1605             Usage:
1606             time ./files_mce.pl /usr 0 | wc -l
1607             time ./files_mce.pl /usr 1 | wc -l
1608             time ./files_thr.pl /usr | wc -l
1609              
1610             Darwin (OS) /usr: 216,271 files
1611             MCE::Queue, fast => 0 : 4.17s
1612             MCE::Queue, fast => 1 : 2.62s
1613             Thread::Queue : 4.14s
1614              
1615             Linux (VM) /usr: 186,154 files
1616             MCE::Queue, fast => 0 : 12.57s
1617             MCE::Queue, fast => 1 : 3.36s
1618             Thread::Queue : 5.91s
1619              
1620             Solaris (VM) /usr: 603,051 files
1621             MCE::Queue, fast => 0 : 39.04s
1622             MCE::Queue, fast => 1 : 18.08s
1623             Thread::Queue * Perl not built to support threads
1624              
1625             =head1 API DOCUMENTATION
1626              
1627             =head2 MCE::Queue->new ( [ queue => \@array, await => 1, fast => 1 ] )
1628              
1629             This creates a new queue. Available options are queue, porder, type, await,
1630             and gather. Note: The barrier and fast options are silentently ignored (no-op)
1631             if specified; starting with 1.867.
1632              
1633             use MCE;
1634             use MCE::Queue;
1635              
1636             my $q1 = MCE::Queue->new();
1637             my $q2 = MCE::Queue->new( queue => [ 0, 1, 2 ] );
1638              
1639             my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
1640             my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
1641              
1642             my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO );
1643             my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO );
1644              
1645             my $q7 = MCE::Queue->new( await => 1, barrier => 0 );
1646             my $q8 = MCE::Queue->new( fast => 1 );
1647              
1648             The C<await> option, when enabled, allows workers to block (semaphore-like)
1649             until the number of items pending is equal to or less than a threshold value.
1650             The $q->await method is described below.
1651              
1652             Obsolete: On Unix platforms, C<barrier> mode (enabled by default) prevents
1653             many workers from dequeuing simultaneously to lessen overhead for the OS kernel.
1654             Specify 0 to disable barrier mode and not allocate sockets. The barrier option
1655             has no effect if constructing the queue inside a thread or enabling C<fast>.
1656              
1657             Obsolete: The C<fast> option speeds up dequeues and is not enabled by default.
1658             It is beneficial for queues not calling (->dequeue_nb) and not altering the
1659             count value while running; e.g. ->dequeue($count).
1660              
1661             The C<gather> option is mainly for running with MCE and wanting to pass item(s)
1662             to a callback function for appending to the queue. Multiple queues may point to
1663             the same callback function. The callback receives the queue object as the first
1664             argument and items after it.
1665              
1666             sub _append {
1667             my ($q, @items) = @_;
1668             $q->enqueue(@items);
1669             }
1670              
1671             my $q7 = MCE::Queue->new( gather => \&_append );
1672             my $q8 = MCE::Queue->new( gather => \&_append );
1673              
1674             ## Items are diverted to the callback function, not the queue.
1675             $q7->enqueue( 'apple', 'orange' );
1676              
1677             Specifying the C<gather> option allows one to store items temporarily while
1678             ensuring output order. Although a queue object is not required, this is
1679             simply a demonstration of the gather option in the context of a queue.
1680              
1681             use MCE;
1682             use MCE::Queue;
1683              
1684             sub preserve_order {
1685             my %tmp; my $order_id = 1;
1686              
1687             return sub {
1688             my ($q, $chunk_id, $data) = @_;
1689             $tmp{$chunk_id} = $data;
1690              
1691             while (1) {
1692             last unless exists $tmp{$order_id};
1693             $q->enqueue( delete $tmp{$order_id++} );
1694             }
1695              
1696             return;
1697             };
1698             }
1699              
1700             my @squares; my $q = MCE::Queue->new(
1701             queue => \@squares, gather => preserve_order
1702             );
1703              
1704             my $mce = MCE->new(
1705             chunk_size => 1, input_data => [ 1 .. 100 ],
1706             user_func => sub {
1707             $q->enqueue( MCE->chunk_id, $_ * $_ );
1708             }
1709             );
1710              
1711             $mce->run;
1712              
1713             print "@squares\n";
1714              
1715             =head2 $q->await ( $pending_threshold )
1716              
1717             The await method is beneficial when wanting to throttle worker(s) appending
1718             to the queue. Perhaps, consumers are running a bit behind and wanting to keep
1719             tabs on memory consumption. Below, the number of items pending will never go
1720             above 20.
1721              
1722             use Time::HiRes qw( sleep );
1723              
1724             use MCE::Flow;
1725             use MCE::Queue;
1726              
1727             my $q = MCE::Queue->new( await => 1, fast => 1 );
1728             my ( $producers, $consumers ) = ( 1, 8 );
1729              
1730             mce_flow {
1731             task_name => [ 'producer', 'consumer' ],
1732             max_workers => [ $producers, $consumers ],
1733             },
1734             sub {
1735             ## producer
1736             for my $item ( 1 .. 100 ) {
1737             $q->enqueue($item);
1738              
1739             ## blocks until the # of items pending reaches <= 10
1740             if ($item % 10 == 0) {
1741             MCE->say( 'pending: '.$q->pending() );
1742             $q->await(10);
1743             }
1744             }
1745              
1746             ## notify consumers no more work
1747             $q->end();
1748              
1749             },
1750             sub {
1751             ## consumers
1752             while (defined (my $next = $q->dequeue())) {
1753             MCE->say( MCE->task_wid().': '.$next );
1754             sleep 0.100;
1755             }
1756             };
1757              
1758             =head2 $q->clear ( void )
1759              
1760             Clears the queue of any items. This has the effect of nulling the queue and
1761             the socket used for blocking.
1762              
1763             my @a; my $q = MCE::Queue->new( queue => \@a );
1764              
1765             @a = (); ## bad, the blocking socket may become out of sync
1766             $q->clear; ## ok
1767              
1768             =head2 $q->end ( void )
1769              
1770             Stops the queue from receiving more items. Any worker blocking on C<dequeue>
1771             will be unblocked automatically. Subsequent calls to C<dequeue> will behave
1772             like C<dequeue_nb>. Current API available since MCE 1.818.
1773              
1774             $q->end();
1775              
1776             MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one might
1777             want to enqueue C<undef>'s versus calling C<end>. The number of C<undef>'s
1778             depends on how many items workers dequeue at a time.
1779              
1780             $q->enqueue((undef) x ($N_workers * 1)); # $q->dequeue() 1 item
1781             $q->enqueue((undef) x ($N_workers * 2)); # $q->dequeue(2) 2 items
1782             $q->enqueue((undef) x ($N_workers * N)); # $q->dequeue(N) N items
1783              
1784             =head2 $q->enqueue ( $item [, $item, ... ] )
1785              
1786             Appends a list of items onto the end of the normal queue.
1787              
1788             $q->enqueue( 'foo' );
1789             $q->enqueue( 'bar', 'baz' );
1790              
1791             =head2 $q->enqueuep ( $p, $item [, $item, ... ] )
1792              
1793             Appends a list of items onto the end of the priority queue with priority.
1794              
1795             $q->enqueue( $priority, 'foo' );
1796             $q->enqueue( $priority, 'bar', 'baz' );
1797              
1798             =head2 $q->dequeue ( [ $count ] )
1799              
1800             Returns the requested number of items (default 1) from the queue. Priority
1801             data will always dequeue first before any data from the normal queue.
1802              
1803             $q->dequeue;
1804             $q->dequeue( 2 );
1805              
1806             The method will block if the queue contains zero items. If the queue contains
1807             fewer than the requested number of items, the method will not block, but
1808             return whatever items there are on the queue.
1809              
1810             The $count, used for requesting the number of items, is beneficial when workers
1811             are passing parameters through the queue. For this reason, always remember to
1812             dequeue using the same multiple for the count. This is unlike Thread::Queue
1813             which will block until the requested number of items are available.
1814              
1815             # MCE::Queue 1.820 and prior releases
1816             while ( my @items = $q->dequeue(2) ) {
1817             last unless ( defined $items[0] );
1818             ...
1819             }
1820              
1821             # MCE::Queue 1.821 and later
1822             while ( my @items = $q->dequeue(2) ) {
1823             ...
1824             }
1825              
1826             =head2 $q->dequeue_nb ( [ $count ] )
1827              
1828             Returns the requested number of items (default 1) from the queue. Like with
1829             dequeue, priority data will always dequeue first. This method is non-blocking
1830             and returns C<undef> in the absence of data.
1831              
1832             $q->dequeue_nb;
1833             $q->dequeue_nb( 2 );
1834              
1835             =head2 $q->dequeue_timed ( timeout [, $count ] )
1836              
1837             Returns the requested number of items (default 1) from the queue. Like with
1838             dequeue, priority data will always dequeue first. This method is blocking
1839             until the timeout is reached and returns C<undef> in the absence of data.
1840             Current API available since MCE 1.886.
1841              
1842             $q->dequeue_timed( 300 ); # timeout after 5 minutes
1843             $q->dequeue_timed( 300, 2 );
1844              
1845             The timeout may be specified as fractional seconds. If timeout is missing,
1846             undef, less than or equal to 0, or called by the manager process, then this
1847             call behaves like dequeue_nb.
1848              
1849             =head2 $q->insert ( $index, $item [, $item, ... ] )
1850              
1851             Adds the list of items to the queue at the specified index position (0 is the
1852             head of the list). The head of the queue is that item which would be removed
1853             by a call to dequeue.
1854              
1855             $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
1856             $q->enqueue(1, 2, 3, 4);
1857             $q->insert(1, 'foo', 'bar');
1858             # Queue now contains: 1, foo, bar, 2, 3, 4
1859              
1860             $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
1861             $q->enqueue(1, 2, 3, 4);
1862             $q->insert(1, 'foo', 'bar');
1863             # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
1864              
1865             =head2 $q->insertp ( $p, $index, $item [, $item, ... ] )
1866              
1867             Adds the list of items to the queue at the specified index position with
1868             priority. The behavior is similarly to C<< $q->insert >> otherwise.
1869              
1870             =head2 $q->pending ( void )
1871              
1872             Returns the number of items in the queue. The count includes both normal
1873             and priority data. Returns C<undef> if the queue has been ended, and there
1874             are no more items in the queue.
1875              
1876             $q = MCE::Queue->new();
1877             $q->enqueuep(5, 'foo', 'bar');
1878             $q->enqueue('sunny', 'day');
1879              
1880             print $q->pending(), "\n";
1881             # Output: 4
1882              
1883             =head2 $q->peek ( [ $index ] )
1884              
1885             Returns an item from the normal queue, at the specified index, without
1886             dequeuing anything. It defaults to the head of the queue if index is not
1887             specified. The head of the queue is that item which would be removed by a
1888             call to dequeue. Negative index values are supported, similarly to arrays.
1889              
1890             $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
1891             $q->enqueue(1, 2, 3, 4, 5);
1892              
1893             print $q->peek(1), ' ', $q->peek(-2), "\n";
1894             # Output: 2 4
1895              
1896             $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
1897             $q->enqueue(1, 2, 3, 4, 5);
1898              
1899             print $q->peek(1), ' ', $q->peek(-2), "\n";
1900             # Output: 4 2
1901              
1902             =head2 $q->peekp ( $p [, $index ] )
1903              
1904             Returns an item from the queue with priority, at the specified index, without
1905             dequeuing anything. It defaults to the head of the queue if index is not
1906             specified. The behavior is similarly to C<< $q->peek >> otherwise.
1907              
1908             =head2 $q->peekh ( [ $index ] )
1909              
1910             Returns an item from the head of the heap or at the specified index.
1911              
1912             $q = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
1913             $q->enqueuep(5, 'foo');
1914             $q->enqueuep(6, 'bar');
1915             $q->enqueuep(4, 'sun');
1916              
1917             print $q->peekh(0), "\n";
1918             # Output: 6
1919              
1920             $q = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
1921             $q->enqueuep(5, 'foo');
1922             $q->enqueuep(6, 'bar');
1923             $q->enqueuep(4, 'sun');
1924              
1925             print $q->peekh(0), "\n";
1926             # Output: 4
1927              
1928             =head2 $q->heap ( void )
1929              
1930             Returns an array containing the heap data. Heap data consists of priority
1931             numbers, not the data.
1932              
1933             @h = $q->heap; # $MCE::Queue::HIGHEST
1934             # Heap contains: 6, 5, 4
1935              
1936             @h = $q->heap; # $MCE::Queue::LOWEST
1937             # Heap contains: 4, 5, 6
1938              
1939             =head1 ACKNOWLEDGMENTS
1940              
1941             =over 3
1942              
1943             =item * L<List::BinarySearch>
1944              
1945             The bsearch_num_pos method was helpful for accommodating the highest and lowest
1946             order in MCE::Queue.
1947              
1948             =item * L<POE::Queue::Array>
1949              
1950             For extra optimization, two if statements were adopted for checking if the item
1951             belongs at the end or head of the queue.
1952              
1953             =item * L<List::Priority>
1954              
1955             MCE::Queue supports both normal and priority queues.
1956              
1957             =item * L<Thread::Queue>
1958              
1959             Thread::Queue is used as a template for identifying and documenting the methods.
1960              
1961             MCE::Queue is not fully compatible due to supporting normal and priority queues
1962             simultaneously; e.g.
1963              
1964             $q->enqueue( $item [, $item, ... ] ); # normal queue
1965             $q->enqueuep( $p, $item [, $item, ... ] ); # priority queue
1966              
1967             $q->dequeue( [ $count ] ); # priority data dequeues first
1968             $q->dequeue_nb( [ $count ] );
1969              
1970             $q->pending(); # counts both normal/priority queues
1971              
1972             =item * L<Parallel::DataPipe>
1973              
1974             The recursion example, in the synopsis above, was largely adopted from this
1975             module.
1976              
1977             =back
1978              
1979             =head1 INDEX
1980              
1981             L<MCE|MCE>, L<MCE::Core>
1982              
1983             =head1 AUTHOR
1984              
1985             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1986              
1987             =cut
1988