File Coverage

libeio/etp.c
Criterion Covered Total %
statement 179 240 74.5
branch 49 72 68.0
condition n/a
subroutine n/a
pod n/a
total 228 312 73.0


line stmt bran cond sub pod time code
1             /*
2             * libetp implementation
3             *
4             * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann
5             * All rights reserved.
6             *
7             * Redistribution and use in source and binary forms, with or without modifica-
8             * tion, are permitted provided that the following conditions are met:
9             *
10             * 1. Redistributions of source code must retain the above copyright notice,
11             * this list of conditions and the following disclaimer.
12             *
13             * 2. Redistributions in binary form must reproduce the above copyright
14             * notice, this list of conditions and the following disclaimer in the
15             * documentation and/or other materials provided with the distribution.
16             *
17             * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18             * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19             * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20             * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21             * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22             * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23             * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24             * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25             * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26             * OF THE POSSIBILITY OF SUCH DAMAGE.
27             *
28             * Alternatively, the contents of this file may be used under the terms of
29             * the GNU General Public License ("GPL") version 2 or any later version,
30             * in which case the provisions of the GPL are applicable instead of
31             * the above. If you wish to allow the use of your version of this file
32             * only under the terms of the GPL and not to allow others to use your
33             * version of this file under the BSD license, indicate your decision
34             * by deleting the provisions above and replace them with the notice
35             * and other provisions required by the GPL. If you do not delete the
36             * provisions above, a recipient may use your version of this file under
37             * either the BSD or the GPL.
38             */
39              
40             #if HAVE_SYS_PRCTL_H
41             # include
42             #endif
43              
44             #ifdef EIO_STACKSIZE
45             # define X_STACKSIZE EIO_STACKSIZE
46             #endif
47             #include "xthread.h"
48              
49             #ifndef ETP_API_DECL
50             # define ETP_API_DECL static
51             #endif
52              
53             #ifndef ETP_PRI_MIN
54             # define ETP_PRI_MIN 0
55             # define ETP_PRI_MAX 0
56             #endif
57              
58             #ifndef ETP_TYPE_QUIT
59             # define ETP_TYPE_QUIT 0
60             #endif
61              
62             #ifndef ETP_TYPE_GROUP
63             # define ETP_TYPE_GROUP 1
64             #endif
65              
66             #ifndef ETP_WANT_POLL
67             # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
68             #endif
69             #ifndef ETP_DONE_POLL
70             # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
71             #endif
72              
73             #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
74              
75             #define ETP_TICKS ((1000000 + 1023) >> 10)
76              
77             enum {
78             ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
79             ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
80             };
81              
82             /* calculate time difference in ~1/ETP_TICKS of a second */
83             ecb_inline int
84 0           etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
85             {
86 0           return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
87 0           + ((tv2->tv_usec - tv1->tv_usec) >> 10);
88             }
89              
90             struct etp_tmpbuf
91             {
92             void *ptr;
93             int len;
94             };
95              
96             static void *
97 1           etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
98             {
99 1 50         if (buf->len < len)
100             {
101 1           free (buf->ptr);
102 1           buf->ptr = malloc (buf->len = len);
103             }
104              
105 1           return buf->ptr;
106             }
107              
108             /*
109             * a somewhat faster data structure might be nice, but
110             * with 8 priorities this actually needs <20 insns
111             * per shift, the most expensive operation.
112             */
113             typedef struct
114             {
115             ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
116             int size;
117             } etp_reqq;
118              
119             typedef struct etp_pool *etp_pool;
120              
121             typedef struct etp_worker
122             {
123             etp_pool pool;
124              
125             struct etp_tmpbuf tmpbuf;
126              
127             /* locked by pool->wrklock */
128             struct etp_worker *prev, *next;
129              
130             xthread_t tid;
131              
132             #ifdef ETP_WORKER_COMMON
133             ETP_WORKER_COMMON
134             #endif
135             } etp_worker;
136              
137             struct etp_pool
138             {
139             void *userdata;
140              
141             etp_reqq req_queue;
142             etp_reqq res_queue;
143              
144             unsigned int started, idle, wanted;
145              
146             unsigned int max_poll_time; /* pool->reslock */
147             unsigned int max_poll_reqs; /* pool->reslock */
148              
149             unsigned int nreqs; /* pool->reqlock */
150             unsigned int nready; /* pool->reqlock */
151             unsigned int npending; /* pool->reqlock */
152             unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
153             unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
154              
155             void (*want_poll_cb) (void *userdata);
156             void (*done_poll_cb) (void *userdata);
157            
158             xmutex_t wrklock;
159             xmutex_t reslock;
160             xmutex_t reqlock;
161             xcond_t reqwait;
162              
163             etp_worker wrk_first;
164             };
165              
166             #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
167             #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
168              
169             /* worker threads management */
170              
171             static void
172 72           etp_worker_clear (etp_worker *wrk)
173             {
174 72           }
175              
176             static void ecb_cold
177 0           etp_worker_free (etp_worker *wrk)
178             {
179 0           free (wrk->tmpbuf.ptr);
180              
181 0           wrk->next->prev = wrk->prev;
182 0           wrk->prev->next = wrk->next;
183              
184 0           free (wrk);
185 0           }
186              
187             ETP_API_DECL unsigned int
188 604           etp_nreqs (etp_pool pool)
189             {
190             int retval;
191             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
192 302           retval = pool->nreqs;
193             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
194 302           return retval;
195             }
196              
197             ETP_API_DECL unsigned int
198 0           etp_nready (etp_pool pool)
199             {
200             unsigned int retval;
201              
202             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
203 0           retval = pool->nready;
204             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
205              
206 0           return retval;
207             }
208              
209             ETP_API_DECL unsigned int
210 378           etp_npending (etp_pool pool)
211             {
212             unsigned int retval;
213              
214             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
215 189           retval = pool->npending;
216             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
217              
218 189           return retval;
219             }
220              
221             ETP_API_DECL unsigned int
222 760           etp_nthreads (etp_pool pool)
223             {
224             unsigned int retval;
225              
226             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
227 380           retval = pool->started;
228             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
229              
230 380           return retval;
231             }
232              
233             static void ecb_noinline ecb_cold
234 20           reqq_init (etp_reqq *q)
235             {
236             int pri;
237              
238 200 100         for (pri = 0; pri < ETP_NUM_PRI; ++pri)
239 180           q->qs[pri] = q->qe[pri] = 0;
240              
241 20           q->size = 0;
242 20           }
243              
244             static int ecb_noinline
245 153           reqq_push (etp_reqq *q, ETP_REQ *req)
246             {
247 153           int pri = req->pri;
248 153           req->next = 0;
249              
250 153 100         if (q->qe[pri])
251             {
252 35           q->qe[pri]->next = req;
253 35           q->qe[pri] = req;
254             }
255             else
256 118           q->qe[pri] = q->qs[pri] = req;
257              
258 153           return q->size++;
259             }
260              
261             static ETP_REQ * ecb_noinline
262 242           reqq_shift (etp_reqq *q)
263             {
264             int pri;
265              
266 242 100         if (!q->size)
267 108           return 0;
268              
269 134           --q->size;
270              
271 530 50         for (pri = ETP_NUM_PRI; pri--; )
272             {
273 530           ETP_REQ *req = q->qs[pri];
274              
275 530 100         if (req)
276             {
277 134 100         if (!(q->qs[pri] = (ETP_REQ *)req->next))
278 108           q->qe[pri] = 0;
279              
280 134           return req;
281             }
282             }
283              
284 0           abort ();
285             }
286              
287             ETP_API_DECL int ecb_cold
288 10           etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
289             {
290 10           X_MUTEX_CREATE (pool->wrklock);
291 10           X_MUTEX_CREATE (pool->reslock);
292 10           X_MUTEX_CREATE (pool->reqlock);
293 10           X_COND_CREATE (pool->reqwait);
294              
295 10           reqq_init (&pool->req_queue);
296 10           reqq_init (&pool->res_queue);
297              
298 10           pool->wrk_first.next =
299 10           pool->wrk_first.prev = &pool->wrk_first;
300              
301 10           pool->started = 0;
302 10           pool->idle = 0;
303 10           pool->nreqs = 0;
304 10           pool->nready = 0;
305 10           pool->npending = 0;
306 10           pool->wanted = 4;
307              
308 10           pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
309 10           pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
310              
311 10           pool->userdata = userdata;
312 10           pool->want_poll_cb = want_poll;
313 10           pool->done_poll_cb = done_poll;
314              
315 10           return 0;
316             }
317              
318             static void ecb_noinline ecb_cold
319 21           etp_proc_init (void)
320             {
321             #if HAVE_PRCTL_SET_NAME
322             /* provide a more sensible "thread name" */
323             char name[16 + 1];
324 21           const int namelen = sizeof (name) - 1;
325             int len;
326              
327 21           prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
328 21           name [namelen] = 0;
329 21           len = strlen (name);
330 21           strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
331 21           prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
332             #endif
333 21           }
334              
335 21           X_THREAD_PROC (etp_proc)
336             {
337             ETP_REQ *req;
338             struct timespec ts;
339 21           etp_worker *self = (etp_worker *)thr_arg;
340 21           etp_pool pool = self->pool;
341              
342 21           etp_proc_init ();
343              
344             /* try to distribute timeouts somewhat evenly */
345 21           ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
346              
347             for (;;)
348             {
349 93           ts.tv_sec = 0;
350              
351 93           X_LOCK (pool->reqlock);
352              
353             for (;;)
354             {
355 143           req = reqq_shift (&pool->req_queue);
356              
357 143 100         if (ecb_expect_true (req))
358 73           break;
359              
360 70 50         if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
361             {
362 0           X_UNLOCK (pool->reqlock);
363 0           X_LOCK (pool->wrklock);
364 0           --pool->started;
365 0           X_UNLOCK (pool->wrklock);
366 0           goto quit;
367             }
368              
369 70           ++pool->idle;
370              
371 70 50         if (pool->idle <= pool->max_idle)
372             /* we are allowed to pool->idle, so do so without any timeout */
373 70           X_COND_WAIT (pool->reqwait, pool->reqlock);
374             else
375             {
376             /* initialise timeout once */
377 0 0         if (!ts.tv_sec)
378 0           ts.tv_sec = time (0) + pool->idle_timeout;
379              
380 0 0         if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
381 0           ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
382             }
383              
384 50           --pool->idle;
385 50           }
386              
387 73           --pool->nready;
388              
389 73           X_UNLOCK (pool->reqlock);
390            
391 73 50         if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
392 0           goto quit;
393              
394 73           ETP_EXECUTE (self, req);
395              
396 72           X_LOCK (pool->reslock);
397              
398 72           ++pool->npending;
399              
400 72 100         if (!reqq_push (&pool->res_queue, req))
401 45           ETP_WANT_POLL (pool);
402              
403 72           etp_worker_clear (self);
404              
405 72           X_UNLOCK (pool->reslock);
406 72           }
407              
408             quit:
409 0           free (req);
410              
411 0           X_LOCK (pool->wrklock);
412 0           etp_worker_free (self);
413 0           X_UNLOCK (pool->wrklock);
414              
415 0           return 0;
416             }
417              
418             static void ecb_cold
419 21           etp_start_thread (etp_pool pool)
420             {
421 21           etp_worker *wrk = calloc (1, sizeof (etp_worker));
422              
423             /*TODO*/
424             assert (("unable to allocate worker thread data", wrk));
425              
426 21           wrk->pool = pool;
427              
428 21           X_LOCK (pool->wrklock);
429              
430 21 50         if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
431             {
432 21           wrk->prev = &pool->wrk_first;
433 21           wrk->next = pool->wrk_first.next;
434 21           pool->wrk_first.next->prev = wrk;
435 21           pool->wrk_first.next = wrk;
436 21           ++pool->started;
437             }
438             else
439 0           free (wrk);
440              
441 21           X_UNLOCK (pool->wrklock);
442 21           }
443              
444             static void
445 191           etp_maybe_start_thread (etp_pool pool)
446             {
447 191 100         if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
448 2           return;
449            
450             /* todo: maybe use pool->idle here, but might be less exact */
451 189 100         if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
452 168           return;
453              
454 21           etp_start_thread (pool);
455             }
456              
457             static void ecb_cold
458 0           etp_end_thread (etp_pool pool)
459             {
460 0           ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
461              
462 0           req->type = ETP_TYPE_QUIT;
463 0           req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
464              
465 0           X_LOCK (pool->reqlock);
466 0           reqq_push (&pool->req_queue, req);
467 0           X_COND_SIGNAL (pool->reqwait);
468 0           X_UNLOCK (pool->reqlock);
469              
470 0           X_LOCK (pool->wrklock);
471 0           --pool->started;
472 0           X_UNLOCK (pool->wrklock);
473 0           }
474              
475             ETP_API_DECL int
476 38           etp_poll (etp_pool pool)
477             {
478             unsigned int maxreqs;
479             unsigned int maxtime;
480             struct timeval tv_start, tv_now;
481              
482 38           X_LOCK (pool->reslock);
483 38           maxreqs = pool->max_poll_reqs;
484 38           maxtime = pool->max_poll_time;
485 38           X_UNLOCK (pool->reslock);
486              
487 38 50         if (maxtime)
488 0           gettimeofday (&tv_start, 0);
489              
490             for (;;)
491             {
492             ETP_REQ *req;
493              
494 99           etp_maybe_start_thread (pool);
495              
496 99           X_LOCK (pool->reslock);
497 99           req = reqq_shift (&pool->res_queue);
498              
499 99 100         if (ecb_expect_true (req))
500             {
501 61           --pool->npending;
502              
503 61 100         if (!pool->res_queue.size)
504 42           ETP_DONE_POLL (pool);
505             }
506              
507 99           X_UNLOCK (pool->reslock);
508              
509 99 100         if (ecb_expect_false (!req))
510 38           return 0;
511              
512 61           X_LOCK (pool->reqlock);
513 61           --pool->nreqs;
514 61           X_UNLOCK (pool->reqlock);
515              
516 61 100         if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
    100          
517             {
518 5           req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
519 5           continue;
520             }
521             else
522             {
523 56           int res = ETP_FINISH (req);
524 56 50         if (ecb_expect_false (res))
525 0           return res;
526             }
527              
528 56 50         if (ecb_expect_false (maxreqs && !--maxreqs))
    0          
529 0           break;
530              
531 56 50         if (maxtime)
532             {
533 0           gettimeofday (&tv_now, 0);
534              
535 0 0         if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
536 0           break;
537             }
538 61           }
539              
540 0           errno = EAGAIN;
541 38           return -1;
542             }
543              
544             ETP_API_DECL void
545             etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
546              
547             ETP_API_DECL void
548 3           etp_cancel (etp_pool pool, ETP_REQ *req)
549             {
550 3           req->cancelled = 1;
551              
552 3           etp_grp_cancel (pool, req);
553 3           }
554              
555             ETP_API_DECL void
556 3           etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
557             {
558 4 100         for (grp = grp->grp_first; grp; grp = grp->grp_next)
559 1           etp_cancel (pool, grp);
560 3           }
561              
562             ETP_API_DECL void
563 81           etp_submit (etp_pool pool, ETP_REQ *req)
564             {
565 81           req->pri -= ETP_PRI_MIN;
566              
567 81 50         if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
568 81 50         if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
569              
570 81 100         if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
571             {
572             /* I hope this is worth it :/ */
573 6           X_LOCK (pool->reqlock);
574 6           ++pool->nreqs;
575 6           X_UNLOCK (pool->reqlock);
576              
577 6           X_LOCK (pool->reslock);
578              
579 6           ++pool->npending;
580              
581 6 50         if (!reqq_push (&pool->res_queue, req))
582 6           ETP_WANT_POLL (pool);
583              
584 6           X_UNLOCK (pool->reslock);
585             }
586             else
587             {
588 75           X_LOCK (pool->reqlock);
589 75           ++pool->nreqs;
590 75           ++pool->nready;
591 75           reqq_push (&pool->req_queue, req);
592 75           X_COND_SIGNAL (pool->reqwait);
593 75           X_UNLOCK (pool->reqlock);
594              
595 75           etp_maybe_start_thread (pool);
596             }
597 81           }
598              
599             ETP_API_DECL void ecb_cold
600 0           etp_set_max_poll_time (etp_pool pool, double seconds)
601             {
602             if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
603 0           pool->max_poll_time = seconds * ETP_TICKS;
604             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
605 0           }
606              
607             ETP_API_DECL void ecb_cold
608 0           etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
609             {
610             if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
611 0           pool->max_poll_reqs = maxreqs;
612             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
613 0           }
614              
615             ETP_API_DECL void ecb_cold
616 0           etp_set_max_idle (etp_pool pool, unsigned int threads)
617             {
618             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
619 0           pool->max_idle = threads;
620             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
621 0           }
622              
623             ETP_API_DECL void ecb_cold
624 0           etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
625             {
626             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
627 0           pool->idle_timeout = seconds;
628             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
629 0           }
630              
631             ETP_API_DECL void ecb_cold
632 16           etp_set_min_parallel (etp_pool pool, unsigned int threads)
633             {
634 16 100         if (pool->wanted < threads)
635 10           pool->wanted = threads;
636 16           }
637              
638             ETP_API_DECL void ecb_cold
639 1           etp_set_max_parallel (etp_pool pool, unsigned int threads)
640             {
641 1 50         if (pool->wanted > threads)
642 1           pool->wanted = threads;
643              
644 1 50         while (pool->started > pool->wanted)
645 0           etp_end_thread (pool);
646 1           }
647