File Coverage

Multicore.xs
Criterion Covered Total %
statement 105 131 80.1
branch 33 60 55.0
condition n/a
subroutine n/a
pod n/a
total 138 191 72.2


line stmt bran cond sub pod time code
1             /* most win32 perls are beyond fixing, requiring dTHX */
2             /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3             /* and fail to define numerous symbols, but still overrwide them */
4             /* with non-working versions (e.g. setjmp). */
5             #ifdef _WIN32
6             /*# define PERL_CORE 1 fixes some, breaks others */
7             #else
8             # define PERL_NO_GET_CONTEXT
9             #endif
10              
11             #include "EXTERN.h"
12             #include "perl.h"
13             #include "XSUB.h"
14              
15             #define X_STACKSIZE 1024 * sizeof (void *)
16              
17             #include "CoroAPI.h"
18             #include "perlmulticore.h"
19             #include "schmorp.h"
20             #include "xthread.h"
21              
22             #ifdef _WIN32
23             #ifndef sigset_t
24             #define sigset_t int
25             #endif
26             #endif
27              
28             #ifndef SvREFCNT_dec_NN
29             #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30             #endif
31              
32             #ifndef SvREFCNT_dec_simple_void_NN
33             #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
34             #endif
35              
36             #ifndef SvREFCNT_inc_NN
37             #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
38             #endif
39              
40             #ifndef RECURSION_CHECK
41             #define RECURSION_CHECK 0
42             #endif
43              
44             static X_TLS_DECLARE(current_key);
45             #if RECURSION_CHECK
46             static X_TLS_DECLARE(check_key);
47             #endif
48              
49             static void
50 0           fatal (const char *msg)
51             {
52 0           write (2, msg, strlen (msg));
53 0           abort ();
54             }
55              
56             static s_epipe ep;
57             static void *perl_thx;
58             static sigset_t cursigset, fullsigset;
59              
60             static int global_enable = 0;
61             static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
62              
63             /* assigned to a thread for each release/acquire */
64             struct tctx
65             {
66             void *coro;
67             int wait_f;
68             xcond_t acquire_c;
69             int jeret;
70             };
71              
72             static struct tctx *tctx_free;
73              
74             static struct tctx *
75 2           tctx_get (void)
76             {
77             struct tctx *ctx;
78              
79 2 50         if (!tctx_free)
80             {
81 2           ctx = malloc (sizeof (*tctx_free));
82 2           X_COND_CREATE (ctx->acquire_c);
83             }
84             else
85             {
86 0           ctx = tctx_free;
87 0           tctx_free = tctx_free->coro;
88             }
89              
90 2           return ctx;
91             }
92              
93             static void
94 2           tctx_put (struct tctx *ctx)
95             {
96 2           ctx->coro = tctx_free;
97 2           tctx_free = ctx;
98 2           }
99              
100             /* a stack of tctxs */
101             struct tctxs
102             {
103             struct tctx **ctxs;
104             int cur, max;
105             };
106              
107             static struct tctx *
108 4           tctxs_get (struct tctxs *ctxs)
109             {
110 4           return ctxs->ctxs[--ctxs->cur];
111             }
112              
113             static void
114 4           tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
115             {
116 4 100         if (ctxs->cur >= ctxs->max)
117             {
118 2 50         ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
119 2           ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
120             }
121              
122 4           ctxs->ctxs[ctxs->cur++] = ctx;
123 4           }
124              
125             static xmutex_t release_m = X_MUTEX_INIT;
126             static xcond_t release_c = X_COND_INIT;
127             static struct tctxs releasers;
128             static int idle;
129             static int min_idle = 1;
130             static int curthreads, max_threads = 1; /* protected by release_m */
131              
132             static xmutex_t acquire_m = X_MUTEX_INIT;
133             static struct tctxs acquirers;
134              
135 2           X_THREAD_PROC(thread_proc)
136             {
137 2           PERL_SET_CONTEXT (perl_thx);
138              
139             {
140             dTHXa (perl_thx);
141             dJMPENV;
142             struct tctx *ctx;
143             int catchret;
144              
145 2           X_LOCK (release_m);
146              
147             for (;;)
148             {
149 4 100         while (!releasers.cur)
150             if (idle <= min_idle || 1)
151 2           X_COND_WAIT (release_c, release_m);
152             else
153             {
154             struct timespec ts = { time (0) + idle - min_idle, 0 };
155              
156             if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
157             if (idle > min_idle && !releasers.cur)
158             break;
159             }
160              
161 2           ctx = tctxs_get (&releasers);
162 2           --idle;
163 2           X_UNLOCK (release_m);
164              
165 2 50         if (!ctx) /* timed out? */
166 0           break;
167              
168 2           pthread_sigmask (SIG_SETMASK, &cursigset, 0);
169 2           JMPENV_PUSH (ctx->jeret);
170              
171 2 50         if (!ctx->jeret)
172 4 100         while (ctx->coro)
173 2           CORO_SCHEDULE;
174              
175 2           JMPENV_POP;
176 2           pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
177              
178 2           X_LOCK (acquire_m);
179 2           ctx->wait_f = 1;
180 2           X_COND_SIGNAL (ctx->acquire_c);
181 2           X_UNLOCK (acquire_m);
182              
183 2           X_LOCK (release_m);
184 2           ++idle;
185 2           }
186             }
187 0           }
188              
189             static void
190 2           start_thread (void)
191             {
192             xthread_t tid;
193              
194 2 100         if (!curthreads)
195             {
196 1           X_UNLOCK (release_m);
197             {
198             dTHX;
199 1           dSP;
200              
201 1 50         PUSHSTACKi (PERLSI_REQUIRE);
202              
203 1           eval_pv ("Coro::Multicore::init", 1);
204              
205 1 50         POPSTACK;
206             }
207 2           X_LOCK (release_m);
208             }
209              
210             if (curthreads >= max_threads && 0)
211             return;
212              
213 2           ++curthreads;
214 2           ++idle;
215 2           xthread_create (&tid, thread_proc, 0);
216             }
217              
218             static void
219 2           pmapi_release (void)
220             {
221 2 50         if (! ((thread_enable ? thread_enable : global_enable) & 1))
    50          
222             {
223 0           X_TLS_SET (current_key, 0);
224 0           return;
225             }
226              
227             #if RECURSION_CHECK
228             if (X_TLS_GET (check_key))
229             fatal ("FATAL: perlinterp_release () called without valid perl context");
230              
231             X_TLS_SET (check_key, &check_key);
232             #endif
233              
234 2           struct tctx *ctx = tctx_get ();
235 2           ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
236 2           ctx->wait_f = 0;
237              
238 2           X_TLS_SET (current_key, ctx);
239 2           pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
240              
241 2           X_LOCK (release_m);
242              
243 2 50         if (idle <= min_idle)
244 2           start_thread ();
245              
246 2           tctxs_put (&releasers, ctx);
247 2           X_COND_SIGNAL (release_c);
248              
249 2 50         while (!idle && releasers.cur)
    0          
250             {
251 0           X_UNLOCK (release_m);
252 0           X_LOCK (release_m);
253             }
254              
255 2           X_UNLOCK (release_m);
256             }
257              
258             static void
259 2           pmapi_acquire (void)
260             {
261             int jeret;
262 2           struct tctx *ctx = X_TLS_GET (current_key);
263              
264 2 50         if (!ctx)
265 0           return;
266              
267             #if RECURSION_CHECK
268             if (X_TLS_GET (check_key) != &check_key)
269             fatal ("FATAL: perlinterp_acquire () called with valid perl context");
270              
271             X_TLS_SET (check_key, 0);
272             #endif
273              
274 2           X_LOCK (acquire_m);
275              
276 2           tctxs_put (&acquirers, ctx);
277              
278 2           s_epipe_signal (&ep);
279 4 100         while (!ctx->wait_f)
280 2           X_COND_WAIT (ctx->acquire_c, acquire_m);
281 2           X_UNLOCK (acquire_m);
282              
283 2           jeret = ctx->jeret;
284 2           tctx_put (ctx);
285 2           pthread_sigmask (SIG_SETMASK, &cursigset, 0);
286              
287 2 50         if (jeret)
288             {
289             dTHX;
290 0 0         JMPENV_JUMP (jeret);
    0          
291             }
292             }
293              
294             static void
295 0           set_thread_enable (pTHX_ void *arg)
296             {
297 0           thread_enable = PTR2IV (arg);
298 0           }
299              
300             static void
301 0           atfork_child (void)
302             {
303 0           s_epipe_renew (&ep);
304 0           }
305              
306             MODULE = Coro::Multicore PACKAGE = Coro::Multicore
307              
308             PROTOTYPES: DISABLE
309              
310             BOOT:
311             {
312             #ifndef _WIN32
313 2           sigfillset (&fullsigset);
314             #endif
315              
316 2 50         X_TLS_INIT (current_key);
317             #if RECURSION_CHECK
318             X_TLS_INIT (check_key);
319             #endif
320              
321 2 50         if (s_epipe_new (&ep))
322 0           croak ("Coro::Multicore: unable to initialise event pipe.\n");
323              
324 2           pthread_atfork (0, 0, atfork_child);
325              
326 2           perl_thx = PERL_GET_CONTEXT;
327              
328 2 50         I_CORO_API ("Coro::Multicore");
    50          
    50          
    50          
329              
330             if (0) { /*D*/
331             X_LOCK (release_m);
332             while (idle < min_idle)
333             start_thread ();
334             X_UNLOCK (release_m);
335             }
336              
337             /* not perfectly efficient to do it this way, but it is simple */
338 2           perl_multicore_init (); /* calls release */
339 2           perl_multicore_api->pmapi_release = pmapi_release;
340 2           perl_multicore_api->pmapi_acquire = pmapi_acquire;
341             }
342              
343             bool
344             enable (bool enable = NO_INIT)
345             CODE:
346 2           RETVAL = global_enable;
347 2 50         if (items)
348 2           global_enable = enable;
349             OUTPUT:
350             RETVAL
351              
352             void
353             scoped_enable ()
354             CODE:
355 0           LEAVE; /* see Guard.xs */
356 0           CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
357 0           ENTER; /* see Guard.xs */
358              
359             void
360             scoped_disable ()
361             CODE:
362 0           LEAVE; /* see Guard.xs */
363 0           CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
364 0           ENTER; /* see Guard.xs */
365              
366             #if 0
367              
368             U32
369             min_idle_threads (U32 min = NO_INIT)
370             CODE:
371             X_LOCK (acquire_m);
372             RETVAL = min_idle;
373             if (items)
374             min_idle = min;
375             X_UNLOCK (acquire_m);
376             OUTPUT:
377             RETVAL
378              
379             #endif
380              
381             int
382             fd ()
383             CODE:
384 1           RETVAL = s_epipe_fd (&ep);
385             OUTPUT:
386             RETVAL
387              
388             void
389             poll (...)
390             CODE:
391 2           s_epipe_drain (&ep);
392 2           X_LOCK (acquire_m);
393 4 100         while (acquirers.cur)
394             {
395 2           struct tctx *ctx = tctxs_get (&acquirers);
396 2           CORO_READY ((SV *)ctx->coro);
397 2           SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
398 2           ctx->coro = 0;
399             }
400 2           X_UNLOCK (acquire_m);
401              
402             void
403             sleep (NV seconds)
404             CODE:
405 2           perlinterp_release ();
406             {
407 2           int nsec = seconds;
408 2 50         if (nsec) sleep (nsec);
409 2           nsec = (seconds - nsec) * 1e9;
410 2 50         if (nsec) usleep (nsec);
411             }
412 2           perlinterp_acquire ();
413