File Coverage

c/queue.c
Criterion Covered Total %
statement 129 150 86.0
branch 70 104 67.3
condition n/a
subroutine n/a
pod n/a
total 199 254 78.3


line stmt bran cond sub pod time code
1             static pe_ring NQueue;
2             static int StarvePrio = PE_QUEUES - 2;
3              
4 25           static void boot_queue() {
5 25           HV *stash = gv_stashpv("Event", 1);
6 25           PE_RING_INIT(&NQueue, 0);
7 25           newCONSTSUB(stash, "QUEUES", newSViv(PE_QUEUES));
8 25           newCONSTSUB(stash, "PRIO_NORMAL", newSViv(PE_PRIO_NORMAL));
9 25           newCONSTSUB(stash, "PRIO_HIGH", newSViv(PE_PRIO_HIGH));
10 25           }
11              
12 95886           /*inline*/ static void dequeEvent(pe_event *ev) {
13             assert(ev);
14 95886 50         PE_RING_DETACH(&ev->que);
15 95886           --ActiveWatchers;
16 95886           }
17              
18 0           static void db_show_queue() {
19             pe_event *ev;
20 0           ev = (pe_event*) NQueue.next->self;
21 0 0         while (ev) {
22 0           warn("0x%x : %d\n", ev, ev->prio);
23 0           ev = (pe_event*) ev->que.next->self;
24             }
25 0           }
26              
27 95902           static int prepare_event(pe_event *ev, char *forwhat) {
28             /* AVOID DIEING IN HERE!! */
29             STRLEN n_a;
30 95902           pe_watcher *wa = ev->up;
31 95902 100         if (!ev->callback) {
32 95899 50         if (WaPERLCB(wa)) {
33 95899           ev->callback = SvREFCNT_inc(wa->callback);
34 95899           EvPERLCB_on(ev);
35             } else {
36 0           ev->callback = wa->callback;
37 0           ev->ext_data = wa->ext_data;
38 0           EvPERLCB_off(ev);
39             }
40             assert(ev->callback);
41             }
42             assert(!WaSUSPEND(wa));
43             assert(WaREENTRANT(wa) || !wa->running);
44 95902 100         if (!WaACTIVE(wa)) {
45 9 50         if (!WaRUNNOW(wa))
46 9 0         warn("Event: event for !ACTIVE watcher '%s'", SvPV(wa->desc,n_a));
47             }
48             else {
49 95893 100         if (!WaREPEAT(wa))
50 29           pe_watcher_stop(wa, 0);
51 95864 100         else if (WaINVOKE1(wa))
52 48           pe_watcher_off(wa);
53             }
54 95902           WaRUNNOW_off(wa); /* race condition? XXX */
55             if (WaDEBUGx(wa) >= 3)
56             warn("Event: %s '%s' prio=%d\n", forwhat, SvPV(wa->desc,n_a), ev->prio);
57 95902           return 1;
58             }
59              
60 95887           static void queueEvent(pe_event *ev) { /**INVOKE**/
61             assert(ev->hits);
62 95887 50         if (!PE_RING_EMPTY(&ev->que)) return; /* clump'd event already queued */
63 95887 50         if (!prepare_event(ev, "queue")) return;
64              
65 95887 100         if (ev->prio < 0) { /* invoke the event immediately! */
66 1           ev->prio = 0;
67 1           pe_event_invoke(ev);
68 1           return;
69             }
70 95886 50         if (ev->prio >= PE_QUEUES)
71 0           ev->prio = PE_QUEUES-1;
72              
73             {
74             /* queue in reverse direction? XXX */
75             /* warn("-- adding 0x%x/%d\n", ev, prio); db_show_queue();/**/
76             pe_ring *rg;
77 95886           rg = NQueue.next;
78 867886 100         while (rg->self && ((pe_event*)rg->self)->prio <= ev->prio)
    100          
79 772000           rg = rg->next;
80 95886           PE_RING_ADD_BEFORE(&ev->que, rg);
81             /* warn("=\n"); db_show_queue();/**/
82 95886           ++ActiveWatchers;
83             }
84             }
85              
86 165223           static int pe_empty_queue(int maxprio) { /**INVOKE**/
87             pe_event *ev;
88 165223           ev = (pe_event*) NQueue.next->self;
89 165223 100         if (ev && ev->prio < maxprio) {
    100          
90 94952           dequeEvent(ev);
91 94952           pe_event_invoke(ev);
92 94937           return 1;
93             }
94 70271           return 0;
95             }
96              
97 59064           /*inline*/ static void pe_multiplex(NV tm) {
98 59064 50         if (SvIVX(DebugLevel) >= 2) {
99 0 0         warn("Event: multiplex %.4fs %s%s\n", tm,
    0          
100 0           PE_RING_EMPTY(&NQueue)?"":"QUEUE",
101 0           PE_RING_EMPTY(&Idle)?"":"IDLE");
102             }
103 59064 50         if (!Estat.on)
104 59064           pe_sys_multiplex(tm);
105             else {
106 0           void *st = Estat.enter(-1, 0);
107 0           pe_sys_multiplex(tm);
108 0           Estat.commit(st, 0);
109             }
110 59064           }
111              
112 1           static NV pe_map_prepare(NV tm) {
113 1           pe_qcallback *qcb = (pe_qcallback*) Prepare.prev->self;
114 2 100         while (qcb) {
115 1 50         if (qcb->is_perl) {
116             SV *got;
117             NV when;
118 1           dSP;
119 1 50         PUSHMARK(SP);
120 1           PUTBACK;
121 1           perl_call_sv((SV*)qcb->callback, G_SCALAR);
122 1           SPAGAIN;
123 1           got = POPs;
124 1           PUTBACK;
125 1 50         when = SvNOK(got) ? SvNVX(got) : SvNV(got);
    50          
126 1 50         if (when < tm) tm = when;
127             }
128             else { /* !is_perl */
129 0           NV got = (* (NV(*)(void*)) qcb->callback)(qcb->ext_data);
130 0 0         if (got < tm) tm = got;
131             }
132 1           qcb = (pe_qcallback*) qcb->ring.prev->self;
133             }
134 1           return tm;
135             }
136              
137 3           static void pe_queue_pending() {
138 3           NV tm = 0;
139 3 100         if (!PE_RING_EMPTY(&Prepare)) tm = pe_map_prepare(tm);
140              
141 3           pe_multiplex(0);
142              
143 3           pe_timeables_check();
144 3 100         if (!PE_RING_EMPTY(&Check)) pe_map_check(&Check);
145              
146 3           pe_signal_asynccheck();
147 3 100         if (!PE_RING_EMPTY(&AsyncCheck)) pe_map_check(&AsyncCheck);
148 3           }
149              
150 106156           static int one_event(NV tm) { /**INVOKE**/
151             /*if (SvIVX(DebugLevel) >= 4)
152             warn("Event: ActiveWatchers=%d\n", ActiveWatchers); /**/
153              
154 106156           pe_signal_asynccheck();
155 106156 50         if (!PE_RING_EMPTY(&AsyncCheck)) pe_map_check(&AsyncCheck);
156              
157 106156 100         if (pe_empty_queue(StarvePrio)) return 1;
158              
159 59061 100         if (!PE_RING_EMPTY(&NQueue) || !PE_RING_EMPTY(&Idle)) {
    100          
160 586           tm = 0;
161             }
162             else {
163 58475           NV t1 = timeTillTimer();
164 58475 100         if (t1 < tm) tm = t1;
165             }
166 59061 50         if (!PE_RING_EMPTY(&Prepare)) tm = pe_map_prepare(tm);
167              
168 59061           pe_multiplex(tm);
169              
170 59061           pe_timeables_check();
171 59061 50         if (!PE_RING_EMPTY(&Check)) pe_map_check(&Check);
172              
173 59061 100         if (tm) {
174 58475           pe_signal_asynccheck();
175 58475 50         if (!PE_RING_EMPTY(&AsyncCheck)) pe_map_check(&AsyncCheck);
176             }
177              
178 59061 100         if (pe_empty_queue(PE_QUEUES)) return 1;
179              
180             while (1) {
181             pe_watcher *wa;
182             pe_event *ev;
183             pe_ring *lk;
184              
185 11207 100         if (PE_RING_EMPTY(&Idle)) return 0;
186              
187 15           lk = Idle.prev;
188 15 50         PE_RING_DETACH(lk);
189 15           wa = (pe_watcher*) lk->self;
190              
191             /* idle is not an event so CLUMP is never an option but we still need
192             to create an event to pass info to the callback */
193 15           ev = pe_event_allocate(wa);
194 15 50         if (!prepare_event(ev, "idle")) continue;
195             /* can't queueEvent because we are already missed that */
196 15           pe_event_invoke(ev);
197 14           return 1;
198 0           }
199             }
200              
201 147           static void pe_reentry() {
202             pe_watcher *wa;
203             struct pe_cbframe *frp;
204              
205 147           ENTER; /* for SAVE*() macro (see below) */
206              
207 147 100         if (CurCBFrame < 0)
208 29           return;
209              
210 118           frp = CBFrame + CurCBFrame;
211 118           wa = frp->ev->up;
212             assert(wa->running == frp->run_id);
213 118 50         if (Estat.on)
214 0           Estat.suspend(frp->stats); /* reversed by pe_event_postCB? */
215 118 50         if (WaREPEAT(wa)) {
216 118 100         if (WaREENTRANT(wa)) {
217 116 50         if (WaACTIVE(wa) && WaINVOKE1(wa))
    100          
218 116           pe_watcher_on(wa, 1);
219             } else {
220 2 50         if (!WaSUSPEND(wa)) {
221             /* temporarily suspend non-reentrant watcher until
222             callback is finished! */
223 2           pe_watcher_suspend(wa);
224 2           SAVEDESTRUCTOR(_resume_watcher, wa);
225             }
226             }
227             }
228             }
229              
230 21           static int safe_one_event(NV maxtm) {
231             int got;
232 21           pe_check_recovery();
233 21           pe_reentry();
234 21           got = one_event(maxtm);
235 11           LEAVE; /* reentry */
236 11           return got;
237             }
238              
239 13           static void pe_unloop(SV *why) {
240 13           SV *rsv = perl_get_sv("Event::Result", 0);
241             assert(rsv);
242 13           sv_setsv(rsv, why);
243 13 50         if (--ExitLevel < 0) {
244 0           warn("Event::unloop() to %d", ExitLevel);
245             }
246 13           }
247              
248 1           static void pe_unloop_all(SV *why) {
249 1           SV *rsv = perl_get_sv("Event::TopResult", 0);
250             assert(rsv);
251 1           sv_setsv(rsv, why);
252 1           ExitLevel = 0;
253 1           }