File Coverage

XS.xs
Criterion Covered Total %
statement 402 587 68.4
branch 233 496 46.9
condition n/a
subroutine n/a
pod n/a
total 635 1083 58.6


line stmt bran cond sub pod time code
1             #include "easyxs/easyxs.h"
2              
3             #include
4             #include
5              
6             #define MY_CXT_KEY "Promise::XS::_guts" XS_VERSION
7              
8             #define BASE_CLASS "Promise::XS"
9              
10             #define PROMISE_CLASS "Promise::XS::Promise"
11             #define PROMISE_CLASS_TYPE Promise__XS__Promise
12              
13             #define DEFERRED_CLASS "Promise::XS::Deferred"
14             #define DEFERRED_CLASS_TYPE Promise__XS__Deferred
15              
16             #define CONVERTER_CR_NAME "_convert_to_our_promise"
17              
18             #ifdef PL_phase
19             #define PXS_IS_GLOBAL_DESTRUCTION PL_phase == PERL_PHASE_DESTRUCT
20             #else
21             #define PXS_IS_GLOBAL_DESTRUCTION PL_dirty
22             #endif
23              
24             #define RESULT_IS_RESOLVED(result) (result->state == XSPR_RESULT_RESOLVED)
25             #define RESULT_IS_REJECTED(result) (result->state == XSPR_RESULT_REJECTED)
26              
27             #define UNUSED(x) (void)(x)
28              
29             #define DEBUG_AWAITABLE 0
30             #if DEBUG_AWAITABLE
31             # define _DO_DEBUG_AWAITABLE() fprintf(stderr, "# %s\n", __func__)
32             #else
33             # define _DO_DEBUG_AWAITABLE()
34             #endif
35              
36             #define _MAX_RECURSION 254
37              
38             /* We could look here at the full stack depth
39             (PL_stack_sp - PL_stack_base), but we only really care about
40             our *own* recursion, not the overall Perl stack.
41             */
42              
43             #define _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION \
44             dMY_CXT; \
45             if (MY_CXT.callback_depth > _MAX_RECURSION) { \
46             croak("Exceeded %u callbacks; infinite recursion detected!", _MAX_RECURSION); \
47             }
48              
49             typedef enum {
50             _DEFER_NONE = 0,
51             _DEFER_ANYEVENT,
52             _DEFER_IOASYNC,
53             _DEFER_MOJO,
54             } event_system_t;
55              
56             typedef struct xspr_callback_s xspr_callback_t;
57             typedef struct xspr_promise_s xspr_promise_t;
58             typedef struct xspr_result_s xspr_result_t;
59             typedef struct xspr_callback_queue_s xspr_callback_queue_t;
60              
61             typedef enum {
62             XSPR_STATE_NONE,
63             XSPR_STATE_PENDING,
64             XSPR_STATE_FINISHED,
65             } xspr_promise_state_t;
66              
67             typedef enum {
68             XSPR_RESULT_NONE,
69             XSPR_RESULT_RESOLVED,
70             XSPR_RESULT_REJECTED,
71             XSPR_RESULT_BOTH
72             } xspr_result_state_t;
73              
74             typedef enum {
75             // from then() or catch()
76             XSPR_CALLBACK_PERL,
77              
78             // from finally()
79             XSPR_CALLBACK_FINALLY,
80              
81              
82             // from a promise returned from a then() or catch() callback
83             XSPR_CALLBACK_CHAIN,
84              
85             // from a promise returned from a finally() callback
86             XSPR_CALLBACK_FINALLY_CHAIN
87             } xspr_callback_type_t;
88              
89             struct xspr_callback_s {
90             xspr_callback_type_t type;
91              
92             union {
93             struct {
94             SV* on_resolve;
95             SV* on_reject;
96             xspr_promise_t* next;
97             } perl;
98              
99             struct {
100             SV* on_finally;
101             xspr_promise_t* next;
102             } finally;
103              
104             xspr_promise_t* chain;
105              
106             struct {
107             xspr_result_t* original_result;
108             xspr_promise_t* chain_promise;
109             } finally_chain;
110             };
111             };
112              
113             struct xspr_result_s {
114             xspr_result_state_t state;
115             SV** results;
116             int count;
117             int refs;
118             bool rejection_should_warn;
119             };
120              
121             struct xspr_promise_s {
122             xspr_promise_state_t state;
123             pid_t detect_leak_pid;
124             int refs;
125             union {
126             struct {
127             xspr_callback_t** callbacks;
128             int callbacks_count;
129             } pending;
130             struct {
131             xspr_result_t *result;
132             } finished;
133             };
134              
135             /* For async/await: */
136             SV* on_ready_immediate;
137             SV* self_sv_ref;
138             };
139              
140             struct xspr_callback_queue_s {
141             xspr_promise_t* origin;
142             xspr_callback_t* callback;
143             xspr_callback_queue_t* next;
144             };
145              
146             xspr_callback_t* xspr_callback_new_perl(pTHX_ SV* on_resolve, SV* on_reject, xspr_promise_t* next);
147             xspr_callback_t* xspr_callback_new_chain(pTHX_ xspr_promise_t* chain);
148             xspr_callback_t* xspr_callback_new_finally_chain(pTHX_ xspr_result_t* original_result, xspr_promise_t* next_promise);
149             void xspr_callback_process(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin);
150             void xspr_callback_free(pTHX_ xspr_callback_t* callback);
151              
152             xspr_promise_t* xspr_promise_new(pTHX);
153             void xspr_promise_then(pTHX_ xspr_promise_t* promise, xspr_callback_t* callback);
154             void xspr_promise_finish(pTHX_ xspr_promise_t* promise, xspr_result_t *result);
155             void xspr_promise_incref(pTHX_ xspr_promise_t* promise);
156             void xspr_promise_decref(pTHX_ xspr_promise_t* promise);
157              
158             xspr_result_t* xspr_result_new(pTHX_ xspr_result_state_t state, unsigned count);
159             xspr_result_t* pxs_result_clone(pTHX_ xspr_result_t* old);
160             xspr_result_t* xspr_result_from_error(pTHX_ const char *error);
161             void xspr_result_incref(pTHX_ xspr_result_t* result);
162             void xspr_result_decref(pTHX_ xspr_result_t* result);
163              
164             xspr_result_t* xspr_invoke_perl(pTHX_ SV* perl_fn, SV** inputs, unsigned input_count);
165             xspr_promise_t* xspr_promise_from_sv(pTHX_ SV* input);
166              
167              
168             typedef struct {
169             xspr_callback_queue_t* queue_head;
170             xspr_callback_queue_t* queue_tail;
171             int in_flush;
172             int backend_scheduled;
173             unsigned char callback_depth;
174             #ifdef USE_ITHREADS
175             tTHX owner;
176             #endif
177             SV* pxs_flush_cr;
178             HV* pxs_base_stash;
179             HV* pxs_promise_stash;
180             HV* pxs_deferred_stash;
181             SV* deferral_cr;
182             SV* deferral_arg;
183             event_system_t event_system;
184             SV* stop_cr;
185             } my_cxt_t;
186              
187             typedef struct {
188             xspr_promise_t* promise;
189             } DEFERRED_CLASS_TYPE;
190              
191             typedef struct {
192             xspr_promise_t* promise;
193             } PROMISE_CLASS_TYPE;
194              
195             //----------------------------------------------------------------------
196              
197             START_MY_CXT
198              
199             /* Process a single callback */
200 1608           void xspr_callback_process(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin)
201             {
202 1608 50         ASSUME(origin->state == XSPR_STATE_FINISHED);
203              
204 1608 100         if (callback->type == XSPR_CALLBACK_CHAIN) {
205 770           xspr_promise_finish(aTHX_ callback->chain, origin->finished.result);
206              
207 838 100         } else if (callback->type == XSPR_CALLBACK_FINALLY_CHAIN) {
208 2 100         xspr_promise_finish(aTHX_
209             callback->finally_chain.chain_promise,
210 2           RESULT_IS_REJECTED(origin->finished.result) ? origin->finished.result : callback->finally_chain.original_result
211             );
212              
213 1672 100         } else if (callback->type == XSPR_CALLBACK_PERL || callback->type == XSPR_CALLBACK_FINALLY) {
    50          
214             SV* callback_fn;
215             xspr_promise_t* next_promise;
216              
217 836 100         if (callback->type == XSPR_CALLBACK_FINALLY) {
218 11           callback_fn = callback->finally.on_finally;
219 11           next_promise = callback->finally.next;
220              
221             /* A finally() “catches” its parent promise, even as it
222             rethrows any failure from it. */
223 11 50         if (callback_fn && SvOK(callback_fn)) {
    50          
    0          
    0          
224 11           origin->finished.result->rejection_should_warn = false;
225             }
226             } else {
227 825           next_promise = callback->perl.next;
228              
229 825 100         if (RESULT_IS_RESOLVED(origin->finished.result)) {
230 804           callback_fn = callback->perl.on_resolve;
231 21 50         } else if (RESULT_IS_REJECTED(origin->finished.result)) {
232 21           callback_fn = callback->perl.on_reject;
233              
234 21 50         if (callback_fn && SvOK(callback_fn)) {
    50          
    0          
    0          
235 21           origin->finished.result->rejection_should_warn = false;
236             }
237              
238             } else {
239 0           callback_fn = NULL; /* Be quiet, bad compiler! */
240 0           ASSUME(0);
241             }
242             }
243              
244 836 50         if (callback_fn != NULL) {
245             xspr_result_t* callback_result;
246              
247 836 100         if (callback->type == XSPR_CALLBACK_FINALLY) {
248 11           callback_result = xspr_invoke_perl(aTHX_ callback_fn, NULL, 0);
249             }
250             else {
251 825           callback_result = xspr_invoke_perl(aTHX_
252             callback_fn,
253 825           origin->finished.result->results,
254 825           origin->finished.result->count
255             );
256             }
257              
258 836 100         if (next_promise == NULL) {
259 33 100         if (callback->type == XSPR_CALLBACK_FINALLY && RESULT_IS_RESOLVED(callback_result) && RESULT_IS_REJECTED(origin->finished.result)) {
    50          
    50          
260              
261             /* This handles the case where finally() is called in
262             void context and the parent promise rejects. In this
263             case we need an unhandled-rejection warning right
264             away since, given the absence of a next_promise,
265             by definition we have an unhandled rejection.
266             */
267 1           xspr_result_decref(aTHX_ callback_result);
268 33           callback_result = pxs_result_clone( aTHX_ origin->finished.result );
269             }
270             }
271             else {
272 803           bool finish_promise = true;
273              
274 803 100         if (callback_result->count > 0 && callback_result->state == XSPR_RESULT_RESOLVED) {
    100          
275 790           xspr_promise_t* promise = xspr_promise_from_sv(aTHX_ callback_result->results[0]);
276              
277 790 100         if (promise != NULL) {
278              
279 774 100         if (callback_result->count > 1) {
280 2           warn( BASE_CLASS ": %d extra response(s) returned after promise! Treating promise like normal return.", callback_result->count - 1 );
281             }
282 772 50         else if (promise == next_promise) {
283 0           finish_promise = false;
284              
285             /* This is an extreme corner case the A+ spec made us implement: we need to reject
286             * cases where the promise created from then() is passed back to its own callback */
287 0           xspr_result_t* chain_error = xspr_result_from_error(aTHX_ "TypeError");
288 0           xspr_promise_finish(aTHX_ next_promise, chain_error);
289              
290 0           xspr_result_decref(aTHX_ chain_error);
291             }
292             else {
293 772           finish_promise = false;
294              
295             /* Fairly normal case: we returned a promise from the callback */
296             xspr_callback_t* chainback;
297              
298 772 100         if (callback->type == XSPR_CALLBACK_FINALLY) {
299 2           chainback = xspr_callback_new_finally_chain(aTHX_ origin->finished.result, next_promise);
300             }
301             else {
302 770           chainback = xspr_callback_new_chain(aTHX_ next_promise);
303             }
304              
305 772           xspr_promise_then(aTHX_ promise, chainback);
306             }
307              
308 774           xspr_promise_decref(aTHX_ promise);
309             }
310             }
311              
312 803 100         if (finish_promise) {
313             xspr_result_t* final_result;
314 31           bool final_result_needs_decref = false;;
315              
316 31 100         if ((callback->type == XSPR_CALLBACK_FINALLY) && RESULT_IS_RESOLVED(callback_result)) {
    100          
317 7           final_result = origin->finished.result;
318              
319 11 100         if (RESULT_IS_REJECTED(final_result)) {
320              
321             // If finally()’s callback succeeds, it takes
322             // on the resolution status of the “parent”
323             // promise. If that promise rejected, then,
324             // the finally’s promise also rejects. Notably,
325             // the finally’s promise should STILL trigger
326             // an unhandled-rejection warning, even if the
327             // parent’s rejection is eventually handled.
328 4           final_result = pxs_result_clone(aTHX_ final_result);
329 4           final_result_needs_decref = true;
330             }
331             }
332             else {
333 24           final_result = callback_result;
334             }
335              
336 31           xspr_promise_finish(aTHX_ next_promise, final_result);
337              
338 31 100         if (final_result_needs_decref) {
339 4           xspr_result_decref(aTHX_ final_result);
340             }
341             }
342             }
343              
344 836           xspr_result_decref(aTHX_ callback_result);
345              
346 0 0         } else if (next_promise) {
347             /* No callback, so we're just passing the result along. */
348 0           xspr_result_t* result = origin->finished.result;
349 0           xspr_promise_finish(aTHX_ next_promise, result);
350             }
351              
352             } else {
353 0           ASSUME(0);
354             }
355 1608           }
356              
357             /* Frees the xspr_callback_t structure */
358 1614           void xspr_callback_free(pTHX_ xspr_callback_t *callback)
359             {
360 1614 100         if (callback->type == XSPR_CALLBACK_CHAIN) {
361 770           xspr_promise_decref(aTHX_ callback->chain);
362              
363 844 100         } else if (callback->type == XSPR_CALLBACK_PERL) {
364 828           SvREFCNT_dec(callback->perl.on_resolve);
365 828           SvREFCNT_dec(callback->perl.on_reject);
366 828 100         if (callback->perl.next != NULL)
367 828           xspr_promise_decref(aTHX_ callback->perl.next);
368              
369 16 100         } else if (callback->type == XSPR_CALLBACK_FINALLY) {
370 14           SvREFCNT_dec(callback->finally.on_finally);
371 14 100         if (callback->finally.next != NULL)
372 14           xspr_promise_decref(aTHX_ callback->finally.next);
373              
374 2 50         } else if (callback->type == XSPR_CALLBACK_FINALLY_CHAIN) {
375 2           xspr_promise_decref(aTHX_ callback->finally_chain.chain_promise);
376 2           xspr_result_decref(aTHX_ callback->finally_chain.original_result);
377              
378             } else {
379 0           ASSUME(0);
380             }
381              
382 1614           Safefree(callback);
383 1614           }
384              
385             /* Process the queue until it's empty */
386 0           void xspr_queue_flush(pTHX)
387             {
388             dMY_CXT;
389              
390 0 0         if (MY_CXT.in_flush) {
391             /* XXX: is there a reasonable way to trigger this? */
392 0           warn("Rejecting request to flush promises queue: already processing");
393 0           return;
394             }
395 0           MY_CXT.in_flush = 1;
396              
397 0 0         while (MY_CXT.queue_head != NULL) {
398             /* Save some typing... */
399 0           xspr_callback_queue_t *cur = MY_CXT.queue_head;
400              
401             /* Process the callback. This could trigger some Perl code, meaning we
402             * could end up with additional queue entries after this */
403 0           xspr_callback_process(aTHX_ cur->callback, cur->origin);
404              
405             /* Free-ing the callback structure could theoretically trigger DESTROY subs,
406             * enqueueing new callbacks, so we can't assume the loop ends here! */
407 0           MY_CXT.queue_head = cur->next;
408 0 0         if (cur->next == NULL) {
409 0           MY_CXT.queue_tail = NULL;
410             }
411              
412             /* Destroy the structure */
413 0           xspr_callback_free(aTHX_ cur->callback);
414 0           xspr_promise_decref(aTHX_ cur->origin);
415 0           Safefree(cur);
416             }
417              
418 0           MY_CXT.in_flush = 0;
419 0           MY_CXT.backend_scheduled = 0;
420             }
421              
422             /* Add a callback invocation into the queue for the given origin promise.
423             * Takes ownership of the callback structure */
424 0           void xspr_queue_add(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin)
425             {
426             dMY_CXT;
427              
428             xspr_callback_queue_t* entry;
429 0           Newxz(entry, 1, xspr_callback_queue_t);
430 0           entry->origin = origin;
431 0           xspr_promise_incref(aTHX_ entry->origin);
432 0           entry->callback = callback;
433              
434 0 0         if (MY_CXT.queue_head == NULL) {
435 0 0         ASSUME(MY_CXT.queue_tail == NULL);
436             /* Empty queue, so now it's just us */
437 0           MY_CXT.queue_head = entry;
438 0           MY_CXT.queue_tail = entry;
439              
440             } else {
441 0 0         ASSUME(MY_CXT.queue_tail != NULL);
442             /* Existing queue, add to the tail */
443 0           MY_CXT.queue_tail->next = entry;
444 0           MY_CXT.queue_tail = entry;
445             }
446 0           }
447              
448 0           void _call_with_1_or_2_args( pTHX_ SV* cb, SV* maybe_arg0, SV* arg1 ) {
449             // --- Almost all copy-paste from “perlcall” … blegh!
450 0           dSP;
451              
452 0           ENTER;
453 0           SAVETMPS;
454              
455 0 0         PUSHMARK(SP);
456              
457 0 0         if (maybe_arg0) {
458 0 0         EXTEND(SP, 2);
459 0           PUSHs(maybe_arg0);
460             }
461             else {
462 0 0         EXTEND(SP, 1);
463             }
464              
465 0           PUSHs( arg1 );
466 0           PUTBACK;
467              
468 0           call_sv(cb, G_VOID);
469              
470 0 0         FREETMPS;
471 0           LEAVE;
472              
473 0           return;
474             }
475              
476 10           void _call_pv_with_args( pTHX_ const char* subname, SV** args, unsigned argscount )
477             {
478             // --- Almost all copy-paste from “perlcall” … blegh!
479 10           dSP;
480              
481 10           ENTER;
482 10           SAVETMPS;
483              
484 10 50         PUSHMARK(SP);
485 10 50         EXTEND(SP, argscount);
486              
487             unsigned i;
488 20 100         for (i=0; i
489 10           PUSHs(args[i]);
490             }
491              
492 10           PUTBACK;
493              
494 10           call_pv(subname, G_VOID);
495              
496 10 50         FREETMPS;
497 10           LEAVE;
498              
499 10           return;
500             }
501              
502 0           void xspr_queue_maybe_schedule(pTHX)
503             {
504             dMY_CXT;
505 0 0         if (MY_CXT.queue_head == NULL || MY_CXT.backend_scheduled || MY_CXT.in_flush) {
    0          
    0          
506 0           return;
507             }
508              
509 0           MY_CXT.backend_scheduled = 1;
510             /* We trust our backends to be sane, so little guarding against errors here */
511              
512 0 0         if (!MY_CXT.pxs_flush_cr) {
513 0           HV *stash = gv_stashpv(DEFERRED_CLASS, 0);
514 0           GV* method_gv = gv_fetchmethod_autoload(stash, "___flush", FALSE);
515 0 0         if (method_gv != NULL && isGV(method_gv) && GvCV(method_gv) != NULL) {
    0          
    0          
516 0           MY_CXT.pxs_flush_cr = newRV_inc( (SV*)GvCV(method_gv) );
517             }
518             else {
519 0           ASSUME(0);
520             }
521             }
522              
523 0           _call_with_1_or_2_args(aTHX_ MY_CXT.deferral_cr, MY_CXT.deferral_arg, MY_CXT.pxs_flush_cr);
524             }
525              
526             /* Invoke the user's perl code. We need to be really sure this doesn't return early via croak/next/etc. */
527 837           xspr_result_t* xspr_invoke_perl(pTHX_ SV* perl_fn, SV** inputs, unsigned input_count)
528             {
529 837           dSP;
530             unsigned count, i;
531             xspr_result_t* result;
532              
533 837 50         if (!SvROK(perl_fn)) {
534 0           return xspr_result_from_error(aTHX_ "promise callbacks need to be a CODE reference");
535             }
536              
537 837           ENTER;
538 837           SAVETMPS;
539              
540 837 50         PUSHMARK(SP);
541 837 50         EXTEND(SP, input_count);
542 1677 100         for (i = 0; i < input_count; i++) {
543 840           PUSHs(inputs[i]);
544             }
545 837           PUTBACK;
546              
547             /* Clear $_ so that callbacks don't end up talking to each other by accident */
548 837           SAVE_DEFSV;
549 837           DEFSV_set(sv_newmortal());
550              
551 837           count = call_sv(perl_fn, G_EVAL | G_ARRAY);
552              
553 837           SPAGAIN;
554              
555 837 50         if (SvTRUE(ERRSV)) {
    50          
    50          
    50          
    0          
    0          
    50          
    50          
    0          
    0          
    0          
    0          
    50          
    50          
    50          
    50          
    50          
    100          
    50          
    50          
    0          
    0          
    100          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
556 6           result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, 1);
557 6 50         result->results[0] = newSVsv(ERRSV);
558             } else {
559 831           result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, count);
560 1651 100         for (i = 0; i < count; i++) {
561 820           result->results[count-i-1] = SvREFCNT_inc(POPs);
562             }
563             }
564 837           PUTBACK;
565              
566 837 50         FREETMPS;
567 837           LEAVE;
568              
569 837           return result;
570             }
571              
572             /* Increments the ref count for xspr_result_t */
573 1620           void xspr_result_incref(pTHX_ xspr_result_t* result)
574             {
575 1620           result->refs++;
576 1620           }
577              
578             /* Decrements the ref count for the xspr_result_t, freeing the structure if needed */
579 3281           void xspr_result_decref(pTHX_ xspr_result_t* result)
580             {
581 3281 100         if (--(result->refs) == 0) {
582 1661 100         if (RESULT_IS_REJECTED(result) && result->rejection_should_warn) {
    100          
583 10           SV* warn_args[result->count];
584              
585             // Dupe the results to warn about:
586 10 50         Copy(result->results, warn_args, result->count, SV*);
587              
588 10           _call_pv_with_args(aTHX_ "Promise::XS::Promise::_warn_unhandled", warn_args, result->count);
589             }
590              
591             unsigned i;
592 3318 100         for (i = 0; i < result->count; i++) {
593 1657           SvREFCNT_dec(result->results[i]);
594             }
595 1661           Safefree(result->results);
596 1661           Safefree(result);
597             }
598 3281           }
599              
600 1608           void xspr_immediate_process(pTHX_ xspr_callback_t* callback, xspr_promise_t* promise)
601             {
602             dMY_CXT;
603              
604 1608           MY_CXT.callback_depth++;
605              
606 1608           xspr_callback_process(aTHX_ callback, promise);
607              
608 1608           MY_CXT.callback_depth--;
609              
610             /* Destroy the structure */
611 1608           xspr_callback_free(aTHX_ callback);
612 1608           }
613              
614             #define _XSPR_FREE_ON_READY_IMMEDIATE(promise) \
615             SvREFCNT_dec(SvRV(promise->on_ready_immediate)); \
616             SvREFCNT_dec(promise->on_ready_immediate);
617              
618             /* Transitions a promise from pending to finished, using the given result */
619 1620           void xspr_promise_finish(pTHX_ xspr_promise_t* promise, xspr_result_t* result)
620             {
621             dMY_CXT;
622              
623 1620 50         ASSUME(promise->state == XSPR_STATE_PENDING);
624 1620           xspr_callback_t** pending_callbacks = promise->pending.callbacks;
625 1620           int count = promise->pending.callbacks_count;
626              
627 1620           promise->state = XSPR_STATE_FINISHED;
628 1620           promise->finished.result = result;
629 1620           xspr_result_incref(aTHX_ promise->finished.result);
630              
631             /* fprintf(stderr, "finishing p=%p (%d callbacks)\n", promise, count); */
632              
633             /* For async/await: */
634 1620 50         if (promise->on_ready_immediate != NULL) {
635 0           xspr_invoke_perl(aTHX_ promise->on_ready_immediate, NULL, 0);
636              
637 0           _XSPR_FREE_ON_READY_IMMEDIATE(promise);
638 0           promise->on_ready_immediate = NULL;
639             }
640              
641             unsigned i;
642 1632 100         for (i = 0; i < count; i++) {
643              
644             // If any of this promise’s callbacks has an on_reject, then
645             // the promise’s result is rejection-handled.
646 12 100         if (pending_callbacks[i]->type == XSPR_CALLBACK_PERL && RESULT_IS_REJECTED(result) && result->rejection_should_warn) {
    100          
    100          
647 3           SV* on_reject = pending_callbacks[i]->perl.on_reject;
648 3 50         if (on_reject && SvOK(on_reject)) {
    50          
    0          
    0          
649 3           result->rejection_should_warn = false;
650             }
651             }
652              
653 12 50         if (MY_CXT.deferral_cr) {
654 0           xspr_queue_add(aTHX_ pending_callbacks[i], promise);
655             }
656             else {
657 12           xspr_immediate_process(aTHX_ pending_callbacks[i], promise);
658             }
659             }
660              
661 1620 50         if (promise->self_sv_ref != NULL) {
662              
663             // After we set self_sv_ref, Future::AsyncAwait manipulates
664             // things a bit such that WEAKREF is set on the reference and
665             // the referent’s refcount is decremented. Thus, we can forgo
666             // the reference-count decrement here. We still check for the
667             // WEAKREF flag, though, just in case something changed.
668             //
669 0 0         if (!SvWEAKREF(promise->self_sv_ref)) {
670 0           SvREFCNT_dec(SvRV(promise->self_sv_ref));
671             }
672              
673 0           SvREFCNT_dec(promise->self_sv_ref);
674 0           promise->self_sv_ref = NULL;
675             }
676              
677 1620 50         if (MY_CXT.deferral_cr) {
678 0           xspr_queue_maybe_schedule(aTHX);
679             }
680              
681 1620           Safefree(pending_callbacks);
682 1620           }
683              
684             /* Create a new xspr_result_t object with the given number of item slots */
685 1661           xspr_result_t* xspr_result_new(pTHX_ xspr_result_state_t state, unsigned count)
686             {
687             xspr_result_t* result;
688 1661           Newxz(result, 1, xspr_result_t);
689 1661 50         Newxz(result->results, count, SV*);
690 1661           result->rejection_should_warn = true;
691 1661           result->state = state;
692 1661           result->refs = 1;
693 1661           result->count = count;
694 1661           return result;
695             }
696              
697 7           xspr_result_t* pxs_result_clone(pTHX_ xspr_result_t* old)
698             {
699 7           xspr_result_t* new = xspr_result_new(aTHX_ old->state, old->count);
700              
701             unsigned i;
702 16 100         for (i=0; icount; i++) {
703 9           new->results[i] = SvREFCNT_inc( old->results[i] );
704             }
705              
706 7           return new;
707             }
708              
709 0           xspr_result_t* xspr_result_from_error(pTHX_ const char *error)
710             {
711 0           xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, 1);
712 0           result->results[0] = newSVpv(error, 0);
713 0           return result;
714             }
715              
716             /* Increments the ref count for xspr_promise_t */
717 2389           void xspr_promise_incref(pTHX_ xspr_promise_t* promise)
718             {
719 2389           (promise->refs)++;
720 2389           }
721              
722             /* Decrements the ref count for the xspr_promise_t, freeing the structure if needed */
723 4024           void xspr_promise_decref(pTHX_ xspr_promise_t *promise)
724             {
725 4024 100         if (--(promise->refs) == 0) {
726 1635 100         if (promise->state == XSPR_STATE_PENDING) {
727             /* XXX: is this a bad thing we should warn for? */
728 15           int count = promise->pending.callbacks_count;
729 15           xspr_callback_t **callbacks = promise->pending.callbacks;
730             int i;
731 21 100         for (i = 0; i < count; i++) {
732 6           xspr_callback_free(aTHX_ callbacks[i]);
733             }
734 15           Safefree(callbacks);
735              
736 1620 50         } else if (promise->state == XSPR_STATE_FINISHED) {
737 1620           xspr_result_decref(aTHX_ promise->finished.result);
738              
739             } else {
740 0           ASSUME(0);
741             }
742              
743 1635 50         if (promise->on_ready_immediate != NULL) {
744 0           _XSPR_FREE_ON_READY_IMMEDIATE(promise);
745             }
746              
747 1635           Safefree(promise);
748             }
749 4024           }
750              
751             /* Creates a new promise. It's that simple. */
752 1635           xspr_promise_t* xspr_promise_new(pTHX)
753             {
754             xspr_promise_t* promise;
755 1635           Newxz(promise, 1, xspr_promise_t);
756              
757 1635           *promise = (xspr_promise_t) {
758             .refs = 1,
759             .state = XSPR_STATE_PENDING,
760             };
761              
762 1635           return promise;
763             }
764              
765 828           xspr_callback_t* xspr_callback_new_perl(pTHX_ SV* on_resolve, SV* on_reject, xspr_promise_t* next)
766             {
767             xspr_callback_t* callback;
768 828           Newxz(callback, 1, xspr_callback_t);
769 828           callback->type = XSPR_CALLBACK_PERL;
770 828 100         if (SvOK(on_resolve))
    50          
    50          
771 807           callback->perl.on_resolve = newSVsv(on_resolve);
772 828 100         if (SvOK(on_reject))
    50          
    50          
773 32           callback->perl.on_reject = newSVsv(on_reject);
774 828           callback->perl.next = next;
775 828 100         if (next)
776 796           xspr_promise_incref(aTHX_ callback->perl.next);
777 828           return callback;
778             }
779              
780 14           xspr_callback_t* xspr_callback_new_finally(pTHX_ SV* on_finally, xspr_promise_t* next)
781             {
782             xspr_callback_t* callback;
783 14           Newxz(callback, 1, xspr_callback_t);
784 14           callback->type = XSPR_CALLBACK_FINALLY;
785 14 50         if (SvOK(on_finally))
    0          
    0          
786 14           callback->finally.on_finally = newSVsv(on_finally);
787 14           callback->finally.next = next;
788 14 100         if (next)
789 13           xspr_promise_incref(aTHX_ callback->finally.next);
790 14           return callback;
791             }
792              
793 770           xspr_callback_t* xspr_callback_new_chain(pTHX_ xspr_promise_t* chain)
794             {
795             xspr_callback_t* callback;
796 770           Newxz(callback, 1, xspr_callback_t);
797 770           callback->type = XSPR_CALLBACK_CHAIN;
798 770           callback->chain = chain;
799 770           xspr_promise_incref(aTHX_ chain);
800 770           return callback;
801             }
802              
803 2           xspr_callback_t* xspr_callback_new_finally_chain(pTHX_ xspr_result_t* original_result, xspr_promise_t* next_promise)
804             {
805             xspr_callback_t* callback;
806 2           Newxz(callback, 1, xspr_callback_t);
807 2           callback->type = XSPR_CALLBACK_FINALLY_CHAIN;
808              
809             /*
810             callback->finally_chain.original_result = original_result;
811             xspr_result_incref(aTHX_ original_result);
812             */
813 2           callback->finally_chain.original_result = pxs_result_clone(aTHX_ original_result);
814              
815 2           callback->finally_chain.chain_promise = next_promise;
816 2           xspr_promise_incref(aTHX_ next_promise);
817              
818 2           return callback;
819             }
820              
821             /* Adds a then to the promise. Takes ownership of the callback */
822 1614           void xspr_promise_then(pTHX_ xspr_promise_t* promise, xspr_callback_t* callback)
823             {
824             dMY_CXT;
825              
826 1614 100         if (promise->state == XSPR_STATE_PENDING) {
827 18           promise->pending.callbacks_count++;
828 18 50         Renew(promise->pending.callbacks, promise->pending.callbacks_count, xspr_callback_t*);
829 18           promise->pending.callbacks[promise->pending.callbacks_count-1] = callback;
830              
831 1596 50         } else if (promise->state == XSPR_STATE_FINISHED) {
832              
833 1596 50         if (MY_CXT.deferral_cr) {
834 0           xspr_queue_add(aTHX_ callback, promise);
835 0           xspr_queue_maybe_schedule(aTHX);
836             }
837             else {
838 1596           xspr_immediate_process(aTHX_ callback, promise);
839             }
840             } else {
841 0           ASSUME(0);
842             }
843 1614           }
844              
845             /* Returns a promise if the given SV is a thenable. Ownership handed to the caller! */
846 790           xspr_promise_t* xspr_promise_from_sv(pTHX_ SV* input)
847             {
848 790 50         if (input == NULL || !sv_isobject(input)) {
    100          
849 16           return NULL;
850             }
851              
852             /* If we got one of our own promises: great, not much to do here! */
853 774 100         if (sv_derived_from(input, PROMISE_CLASS)) {
854 773 50         IV tmp = SvIV((SV*)SvRV(input));
855 773           PROMISE_CLASS_TYPE* promise = INT2PTR(PROMISE_CLASS_TYPE*, tmp);
856 773           xspr_promise_incref(aTHX_ promise->promise);
857 773           return promise->promise;
858             }
859              
860             /* Maybe we got another type of promise. Let's convert it */
861 1           GV* method_gv = gv_fetchmethod_autoload(SvSTASH(SvRV(input)), "then", FALSE);
862 1 50         if (method_gv != NULL && isGV(method_gv) && GvCV(method_gv) != NULL) {
    50          
    50          
863              
864 1           CV* converter_cv = get_cv(BASE_CLASS "::" CONVERTER_CR_NAME, 0);
865 1 50         if (!converter_cv) croak("Need " CONVERTER_CR_NAME "!");
866              
867 1           SV* converter_svcv = newRV_inc((SV*) converter_cv);
868 1           sv_2mortal(converter_svcv);
869              
870 1           xspr_result_t* new_result = xspr_invoke_perl(aTHX_ converter_svcv, &input, 1);
871 1 50         if (new_result->state == XSPR_RESULT_RESOLVED &&
    50          
872 1 50         new_result->results != NULL &&
873 1 50         new_result->count == 1 &&
874 1 50         SvROK(new_result->results[0]) &&
875 1           sv_derived_from(new_result->results[0], PROMISE_CLASS)) {
876             /* This is expected: our conversion function returned us one of our own promises */
877 1 50         IV tmp = SvIV((SV*)SvRV(new_result->results[0]));
878 1           PROMISE_CLASS_TYPE* new_promise = INT2PTR(PROMISE_CLASS_TYPE*, tmp);
879              
880 1           xspr_promise_t* promise = new_promise->promise;
881 1           xspr_promise_incref(aTHX_ promise);
882              
883 1           xspr_result_decref(aTHX_ new_result);
884 1           return promise;
885              
886             } else {
887 0           xspr_promise_t* promise = xspr_promise_new(aTHX);
888 0           xspr_promise_finish(aTHX_ promise, new_result);
889 0           xspr_result_decref(aTHX_ new_result);
890 0           return promise;
891             }
892             }
893              
894             /* We didn't get a promise. */
895 0           return NULL;
896             }
897              
898 109           DEFERRED_CLASS_TYPE* _get_deferred_from_sv(pTHX_ SV *self_sv) {
899 109           SV *referent = SvRV(self_sv);
900 109 50         return INT2PTR(DEFERRED_CLASS_TYPE*, SvUV(referent));
901             }
902              
903 2471           PROMISE_CLASS_TYPE* _get_promise_from_sv(pTHX_ SV *self_sv) {
904 2471           SV *referent = SvRV(self_sv);
905 2471 50         return INT2PTR(PROMISE_CLASS_TYPE*, SvUV(referent));
906             }
907              
908 860           SV* _ptr_to_svrv(pTHX_ void* ptr, HV* stash) {
909 860           SV* referent = newSVuv( PTR2UV(ptr) );
910 860           SV* retval = newRV_noinc(referent);
911 860           sv_bless(retval, stash);
912              
913 860           return retval;
914             }
915              
916 1635           static inline xspr_promise_t* create_promise(pTHX) {
917             dMY_CXT;
918              
919 1635           xspr_promise_t* promise = xspr_promise_new(aTHX);
920              
921 1635           SV *detect_leak_perl = NULL;
922              
923 1635           SV** dml_svgv = hv_fetchs( MY_CXT.pxs_base_stash, "DETECT_MEMORY_LEAKS", 0 );
924              
925 1635 50         if (dml_svgv) {
926 1635           detect_leak_perl = GvSV(*dml_svgv);
927             }
928              
929 1635 50         promise->detect_leak_pid = detect_leak_perl && SvTRUE(detect_leak_perl) ? getpid() : 0;
    50          
    50          
    0          
    100          
    50          
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    50          
    50          
    50          
    0          
    0          
    50          
    0          
930              
931 1635           return promise;
932             }
933              
934             /* Many promises are just thrown away after the final callback, no need to allocate a next promise for those */
935 842           static inline xspr_promise_t* create_next_promise_if_needed(pTHX_ SV* original, SV** stack_ptr) {
936 842 100         if (GIMME_V != G_VOID) {
    100          
937             PROMISE_CLASS_TYPE* next_promise;
938 809           Newxz(next_promise, 1, PROMISE_CLASS_TYPE);
939              
940 809           xspr_promise_t* next = create_promise(aTHX);
941 809           next_promise->promise = next;
942              
943 809           *stack_ptr = sv_newmortal();
944              
945             // This would be simpler, but let’s facilitate subclassing.
946             // sv_setref_pv(*stack_ptr, PROMISE_CLASS, (void*)next_promise);
947              
948 809           sv_setref_pv(*stack_ptr, NULL, (void*)next_promise);
949 809           sv_bless(*stack_ptr, SvSTASH(SvRV(original)));
950              
951 809           return next;
952             }
953              
954 33           return NULL;
955             }
956              
957 1669           static inline void _warn_on_destroy_if_needed(pTHX_ xspr_promise_t* promise, SV* self_sv) {
958 1669 100         if (promise->detect_leak_pid && PXS_IS_GLOBAL_DESTRUCTION && promise->detect_leak_pid == getpid()) {
    100          
    100          
959 2 50         warn( "======================================================================\nXXXXXX - %s survived until global destruction; memory leak likely!\n======================================================================\n", SvPV_nolen(self_sv) );
960             }
961 1669           }
962              
963 6           static inline void _warn_weird_reject_if_needed( pTHX_ SV* self_sv, const char* funcname, I32 my_items ) {
964              
965 6           char *pkgname = NULL;
966              
967 6 100         HV *stash = (self_sv == NULL) ? NULL : SvSTASH( SvRV(self_sv) );
968              
969 6 100         if (stash != NULL) {
970 3 50         pkgname = HvNAME(stash);
    50          
    50          
    0          
    50          
    50          
971             }
972              
973 6 100         if (pkgname == NULL) pkgname = DEFERRED_CLASS;
974              
975 6 100         if (my_items == 0) {
976 2           warn( "%s: Empty call to %s()", pkgname, funcname );
977             }
978             else {
979 4           warn( "%s: %s() called with only uninitialized values (%" IVdf ")", pkgname, funcname, (IV) my_items);
980             }
981 6           }
982              
983 795           static inline void _resolve_promise(pTHX_ xspr_promise_t* promise_p, SV** args, I32 argslen) {
984 795           xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, argslen);
985              
986             unsigned i;
987 1594 100         for (i = 0; i < argslen; i++) {
988 799           result->results[i] = newSVsv(args[i]);
989             }
990              
991 795           xspr_promise_finish(aTHX_ promise_p, result);
992 795           xspr_result_decref(aTHX_ result);
993 795           }
994              
995 22           static inline void _reject_promise(pTHX_ SV* self_sv, xspr_promise_t* promise_p, SV** args, I32 argslen) {
996 22           xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, argslen);
997              
998 22           bool has_defined = false;
999              
1000             unsigned i;
1001 45 100         for (i = 0; i < argslen; i++) {
1002 23           result->results[i] = newSVsv(args[i]);
1003              
1004 23 100         if (!has_defined && SvOK(result->results[i])) {
    100          
    50          
    50          
1005 16           has_defined = true;
1006             }
1007             }
1008              
1009 22 100         if (!has_defined) {
1010 6 100         const char* funcname = (self_sv == NULL) ? "rejected" : "reject";
1011              
1012 6           _warn_weird_reject_if_needed( aTHX_ self_sv, funcname, argslen );
1013             }
1014              
1015 22           xspr_promise_finish(aTHX_ promise_p, result);
1016 22           xspr_result_decref(aTHX_ result);
1017 22           }
1018              
1019 820           SV* _promise_to_sv(pTHX_ xspr_promise_t* promise_p) {
1020             dMY_CXT;
1021              
1022             PROMISE_CLASS_TYPE* promise_ptr;
1023 820           Newxz(promise_ptr, 1, PROMISE_CLASS_TYPE);
1024 820           promise_ptr->promise = promise_p;
1025              
1026 820           return _ptr_to_svrv(aTHX_ promise_ptr, MY_CXT.pxs_promise_stash);
1027             }
1028              
1029             /* When Future::AsyncAwait creates a promise/future it does NOT
1030             hold a strong reference to that object. Consequently, we have to
1031             ensure that the object lasts until we’re done with it. So introduce
1032             a (temporary!) circular reference. */
1033             #define _IMMORTALIZE_PROMISE_SV(promise_sv, promise_p) \
1034             do { \
1035             /* fprintf(stderr, "making immortal: sv=%p p=%p\n", promise_sv, promise_p); */ \
1036             promise_p->self_sv_ref = promise_sv; \
1037             SvREFCNT_inc(promise_sv); \
1038             SvREFCNT_inc(SvRV(promise_sv)); \
1039             } while (0)
1040              
1041 774           static inline SV* _create_preresolved_promise(pTHX_ SV** args, I32 argslen, bool immortalize) {
1042 774           xspr_promise_t* promise_p = create_promise(aTHX);
1043              
1044 774           _resolve_promise(aTHX_ promise_p, args, argslen);
1045              
1046 774           SV* promise_sv = _promise_to_sv(aTHX_ promise_p);
1047              
1048 774 50         if (immortalize) _IMMORTALIZE_PROMISE_SV(promise_sv, promise_p);
1049              
1050 774           return promise_sv;
1051             }
1052              
1053 10           static inline SV* _create_prerejected_promise(pTHX_ SV** args, I32 argslen, bool immortalize) {
1054 10           xspr_promise_t* promise_p = create_promise(aTHX);
1055              
1056 10           _reject_promise(aTHX_ NULL, promise_p, args, argslen);
1057              
1058 10           SV* promise_sv = _promise_to_sv(aTHX_ promise_p);
1059              
1060 10 50         if (immortalize) _IMMORTALIZE_PROMISE_SV(promise_sv, promise_p);
1061              
1062 10           return promise_sv;
1063             }
1064              
1065             //----------------------------------------------------------------------
1066 0           static SV* _get_nothing_cr_arg (pTHX) {
1067 0           return SvREFCNT_inc( get_sv("Promise::XS::Deferred::_NOTHING_CR", 0) );
1068             }
1069              
1070 0           static void _anyevent_wait_promise (pTHX_ SV* promise_sv) {
1071 0           SV* condvar = exs_call_method_scalar(
1072             sv_2mortal( newSVpvs("AnyEvent") ),
1073             "condvar",
1074             NULL
1075             );
1076              
1077 0           SV* catch_args[] = {
1078 0           _get_nothing_cr_arg(aTHX),
1079             NULL,
1080             };
1081              
1082 0           SV* caught = exs_call_method_scalar(
1083             promise_sv,
1084             "catch",
1085             catch_args
1086             );
1087              
1088 0           SV* finally_args[] = {
1089 0           SvREFCNT_inc(condvar),
1090             NULL,
1091             };
1092              
1093 0 0         exs_call_method_void(
1094             caught,
1095             "finally",
1096             finally_args
1097             );
1098              
1099 0           sv_2mortal(caught);
1100              
1101 0 0         exs_call_method_void(
1102             condvar,
1103             "recv",
1104             NULL
1105             );
1106              
1107 0           sv_2mortal(condvar);
1108 0           }
1109              
1110 0           static void _ioasync_wait_promise (pTHX_ SV* promise_sv, SV* loop_sv, SV* stop_cr) {
1111 0           SV* catch_args[] = {
1112 0           _get_nothing_cr_arg(aTHX),
1113             NULL,
1114             };
1115              
1116 0           SV* caught = exs_call_method_scalar(
1117             promise_sv,
1118             "catch",
1119             catch_args
1120             );
1121              
1122 0           SV* finally_args[] = { SvREFCNT_inc(stop_cr), NULL };
1123              
1124 0 0         exs_call_method_void(
1125             caught,
1126             "finally",
1127             finally_args
1128             );
1129              
1130 0           sv_2mortal(caught);
1131              
1132 0 0         exs_call_method_void(
1133             loop_sv,
1134             "run",
1135             NULL
1136             );
1137 0           }
1138              
1139 0           static void _mojo_wait_promise(pTHX_ SV* promise_sv, SV* stop_cr) {
1140 0           SV* catch_args[] = {
1141 0           _get_nothing_cr_arg(aTHX),
1142             NULL,
1143             };
1144              
1145 0           SV* caught = exs_call_method_scalar(
1146             promise_sv,
1147             "catch",
1148             catch_args
1149             );
1150              
1151 0           SV* finally_args[] = { SvREFCNT_inc(stop_cr), NULL };
1152              
1153 0 0         exs_call_method_void(
1154             caught,
1155             "finally",
1156             finally_args
1157             );
1158              
1159 0           sv_2mortal(caught);
1160              
1161 0 0         exs_call_method_void(
1162             sv_2mortal( newSVpvs("Mojo::IOLoop") ),
1163             "start",
1164             NULL
1165             );
1166 0           }
1167              
1168             //----------------------------------------------------------------------
1169              
1170             MODULE = Promise::XS PACKAGE = Promise::XS
1171              
1172             BOOT:
1173             {
1174             MY_CXT_INIT;
1175             #ifdef USE_ITHREADS
1176             MY_CXT.owner = aTHX;
1177             #endif
1178 28           MY_CXT.queue_head = NULL;
1179 28           MY_CXT.queue_tail = NULL;
1180 28           MY_CXT.in_flush = 0;
1181 28           MY_CXT.backend_scheduled = 0;
1182 28           MY_CXT.callback_depth = 0;
1183              
1184 28           MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
1185 28           MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
1186 28           MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);
1187              
1188 28           MY_CXT.deferral_cr = NULL;
1189 28           MY_CXT.deferral_arg = NULL;
1190 28           MY_CXT.event_system = _DEFER_NONE;
1191 28           MY_CXT.stop_cr = NULL;
1192 28           MY_CXT.pxs_flush_cr = NULL;
1193             }
1194              
1195             # In some old thread-multi perls sv_dup_inc() wasn’t defined.
1196              
1197             #if defined(USE_ITHREADS) && defined(sv_dup_inc)
1198              
1199             # ithreads would seem to be a very bad idea in Promise-based code,
1200             # but anyway ..
1201              
1202             void
1203             CLONE(...)
1204             PPCODE:
1205              
1206             SV* pxs_flush_cr = NULL;
1207             SV* deferral_cr = NULL;
1208             event_system_t event_system;
1209             SV* deferral_arg = NULL;
1210             SV* stop_cr = NULL;
1211              
1212             {
1213             dMY_CXT;
1214              
1215             CLONE_PARAMS params = {NULL, 0, MY_CXT.owner};
1216              
1217             if ( MY_CXT.pxs_flush_cr ) {
1218             pxs_flush_cr = sv_dup_inc( MY_CXT.pxs_flush_cr, ¶ms );
1219             }
1220              
1221             if ( MY_CXT.deferral_cr ) {
1222             deferral_cr = sv_dup_inc( MY_CXT.deferral_cr, ¶ms );
1223             }
1224              
1225             if ( MY_CXT.deferral_arg ) {
1226             deferral_arg = sv_dup_inc( MY_CXT.deferral_arg, ¶ms );
1227             }
1228              
1229             event_system = MY_CXT.event_system;
1230              
1231             if ( MY_CXT.stop_cr ) {
1232             stop_cr = sv_dup_inc( MY_CXT.stop_cr, ¶ms );
1233             }
1234             }
1235              
1236             {
1237             MY_CXT_CLONE;
1238             MY_CXT.owner = aTHX;
1239              
1240             // Clone SVs
1241             MY_CXT.pxs_flush_cr = pxs_flush_cr;
1242             MY_CXT.deferral_cr = deferral_cr;
1243             MY_CXT.deferral_arg = deferral_arg;
1244             MY_CXT.event_system = event_system;
1245             MY_CXT.stop_cr = stop_cr;
1246              
1247             // Clone HVs
1248             MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
1249             MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
1250             MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);
1251             }
1252              
1253             XSRETURN_UNDEF;
1254              
1255             #endif /* USE_ITHREADS && defined(sv_dup_inc) */
1256              
1257             SV *
1258             resolved(...)
1259             CODE:
1260 774           RETVAL = _create_preresolved_promise(aTHX_ &(ST(0)), items, false);
1261             OUTPUT:
1262             RETVAL
1263              
1264             SV *
1265             rejected(...)
1266             CODE:
1267 10           RETVAL = _create_prerejected_promise(aTHX_ &(ST(0)), items, false);
1268             OUTPUT:
1269             RETVAL
1270              
1271             #----------------------------------------------------------------------
1272              
1273             MODULE = Promise::XS PACKAGE = Promise::XS::Deferred
1274              
1275             PROTOTYPES: DISABLE
1276              
1277             BOOT:
1278 28           newCONSTSUB( gv_stashpvs(BASE_CLASS "::Deferred", FALSE), "_DEFER_ANYEVENT", newSVuv(_DEFER_ANYEVENT));
1279 28           newCONSTSUB( gv_stashpvs(BASE_CLASS "::Deferred", FALSE), "_DEFER_IOASYNC", newSVuv(_DEFER_IOASYNC));
1280 28           newCONSTSUB( gv_stashpvs(BASE_CLASS "::Deferred", FALSE), "_DEFER_MOJO", newSVuv(_DEFER_MOJO));
1281              
1282             SV *
1283             create()
1284             CODE:
1285             dMY_CXT;
1286              
1287             DEFERRED_CLASS_TYPE* deferred_ptr;
1288 40           Newxz(deferred_ptr, 1, DEFERRED_CLASS_TYPE);
1289              
1290 40           xspr_promise_t* promise = create_promise(aTHX);
1291              
1292 40           deferred_ptr->promise = promise;
1293              
1294 40           RETVAL = _ptr_to_svrv(aTHX_ deferred_ptr, MY_CXT.pxs_deferred_stash);
1295             OUTPUT:
1296             RETVAL
1297              
1298             void
1299             ___set_deferral_generic(SV* deferral_cr, SV* deferral_arg, UV event_system, SV* stop_cr=NULL)
1300             CODE:
1301             dMY_CXT;
1302              
1303             // deferral_cr = SvRV(deferral_cr);
1304              
1305 0 0         if (MY_CXT.deferral_cr) {
1306 0           SvREFCNT_dec(MY_CXT.deferral_cr);
1307             }
1308              
1309 0 0         if (MY_CXT.deferral_arg) {
1310 0           SvREFCNT_dec(MY_CXT.deferral_arg);
1311             }
1312              
1313 0 0         if (MY_CXT.stop_cr) {
1314 0           SvREFCNT_dec(MY_CXT.stop_cr);
1315             }
1316              
1317 0           MY_CXT.deferral_cr = SvREFCNT_inc(deferral_cr);
1318              
1319 0 0         MY_CXT.deferral_arg = SvOK(deferral_arg) ? SvREFCNT_inc(deferral_arg) : NULL;
    0          
    0          
