File Coverage

Multicore.xs
Criterion Covered Total %
statement 101 124 81.4
branch 31 56 55.3
condition n/a
subroutine n/a
pod n/a
total 132 180 73.3


line stmt bran cond sub pod time code
1             /* most win32 perls are beyond fixing, requiring dTHX */
2             /* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
3             /* and fail to define numerous symbols, but still overrwide them */
4             /* with non-working versions (e.g. setjmp). */
5             #ifdef _WIN32
6             /*# define PERL_CORE 1 fixes some, breaks others */
7             #else
8             # define PERL_NO_GET_CONTEXT
9             #endif
10              
11             #include "EXTERN.h"
12             #include "perl.h"
13             #include "XSUB.h"
14              
15             #define X_STACKSIZE 1024 * sizeof (void *)
16              
17             #include "CoroAPI.h"
18             #include "perlmulticore.h"
19             #include "schmorp.h"
20             #include "xthread.h"
21              
22             #ifdef _WIN32
23             #ifndef sigset_t
24             #define sigset_t int
25             #endif
26             #endif
27              
28             #ifndef SvREFCNT_dec_NN
29             #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
30             #endif
31              
32             #ifndef SvREFCNT_dec_simple_void_NN
33             #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
34             #endif
35              
36             #ifndef SvREFCNT_inc_NN
37             #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
38             #endif
39              
40             #ifndef RECURSION_CHECK
41             #define RECURSION_CHECK 0
42             #endif
43              
44             static X_TLS_DECLARE(current_key);
45             #if RECURSION_CHECK
46             static X_TLS_DECLARE(check_key);
47             #endif
48              
49             static void
50 0           fatal (const char *msg)
51             {
52 0           write (2, msg, strlen (msg));
53 0           abort ();
54             }
55              
56             static s_epipe ep;
57             static void *perl_thx;
58             static sigset_t cursigset, fullsigset;
59              
60             static int global_enable = 0;
61             static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
62              
63             /* assigned to a thread for each release/acquire */
64             struct tctx
65             {
66             void *coro;
67             int wait_f;
68             xcond_t acquire_c;
69             int jeret;
70             };
71              
72             static struct tctx *tctx_free;
73              
74             static struct tctx *
75 2           tctx_get (void)
76             {
77             struct tctx *ctx;
78              
79 2 50         if (!tctx_free)
80             {
81 2           ctx = malloc (sizeof (*tctx_free));
82 2           X_COND_CREATE (ctx->acquire_c);
83             }
84             else
85             {
86 0           ctx = tctx_free;
87 0           tctx_free = tctx_free->coro;
88             }
89              
90 2           return ctx;
91             }
92              
93             static void
94 2           tctx_put (struct tctx *ctx)
95             {
96 2           ctx->coro = tctx_free;
97 2           tctx_free = ctx;
98 2           }
99              
100             /* a stack of tctxs */
101             struct tctxs
102             {
103             struct tctx **ctxs;
104             int cur, max;
105             };
106              
107             static struct tctx *
108 4           tctxs_get (struct tctxs *ctxs)
109             {
110 4           return ctxs->ctxs[--ctxs->cur];
111             }
112              
113             static void
114 4           tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
115             {
116 4 100         if (ctxs->cur >= ctxs->max)
117             {
118 2 50         ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
119 2           ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
120             }
121              
122 4           ctxs->ctxs[ctxs->cur++] = ctx;
123 4           }
124              
125             static xmutex_t release_m = X_MUTEX_INIT;
126             static xcond_t release_c = X_COND_INIT;
127             static struct tctxs releasers;
128             static int idle;
129             static int min_idle = 1;
130             static int curthreads, max_threads = 1; /* protected by release_m */
131              
132             static xmutex_t acquire_m = X_MUTEX_INIT;
133             static struct tctxs acquirers;
134              
135 2           X_THREAD_PROC(thread_proc)
136             {
137 2           PERL_SET_CONTEXT (perl_thx);
138              
139             {
140             dTHXa (perl_thx);
141             dJMPENV;
142             struct tctx *ctx;
143             int catchret;
144              
145 2           X_LOCK (release_m);
146              
147             for (;;)
148             {
149 4 100         while (!releasers.cur)
150             if (idle <= min_idle || 1)
151 2           X_COND_WAIT (release_c, release_m);
152             else
153             {
154             struct timespec ts = { time (0) + idle - min_idle, 0 };
155              
156             if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
157             if (idle > min_idle && !releasers.cur)
158             break;
159             }
160              
161 2           ctx = tctxs_get (&releasers);
162 2           --idle;
163 2           X_UNLOCK (release_m);
164              
165 2 50         if (!ctx) /* timed out? */
166 0           break;
167              
168 2           pthread_sigmask (SIG_SETMASK, &cursigset, 0);
169 2           JMPENV_PUSH (ctx->jeret);
170              
171 2 50         if (!ctx->jeret)
172 4 100         while (ctx->coro)
173 2           CORO_SCHEDULE;
174              
175 2           JMPENV_POP;
176 2           pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
177              
178 2           X_LOCK (acquire_m);
179 2           ctx->wait_f = 1;
180 2           X_COND_SIGNAL (ctx->acquire_c);
181 2           X_UNLOCK (acquire_m);
182              
183 2           X_LOCK (release_m);
184 2           ++idle;
185 2           }
186             }
187 0           }
188              
189             static void
190 2           start_thread (void)
191             {
192             xthread_t tid;
193              
194 2 100         if (!curthreads)
195             {
196 1           X_UNLOCK (release_m);
197             {
198             dTHX;
199 1           eval_pv ("Coro::Multicore::init", 1);
200             }
201 2           X_LOCK (release_m);
202             }
203              
204             if (curthreads >= max_threads && 0)
205             return;
206              
207 2           ++curthreads;
208 2           ++idle;
209 2           xthread_create (&tid, thread_proc, 0);
210             }
211              
212             static void
213 2           pmapi_release (void)
214             {
215 2 50         if (! ((thread_enable ? thread_enable : global_enable) & 1))
    50          
216             {
217 0           X_TLS_SET (current_key, 0);
218 0           return;
219             }
220              
221             #if RECURSION_CHECK
222             if (X_TLS_GET (check_key))
223             fatal ("FATAL: perlinterp_release () called without valid perl context");
224              
225             X_TLS_SET (check_key, &check_key);
226             #endif
227              
228 2           struct tctx *ctx = tctx_get ();
229 2           ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
230 2           ctx->wait_f = 0;
231              
232 2           X_TLS_SET (current_key, ctx);
233 2           pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
234              
235 2           X_LOCK (release_m);
236              
237 2 50         if (idle <= min_idle)
238 2           start_thread ();
239              
240 2           tctxs_put (&releasers, ctx);
241 2           X_COND_SIGNAL (release_c);
242              
243 2 50         while (!idle && releasers.cur)
    0          
244             {
245 0           X_UNLOCK (release_m);
246 0           X_LOCK (release_m);
247             }
248              
249 2           X_UNLOCK (release_m);
250             }
251              
252             static void
253 2           pmapi_acquire (void)
254             {
255             int jeret;
256 2           struct tctx *ctx = X_TLS_GET (current_key);
257              
258 2 50         if (!ctx)
259 0           return;
260              
261             #if RECURSION_CHECK
262             if (X_TLS_GET (check_key) != &check_key)
263             fatal ("FATAL: perlinterp_acquire () called with valid perl context");
264              
265             X_TLS_SET (check_key, 0);
266             #endif
267              
268 2           X_LOCK (acquire_m);
269              
270 2           tctxs_put (&acquirers, ctx);
271              
272 2           s_epipe_signal (&ep);
273 4 100         while (!ctx->wait_f)
274 2           X_COND_WAIT (ctx->acquire_c, acquire_m);
275 2           X_UNLOCK (acquire_m);
276              
277 2           jeret = ctx->jeret;
278 2           tctx_put (ctx);
279 2           pthread_sigmask (SIG_SETMASK, &cursigset, 0);
280              
281 2 50         if (jeret)
282             {
283             dTHX;
284 0 0         JMPENV_JUMP (jeret);
    0          
285             }
286             }
287              
288             static void
289 0           set_thread_enable (pTHX_ void *arg)
290             {
291 0           thread_enable = PTR2IV (arg);
292 0           }
293              
294             MODULE = Coro::Multicore PACKAGE = Coro::Multicore
295              
296             PROTOTYPES: DISABLE
297              
298             BOOT:
299             {
300             #ifndef _WIN32
301 2           sigfillset (&fullsigset);
302             #endif
303              
304 2 50         X_TLS_INIT (current_key);
305             #if RECURSION_CHECK
306             X_TLS_INIT (check_key);
307             #endif
308              
309 2 50         if (s_epipe_new (&ep))
310 0           croak ("Coro::Multicore: unable to initialise event pipe.\n");
311              
312 2           perl_thx = PERL_GET_CONTEXT;
313              
314 2 50         I_CORO_API ("Coro::Multicore");
    50          
    50          
    50          
315              
316             if (0) { /*D*/
317             X_LOCK (release_m);
318             while (idle < min_idle)
319             start_thread ();
320             X_UNLOCK (release_m);
321             }
322              
323             /* not perfectly efficient to do it this way, but it is simple */
324 2           perl_multicore_init (); /* calls release */
325 2           perl_multicore_api->pmapi_release = pmapi_release;
326 2           perl_multicore_api->pmapi_acquire = pmapi_acquire;
327             }
328              
329             bool
330             enable (bool enable = NO_INIT)
331             CODE:
332 2           RETVAL = global_enable;
333 2 50         if (items)
334 2           global_enable = enable;
335             OUTPUT:
336             RETVAL
337              
338             void
339             scoped_enable ()
340             CODE:
341 0           LEAVE; /* see Guard.xs */
342 0           CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
343 0           ENTER; /* see Guard.xs */
344              
345             void
346             scoped_disable ()
347             CODE:
348 0           LEAVE; /* see Guard.xs */
349 0           CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
350 0           ENTER; /* see Guard.xs */
351              
352             #if 0
353              
354             U32
355             min_idle_threads (U32 min = NO_INIT)
356             CODE:
357             X_LOCK (acquire_m);
358             RETVAL = min_idle;
359             if (items)
360             min_idle = min;
361             X_UNLOCK (acquire_m);
362             OUTPUT:
363             RETVAL
364              
365             #endif
366              
367             int
368             fd ()
369             CODE:
370 1           RETVAL = s_epipe_fd (&ep);
371             OUTPUT:
372             RETVAL
373              
374             void
375             poll (...)
376             CODE:
377 2           s_epipe_drain (&ep);
378 2           X_LOCK (acquire_m);
379 4 100         while (acquirers.cur)
380             {
381 2           struct tctx *ctx = tctxs_get (&acquirers);
382 2           CORO_READY ((SV *)ctx->coro);
383 2           SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
384 2           ctx->coro = 0;
385             }
386 2           X_UNLOCK (acquire_m);
387              
388             void
389             sleep (NV seconds)
390             CODE:
391 2           perlinterp_release ();
392             {
393 2           int nsec = seconds;
394 2 50         if (nsec) sleep (nsec);
395 2           nsec = (seconds - nsec) * 1e9;
396 2 50         if (nsec) usleep (nsec);
397             }
398 2           perlinterp_acquire ();
399