File Coverage

src/future.c
Criterion Covered Total %
statement 13 863 1.5
branch 5 708 0.7
condition n/a
subroutine n/a
pod n/a
total 18 1571 1.1


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2              
3             #include "EXTERN.h"
4             #include "perl.h"
5             #include "XSUB.h"
6              
7             #include "future.h"
8              
9             #include "perl-backcompat.c.inc"
10              
11             #include "av-utils.c.inc"
12             #include "cv_set_anysv_refcounted.c.inc"
13              
14             #if !HAVE_PERL_VERSION(5, 16, 0)
15             # define false FALSE
16             # define true TRUE
17             #endif
18              
19             #ifdef HAVE_DMD_HELPER
20             # define WANT_DMD_API_044
21             # include "DMD_helper.h"
22             #endif
23              
24             #if !HAVE_PERL_VERSION(5, 16, 0)
25             # define XS_INTERNAL(name) static XS(name)
26             #endif
27              
28             #define mPUSHpvs(s) mPUSHp("" s "", sizeof(s)-1)
29              
30             static bool future_debug;
31             static bool capture_times;
32              
33             /* There's no reason these have to match those in Future.pm but for now we
34             * might as well just copy the same values
35             */
36             enum {
37             CB_DONE = (1<<0),
38             CB_FAIL = (1<<1),
39             CB_CANCEL = (1<<2),
40             CB_ALWAYS = CB_DONE|CB_FAIL|CB_CANCEL,
41              
42             CB_SELF = (1<<3),
43             CB_RESULT = (1<<4),
44              
45             CB_SEQ_READY = (1<<5),
46             CB_SEQ_CANCEL = (1<<6),
47             CB_SEQ_ANY = CB_SEQ_READY|CB_SEQ_CANCEL,
48              
49             CB_SEQ_IMDONE = (1<<7),
50             CB_SEQ_IMFAIL = (1<<8),
51              
52             CB_SEQ_STRICT = (1<<9),
53              
54             CB_IS_FUTURE = (1<<10),
55             };
56              
57             // TODO: Consider using different struct types to save memory? Or maybe it's
58             // so small a difference it doesn't matter
59             struct FutureXSCallback
60             {
61             unsigned int flags;
62             union {
63             SV *code; /* if !(flags & CB_SEQ_ANY) */
64             struct { /* if (flags & CB_SEQ_ANY) */
65             SV *thencode;
66             SV *elsecode;
67             HV *catches;
68             SV *f;
69             } seq;
70             };
71             };
72              
73             struct FutureXSRevocation
74             {
75             SV *precedent_f;
76             SV *toclear_sv_at;
77             };
78              
79             #define CB_NONSEQ_CODE(cb) \
80             ({ if((cb)->flags & CB_SEQ_ANY) croak("ARGH: CB_NONSEQ_CODE on SEQ"); (cb)->code;})
81              
82             struct FutureXS
83             {
84             unsigned int ready : 1;
85             unsigned int cancelled : 1;
86             unsigned int reported : 1;
87             SV *label;
88             AV *result; // implies done
89             AV *failure; // implies fail
90             AV *callbacks; // values are struct FutureXSCallback ptrs directly. TODO: custom ptr/fill/max
91             AV *on_cancel; // values are CVs directly
92             AV *revoke_when_ready; // values are struct FutureXSRevocation ptrs directly.
93             int empty_revocation_slots;
94              
95             HV *udata;
96              
97             struct timeval btime, rtime;
98             SV *constructed_at;
99              
100             /* For convergents
101             * TODO: consider making this an optional extra part of the body, only
102             * allocated when required
103             */
104             AV *subs;
105             Size_t pending_subs;
106              
107             /* For without_cancel, purely to keep a strongref */
108             SV *precedent_f;
109             };
110              
111 0           bool Future_sv_is_future(pTHX_ SV *sv)
112             {
113 0 0         if(!SvROK(sv) || !SvOBJECT(SvRV(sv)))
    0          
114 0           return false;
115              
116 0 0         if(sv_derived_from(sv, "Future") || sv_derived_from(sv, "Future::XS"))
    0          
117 0           return true;
118              
119 0           return false;
120             }
121              
122             #define get_future(sv) S_get_future(aTHX_ sv)
123 0           static struct FutureXS *S_get_future(pTHX_ SV *sv)
124             {
125             assert(sv);
126             assert(SvROK(sv) && SvOBJECT(SvRV(sv)));
127             // TODO: Add some safety checking about class
128 0 0         struct FutureXS *self = INT2PTR(struct FutureXS *, SvIV(SvRV(sv)));
129             assert(self);
130 0           return self;
131             }
132              
133 0           SV *Future_new(pTHX_ const char *cls)
134             {
135 0 0         if(!cls)
136 0           cls = "Future::XS";
137              
138             struct FutureXS *self;
139 0           Newx(self, 1, struct FutureXS);
140              
141 0           self->ready = false;
142 0           self->cancelled = false;
143 0           self->reported = false;
144              
145 0           self->label = NULL;
146              
147 0 0         if(capture_times)
148 0           gettimeofday(&self->btime, NULL);
149             else
150 0           self->btime = (struct timeval){ 0 };
151              
152 0           self->rtime = (struct timeval){ 0 };
153              
154 0 0         if(future_debug)
155 0 0         self->constructed_at = newSVpvf("constructed at %s line %d", CopFILE(PL_curcop), CopLINE(PL_curcop));
156             else
157 0           self->constructed_at = NULL;
158              
159 0           self->result = NULL;
160 0           self->failure = NULL;
161              
162 0           self->callbacks = NULL;
163 0           self->on_cancel = NULL;
164 0           self->revoke_when_ready = NULL;
165 0           self->empty_revocation_slots = 0;
166              
167 0           self->udata = NULL;
168              
169 0           self->subs = NULL;
170              
171 0           self->precedent_f = NULL;
172              
173 0           SV *ret = newSV(0);
174 0           sv_setref_pv(ret, cls, self);
175              
176 0           return ret;
177             }
178              
179             #define future_new_proto(f1) Future_new_proto(aTHX_ f1)
180 0           SV *Future_new_proto(pTHX_ SV *f1)
181             {
182             assert(f1 && SvROK(f1) && SvRV(f1));
183             // TODO Shortcircuit in the common case that f1 is a Future instance
184             // return future_new(HvNAME(SvSTASH(SvRV(f1))));
185              
186 0           dSP;
187 0           ENTER;
188 0           SAVETMPS;
189              
190 0 0         EXTEND(SP, 1);
191 0 0         PUSHMARK(SP);
192 0           PUSHs(sv_mortalcopy(f1));
193 0           PUTBACK;
194              
195 0           call_method("new", G_SCALAR);
196              
197 0           SPAGAIN;
198              
199 0           SV *ret = SvREFCNT_inc(POPs);
200              
201 0           PUTBACK;
202 0 0         FREETMPS;
203 0           LEAVE;
204              
205 0           return ret;
206             }
207              
208             #define destroy_callbacks(self) S_destroy_callbacks(aTHX_ self)
209 0           static void S_destroy_callbacks(pTHX_ struct FutureXS *self)
210             {
211 0           AV *callbacksav = self->callbacks;
212 0 0         while(callbacksav && AvFILLp(callbacksav) > -1) {
    0          
213 0           struct FutureXSCallback *cb = (struct FutureXSCallback *)AvARRAY(self->callbacks)[AvFILLp(callbacksav)--];
214              
215 0           int flags = cb->flags;
216 0 0         if(flags & CB_SEQ_ANY) {
217 0           SvREFCNT_dec(cb->seq.thencode);
218 0           SvREFCNT_dec(cb->seq.elsecode);
219 0           SvREFCNT_dec(cb->seq.catches);
220 0           SvREFCNT_dec(cb->seq.f);
221             }
222             else {
223 0 0         SvREFCNT_dec(CB_NONSEQ_CODE(cb));
224             }
225              
226 0           Safefree(cb);
227             }
228 0           }
229              
230 0           void Future_destroy(pTHX_ SV *f)
231             {
232             #ifdef DEBUGGING
233             // Every pointer in this function ought to have been uniquely held
234             # define UNREF(p) \
235             do { \
236             if(p) assert(SvREFCNT(p) == 1); \
237             SvREFCNT_dec((SV *)p); \
238             (p) = (void *)0xAA55AA55; \
239             } while(0)
240             #else
241             # define UNREF(p) SvREFCNT_dec((SV *)p)
242             #endif
243              
244 0           struct FutureXS *self = get_future(f);
245              
246 0 0         if(future_debug &&
    0          
247 0 0         (!self->ready || (self->failure && !self->reported))) {
    0          
248 0 0         if(!self->ready)
249 0 0         warn("%" SVf " was %" SVf " and was lost near %s line %d before it was ready\n",
250 0           SVfARG(f), SVfARG(self->constructed_at),
251 0           CopFILE(PL_curcop), CopLINE(PL_curcop));
252             else {
253 0           SV *failure = AvARRAY(self->failure)[0];
254 0 0         warn("%" SVf " was %" SVf " and was lost near %s line %d with an unreported failure of: %" SVf "\n",
255 0           SVfARG(f), SVfARG(self->constructed_at),
256 0           CopFILE(PL_curcop), CopLINE(PL_curcop),
257             SVfARG(failure));
258             }
259             }
260              
261 0           UNREF(self->label);
262              
263 0           UNREF(self->result);
264              
265 0           UNREF(self->failure);
266              
267 0           destroy_callbacks(self);
268 0           UNREF(self->callbacks);
269              
270 0           UNREF(self->on_cancel);
271              
272 0           AV *revocationsav = self->revoke_when_ready;
273 0 0         while(revocationsav && AvFILLp(revocationsav) > -1) {
    0          
274 0           struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocationsav)[AvFILLp(revocationsav)--];
275 0           UNREF(rev->precedent_f);
276 0           UNREF(rev->toclear_sv_at);
277 0           Safefree(rev);
278             }
279 0           UNREF(self->revoke_when_ready);
280              
281 0           UNREF(self->udata);
282              
283 0           UNREF(self->constructed_at);
284              
285 0           UNREF(self->subs);
286              
287 0           UNREF(self->precedent_f);
288              
289 0           Safefree(self);
290              
291             #undef UNREF
292 0           }
293              
294 0           bool Future_is_ready(pTHX_ SV *f)
295             {
296 0           struct FutureXS *self = get_future(f);
297 0           return self->ready;
298             }
299              
300 0           bool Future_is_done(pTHX_ SV *f)
301             {
302 0           struct FutureXS *self = get_future(f);
303 0 0         return self->ready && !self->failure && !self->cancelled;
    0          
    0          
304             }
305              
306 0           bool Future_is_failed(pTHX_ SV *f)
307             {
308 0           struct FutureXS *self = get_future(f);
309 0 0         return self->ready && self->failure;
    0          
310             }
311              
312 0           bool Future_is_cancelled(pTHX_ SV *f)
313             {
314 0           struct FutureXS *self = get_future(f);
315 0           return self->cancelled;
316             }
317              
318             #define clear_on_cancel(self) S_clear_on_cancel(aTHX_ self)
319 0           static void S_clear_on_cancel(pTHX_ struct FutureXS *self)
320             {
321 0 0         if(!self->on_cancel)
322 0           return;
323              
324 0           AV *on_cancel = self->on_cancel;
325 0           self->on_cancel = NULL;
326              
327 0           SvREFCNT_dec(on_cancel);
328             }
329              
330             #define push_callback(self, cb) S_push_callback(aTHX_ self, cb)
331 0           static void S_push_callback(pTHX_ struct FutureXS *self, struct FutureXSCallback *cb)
332             {
333             struct FutureXSCallback *new;
334 0           Newx(new, 1, struct FutureXSCallback);
335              
336 0           new->flags = cb->flags;
337 0 0         if(cb->flags & CB_SEQ_ANY) {
338 0           new->seq.thencode = cb->seq.thencode;
339 0           new->seq.elsecode = cb->seq.elsecode;
340 0           new->seq.catches = cb->seq.catches;
341 0           new->seq.f = cb->seq.f;
342             }
343             else {
344 0 0         new->code = CB_NONSEQ_CODE(cb);
345             }
346              
347 0 0         if(!self->callbacks)
348 0           self->callbacks = newAV();
349              
350 0           av_push(self->callbacks, (SV *)new);
351 0           }
352              
353             #define wrap_cb(f, name, cv) S_wrap_cb(aTHX_ f, name, cv)
354 0           static SV *S_wrap_cb(pTHX_ SV *f, const char *name, SV *cv)
355             {
356             // TODO: This is quite the speed bump having to do this, in the common case
357             // that it isn't overridden
358 0           dSP;
359 0           ENTER;
360 0           SAVETMPS;
361              
362 0 0         EXTEND(SP, 3);
363 0 0         PUSHMARK(SP);
364 0           PUSHs(sv_mortalcopy(f));
365 0           mPUSHp(name, strlen(name));
366 0           PUSHs(sv_mortalcopy(cv));
367 0           PUTBACK;
368              
369 0           call_method("wrap_cb", G_SCALAR);
370              
371 0           SPAGAIN;
372 0           SV *ret = newSVsv(POPs);
373              
374 0           PUTBACK;
375 0 0         FREETMPS;
376 0           LEAVE;
377              
378 0           return ret;
379             }
380              
381             #define invoke_seq_callback(self, selfsv, cb) S_invoke_seq_callback(aTHX_ self, selfsv, cb)
382 0           static SV *S_invoke_seq_callback(pTHX_ struct FutureXS *self, SV *selfsv, struct FutureXSCallback *cb)
383             {
384 0           int flags = cb->flags;
385              
386 0           bool is_fail = cBOOL(self->failure);
387 0 0         bool is_done = !self->cancelled && !is_fail;
    0          
388              
389 0 0         AV *result = (is_done) ? self->result :
390 0 0         (is_fail) ? self->failure :
391             NULL;
392              
393 0 0         SV *code = (is_done) ? cb->seq.thencode :
394 0 0         (is_fail) ? cb->seq.elsecode :
395             NULL;
396              
397 0 0         if(is_fail && result && av_count(result) > 1 && cb->seq.catches) {
    0          
    0          
    0          
    0          
398 0           SV *category = AvARRAY(result)[1];
399 0 0         if(SvOK(category)) {
    0          
    0          
400 0           HE *he = hv_fetch_ent(cb->seq.catches, category, 0, 0);
401 0 0         if(he && HeVAL(he))
    0          
402 0           code = HeVAL(he);
403             }
404             }
405              
406 0 0         if(!code || !SvOK(code))
    0          
    0          
    0          
407 0           return newSVsv(selfsv);
408              
409 0           dSP;
410              
411 0           ENTER;
412 0           SAVETMPS;
413              
414 0 0         PUSHMARK(SP);
415 0 0         if(flags & CB_SELF)
416 0 0         XPUSHs(selfsv);
417 0 0         if(flags & CB_RESULT)
418 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
419 0           PUTBACK;
420              
421             assert(SvOK(code));
422 0           call_sv(code, G_SCALAR|G_EVAL);
423              
424 0           SPAGAIN;
425              
426 0 0         if(SvROK(ERRSV) || SvTRUE(ERRSV)) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
427 0           POPs;
428              
429 0           SV *fseq = cb->seq.f;
430              
431 0 0         if(!fseq)
432 0           fseq = future_new_proto(selfsv);
433              
434 0 0         future_failv(fseq, &ERRSV, 1);
435              
436 0 0         FREETMPS;
437 0           LEAVE;
438              
439 0           return fseq;
440             }
441              
442 0           SV *f2 = POPs;
443 0           SvREFCNT_inc(f2);
444              
445 0           PUTBACK;
446 0 0         FREETMPS;
447 0           LEAVE;
448              
449 0 0         if(!sv_is_future(f2)) {
450 0           SV *result = f2;
451              
452             // TODO: strictness check
453              
454 0           f2 = future_new_proto(selfsv);
455 0           future_donev(f2, &result, 1);
456             }
457              
458 0           return f2;
459             }
460              
461             #define invoke_callback(self, selfsv, cb) S_invoke_callback(aTHX_ self, selfsv, cb)
462 0           static void S_invoke_callback(pTHX_ struct FutureXS *self, SV *selfsv, struct FutureXSCallback *cb)
463             {
464 0           int flags = cb->flags;
465              
466 0           bool is_cancelled = self->cancelled;
467 0           bool is_fail = cBOOL(self->failure);
468 0 0         bool is_done = !is_cancelled && !is_fail;
    0          
469              
470 0 0         AV *result = (is_done) ? self->result :
471 0 0         (is_fail) ? self->failure :
472             NULL;
473              
474 0 0         if(is_done && !(flags & CB_DONE))
    0          
475 0           return;
476 0 0         if(is_fail && !(flags & CB_FAIL))
    0          
477 0           return;
478 0 0         if(is_cancelled && !(flags & CB_CANCEL))
    0          
479 0           return;
480              
481 0 0         if(flags & CB_IS_FUTURE) {
482 0           dSP;
483              
484 0           ENTER;
485 0           SAVETMPS;
486              
487 0 0         PUSHMARK(SP);
488 0 0         XPUSHs(CB_NONSEQ_CODE(cb)); // really a Future RV
    0          
489 0 0         if(result)
490 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
491              
492 0           PUTBACK;
493 0 0         if(is_done)
494 0           call_method("done", G_VOID);
495 0 0         else if(is_fail)
496 0           call_method("fail", G_VOID);
497             else
498 0           call_method("cancel", G_VOID);
499              
500 0 0         FREETMPS;
501 0           LEAVE;
502             }
503 0 0         else if(flags & CB_SEQ_ANY) {
504 0           SV *fseq = cb->seq.f;
505              
506 0 0         if(!SvOK(fseq)) {
    0          
    0          
507 0 0         if(self->constructed_at)
508 0           warn("%" SVf " (%" SVf ") lost a sequence Future",
509 0           SVfARG(selfsv), SVfARG(self->constructed_at));
510             else
511 0           warn("%" SVf " lost a sequence Future",
512             SVfARG(selfsv));
513 0           return;
514             }
515              
516 0           SV *f2 = invoke_seq_callback(self, selfsv, cb);
517 0 0         if(f2 == fseq)
518             /* immediate fail */
519 0           return;
520              
521 0           future_on_cancel(fseq, f2);
522              
523 0 0         if(future_is_ready(f2)) {
524 0 0         if(!future_is_cancelled(f2))
525 0           future_on_ready(f2, fseq);
526 0 0         else if(flags & CB_CANCEL)
527 0           future_cancel(fseq);
528              
529 0           SvREFCNT_dec(f2);
530             }
531             else {
532 0           struct FutureXS *f2self = get_future(f2);
533 0           struct FutureXSCallback cb2 = {
534             .flags = CB_DONE|CB_FAIL|CB_IS_FUTURE,
535 0           .code = sv_rvweaken(newSVsv(fseq)),
536             };
537 0           push_callback(f2self, &cb2);
538             }
539             }
540             else {
541 0 0         SV *code = CB_NONSEQ_CODE(cb);
542              
543 0           dSP;
544              
545 0           ENTER;
546 0           SAVETMPS;
547              
548 0 0         PUSHMARK(SP);
549 0 0         if(flags & CB_SELF)
550 0 0         XPUSHs(selfsv);
551 0 0         if((flags & CB_RESULT) && result)
    0          
552 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
553              
554 0           PUTBACK;
555             assert(SvOK(code));
556 0           call_sv(code, G_VOID);
557              
558 0 0         FREETMPS;
559 0           LEAVE;
560             }
561             }
562              
563             #define revoke_on_cancel(rev) S_revoke_on_cancel(aTHX_ rev)
564 0           static void S_revoke_on_cancel(pTHX_ struct FutureXSRevocation *rev)
565             {
566 0 0         if(rev->toclear_sv_at && SvROK(rev->toclear_sv_at)) {
    0          
567             assert(SvTYPE(rev->toclear_sv_at) <= SVt_PVMG);
568             assert(SvROK(rev->toclear_sv_at));
569 0           sv_set_undef(SvRV(rev->toclear_sv_at));
570 0           SvREFCNT_dec(rev->toclear_sv_at);
571 0           rev->toclear_sv_at = NULL;
572             }
573              
574 0 0         if(!SvOK(rev->precedent_f))
    0          
    0          
575 0           return;
576              
577 0           struct FutureXS *self = get_future(rev->precedent_f);
578              
579 0           self->empty_revocation_slots++;
580              
581 0           AV *on_cancel = self->on_cancel;
582 0 0         if(self->empty_revocation_slots >= 8 && on_cancel &&
    0          
    0          
583 0 0         self->empty_revocation_slots >= AvFILL(on_cancel)/2) {
584              
585             // Squash up the array to contain only defined values
586 0           SV **wrsv = AvARRAY(on_cancel),
587 0           **rdsv = AvARRAY(on_cancel),
588 0 0         **end = AvARRAY(on_cancel) + AvFILL(on_cancel);
589              
590 0 0         while(rdsv <= end) {
591 0 0         if(SvOK(*rdsv))
    0          
    0          
592             // Keep this one
593 0           *(wrsv++) = *rdsv;
594             else
595             // Free this one
596 0           SvREFCNT_dec(*rdsv);
597              
598 0           rdsv++;
599             }
600 0           AvFILLp(on_cancel) = wrsv - AvARRAY(on_cancel) - 1;
601              
602 0           self->empty_revocation_slots = 0;
603             }
604             }
605              
606             #define mark_ready(self, selfsv, state) S_mark_ready(aTHX_ self, selfsv, state)
607 0           static void S_mark_ready(pTHX_ struct FutureXS *self, SV *selfsv, const char *state)
608             {
609 0           self->ready = true;
610             // TODO: self->ready_at
611 0 0         if(capture_times)
612 0           gettimeofday(&self->rtime, NULL);
613              
614             /* Make sure self doesn't disappear during this function */
615 0           SvREFCNT_inc(SvRV(selfsv));
616 0           SAVEFREESV(SvRV(selfsv));
617              
618 0 0         if(self->precedent_f) {
619 0           SvREFCNT_dec(self->precedent_f);
620 0           self->precedent_f = NULL;
621             }
622              
623 0           clear_on_cancel(self);
624 0 0         if(self->revoke_when_ready) {
625 0           AV *revocations = self->revoke_when_ready;
626 0 0         for(size_t i = 0; i < av_count(revocations); i++) {
    0          
627 0           struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocations)[i];
628 0           revoke_on_cancel(rev);
629              
630 0           SvREFCNT_dec(rev->precedent_f);
631 0           Safefree(rev);
632             }
633 0           AvFILLp(revocations) = -1;
634 0           SvREFCNT_dec(revocations);
635              
636 0           self->revoke_when_ready = NULL;
637             }
638              
639 0 0         if(!self->callbacks)
640 0           return;
641              
642 0           AV *callbacks = self->callbacks;
643              
644 0           struct FutureXSCallback **cbs = (struct FutureXSCallback **)AvARRAY(callbacks);
645 0 0         size_t i, n = av_count(callbacks);
646 0 0         for(i = 0; i < n; i++) {
647 0           struct FutureXSCallback *cb = cbs[i];
648 0           invoke_callback(self, selfsv, cb);
649             }
650              
651 0           destroy_callbacks(self);
652             }
653              
654             #define make_sequence(f1, cb) S_make_sequence(aTHX_ f1, cb)
655 0           static SV *S_make_sequence(pTHX_ SV *f1, struct FutureXSCallback *cb)
656             {
657 0           struct FutureXS *self = get_future(f1);
658              
659 0           int flags = cb->flags;
660              
661 0 0         if(self->ready) {
662             // TODO: CB_SEQ_IM*
663              
664 0           SV *f2 = invoke_seq_callback(self, f1, cb);
665 0           return f2;
666             }
667              
668 0           SV *fseq = future_new_proto(f1);
669 0 0         if(cb->flags & CB_SEQ_CANCEL)
670 0           future_on_cancel(fseq, f1);
671              
672 0           cb->flags |= CB_DONE|CB_FAIL;
673 0 0         if(cb->seq.thencode)
674 0           cb->seq.thencode = wrap_cb(f1, "sequence", cb->seq.thencode);
675 0 0         if(cb->seq.elsecode)
676 0           cb->seq.elsecode = wrap_cb(f1, "sequence", cb->seq.elsecode);
677 0           cb->seq.f = sv_rvweaken(newSVsv(fseq));
678              
679 0           push_callback(self, cb);
680              
681 0           return fseq;
682             }
683              
684             // TODO: move to a hax/ file
685             #define CvNAME_FILE_LINE(cv) S_CvNAME_FILE_LINE(aTHX_ cv)
686 0           static SV *S_CvNAME_FILE_LINE(pTHX_ CV *cv)
687             {
688 0 0         if(!CvANON(cv)) {
689 0           SV *ret = newSVpvf("HvNAME::GvNAME");
690 0           return ret;
691             }
692              
693 0           OP *cop = CvSTART(cv);
694 0 0         while(cop && OP_CLASS(cop) != OA_COP)
    0          
    0          
    0          
695 0           cop = cop->op_next;
696              
697 0 0         if(!cop)
698 0           return newSVpvs("__ANON__");
699              
700 0 0         return newSVpvf("__ANON__(%s line %d)", CopFILE((COP *)cop), CopLINE((COP *)cop));
701             }
702              
703 0           void Future_donev(pTHX_ SV *f, SV **svp, size_t n)
704             {
705 0           struct FutureXS *self = get_future(f);
706              
707 0 0         if(self->cancelled)
708 0           return;
709              
710 0 0         if(self->ready)
711 0           croak("%" SVf " is already (STATE) and cannot be ->done",
712             SVfARG(f));
713             // TODO: test subs
714              
715 0           self->result = newAV_svn_dup(svp, n);
716 0           mark_ready(self, f, "done");
717             }
718              
719 0           void Future_failv(pTHX_ SV *f, SV **svp, size_t n)
720             {
721 0           struct FutureXS *self = get_future(f);
722              
723 0 0         if(self->cancelled)
724 0           return;
725              
726 0 0         if(self->ready)
727 0           croak("%" SVf " is already (STATE) and cannot be ->fail'ed",
728             SVfARG(f));
729              
730 0 0         if(n == 1 &&
    0          
731 0           SvROK(svp[0]) && SvOBJECT(SvRV(svp[0])) &&
732 0           sv_derived_from(svp[0], "Future::Exception")) {
733 0           SV *exception = svp[0];
734 0           AV *failure = self->failure = newAV();
735              
736 0           dSP;
737              
738             {
739 0           ENTER;
740 0           SAVETMPS;
741              
742 0 0         EXTEND(SP, 1);
743 0 0         PUSHMARK(SP);
744 0           PUSHs(sv_mortalcopy(exception));
745 0           PUTBACK;
746              
747 0           call_method("message", G_SCALAR);
748              
749 0           SPAGAIN;
750              
751 0           av_push(failure, SvREFCNT_inc(POPs));
752              
753 0           PUTBACK;
754 0 0         FREETMPS;
755 0           LEAVE;
756             }
757              
758             {
759 0           ENTER;
760 0           SAVETMPS;
761              
762 0 0         EXTEND(SP, 1);
763 0 0         PUSHMARK(SP);
764 0           PUSHs(sv_mortalcopy(exception));
765 0           PUTBACK;
766              
767 0           call_method("category", G_SCALAR);
768              
769 0           SPAGAIN;
770              
771 0           av_push(failure, SvREFCNT_inc(POPs));
772              
773 0           PUTBACK;
774 0 0         FREETMPS;
775 0           LEAVE;
776             }
777              
778             {
779 0           ENTER;
780 0           SAVETMPS;
781              
782 0 0         EXTEND(SP, 1);
783 0 0         PUSHMARK(SP);
784 0           PUSHs(sv_mortalcopy(exception));
785 0           PUTBACK;
786              
787 0           SSize_t count = call_method("details", G_LIST);
788              
789 0           SPAGAIN;
790              
791 0           SV **retp = SP - count + 1;
792              
793 0 0         for(SSize_t i = 0; i < count; i++)
794 0           av_push(failure, SvREFCNT_inc(retp[i]));
795 0           SP -= count;
796              
797 0           PUTBACK;
798 0 0         FREETMPS;
799 0           LEAVE;
800             }
801             }
802             else {
803 0           self->failure = newAV_svn_dup(svp, n);
804             }
805              
806 0           mark_ready(self, f, "failed");
807             }
808              
809             #define future_failp(f, s) Future_failp(aTHX_ f, s)
810 0           void Future_failp(pTHX_ SV *f, const char *s)
811             {
812 0           struct FutureXS *self = get_future(f);
813              
814 0 0         if(self->cancelled)
815 0           return;
816              
817 0 0         if(self->ready)
818 0           croak("%" SVf " is already (STATE) and cannot be ->fail'ed",
819             SVfARG(f));
820              
821 0           self->failure = newAV();
822 0           av_push(self->failure, newSVpv(s, strlen(s)));
823 0           mark_ready(self, f, "failed");
824             }
825              
826 0           void Future_on_cancel(pTHX_ SV *f, SV *code)
827             {
828 0           struct FutureXS *self = get_future(f);
829              
830 0 0         if(self->ready)
831 0           return;
832              
833 0           bool is_future = sv_is_future(code);
834             // TODO: is_future or callable(code) or croak
835              
836 0 0         if(!self->on_cancel)
837 0           self->on_cancel = newAV();
838              
839 0           SV *rv = newSVsv((SV *)code);
840 0           av_push(self->on_cancel, rv);
841              
842 0 0         if(is_future) {
843             struct FutureXSRevocation *rev;
844 0           Newx(rev, 1, struct FutureXSRevocation);
845              
846 0           rev->precedent_f = sv_rvweaken(newSVsv(f));
847 0           rev->toclear_sv_at = sv_rvweaken(newRV_inc(rv));
848              
849 0           struct FutureXS *codeself = get_future(code);
850 0 0         if(!codeself->revoke_when_ready)
851 0           codeself->revoke_when_ready = newAV();
852              
853 0           av_push(codeself->revoke_when_ready, (SV *)rev);
854             }
855             }
856              
857 0           void Future_on_ready(pTHX_ SV *f, SV *code)
858             {
859 0           struct FutureXS *self = get_future(f);
860              
861 0           bool is_future = sv_is_future(code);
862             // TODO: is_future or callable(code) or croak
863              
864 0           int flags = CB_ALWAYS|CB_SELF;
865 0 0         if(is_future)
866 0           flags |= CB_IS_FUTURE;
867              
868 0           struct FutureXSCallback cb = {
869             .flags = flags,
870             .code = code,
871             };
872              
873 0 0         if(self->ready)
874 0           invoke_callback(self, f, &cb);
875             else {
876 0           cb.code = wrap_cb(f, "on_ready", cb.code);
877 0           push_callback(self, &cb);
878             }
879 0           }
880              
881 0           void Future_on_done(pTHX_ SV *f, SV *code)
882             {
883 0           struct FutureXS *self = get_future(f);
884              
885 0           bool is_future = sv_is_future(code);
886             // TODO: is_future or callable(code) or croak
887              
888 0           int flags = CB_DONE|CB_RESULT;
889 0 0         if(is_future)
890 0           flags |= CB_IS_FUTURE;
891              
892 0           struct FutureXSCallback cb = {
893             .flags = flags,
894             .code = code,
895             };
896              
897 0 0         if(self->ready)
898 0           invoke_callback(self, f, &cb);
899             else {
900 0           cb.code = wrap_cb(f, "on_done", cb.code);
901 0           push_callback(self, &cb);
902             }
903 0           }
904              
905 0           void Future_on_fail(pTHX_ SV *f, SV *code)
906             {
907 0           struct FutureXS *self = get_future(f);
908              
909 0           bool is_future = sv_is_future(code);
910             // TODO: is_future or callable(code) or croak
911              
912 0           int flags = CB_FAIL|CB_RESULT;
913 0 0         if(is_future)
914 0           flags |= CB_IS_FUTURE;
915              
916 0           struct FutureXSCallback cb = {
917             .flags = flags,
918             .code = code,
919             };
920              
921 0 0         if(self->ready)
922 0           invoke_callback(self, f, &cb);
923             else {
924 0           cb.code = wrap_cb(f, "on_fail", cb.code);
925 0           push_callback(self, &cb);
926             }
927 0           }
928              
929             #define future_await(f) Future_await(aTHX_ f)
930 0           static void Future_await(pTHX_ SV *f)
931             {
932 0           dSP;
933              
934 0           ENTER;
935 0           SAVETMPS;
936              
937 0 0         PUSHMARK(SP);
938 0 0         mXPUSHs(newSVsv(f));
939 0           PUTBACK;
940              
941 0           call_method("await", G_VOID);
942              
943 0 0         FREETMPS;
944 0           LEAVE;
945 0           }
946              
947 0           AV *Future_get_result_av(pTHX_ SV *f, bool await)
948             {
949 0           struct FutureXS *self = get_future(f);
950              
951 0 0         if(await && !self->ready)
    0          
952 0           future_await(f);
953              
954 0 0         if(!self->ready)
955 0           croak("%" SVf " is not yet ready", SVfARG(f));
956              
957 0 0         if(self->failure) {
958 0           self->reported = true;
959              
960 0           SV *exception = AvARRAY(self->failure)[0];
961 0 0         if(av_count(self->failure) > 1) {
    0          
962 0           dSP;
963 0           ENTER;
964 0           SAVETMPS;
965              
966 0 0         PUSHMARK(SP);
967 0 0         EXTEND(SP, 1 + av_count(self->failure));
    0          
    0          
    0          
    0          
968 0           mPUSHpvs("Future::Exception");
969 0 0         for(SSize_t i = 0; i < av_count(self->failure); i++)
    0          
970 0           PUSHs(sv_mortalcopy(AvARRAY(self->failure)[i]));
971 0           PUTBACK;
972              
973 0           call_method("new", G_SCALAR);
974              
975 0           SPAGAIN;
976              
977 0           exception = SvREFCNT_inc(POPs);
978              
979 0           PUTBACK;
980 0 0         FREETMPS;
981 0           LEAVE;
982             }
983              
984 0 0         if(!SvROK(exception) && SvPV_nolen(exception)[SvCUR(exception)-1] == '\n')
    0          
    0          
985 0           die_sv(exception);
986             else
987 0           croak_sv(exception);
988             }
989              
990 0 0         if(self->cancelled)
991 0           croak("%" SVf " was cancelled",
992             SVfARG(f));
993              
994 0 0         if(!self->result)
995 0           self->result = newAV();
996              
997 0           return self->result;
998             }
999              
1000 0           AV *Future_get_failure_av(pTHX_ SV *f)
1001             {
1002 0           struct FutureXS *self = get_future(f);
1003              
1004 0 0         if(!self->ready)
1005 0           future_await(f);
1006              
1007 0 0         if(!self->failure)
1008 0           return NULL;
1009              
1010 0           return self->failure;
1011             }
1012              
1013 0           void Future_cancel(pTHX_ SV *f)
1014             {
1015 0           struct FutureXS *self = get_future(f);
1016              
1017 0 0         if(self->ready)
1018 0           return;
1019              
1020 0           self->cancelled = true;
1021 0           AV *on_cancel = self->on_cancel;
1022              
1023 0 0         if(self->subs) {
1024 0 0         for(Size_t i = 0; i < av_count(self->subs); i++)
    0          
1025 0           future_cancel(AvARRAY(self->subs)[i]);
1026             }
1027              
1028             // TODO: maybe we need to clear these out from self before we do this, in
1029             // case of recursion?
1030              
1031 0 0         for(int i = on_cancel ? AvFILL(on_cancel) : -1; i >= 0; i--) {
    0          
    0          
1032 0           SV *code = AvARRAY(on_cancel)[i];
1033 0 0         if(!SvOK(code))
    0          
    0          
1034 0           continue;
1035              
1036 0 0         if(sv_is_future(code)) {
1037 0           dSP;
1038              
1039 0           ENTER;
1040 0           SAVETMPS;
1041              
1042 0 0         PUSHMARK(SP);
1043 0           PUSHs(code);
1044 0           PUTBACK;
1045              
1046 0           call_method("cancel", G_VOID);
1047              
1048 0 0         FREETMPS;
1049 0           LEAVE;
1050             }
1051             else {
1052 0           dSP;
1053              
1054 0           ENTER;
1055 0           SAVETMPS;
1056              
1057 0 0         PUSHMARK(SP);
1058 0           PUSHs(f);
1059 0           PUTBACK;
1060              
1061             assert(SvOK(code));
1062 0           call_sv(code, G_VOID);
1063              
1064 0 0         FREETMPS;
1065 0           LEAVE;
1066             }
1067             }
1068              
1069 0           mark_ready(self, f, "cancel");
1070             }
1071              
1072 0           SV *Future_without_cancel(pTHX_ SV *f)
1073             {
1074 0           struct FutureXSCallback cb = {
1075             .flags = CB_SEQ_READY|CB_CANCEL, /* without CB_SEQ_CANCEL */
1076             /* no code */
1077             };
1078              
1079 0           SV *ret = make_sequence(f, &cb);
1080 0           struct FutureXS *self = get_future(ret);
1081              
1082 0           self->precedent_f = newSVsv(f);
1083              
1084 0           return ret;
1085             }
1086              
1087 0           SV *Future_then(pTHX_ SV *f, U32 flags, SV *thencode, SV *elsecode)
1088             {
1089 0           struct FutureXSCallback cb = {
1090             .flags = CB_SEQ_ANY|CB_RESULT,
1091             .seq.thencode = thencode,
1092             .seq.elsecode = elsecode,
1093             };
1094 0 0         if(flags & FUTURE_THEN_WITH_F)
1095 0           cb.flags |= CB_SELF;
1096              
1097 0           return make_sequence(f, &cb);
1098             }
1099              
1100 0           SV *Future_followed_by(pTHX_ SV *f, SV *code)
1101             {
1102 0           struct FutureXSCallback cb = {
1103             .flags = CB_SEQ_ANY|CB_SELF,
1104             .seq.thencode = code,
1105             .seq.elsecode = code,
1106             };
1107              
1108 0           return make_sequence(f, &cb);
1109             }
1110              
1111 0           SV *Future_thencatch(pTHX_ SV *f, U32 flags, SV *thencode, HV *catches, SV *elsecode)
1112             {
1113 0           struct FutureXSCallback cb = {
1114             .flags = CB_SEQ_ANY|CB_RESULT,
1115             .seq.thencode = thencode,
1116             .seq.elsecode = elsecode,
1117             .seq.catches = catches,
1118             };
1119 0 0         if(flags & FUTURE_THEN_WITH_F)
1120 0           cb.flags |= CB_SELF;
1121              
1122 0           return make_sequence(f, &cb);
1123             }
1124              
1125             #define future_new_subsv(cls, subs, n) S_future_new_subsv(aTHX_ cls, subs, n)
1126 0           static SV *S_future_new_subsv(pTHX_ const char *cls, SV **subs, size_t n)
1127             {
1128 0           HV *future_stash = get_hv("Future::", 0);
1129             assert(future_stash);
1130              
1131             /* Find the best prototype; pick the first derived instance if there is
1132             * one */
1133 0           SV *proto = NULL;
1134 0 0         for(Size_t i = 0; i < n; i++) {
1135 0 0         if(!SvROK(subs[i]) || !SvOBJECT(SvRV(subs[i])))
    0          
1136 0           croak("Expected a Future, got %" SVf, SVfARG(subs[i]));
1137              
1138 0 0         if(SvSTASH(SvRV(subs[i])) != future_stash) {
1139 0           proto = subs[i];
1140 0           break;
1141             }
1142             }
1143              
1144 0 0         SV *f = proto ? future_new_proto(proto) : future_new(cls);
1145 0           struct FutureXS *self = get_future(f);
1146              
1147 0 0         if(!self->subs)
1148 0           self->subs = newAV();
1149              
1150 0 0         for(Size_t i = 0; i < n; i++)
1151 0           av_push(self->subs, newSVsv(subs[i]));
1152              
1153 0           return f;
1154             }
1155              
1156             #define copy_result(self, src) S_copy_result(aTHX_ self, src)
1157 0           static void S_copy_result(pTHX_ struct FutureXS *self, SV *src)
1158             {
1159             /* TODO: Handle non-Future::XS instances too */
1160 0           struct FutureXS *srcself = get_future(src);
1161              
1162             assert(srcself->ready);
1163             assert(!srcself->cancelled);
1164              
1165 0 0         if(srcself->failure) {
1166 0 0         self->failure = newAV_svn_dup(AvARRAY(srcself->failure), av_count(srcself->failure));
1167             }
1168             else {
1169             assert(srcself->result);
1170 0 0         self->result = newAV_svn_dup(AvARRAY(srcself->result), av_count(srcself->result));
1171             }
1172 0           }
1173              
1174             #define cancel_pending_subs(self) S_cancel_pending_subs(aTHX_ self)
1175 0           static void S_cancel_pending_subs(pTHX_ struct FutureXS *self)
1176             {
1177 0 0         if(!self->subs)
1178 0           return;
1179              
1180 0 0         for(Size_t i = 0; i < av_count(self->subs); i++) {
    0          
1181 0           SV *sub = AvARRAY(self->subs)[i];
1182 0 0         if(!future_is_ready(sub))
1183 0           future_cancel(sub);
1184             }
1185             }
1186              
1187 0           XS_INTERNAL(sub_on_ready_waitall)
1188             {
1189 0           dXSARGS;
1190              
1191 0           SV *f = XSANY_sv;
1192 0 0         if(!SvOK(f))
    0          
    0          
1193 0           return;
1194              
1195             /* Make sure self doesn't disappear during this function */
1196 0           SvREFCNT_inc(SvRV(f));
1197 0           SAVEFREESV(SvRV(f));
1198              
1199 0           struct FutureXS *self = get_future(f);
1200              
1201 0           self->pending_subs--;
1202              
1203 0 0         if(self->pending_subs)
1204 0           XSRETURN(0);
1205              
1206             /* TODO: This is really just newAVav() */
1207 0 0         self->result = newAV_svn_dup(AvARRAY(self->subs), av_count(self->subs));
1208 0           mark_ready(self, f, "wait_all");
1209             }
1210              
1211 0           SV *Future_new_waitallv(pTHX_ const char *cls, SV **subs, size_t n)
1212             {
1213 0           SV *f = future_new_subsv(cls, subs, n);
1214 0           struct FutureXS *self = get_future(f);
1215              
1216 0           self->pending_subs = 0;
1217 0 0         for(Size_t i = 0; i < n; i++) {
1218             /* TODO: This should probably use some API function to make it transparent */
1219 0 0         if(!future_is_ready(subs[i]))
1220 0           self->pending_subs++;
1221             }
1222              
1223 0 0         if(!self->pending_subs) {
1224 0           self->result = newAV_svn_dup(subs, n);
1225 0           mark_ready(self, f, "wait_all");
1226              
1227 0           return f;
1228             }
1229              
1230 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_waitall, __FILE__);
1231 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1232 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1233              
1234 0           GV *gv = gv_fetchpvs("Future::XS::(wait_all callback)", GV_ADDMULTI, SVt_PVCV);
1235 0           CvGV_set(sub_on_ready, gv);
1236 0           CvANON_off(sub_on_ready);
1237              
1238 0 0         for(Size_t i = 0; i < n; i++) {
1239 0 0         if(!future_is_ready(subs[i]))
1240 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1241             }
1242              
1243 0           SvREFCNT_dec(sub_on_ready);
1244              
1245 0           return f;
1246             }
1247              
1248 0           XS_INTERNAL(sub_on_ready_waitany)
1249             {
1250 0           dXSARGS;
1251 0           SV *thissub = ST(0);
1252              
1253 0           SV *f = XSANY_sv;
1254 0 0         if(!SvOK(f))
    0          
    0          
1255 0           return;
1256              
1257             /* Make sure self doesn't disappear during this function */
1258 0           SvREFCNT_inc(SvRV(f));
1259 0           SAVEFREESV(SvRV(f));
1260              
1261 0           struct FutureXS *self = get_future(f);
1262              
1263 0 0         if(self->result || self->failure)
    0          
1264 0           return;
1265              
1266 0           self->pending_subs--;
1267              
1268 0           bool this_cancelled = future_is_cancelled(thissub);
1269              
1270 0 0         if(self->pending_subs && this_cancelled)
    0          
1271 0           return;
1272              
1273 0 0         if(this_cancelled) {
1274 0           future_failp(f, "All component futures were cancelled");
1275 0           return;
1276             }
1277             else
1278 0           copy_result(self, thissub);
1279              
1280 0           cancel_pending_subs(self);
1281              
1282 0           mark_ready(self, f, "wait_any");
1283             }
1284              
1285 0           SV *Future_new_waitanyv(pTHX_ const char *cls, SV **subs, size_t n)
1286             {
1287 0           SV *f = future_new_subsv(cls, subs, n);
1288 0           struct FutureXS *self = get_future(f);
1289              
1290 0 0         if(!n) {
1291 0           future_failp(f, "Cannot ->wait_any with no subfutures");
1292 0           return f;
1293             }
1294              
1295 0           SV *immediate_ready = NULL;
1296 0 0         for(Size_t i = 0; i < n; i++) {
1297             /* TODO: This should probably use some API function to make it transparent */
1298 0 0         if(future_is_ready(subs[i]) && !future_is_cancelled(subs[i])) {
    0          
1299 0           immediate_ready = subs[i];
1300 0           break;
1301             }
1302             }
1303              
1304 0 0         if(immediate_ready) {
1305 0           copy_result(self, immediate_ready);
1306              
1307 0           cancel_pending_subs(self);
1308              
1309 0           mark_ready(self, f, "wait_any");
1310              
1311 0           return f;
1312             }
1313              
1314 0           self->pending_subs = 0;
1315              
1316 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_waitany, __FILE__);
1317 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1318 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1319              
1320 0           GV *gv = gv_fetchpvs("Future::XS::(wait_any callback)", GV_ADDMULTI, SVt_PVCV);
1321 0           CvGV_set(sub_on_ready, gv);
1322 0           CvANON_off(sub_on_ready);
1323              
1324 0 0         for(Size_t i = 0; i < n; i++) {
1325 0 0         if(future_is_cancelled(subs[i]))
1326 0           continue;
1327              
1328 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1329 0           self->pending_subs++;
1330             }
1331              
1332 0           SvREFCNT_dec(sub_on_ready);
1333              
1334 0           return f;
1335             }
1336              
1337             #define compose_needsall_result(self) S_compose_needsall_result(aTHX_ self)
1338 0           static void S_compose_needsall_result(pTHX_ struct FutureXS *self)
1339             {
1340 0           AV *result = self->result = newAV();
1341 0 0         for(Size_t i = 0; i < av_count(self->subs); i++) {
    0          
1342 0           SV *sub = AvARRAY(self->subs)[i];
1343 0           struct FutureXS *subself = get_future(sub);
1344             assert(subself->result);
1345 0 0         av_push_svn(result, AvARRAY(subself->result), av_count(subself->result));
1346             }
1347 0           }
1348              
1349 0           XS_INTERNAL(sub_on_ready_needsall)
1350             {
1351 0           dXSARGS;
1352 0           SV *thissub = ST(0);
1353              
1354 0           SV *f = XSANY_sv;
1355 0 0         if(!SvOK(f))
    0          
    0          
1356 0           return;
1357              
1358             /* Make sure self doesn't disappear during this function */
1359 0           SvREFCNT_inc(SvRV(f));
1360 0           SAVEFREESV(SvRV(f));
1361              
1362 0           struct FutureXS *self = get_future(f);
1363              
1364 0 0         if(self->result || self->failure)
    0          
1365 0           return;
1366              
1367 0 0         if(future_is_cancelled(thissub)) {
1368 0           future_failp(f, "A component future was cancelled");
1369 0           cancel_pending_subs(self);
1370 0           return;
1371             }
1372 0 0         else if(future_is_failed(thissub)) {
1373 0           copy_result(self, thissub);
1374 0           cancel_pending_subs(self);
1375 0           mark_ready(self, f, "needs_all");
1376             }
1377             else {
1378 0           self->pending_subs--;
1379 0 0         if(self->pending_subs)
1380 0           return;
1381 0           compose_needsall_result(self);
1382 0           mark_ready(self, f, "needs_all");
1383             }
1384             }
1385              
1386 0           SV *Future_new_needsallv(pTHX_ const char *cls, SV **subs, size_t n)
1387             {
1388 0           SV *f = future_new_subsv(cls, subs, n);
1389 0           struct FutureXS *self = get_future(f);
1390              
1391 0 0         if(!n) {
1392 0           future_donev(f, NULL, 0);
1393 0           return f;
1394             }
1395              
1396 0           SV *immediate_fail = NULL;
1397 0 0         for(Size_t i = 0; i < n; i++) {
1398 0 0         if(future_is_cancelled(subs[i])) {
1399 0           future_failp(f, "A component future was cancelled");
1400 0           cancel_pending_subs(self);
1401 0           return f;
1402             }
1403 0 0         if(future_is_failed(subs[i])) {
1404 0           immediate_fail = subs[i];
1405 0           break;
1406             }
1407             }
1408              
1409 0 0         if(immediate_fail) {
1410 0           copy_result(self, immediate_fail);
1411 0           cancel_pending_subs(self);
1412 0           mark_ready(self, f, "needs_all");
1413 0           return f;
1414             }
1415              
1416 0           self->pending_subs = 0;
1417              
1418 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_needsall, __FILE__);
1419 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1420 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1421              
1422 0           GV *gv = gv_fetchpvs("Future::XS::(needs_all callback)", GV_ADDMULTI, SVt_PVCV);
1423 0           CvGV_set(sub_on_ready, gv);
1424 0           CvANON_off(sub_on_ready);
1425              
1426 0 0         for(Size_t i = 0; i < n; i++) {
1427 0 0         if(future_is_ready(subs[i]))
1428 0           continue;
1429              
1430 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1431 0           self->pending_subs++;
1432             }
1433              
1434 0 0         if(!self->pending_subs) {
1435 0           compose_needsall_result(self);
1436 0           mark_ready(self, f, "needs_all");
1437             }
1438              
1439 0           SvREFCNT_dec(sub_on_ready);
1440              
1441 0           return f;
1442             }
1443              
1444 0           XS_INTERNAL(sub_on_ready_needsany)
1445             {
1446 0           dXSARGS;
1447 0           SV *thissub = ST(0);
1448              
1449 0           SV *f = XSANY_sv;
1450 0 0         if(!SvOK(f))
    0          
    0          
1451 0           return;
1452              
1453             /* Make sure self doesn't disappear during this function */
1454 0           SvREFCNT_inc(SvRV(f));
1455 0           SAVEFREESV(SvRV(f));
1456              
1457 0           struct FutureXS *self = get_future(f);
1458              
1459 0 0         if(self->result || self->failure)
    0          
1460 0           return;
1461              
1462 0           self->pending_subs--;
1463              
1464 0           bool this_cancelled = future_is_cancelled(thissub);
1465              
1466 0 0         if(self->pending_subs && this_cancelled)
    0          
1467 0           return;
1468              
1469 0 0         if(this_cancelled) {
1470 0           future_failp(f, "All component futures were cancelled");
1471             }
1472 0 0         else if(future_is_failed(thissub)) {
1473 0 0         if(self->pending_subs)
1474 0           return;
1475              
1476 0           copy_result(self, thissub);
1477 0           mark_ready(self, f, "needs_any");
1478             }
1479             else {
1480 0           copy_result(self, thissub);
1481 0           cancel_pending_subs(self);
1482 0           mark_ready(self, f, "needs_any");
1483             }
1484             }
1485              
1486 0           SV *Future_new_needsanyv(pTHX_ const char *cls, SV **subs, size_t n)
1487             {
1488 0           SV *f = future_new_subsv(cls, subs, n);
1489 0           struct FutureXS *self = get_future(f);
1490              
1491 0 0         if(!n) {
1492 0           future_failp(f, "Cannot ->needs_any with no subfutures");
1493 0           return f;
1494             }
1495              
1496 0           SV *immediate_done = NULL;
1497 0 0         for(Size_t i = 0; i < n; i++) {
1498 0 0         if(future_is_done(subs[i])) {
1499 0           immediate_done = subs[i];
1500 0           break;
1501             }
1502             }
1503              
1504 0 0         if(immediate_done) {
1505 0           copy_result(self, immediate_done);
1506 0           cancel_pending_subs(self);
1507 0           mark_ready(self, f, "needs_any");
1508 0           return f;
1509             }
1510              
1511 0           self->pending_subs = 0;
1512              
1513 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_needsany, __FILE__);
1514 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1515 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1516              
1517 0           GV *gv = gv_fetchpvs("Future::XS::(needs_any callback)", GV_ADDMULTI, SVt_PVCV);
1518 0           CvGV_set(sub_on_ready, gv);
1519 0           CvANON_off(sub_on_ready);
1520              
1521 0 0         for(Size_t i = 0; i < n; i++) {
1522 0 0         if(future_is_ready(subs[i]))
1523 0           continue;
1524              
1525 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1526 0           self->pending_subs++;
1527             }
1528              
1529 0 0         if(!self->pending_subs) {
1530 0           copy_result(self, subs[n-1]);
1531 0           mark_ready(self, f, "needs_any");
1532             }
1533              
1534 0           SvREFCNT_dec(sub_on_ready);
1535              
1536 0           return f;
1537             }
1538              
1539 0           Size_t Future_mPUSH_subs(pTHX_ SV *f, enum FutureSubFilter filter)
1540             {
1541 0           dSP;
1542              
1543 0           struct FutureXS *self = get_future(f);
1544              
1545 0           Size_t ret = 0;
1546 0 0         for(Size_t i = 0; self->subs && i < av_count(self->subs); i++) {
    0          
    0          
1547 0           SV *sub = AvARRAY(self->subs)[i];
1548              
1549             bool want;
1550 0           switch(filter) {
1551             case FUTURE_SUBS_PENDING:
1552 0           want = !future_is_ready(sub);
1553 0           break;
1554              
1555             case FUTURE_SUBS_READY:
1556 0           want = future_is_ready(sub);
1557 0           break;
1558              
1559             case FUTURE_SUBS_DONE:
1560 0           want = future_is_done(sub);
1561 0           break;
1562              
1563             case FUTURE_SUBS_FAILED:
1564 0           want = future_is_failed(sub);
1565 0           break;
1566              
1567             case FUTURE_SUBS_CANCELLED:
1568 0           want = future_is_cancelled(sub);
1569 0           break;
1570             }
1571              
1572 0 0         if(want) {
1573 0 0         XPUSHs(sv_mortalcopy(sub));
1574 0           ret++;
1575             }
1576             }
1577              
1578 0           PUTBACK;
1579 0           return ret;
1580             }
1581              
1582 0           struct timeval Future_get_btime(pTHX_ SV *f)
1583             {
1584 0           struct FutureXS *self = get_future(f);
1585 0           return self->btime;
1586             }
1587              
1588 0           struct timeval Future_get_rtime(pTHX_ SV *f)
1589             {
1590 0           struct FutureXS *self = get_future(f);
1591 0           return self->rtime;
1592             }
1593              
1594 0           void Future_set_label(pTHX_ SV *f, SV *label)
1595             {
1596 0           struct FutureXS *self = get_future(f);
1597              
1598 0 0         if(self->label)
1599 0           SvREFCNT_dec(label);
1600              
1601 0           self->label = newSVsv(label);
1602 0           }
1603              
1604 0           SV *Future_get_label(pTHX_ SV *f)
1605             {
1606 0           struct FutureXS *self = get_future(f);
1607              
1608 0           return self->label;
1609             }
1610              
1611 0           void Future_set_udata(pTHX_ SV *f, SV *key, SV *value)
1612             {
1613 0           struct FutureXS *self = get_future(f);
1614              
1615 0 0         if(!self->udata)
1616 0           self->udata = newHV();
1617              
1618 0           hv_store_ent(self->udata, key, newSVsv(value), 0);
1619 0           }
1620              
1621 0           SV *Future_get_udata(pTHX_ SV *f, SV *key)
1622             {
1623 0           struct FutureXS *self = get_future(f);
1624              
1625 0 0         if(!self->udata)
1626 0           return &PL_sv_undef;
1627              
1628 0           HE *he = hv_fetch_ent(self->udata, key, 0, 0);
1629 0 0         return he ? HeVAL(he) : &PL_sv_undef;
1630             }
1631              
1632             /* DMD_HELPER assistants */
1633              
1634             #ifdef HAVE_DMD_HELPER
1635             static int dumpstruct_callback(pTHX_ DMDContext *ctx, struct FutureXSCallback *cb)
1636             {
1637             if(!(cb->flags & CB_SEQ_ANY))
1638             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSCallback", cb, sizeof(struct FutureXSCallback),
1639             /* Some cheating here, to claim the "code" is either a CV or a Future,
1640             * depending on the CB_IS_FUTURE flag */
1641             3, ((const DMDNamedField []){
1642             {"flags", DMD_FIELD_UINT, .n = cb->flags},
1643             {"the code CV", DMD_FIELD_PTR, .ptr = (cb->flags & CB_IS_FUTURE) ? NULL : cb->code},
1644             {"the Future SV", DMD_FIELD_PTR, .ptr = (cb->flags & CB_IS_FUTURE) ? cb->code : NULL },
1645             })
1646             );
1647             else
1648             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSCallback(CB_SEQ)", cb, sizeof(struct FutureXSCallback),
1649             4, ((const DMDNamedField []){
1650             {"flags", DMD_FIELD_UINT, .n = cb->flags},
1651             {"the then code CV", DMD_FIELD_PTR, .ptr = cb->seq.thencode},
1652             {"the else code CV", DMD_FIELD_PTR, .ptr = cb->seq.elsecode},
1653             {"the sequence future SV", DMD_FIELD_PTR, .ptr = cb->seq.f},
1654             })
1655             );
1656              
1657             return 0;
1658             }
1659              
1660             static int dumpstruct_revocation(pTHX_ DMDContext *ctx, struct FutureXSRevocation *rev)
1661             {
1662             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSRevocation", rev, sizeof(struct FutureXSRevocation),
1663             2, ((const DMDNamedField []){
1664             {"the precedent future SV", DMD_FIELD_PTR, .ptr = rev->precedent_f},
1665             {"the SV to clear RV", DMD_FIELD_PTR, .ptr = rev->toclear_sv_at},
1666             })
1667             );
1668              
1669             return 0;
1670             }
1671              
1672             static int dumpstruct(pTHX_ DMDContext *ctx, const SV *sv)
1673             {
1674             int ret = 0;
1675              
1676             // TODO: Add some safety checking
1677             struct FutureXS *self = INT2PTR(struct FutureXS *, SvIV((SV *)sv));
1678              
1679             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXS", self, sizeof(struct FutureXS),
1680             12, ((const DMDNamedField []){
1681             {"ready", DMD_FIELD_BOOL, .b = self->ready},
1682             {"cancelled", DMD_FIELD_BOOL, .b = self->cancelled},
1683             {"the label SV", DMD_FIELD_PTR, .ptr = self->label},
1684             {"the result AV", DMD_FIELD_PTR, .ptr = self->result},
1685             {"the failure AV", DMD_FIELD_PTR, .ptr = self->failure},
1686             {"the callbacks AV", DMD_FIELD_PTR, .ptr = self->callbacks},
1687             {"the on_cancel AV", DMD_FIELD_PTR, .ptr = self->on_cancel},
1688             {"the revoke_when_ready AV", DMD_FIELD_PTR, .ptr = self->revoke_when_ready},
1689             {"the udata HV", DMD_FIELD_PTR, .ptr = self->udata},
1690             {"the constructed-at SV", DMD_FIELD_PTR, .ptr = self->constructed_at},
1691             {"the subs AV", DMD_FIELD_PTR, .ptr = self->subs},
1692             {"the pending sub count", DMD_FIELD_UINT, .n = self->pending_subs},
1693             })
1694             );
1695              
1696             for(size_t i = 0; self->callbacks && i < av_count(self->callbacks); i++) {
1697             struct FutureXSCallback *cb = (struct FutureXSCallback *)AvARRAY(self->callbacks)[i];
1698             ret += dumpstruct_callback(aTHX_ ctx, cb);
1699             }
1700              
1701             for(size_t i = 0; self->revoke_when_ready && i < av_count(self->revoke_when_ready); i++) {
1702             struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(self->revoke_when_ready)[i];
1703             ret += dumpstruct_revocation(aTHX_ ctx, rev);
1704             }
1705              
1706             ret += DMD_ANNOTATE_SV(sv, (SV *)self, "the FutureXS structure");
1707              
1708             return ret;
1709             }
1710             #endif
1711              
1712             #define getenv_bool(key) S_getenv_bool(aTHX_ key)
1713 4           static bool S_getenv_bool(pTHX_ const char *key)
1714             {
1715 4           const char *val = getenv(key);
1716 4 50         if(!val || !val[0])
    0          
1717 4           return false;
1718 0 0         if(val[0] == '0' && strlen(val) == 1)
    0          
1719 0           return false;
1720 0           return true;
1721             }
1722              
1723             #ifndef newSVbool
1724             # define newSVbool(b) newSVsv(b ? &PL_sv_yes : &PL_sv_no)
1725             #endif
1726              
1727 2           void Future_reread_environment(pTHX)
1728             {
1729 2           future_debug = getenv_bool("PERL_FUTURE_DEBUG");
1730              
1731 2 50         capture_times = future_debug || getenv_bool("PERL_FUTURE_TIMES");
    50          
1732 2 50         sv_setsv(get_sv("Future::TIMES", GV_ADDMULTI), capture_times ? &PL_sv_yes : &PL_sv_no);
1733 2           }
1734              
1735 2           void Future_boot(pTHX)
1736             {
1737             #ifdef HAVE_DMD_HELPER
1738             DMD_SET_PACKAGE_HELPER("Future::XS", dumpstruct);
1739             #endif
1740              
1741 2           Future_reread_environment(aTHX);
1742              
1743             // We can only do this once
1744 2 50         newCONSTSUB(gv_stashpvn("Future::XS", 10, TRUE), "DEBUG", newSVbool(future_debug));
1745 2           }