File Coverage

libeio/etp.c
Criterion Covered Total %
statement 179 240 74.5
branch 49 72 68.0
condition n/a
subroutine n/a
pod n/a
total 228 312 73.0


line stmt bran cond sub pod time code
1             /*
2             * libetp implementation
3             *
4             * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann
5             * All rights reserved.
6             *
7             * Redistribution and use in source and binary forms, with or without modifica-
8             * tion, are permitted provided that the following conditions are met:
9             *
10             * 1. Redistributions of source code must retain the above copyright notice,
11             * this list of conditions and the following disclaimer.
12             *
13             * 2. Redistributions in binary form must reproduce the above copyright
14             * notice, this list of conditions and the following disclaimer in the
15             * documentation and/or other materials provided with the distribution.
16             *
17             * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18             * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19             * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20             * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21             * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22             * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23             * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24             * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25             * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26             * OF THE POSSIBILITY OF SUCH DAMAGE.
27             *
28             * Alternatively, the contents of this file may be used under the terms of
29             * the GNU General Public License ("GPL") version 2 or any later version,
30             * in which case the provisions of the GPL are applicable instead of
31             * the above. If you wish to allow the use of your version of this file
32             * only under the terms of the GPL and not to allow others to use your
33             * version of this file under the BSD license, indicate your decision
34             * by deleting the provisions above and replace them with the notice
35             * and other provisions required by the GPL. If you do not delete the
36             * provisions above, a recipient may use your version of this file under
37             * either the BSD or the GPL.
38             */
39              
40             #if HAVE_SYS_PRCTL_H
41             # include
42             #endif
43              
44             #ifdef EIO_STACKSIZE
45             # define X_STACKSIZE EIO_STACKSIZE
46             #endif
47             #include "xthread.h"
48              
49             #ifndef ETP_API_DECL
50             # define ETP_API_DECL static
51             #endif
52              
53             #ifndef ETP_PRI_MIN
54             # define ETP_PRI_MIN 0
55             # define ETP_PRI_MAX 0
56             #endif
57              
58             #ifndef ETP_TYPE_QUIT
59             # define ETP_TYPE_QUIT 0
60             #endif
61              
62             #ifndef ETP_TYPE_GROUP
63             # define ETP_TYPE_GROUP 1
64             #endif
65              
66             #ifndef ETP_WANT_POLL
67             # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
68             #endif
69             #ifndef ETP_DONE_POLL
70             # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
71             #endif
72              
73             #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
74              
75             #define ETP_TICKS ((1000000 + 1023) >> 10)
76              
77             enum {
78             ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
79             ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
80             };
81              
82             /* calculate time difference in ~1/ETP_TICKS of a second */
83             ecb_inline int
84 0           etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
85             {
86 0           return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
87 0           + ((tv2->tv_usec - tv1->tv_usec) >> 10);
88             }
89              
90             struct etp_tmpbuf
91             {
92             void *ptr;
93             int len;
94             };
95              
96             static void *
97 1           etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
98             {
99 1 50         if (buf->len < len)
100             {
101 1           free (buf->ptr);
102 1           buf->ptr = malloc (buf->len = len);
103             }
104              
105 1           return buf->ptr;
106             }
107              
108             /*
109             * a somewhat faster data structure might be nice, but
110             * with 8 priorities this actually needs <20 insns
111             * per shift, the most expensive operation.
112             */
113             typedef struct
114             {
115             ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
116             int size;
117             } etp_reqq;
118              
119             typedef struct etp_pool *etp_pool;
120              
121             typedef struct etp_worker
122             {
123             etp_pool pool;
124              
125             struct etp_tmpbuf tmpbuf;
126              
127             /* locked by pool->wrklock */
128             struct etp_worker *prev, *next;
129              
130             xthread_t tid;
131              
132             #ifdef ETP_WORKER_COMMON
133             ETP_WORKER_COMMON
134             #endif
135             } etp_worker;
136              
137             struct etp_pool
138             {
139             void *userdata;
140              
141             etp_reqq req_queue;
142             etp_reqq res_queue;
143              
144             unsigned int started, idle, wanted;
145              
146             unsigned int max_poll_time; /* pool->reslock */
147             unsigned int max_poll_reqs; /* pool->reslock */
148              
149             unsigned int nreqs; /* pool->reqlock */
150             unsigned int nready; /* pool->reqlock */
151             unsigned int npending; /* pool->reqlock */
152             unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
153             unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
154              
155             void (*want_poll_cb) (void *userdata);
156             void (*done_poll_cb) (void *userdata);
157            
158             xmutex_t wrklock;
159             xmutex_t reslock;
160             xmutex_t reqlock;
161             xcond_t reqwait;
162              
163             etp_worker wrk_first;
164             };
165              
166             #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
167             #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
168              
169             /* worker threads management */
170              
171             static void
172 73           etp_worker_clear (etp_worker *wrk)
173             {
174 73           }
175              
176             static void ecb_cold
177 0           etp_worker_free (etp_worker *wrk)
178             {
179 0           free (wrk->tmpbuf.ptr);
180              
181 0           wrk->next->prev = wrk->prev;
182 0           wrk->prev->next = wrk->next;
183              
184 0           free (wrk);
185 0           }
186              
187             ETP_API_DECL unsigned int
188 612           etp_nreqs (etp_pool pool)
189             {
190             int retval;
191             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
192 306           retval = pool->nreqs;
193             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
194 306           return retval;
195             }
196              
197             ETP_API_DECL unsigned int
198 0           etp_nready (etp_pool pool)
199             {
200             unsigned int retval;
201              
202             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
203 0           retval = pool->nready;
204             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
205              
206 0           return retval;
207             }
208              
209             ETP_API_DECL unsigned int
210 384           etp_npending (etp_pool pool)
211             {
212             unsigned int retval;
213              
214             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
215 192           retval = pool->npending;
216             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
217              
218 192           return retval;
219             }
220              
221             ETP_API_DECL unsigned int
222 772           etp_nthreads (etp_pool pool)
223             {
224             unsigned int retval;
225              
226             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
227 386           retval = pool->started;
228             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
229              
230 386           return retval;
231             }
232              
233             static void ecb_noinline ecb_cold
234 20           reqq_init (etp_reqq *q)
235             {
236             int pri;
237              
238 200 100         for (pri = 0; pri < ETP_NUM_PRI; ++pri)
239 180           q->qs[pri] = q->qe[pri] = 0;
240              
241 20           q->size = 0;
242 20           }
243              
244             static int ecb_noinline
245 155           reqq_push (etp_reqq *q, ETP_REQ *req)
246             {
247 155           int pri = req->pri;
248 155           req->next = 0;
249              
250 155 100         if (q->qe[pri])
251             {
252 45           q->qe[pri]->next = req;
253 45           q->qe[pri] = req;
254             }
255             else
256 110           q->qe[pri] = q->qs[pri] = req;
257              
258 155           return q->size++;
259             }
260              
261             static ETP_REQ * ecb_noinline
262 245           reqq_shift (etp_reqq *q)
263             {
264             int pri;
265              
266 245 100         if (!q->size)
267 110           return 0;
268              
269 135           --q->size;
270              
271 540 50         for (pri = ETP_NUM_PRI; pri--; )
272             {
273 540           ETP_REQ *req = q->qs[pri];
274              
275 540 100         if (req)
276             {
277 135 100         if (!(q->qs[pri] = (ETP_REQ *)req->next))
278 100           q->qe[pri] = 0;
279              
280 135           return req;
281             }
282             }
283              
284 0           abort ();
285             }
286              
287             ETP_API_DECL int ecb_cold
288 10           etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
289             {
290 10           X_MUTEX_CREATE (pool->wrklock);
291 10           X_MUTEX_CREATE (pool->reslock);
292 10           X_MUTEX_CREATE (pool->reqlock);
293 10           X_COND_CREATE (pool->reqwait);
294              
295 10           reqq_init (&pool->req_queue);
296 10           reqq_init (&pool->res_queue);
297              
298 10           pool->wrk_first.next =
299 10           pool->wrk_first.prev = &pool->wrk_first;
300              
301 10           pool->started = 0;
302 10           pool->idle = 0;
303 10           pool->nreqs = 0;
304 10           pool->nready = 0;
305 10           pool->npending = 0;
306 10           pool->wanted = 4;
307              
308 10           pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
309 10           pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
310              
311 10           pool->userdata = userdata;
312 10           pool->want_poll_cb = want_poll;
313 10           pool->done_poll_cb = done_poll;
314              
315 10           return 0;
316             }
317              
318             static void ecb_noinline ecb_cold
319 22           etp_proc_init (void)
320             {
321             #if HAVE_PRCTL_SET_NAME
322             /* provide a more sensible "thread name" */
323             char name[16 + 1];
324 22           const int namelen = sizeof (name) - 1;
325             int len;
326              
327 22           prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
328 22           name [namelen] = 0;
329 22           len = strlen (name);
330 22           strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
331 22           prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
332             #endif
333 22           }
334              
335 22           X_THREAD_PROC (etp_proc)
336             {
337             ETP_REQ *req;
338             struct timespec ts;
339 22           etp_worker *self = (etp_worker *)thr_arg;
340 22           etp_pool pool = self->pool;
341              
342 22           etp_proc_init ();
343              
344             /* try to distribute timeouts somewhat evenly */
345 22           ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
346              
347             for (;;)
348             {
349 95           ts.tv_sec = 0;
350              
351 95           X_LOCK (pool->reqlock);
352              
353             for (;;)
354             {
355 146           req = reqq_shift (&pool->req_queue);
356              
357 146 100         if (ecb_expect_true (req))
358 74           break;
359              
360 72 50         if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
361             {
362 0           X_UNLOCK (pool->reqlock);
363 0           X_LOCK (pool->wrklock);
364 0           --pool->started;
365 0           X_UNLOCK (pool->wrklock);
366 0           goto quit;
367             }
368              
369 72           ++pool->idle;
370              
371 72 50         if (pool->idle <= pool->max_idle)
372             /* we are allowed to pool->idle, so do so without any timeout */
373 72           X_COND_WAIT (pool->reqwait, pool->reqlock);
374             else
375             {
376             /* initialise timeout once */
377 0 0         if (!ts.tv_sec)
378 0           ts.tv_sec = time (0) + pool->idle_timeout;
379              
380 0 0         if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
381 0           ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
382             }
383              
384 51           --pool->idle;
385 51           }
386              
387 74           --pool->nready;
388              
389 74           X_UNLOCK (pool->reqlock);
390            
391 74 50         if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
392 0           goto quit;
393              
394 74           ETP_EXECUTE (self, req);
395              
396 73           X_LOCK (pool->reslock);
397              
398 73           ++pool->npending;
399              
400 73 100         if (!reqq_push (&pool->res_queue, req))
401 43           ETP_WANT_POLL (pool);
402              
403 73           etp_worker_clear (self);
404              
405 73           X_UNLOCK (pool->reslock);
406 73           }
407              
408             quit:
409 0           free (req);
410              
411 0           X_LOCK (pool->wrklock);
412 0           etp_worker_free (self);
413 0           X_UNLOCK (pool->wrklock);
414              
415 0           return 0;
416             }
417              
418             static void ecb_cold
419 22           etp_start_thread (etp_pool pool)
420             {
421 22           etp_worker *wrk = calloc (1, sizeof (etp_worker));
422              
423             /*TODO*/
424             assert (("unable to allocate worker thread data", wrk));
425              
426 22           wrk->pool = pool;
427              
428 22           X_LOCK (pool->wrklock);
429              
430 22 50         if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
431             {
432 22           wrk->prev = &pool->wrk_first;
433 22           wrk->next = pool->wrk_first.next;
434 22           pool->wrk_first.next->prev = wrk;
435 22           pool->wrk_first.next = wrk;
436 22           ++pool->started;
437             }
438             else
439 0           free (wrk);
440              
441 22           X_UNLOCK (pool->wrklock);
442 22           }
443              
444             static void
445 194           etp_maybe_start_thread (etp_pool pool)
446             {
447 194 100         if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
448 2           return;
449            
450             /* todo: maybe use pool->idle here, but might be less exact */
451 192 100         if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
452 170           return;
453              
454 22           etp_start_thread (pool);
455             }
456              
457             static void ecb_cold
458 0           etp_end_thread (etp_pool pool)
459             {
460 0           ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
461              
462 0           req->type = ETP_TYPE_QUIT;
463 0           req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
464              
465 0           X_LOCK (pool->reqlock);
466 0           reqq_push (&pool->req_queue, req);
467 0           X_COND_SIGNAL (pool->reqwait);
468 0           X_UNLOCK (pool->reqlock);
469              
470 0           X_LOCK (pool->wrklock);
471 0           --pool->started;
472 0           X_UNLOCK (pool->wrklock);
473 0           }
474              
475             ETP_API_DECL int
476 38           etp_poll (etp_pool pool)
477             {
478             unsigned int maxreqs;
479             unsigned int maxtime;
480             struct timeval tv_start, tv_now;
481              
482 38           X_LOCK (pool->reslock);
483 38           maxreqs = pool->max_poll_reqs;
484 38           maxtime = pool->max_poll_time;
485 38           X_UNLOCK (pool->reslock);
486              
487 38 50         if (maxtime)
488 0           gettimeofday (&tv_start, 0);
489              
490             for (;;)
491             {
492             ETP_REQ *req;
493              
494 100           etp_maybe_start_thread (pool);
495              
496 100           X_LOCK (pool->reslock);
497 100           req = reqq_shift (&pool->res_queue);
498              
499 100 100         if (ecb_expect_true (req))
500             {
501 62           --pool->npending;
502              
503 62 100         if (!pool->res_queue.size)
504 40           ETP_DONE_POLL (pool);
505             }
506              
507 100           X_UNLOCK (pool->reslock);
508              
509 100 100         if (ecb_expect_false (!req))
510 38           return 0;
511              
512 62           X_LOCK (pool->reqlock);
513 62           --pool->nreqs;
514 62           X_UNLOCK (pool->reqlock);
515              
516 62 100         if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
    100          
517             {
518 5           req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
519 5           continue;
520             }
521             else
522             {
523 57           int res = ETP_FINISH (req);
524 57 50         if (ecb_expect_false (res))
525 0           return res;
526             }
527              
528 57 50         if (ecb_expect_false (maxreqs && !--maxreqs))
    0          
529 0           break;
530              
531 57 50         if (maxtime)
532             {
533 0           gettimeofday (&tv_now, 0);
534              
535 0 0         if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
536 0           break;
537             }
538 62           }
539              
540 0           errno = EAGAIN;
541 38           return -1;
542             }
543              
544             ETP_API_DECL void
545             etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
546              
547             ETP_API_DECL void
548 3           etp_cancel (etp_pool pool, ETP_REQ *req)
549             {
550 3           req->cancelled = 1;
551              
552 3           etp_grp_cancel (pool, req);
553 3           }
554              
555             ETP_API_DECL void
556 3           etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
557             {
558 4 100         for (grp = grp->grp_first; grp; grp = grp->grp_next)
559 1           etp_cancel (pool, grp);
560 3           }
561              
562             ETP_API_DECL void
563 82           etp_submit (etp_pool pool, ETP_REQ *req)
564             {
565 82           req->pri -= ETP_PRI_MIN;
566              
567 82 50         if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
568 82 50         if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
569              
570 82 100         if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
571             {
572             /* I hope this is worth it :/ */
573 6           X_LOCK (pool->reqlock);
574 6           ++pool->nreqs;
575 6           X_UNLOCK (pool->reqlock);
576              
577 6           X_LOCK (pool->reslock);
578              
579 6           ++pool->npending;
580              
581 6 50         if (!reqq_push (&pool->res_queue, req))
582 6           ETP_WANT_POLL (pool);
583              
584 6           X_UNLOCK (pool->reslock);
585             }
586             else
587             {
588 76           X_LOCK (pool->reqlock);
589 76           ++pool->nreqs;
590 76           ++pool->nready;
591 76           reqq_push (&pool->req_queue, req);
592 76           X_COND_SIGNAL (pool->reqwait);
593 76           X_UNLOCK (pool->reqlock);
594              
595 76           etp_maybe_start_thread (pool);
596             }
597 82           }
598              
599             ETP_API_DECL void ecb_cold
600 0           etp_set_max_poll_time (etp_pool pool, double seconds)
601             {
602             if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
603 0           pool->max_poll_time = seconds * ETP_TICKS;
604             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
605 0           }
606              
607             ETP_API_DECL void ecb_cold
608 0           etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
609             {
610             if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
611 0           pool->max_poll_reqs = maxreqs;
612             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
613 0           }
614              
615             ETP_API_DECL void ecb_cold
616 0           etp_set_max_idle (etp_pool pool, unsigned int threads)
617             {
618             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
619 0           pool->max_idle = threads;
620             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
621 0           }
622              
623             ETP_API_DECL void ecb_cold
624 0           etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
625             {
626             if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
627 0           pool->idle_timeout = seconds;
628             if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
629 0           }
630              
631             ETP_API_DECL void ecb_cold
632 16           etp_set_min_parallel (etp_pool pool, unsigned int threads)
633             {
634 16 100         if (pool->wanted < threads)
635 10           pool->wanted = threads;
636 16           }
637              
638             ETP_API_DECL void ecb_cold
639 1           etp_set_max_parallel (etp_pool pool, unsigned int threads)
640             {
641 1 50         if (pool->wanted > threads)
642 1           pool->wanted = threads;
643              
644 1 50         while (pool->started > pool->wanted)
645 0           etp_end_thread (pool);
646 1           }
647