File Coverage

Multicore.xs
Criterion Covered Total %
statement 104 127 81.8
branch 33 60 55.0
condition n/a
subroutine n/a
pod n/a
total 137 187 73.2


line stmt bran cond sub pod time code
1             /* most win32 perls are beyond fixing, requiring dTHX */
2             /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3             /* and fail to define numerous symbols, but still overrwide them */
4             /* with non-working versions (e.g. setjmp). */
5             #ifdef _WIN32
6             /*# define PERL_CORE 1 fixes some, breaks others */
7             #else
8             # define PERL_NO_GET_CONTEXT
9             #endif
10              
11             #include "EXTERN.h"
12             #include "perl.h"
13             #include "XSUB.h"
14              
15             #define X_STACKSIZE 1024 * sizeof (void *)
16              
17             #include "CoroAPI.h"
18             #include "perlmulticore.h"
19             #include "schmorp.h"
20             #include "xthread.h"
21              
22             #ifdef _WIN32
23             #ifndef sigset_t
24             #define sigset_t int
25             #endif
26             #endif
27              
28             #ifndef SvREFCNT_dec_NN
29             #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30             #endif
31              
32             #ifndef SvREFCNT_dec_simple_void_NN
33             #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
34             #endif
35              
36             #ifndef SvREFCNT_inc_NN
37             #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
38             #endif
39              
40             #ifndef RECURSION_CHECK
41             #define RECURSION_CHECK 0
42             #endif
43              
44             static X_TLS_DECLARE(current_key);
45             #if RECURSION_CHECK
46             static X_TLS_DECLARE(check_key);
47             #endif
48              
49             static void
50 0           fatal (const char *msg)
51             {
52 0           write (2, msg, strlen (msg));
53 0           abort ();
54             }
55              
56             static s_epipe ep;
57             static void *perl_thx;
58             static sigset_t cursigset, fullsigset;
59              
60             static int global_enable = 0;
61             static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
62              
63             /* assigned to a thread for each release/acquire */
64             struct tctx
65             {
66             void *coro;
67             int wait_f;
68             xcond_t acquire_c;
69             int jeret;
70             };
71              
72             static struct tctx *tctx_free;
73              
74             static struct tctx *
75 2           tctx_get (void)
76             {
77             struct tctx *ctx;
78              
79 2 50         if (!tctx_free)
80             {
81 2           ctx = malloc (sizeof (*tctx_free));
82 2           X_COND_CREATE (ctx->acquire_c);
83             }
84             else
85             {
86 0           ctx = tctx_free;
87 0           tctx_free = tctx_free->coro;
88             }
89              
90 2           return ctx;
91             }
92              
93             static void
94 2           tctx_put (struct tctx *ctx)
95             {
96 2           ctx->coro = tctx_free;
97 2           tctx_free = ctx;
98 2           }
99              
100             /* a stack of tctxs */
101             struct tctxs
102             {
103             struct tctx **ctxs;
104             int cur, max;
105             };
106              
107             static struct tctx *
108 4           tctxs_get (struct tctxs *ctxs)
109             {
110 4           return ctxs->ctxs[--ctxs->cur];
111             }
112              
113             static void
114 4           tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
115             {
116 4 100         if (ctxs->cur >= ctxs->max)
117             {
118 2 50         ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
119 2           ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
120             }
121              
122 4           ctxs->ctxs[ctxs->cur++] = ctx;
123 4           }
124              
125             static xmutex_t release_m = X_MUTEX_INIT;
126             static xcond_t release_c = X_COND_INIT;
127             static struct tctxs releasers;
128             static int idle;
129             static int min_idle = 1;
130             static int curthreads, max_threads = 1; /* protected by release_m */
131              
132             static xmutex_t acquire_m = X_MUTEX_INIT;
133             static struct tctxs acquirers;
134              
135 2           X_THREAD_PROC(thread_proc)
136             {
137 2           PERL_SET_CONTEXT (perl_thx);
138              
139             {
140             dTHXa (perl_thx);
141             dJMPENV;
142             struct tctx *ctx;
143             int catchret;
144              
145 2           X_LOCK (release_m);
146              
147             for (;;)
148             {
149 4 100         while (!releasers.cur)
150             if (idle <= min_idle || 1)
151 2           X_COND_WAIT (release_c, release_m);
152             else
153             {
154             struct timespec ts = { time (0) + idle - min_idle, 0 };
155              
156             if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
157             if (idle > min_idle && !releasers.cur)
158             break;
159             }
160              
161 2           ctx = tctxs_get (&releasers);
162 2           --idle;
163 2           X_UNLOCK (release_m);
164              
165 2 50         if (!ctx) /* timed out? */
166 0           break;
167              
168 2           pthread_sigmask (SIG_SETMASK, &cursigset, 0);
169 2           JMPENV_PUSH (ctx->jeret);
170              
171 2 50         if (!ctx->jeret)
172 4 100         while (ctx->coro)
173 2           CORO_SCHEDULE;
174              
175 2           JMPENV_POP;
176 2           pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
177              
178 2           X_LOCK (acquire_m);
179 2           ctx->wait_f = 1;
180 2           X_COND_SIGNAL (ctx->acquire_c);
181 2           X_UNLOCK (acquire_m);
182              
183 2           X_LOCK (release_m);
184 2           ++idle;
185 2           }
186             }
187 0           }
188              
189             static void
190 2           start_thread (void)
191             {
192             xthread_t tid;
193              
194 2 100         if (!curthreads)
195             {
196 1           X_UNLOCK (release_m);
197             {
198             dTHX;
199 1           dSP;
200              
201 1 50         PUSHSTACKi (PERLSI_REQUIRE);
202              
203 1           eval_pv ("Coro::Multicore::init", 1);
204              
205 1 50         POPSTACK;
206             }
207 2           X_LOCK (release_m);
208             }
209              
210             if (curthreads >= max_threads && 0)
211             return;
212              
213 2           ++curthreads;
214 2           ++idle;
215 2           xthread_create (&tid, thread_proc, 0);
216             }
217              
218             static void
219 2           pmapi_release (void)
220             {
221 2 50         if (! ((thread_enable ? thread_enable : global_enable) & 1))
    50          
222             {
223 0           X_TLS_SET (current_key, 0);
224 0           return;
225             }
226              
227             #if RECURSION_CHECK
228             if (X_TLS_GET (check_key))
229             fatal ("FATAL: perlinterp_release () called without valid perl context");
230              
231             X_TLS_SET (check_key, &check_key);
232             #endif
233              
234 2           struct tctx *ctx = tctx_get ();
235 2           ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
236 2           ctx->wait_f = 0;
237              
238 2           X_TLS_SET (current_key, ctx);
239 2           pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
240              
241 2           X_LOCK (release_m);
242              
243 2 50         if (idle <= min_idle)
244 2           start_thread ();
245              
246 2           tctxs_put (&releasers, ctx);
247 2           X_COND_SIGNAL (release_c);
248              
249 2 50         while (!idle && releasers.cur)
    0          
250             {
251 0           X_UNLOCK (release_m);
252 0           X_LOCK (release_m);
253             }
254              
255 2           X_UNLOCK (release_m);
256             }
257              
258             static void
259 2           pmapi_acquire (void)
260             {
261             int jeret;
262 2           struct tctx *ctx = X_TLS_GET (current_key);
263              
264 2 50         if (!ctx)
265 0           return;
266              
267             #if RECURSION_CHECK
268             if (X_TLS_GET (check_key) != &check_key)
269             fatal ("FATAL: perlinterp_acquire () called with valid perl context");
270              
271             X_TLS_SET (check_key, 0);
272             #endif
273              
274 2           X_LOCK (acquire_m);
275              
276 2           tctxs_put (&acquirers, ctx);
277              
278 2           s_epipe_signal (&ep);
279 4 100         while (!ctx->wait_f)
280 2           X_COND_WAIT (ctx->acquire_c, acquire_m);
281 2           X_UNLOCK (acquire_m);
282              
283 2           jeret = ctx->jeret;
284 2           tctx_put (ctx);
285 2           pthread_sigmask (SIG_SETMASK, &cursigset, 0);
286              
287 2 50         if (jeret)
288             {
289             dTHX;
290 0 0         JMPENV_JUMP (jeret);
    0          
291             }
292             }
293              
294             static void
295 0           set_thread_enable (pTHX_ void *arg)
296             {
297 0           thread_enable = PTR2IV (arg);
298 0           }
299              
300             MODULE = Coro::Multicore PACKAGE = Coro::Multicore
301              
302             PROTOTYPES: DISABLE
303              
304             BOOT:
305             {
306             #ifndef _WIN32
307 2           sigfillset (&fullsigset);
308             #endif
309              
310 2 50         X_TLS_INIT (current_key);
311             #if RECURSION_CHECK
312             X_TLS_INIT (check_key);
313             #endif
314              
315 2 50         if (s_epipe_new (&ep))
316 0           croak ("Coro::Multicore: unable to initialise event pipe.\n");
317              
318 2           perl_thx = PERL_GET_CONTEXT;
319              
320 2 50         I_CORO_API ("Coro::Multicore");
    50          
    50          
    50          
321              
322             if (0) { /*D*/
323             X_LOCK (release_m);
324             while (idle < min_idle)
325             start_thread ();
326             X_UNLOCK (release_m);
327             }
328              
329             /* not perfectly efficient to do it this way, but it is simple */
330 2           perl_multicore_init (); /* calls release */
331 2           perl_multicore_api->pmapi_release = pmapi_release;
332 2           perl_multicore_api->pmapi_acquire = pmapi_acquire;
333             }
334              
335             bool
336             enable (bool enable = NO_INIT)
337             CODE:
338 2           RETVAL = global_enable;
339 2 50         if (items)
340 2           global_enable = enable;
341             OUTPUT:
342             RETVAL
343              
344             void
345             scoped_enable ()
346             CODE:
347 0           LEAVE; /* see Guard.xs */
348 0           CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
349 0           ENTER; /* see Guard.xs */
350              
351             void
352             scoped_disable ()
353             CODE:
354 0           LEAVE; /* see Guard.xs */
355 0           CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
356 0           ENTER; /* see Guard.xs */
357              
358             #if 0
359              
360             U32
361             min_idle_threads (U32 min = NO_INIT)
362             CODE:
363             X_LOCK (acquire_m);
364             RETVAL = min_idle;
365             if (items)
366             min_idle = min;
367             X_UNLOCK (acquire_m);
368             OUTPUT:
369             RETVAL
370              
371             #endif
372              
373             int
374             fd ()
375             CODE:
376 1           RETVAL = s_epipe_fd (&ep);
377             OUTPUT:
378             RETVAL
379              
380             void
381             poll (...)
382             CODE:
383 2           s_epipe_drain (&ep);
384 2           X_LOCK (acquire_m);
385 4 100         while (acquirers.cur)
386             {
387 2           struct tctx *ctx = tctxs_get (&acquirers);
388 2           CORO_READY ((SV *)ctx->coro);
389 2           SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
390 2           ctx->coro = 0;
391             }
392 2           X_UNLOCK (acquire_m);
393              
394             void
395             sleep (NV seconds)
396             CODE:
397 2           perlinterp_release ();
398             {
399 2           int nsec = seconds;
400 2 50         if (nsec) sleep (nsec);
401 2           nsec = (seconds - nsec) * 1e9;
402 2 50         if (nsec) usleep (nsec);
403             }
404 2           perlinterp_acquire ();
405