1320              
1321 0           MY_CXT.event_system = event_system;
1322              
1323 0 0         MY_CXT.stop_cr = stop_cr ? SvREFCNT_inc(stop_cr) : NULL;
1324              
1325             # We don’t care if there are args or not.
1326             void
1327             ___flush(...)
1328             CODE:
1329             UNUSED(items);
1330 0           xspr_queue_flush(aTHX);
1331              
1332             SV*
1333             promise(SV* self_sv)
1334             CODE:
1335 34           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1336              
1337 34           xspr_promise_incref(aTHX_ self->promise);
1338              
1339 34           RETVAL = _promise_to_sv(aTHX_ self->promise);
1340             OUTPUT:
1341             RETVAL
1342              
1343             SV*
1344             resolve(SV *self_sv, ...)
1345             CODE:
1346 21           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1347              
1348 21 50         if (self->promise->state != XSPR_STATE_PENDING) {
1349 0           croak("Cannot resolve deferred: not pending");
1350             }
1351              
1352 21           _resolve_promise(aTHX_ self->promise, &(ST(1)), items - 1);
1353              
1354 21 50         if (GIMME_V == G_VOID) {
    100          
1355 19           RETVAL = NULL;
1356             }
1357             else {
1358 2           SvREFCNT_inc(self_sv);
1359 2           RETVAL = self_sv;
1360             }
1361             OUTPUT:
1362             RETVAL
1363              
1364             SV*
1365             reject(SV *self_sv, ...)
1366             CODE:
1367 12           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1368              
1369 12 50         if (self->promise->state != XSPR_STATE_PENDING) {
1370 0           croak("Cannot reject deferred: not pending");
1371             }
1372              
1373 12           _reject_promise(aTHX_ self_sv, self->promise, &(ST(1)), items - 1);
1374              
1375 12 100         if (GIMME_V == G_VOID) {
    50          
1376 12           RETVAL = NULL;
1377             }
1378             else {
1379 0           SvREFCNT_inc(self_sv);
1380 0           RETVAL = self_sv;
1381             }
1382             OUTPUT:
1383             RETVAL
1384              
1385             bool
1386             is_pending(SV *self_sv)
1387             CODE:
1388 0           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1389              
1390 0           RETVAL = (self->promise->state == XSPR_STATE_PENDING);
1391             OUTPUT:
1392             RETVAL
1393              
1394             SV*
1395             clear_unhandled_rejection(SV *self_sv)
1396             CODE:
1397 2           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1398              
1399 2 50         if (self->promise->state == XSPR_STATE_FINISHED) {
1400 2           self->promise->finished.result->rejection_should_warn = false;
1401             }
1402              
1403 2 50         if (GIMME_V == G_VOID) {
    50          
1404 2           RETVAL = NULL;
1405             }
1406             else {
1407 0           SvREFCNT_inc(self_sv);
1408 0           RETVAL = self_sv;
1409             }
1410             OUTPUT:
1411             RETVAL
1412              
1413             void
1414             DESTROY(SV *self_sv)
1415             CODE:
1416 40           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1417              
1418 40           _warn_on_destroy_if_needed(aTHX_ self->promise, self_sv);
1419              
1420 40           xspr_promise_decref(aTHX_ self->promise);
1421 40           Safefree(self);
1422              
1423             # ----------------------------------------------------------------------
1424              
1425             MODULE = Promise::XS PACKAGE = Promise::XS::Promise
1426              
1427             PROTOTYPES: DISABLE
1428              
1429             void
1430             then(SV* self_sv, SV* on_resolve = NULL, SV* on_reject = NULL)
1431             PPCODE:
1432 810 100         _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;
1433              
1434 807           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1435              
1436             xspr_promise_t* next;
1437              
1438 807 50         if (on_resolve == NULL) on_resolve = &PL_sv_undef;
1439 807 100         if (on_reject == NULL) on_reject = &PL_sv_undef;
1440              
1441 807           next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));
1442              
1443 807           xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ on_resolve, on_reject, next);
1444 807           xspr_promise_then(aTHX_ self->promise, callback);
1445              
1446 807           XSRETURN(next ? 1 : 0);
1447              
1448             void
1449             catch(SV* self_sv, SV* on_reject)
1450             PPCODE:
1451 21 50         _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;
1452              
1453 21           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1454              
1455 21           xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));
1456              
1457 21           xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ &PL_sv_undef, on_reject, next);
1458 21           xspr_promise_then(aTHX_ self->promise, callback);
1459              
1460 21           XSRETURN(next ? 1 : 0);
1461              
1462             void
1463             finally(SV* self_sv, SV* on_finally)
1464             PPCODE:
1465 14 50         _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;
1466              
1467 14           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1468              
1469 14           xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));
1470              
1471 14           xspr_callback_t* callback = xspr_callback_new_finally(aTHX_ on_finally, next);
1472 14           xspr_promise_then(aTHX_ self->promise, callback);
1473              
1474 14           XSRETURN(next ? 1 : 0);
1475              
1476             void
1477             DESTROY(SV* self_sv)
1478             CODE:
1479 1629           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1480             /* fprintf(stderr, "DESTROYing sv=%p, p=%p\n", self_sv, self->promise); */
1481              
1482 1629           _warn_on_destroy_if_needed(aTHX_ self->promise, self_sv);
1483              
1484 1629           xspr_promise_decref(aTHX_ self->promise);
1485 1629           Safefree(self);
1486              
1487             # ----------------------------------------------------------------------
1488             # Future::AsyncAwait interface:
1489             # ----------------------------------------------------------------------
1490              
1491             SV*
1492             AWAIT_NEW_DONE(...)
1493             CODE:
1494             _DO_DEBUG_AWAITABLE();
1495             UNUSED(items);
1496 0           RETVAL = _create_preresolved_promise(aTHX_ &(ST(1)), items - 1, true);
1497             OUTPUT:
1498             RETVAL
1499              
1500             SV*
1501             AWAIT_NEW_FAIL(...)
1502             CODE:
1503             _DO_DEBUG_AWAITABLE();
1504             UNUSED(items);
1505 0           RETVAL = _create_prerejected_promise(aTHX_ &(ST(1)), items - 1, true);
1506             OUTPUT:
1507             RETVAL
1508              
1509             SV*
1510             AWAIT_CLONE(...)
1511             CODE:
1512             _DO_DEBUG_AWAITABLE();
1513             UNUSED(items);
1514              
1515 2           xspr_promise_t* promise_p = create_promise(aTHX);
1516              
1517 2           RETVAL = _promise_to_sv(aTHX_ promise_p);
1518              
1519 2           _IMMORTALIZE_PROMISE_SV(RETVAL, promise_p);
1520              
1521             if (DEBUG_AWAITABLE) {
1522             fprintf(stderr, "# SvREFCNT(RETVAL)=%d\n", SvREFCNT(RETVAL));
1523             fprintf(stderr, "# SvREFCNT(SvRV(RETVAL))=%d\n", SvREFCNT(SvRV(RETVAL)));
1524             sv_dump(RETVAL);
1525             }
1526             OUTPUT:
1527             RETVAL
1528              
1529             void
1530             AWAIT_DONE(SV* self_sv, ...)
1531             CODE:
1532             _DO_DEBUG_AWAITABLE();
1533 0           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1534 0           _resolve_promise(aTHX_ self->promise, &ST(1), items - 1);
1535              
1536             void
1537             AWAIT_FAIL(SV* self_sv, ...)
1538             CODE:
1539             _DO_DEBUG_AWAITABLE();
1540 0           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1541 0           _reject_promise(aTHX_ NULL, self->promise, &ST(1), items - 1);
1542              
1543             bool
1544             AWAIT_IS_READY(SV *self_sv)
1545             CODE:
1546             _DO_DEBUG_AWAITABLE();
1547 0           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1548              
1549 0           RETVAL = (self->promise->state != XSPR_STATE_PENDING);
1550             OUTPUT:
1551             RETVAL
1552              
1553             void
1554             AWAIT_GET(SV *self_sv)
1555             PPCODE:
1556             _DO_DEBUG_AWAITABLE();
1557 0           DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
1558              
1559 0 0         ASSUME(self->promise->state == XSPR_STATE_FINISHED);
1560              
1561 0           SV** results = self->promise->finished.result->results;
1562 0           int result_count = self->promise->finished.result->count;
1563              
1564 0 0         if (RESULT_IS_RESOLVED(self->promise->finished.result)) {
1565             int i;
1566              
1567 0 0         if (!result_count) XSRETURN_EMPTY;
1568              
1569 0 0         switch (GIMME_V) {
1570              
1571             case G_ARRAY:
1572 0 0         EXTEND(SP, result_count);
    0          
1573              
1574 0 0         for (i=0; i
1575 0           PUSHs( sv_2mortal( newSVsv(results[i]) ) );
1576             }
1577              
1578 0           XSRETURN(result_count);
1579              
1580             case G_SCALAR:
1581 0 0         EXTEND(SP, 1);
1582 0           PUSHs( sv_2mortal( newSVsv(results[0]) ) );
1583 0           XSRETURN(1);
1584              
1585             case G_VOID:
1586 0           XSRETURN_EMPTY;
1587              
1588             default:
1589 0           ASSUME(0);
1590             }
1591             }
1592             else {
1593             SV* err;
1594 0 0         if (result_count) {
1595 0           err = sv_2mortal( newSVsv( results[0] ) );
1596             }
1597             else {
1598 0           err = &PL_sv_undef;
1599             }
1600              
1601 0           croak_sv(err);
1602             }
1603              
1604             void
1605             AWAIT_CHAIN_CANCEL(...)
1606             CODE:
1607             _DO_DEBUG_AWAITABLE();
1608             UNUSED(items);
1609              
1610             void
1611             AWAIT_ON_CANCEL(...)
1612             CODE:
1613             _DO_DEBUG_AWAITABLE();
1614             UNUSED(items);
1615              
1616             UV
1617             AWAIT_IS_CANCELLED(...)
1618             CODE:
1619             _DO_DEBUG_AWAITABLE();
1620             UNUSED(items);
1621 0           RETVAL = 0;
1622             OUTPUT:
1623             RETVAL
1624              
1625             void
1626             AWAIT_ON_READY(SV *self_sv, SV* coderef)
1627             CODE:
1628             _DO_DEBUG_AWAITABLE();
1629 0           PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
1630              
1631 0           self->promise->on_ready_immediate = coderef;
1632 0           SvREFCNT_inc(coderef);
1633 0           SvREFCNT_inc(SvRV(coderef));
1634              
1635             void
1636             AWAIT_WAIT(SV* self_sv)
1637             PPCODE:
1638             _DO_DEBUG_AWAITABLE();
1639             dMY_CXT;
1640              
1641 0           switch (MY_CXT.event_system) {
1642             case _DEFER_ANYEVENT:
1643 0           _anyevent_wait_promise(aTHX_ self_sv);
1644 0           break;
1645              
1646             case _DEFER_IOASYNC:
1647 0           _ioasync_wait_promise(aTHX_ self_sv, MY_CXT.deferral_arg, MY_CXT.stop_cr);
1648 0           break;
1649              
1650             case _DEFER_MOJO:
1651 0           _mojo_wait_promise(aTHX_ self_sv, MY_CXT.stop_cr);
1652 0           break;
1653              
1654             default:
1655 0           croak(BASE_CLASS ": No event loop set up! Did you forget to call use_event()?");
1656             }
1657              
1658 0 0         PUSHMARK(SP);
1659              
1660 0 0         int count = call_method("AWAIT_GET", GIMME_V);
1661 0           XSRETURN(count);