File Coverage

c/ev.c
Criterion Covered Total %
statement 186 244 76.2
branch 86 206 41.7
condition n/a
subroutine n/a
pod n/a
total 272 450 60.4


line stmt bran cond sub pod time code
1             /* 100 levels will trigger a manditory warning from perl */
2             #define MAX_CB_NEST 95
3              
4             static NV QueueTime[PE_QUEUES];
5              
6             static pe_cbframe CBFrame[MAX_CB_NEST];
7             static int CurCBFrame = -1;
8              
9             pe_event_vtbl event_vtbl, ioevent_vtbl, datafulevent_vtbl;
10              
11 95902           static void pe_anyevent_init(pe_event *ev, pe_watcher *wa) {
12             assert(wa);
13 95902           ev->up = wa;
14 95902           ++wa->refcnt;
15 95902           ev->mysv = 0;
16 95902           PE_RING_INIT(&ev->peer, ev);
17 95902           PE_RING_UNSHIFT(&ev->peer, &wa->events);
18 95902           ev->hits = 0;
19 95902           ev->prio = wa->prio;
20 95902           ev->callback = 0;
21 95902           }
22              
23 95902           static void pe_anyevent_dtor(pe_event *ev) {
24             STRLEN n_a;
25 95902           pe_watcher *wa = ev->up;
26             if (WaDEBUGx(wa) >= 3)
27             warn("Event=0x%x '%s' destroyed (SV=0x%x)",
28             ev,
29             SvPV(wa->desc, n_a),
30             ev->mysv? SvRV(ev->mysv) : 0);
31 95902           ev->up = 0;
32 95902           ev->mysv = 0;
33 95902           ev->hits = 0;
34 95902 50         if (EvPERLCB(ev))
35 95902           SvREFCNT_dec(ev->callback);
36 95902           ev->callback = 0;
37 95902 100         PE_RING_DETACH(&ev->peer);
38 95902 50         PE_RING_DETACH(&ev->que);
39 95902           --wa->refcnt;
40 95902 100         if (WaCANDESTROY(wa)) /* running */
    50          
    50          
41 625           (*wa->vtbl->dtor)(wa);
42 95902           }
43              
44 0           static void pe_anyevent_set_cb(pe_event *ev, void *fptr, void *ext) {
45 0 0         if (EvPERLCB(ev))
46 0           SvREFCNT_dec(ev->callback);
47 0           EvPERLCB_off(ev);
48 0           ev->callback = fptr;
49 0           ev->ext_data = ext;
50 0           }
51              
52 3           static void pe_anyevent_set_perl_cb(pe_event *ev, SV *sv) {
53 3           SV *old = 0;
54 3 100         if (EvPERLCB(ev))
55 2           old = ev->callback;
56 3           ev->callback = SvREFCNT_inc(sv);
57 3           SvREFCNT_dec(old);
58 3           EvPERLCB_on(ev);
59 3           }
60              
61             /*****************************************************************/
62              
63 85           static pe_event *pe_event_allocate(pe_watcher *wa) {
64             pe_event *ev;
65             assert(wa);
66 85 100         if (PE_RING_EMPTY(&event_vtbl.freelist)) {
67 30           EvNew(0, ev, 1, pe_event);
68 30           ev->vtbl = &event_vtbl;
69 30           PE_RING_INIT(&ev->que, ev);
70             } else {
71 55           pe_ring *lk = event_vtbl.freelist.prev;
72 55 50         PE_RING_DETACH(lk);
73 55           ev = (pe_event*) lk->self;
74             }
75 85           pe_anyevent_init(ev, wa);
76 85           return ev;
77             }
78              
79 85           static void pe_event_dtor(pe_event *ev) {
80 85           pe_anyevent_dtor(ev);
81 85           PE_RING_UNSHIFT(&ev->que, &event_vtbl.freelist);
82 85           }
83              
84 95902           static void pe_event_release(pe_event *ev) {
85 95902 100         if (!ev->mysv)
86 934           (*ev->vtbl->dtor)(ev);
87             else {
88 94968           SvREFCNT_dec(ev->mysv);
89 94968           ev->mysv=0;
90             }
91 95902           }
92              
93 0           EKEYMETH(_event_hits) {
94 0 0         if (!nval) {
95 0           dSP;
96 0 0         XPUSHs(sv_2mortal(newSViv(ev->hits)));
97 0           PUTBACK;
98             } else
99 0           croak("'e_hits' is read-only");
100 0           }
101              
102 0           EKEYMETH(_event_prio) {
103 0 0         if (!nval) {
104 0           dSP;
105 0 0         XPUSHs(sv_2mortal(newSViv(ev->prio)));
106 0           PUTBACK;
107             } else
108 0           croak("'e_prio' is read-only");
109 0           }
110              
111             /*------------------------------------------------------*/
112              
113 95814           static pe_event *pe_ioevent_allocate(pe_watcher *wa) {
114             pe_ioevent *ev;
115             assert(wa);
116 95814 100         if (PE_RING_EMPTY(&ioevent_vtbl.freelist)) {
117 1225           EvNew(1, ev, 1, pe_ioevent);
118 1225           ev->base.vtbl = &ioevent_vtbl;
119 1225           PE_RING_INIT(&ev->base.que, ev);
120             } else {
121 94589           pe_ring *lk = ioevent_vtbl.freelist.prev;
122 94589 50         PE_RING_DETACH(lk);
123 94589           ev = (pe_ioevent*) lk->self;
124             }
125 95814           pe_anyevent_init(&ev->base, wa);
126 95814           ev->got = 0;
127 95814           return &ev->base;
128             }
129              
130 95814           static void pe_ioevent_dtor(pe_event *ev) {
131 95814           pe_anyevent_dtor(ev);
132 95814           PE_RING_UNSHIFT(&ev->que, &ioevent_vtbl.freelist);
133 95814           }
134              
135 0           EKEYMETH(_event_got) {
136 0           pe_ioevent *io = (pe_ioevent *)ev;
137 0 0         if (!nval) {
138 0           dSP;
139 0 0         XPUSHs(sv_2mortal(events_mask_2sv(io->got)));
140 0           PUTBACK;
141             } else
142 0           croak("'e_got' is read-only");
143 0           }
144              
145             /*------------------------------------------------------*/
146              
147 3           static pe_event *pe_datafulevent_allocate(pe_watcher *wa) {
148             pe_datafulevent *ev;
149             assert(wa);
150 3 50         if (PE_RING_EMPTY(&datafulevent_vtbl.freelist)) {
151 3           EvNew(15, ev, 1, pe_datafulevent);
152 3           ev->base.vtbl = &datafulevent_vtbl;
153 3           PE_RING_INIT(&ev->base.que, ev);
154             } else {
155 0           pe_ring *lk = datafulevent_vtbl.freelist.prev;
156 0 0         PE_RING_DETACH(lk);
157 0           ev = (pe_datafulevent*) lk->self;
158             }
159 3           pe_anyevent_init(&ev->base, wa);
160 3           ev->data = &PL_sv_undef;
161 3           return &ev->base;
162             }
163              
164 3           static void pe_datafulevent_dtor(pe_event *ev) {
165 3           pe_datafulevent *de = (pe_datafulevent *)ev;
166 3           SvREFCNT_dec(de->data);
167 3           pe_anyevent_dtor(ev);
168 3           PE_RING_UNSHIFT(&ev->que, &datafulevent_vtbl.freelist);
169 3           }
170              
171 0           EKEYMETH(_event_data) {
172 0           pe_datafulevent *de = (pe_datafulevent *)ev;
173 0 0         if (!nval) {
174 0           dSP;
175 0 0         XPUSHs(de->data);
176 0           PUTBACK;
177             } else
178 0           croak("'e_data' is read-only");
179 0           }
180              
181             /*------------------------------------------------------*/
182              
183 94968           static void pe_event_postCB(pe_cbframe *fp) {
184 94968           pe_event *ev = fp->ev;
185 94968           pe_watcher *wa = ev->up;
186 94968           --CurCBFrame;
187 94968 100         if (WaACTIVE(wa) && WaINVOKE1(wa) && WaREPEAT(wa))
    100          
    100          
188 48           pe_watcher_on(wa, 1);
189 94968 50         if (Estat.on) {
190 0 0         if (fp->stats) {
191 0           Estat.scrub(fp->stats, wa);
192 0           fp->stats = 0;
193             }
194 0 0         if (CurCBFrame >= 0) {
195 0           pe_cbframe *pfp = CBFrame + CurCBFrame;
196 0 0         if (!pfp->stats)
197 0           pfp->stats = Estat.enter(CurCBFrame, pfp->ev->up->max_cb_tm);
198             else
199 0           Estat.resume(pfp->stats);
200             }
201             }
202             /* this must be last because it can destroy the watcher */
203 94968           pe_event_release(ev);
204 94968           }
205              
206 7           static void pe_callback_died(pe_cbframe *fp) {
207 7           dSP;
208             STRLEN n_a;
209 7           pe_watcher *wa = fp->ev->up;
210 7           SV *eval = perl_get_sv("Event::DIED", 1);
211 14 50         SV *err = (sv_true(ERRSV)?
212 7 50         sv_mortalcopy(ERRSV):
    50          
213 0           sv_2mortal(newSVpv("?",0)));
214             if (WaDEBUGx(wa) >= 4)
215             warn("Event: '%s' died with: %s\n", SvPV(wa->desc,n_a),
216             SvPV(ERRSV,n_a));
217 7 50         PUSHMARK(SP);
218 7 50         XPUSHs(event_2sv(fp->ev));
219 7 50         XPUSHs(err);
220 7           PUTBACK;
221 7           perl_call_sv(eval, G_EVAL|G_DISCARD);
222 7 50         if (sv_true(ERRSV)) {
    100          
223 2 50         warn("Event: '%s' died and then $Event::DIED died with: %s\n",
    50          
224 2 50         SvPV(wa->desc,n_a), SvPV(ERRSV,n_a));
    50          
    50          
    0          
225 1 50         sv_setpv(ERRSV, "");
226             }
227 7           }
228              
229 2           static void _resume_watcher(void *vp) {
230 2           pe_watcher *wa = (pe_watcher *)vp;
231 2           pe_watcher_resume(wa);
232 2           }
233              
234 95115           static void pe_check_recovery() {
235             /* NO ASSERTIONS HERE! EVAL CONTEXT IS VERY MESSY */
236             int alert;
237             struct pe_cbframe *fp;
238 95115 100         if (CurCBFrame < 0)
239 94874           return;
240              
241 241           alert=0;
242 257 100         while (CurCBFrame >= 0) {
243 252           fp = CBFrame + CurCBFrame;
244 252 100         if (fp->ev->up->running == fp->run_id)
245 236           break;
246 16 100         if (!alert) {
247 6           alert=1;
248             /* exception detected; alert the militia! */
249 6           pe_callback_died(fp);
250             }
251 16           pe_event_postCB(fp);
252             }
253             }
254              
255 94968           static void pe_event_invoke(pe_event *ev) {
256             STRLEN n_a;
257             int Dbg;
258 94968           pe_watcher *wa = ev->up;
259             struct pe_cbframe *frp;
260              
261 94968           pe_check_recovery();
262              
263             /* SETUP */
264 94968           ENTER;
265 94968           SAVEINT(wa->running);
266 94968 50         PE_RING_DETACH(&ev->peer);
267 94968           frp = &CBFrame[++CurCBFrame];
268 94968           frp->ev = ev;
269 94968           frp->run_id = ++wa->running;
270 94968 50         if (Estat.on)
271 0           frp->stats = Estat.enter(CurCBFrame, wa->max_cb_tm);
272             assert(ev->prio >= 0 && ev->prio < PE_QUEUES);
273 94968           QueueTime[ev->prio] = wa->cbtime = NVtime();
274             /* SETUP */
275              
276 94968 100         if (CurCBFrame+1 >= MAX_CB_NEST) {
277 1           ExitLevel = 0;
278 1           croak("Deep recursion detected; invoking unloop_all()\n");
279             }
280              
281 94967           Dbg = WaDEBUGx(wa);
282 94967 50         if (Dbg) {
283             /*
284             SV *cvb = perl_get_sv("Carp::Verbose", 1);
285             if (!SvIV(cvb)) {
286             SAVEIV(SvIVX(cvb));
287             SvIVX(cvb) = 1;
288             }
289             */
290              
291 0 0         if (Dbg >= 2)
292 0 0         warn("Event: [%d]invoking '%s' (prio %d)\n",
293 0           CurCBFrame, SvPV(wa->desc,n_a),ev->prio);
294             }
295              
296 94967 100         if (!PE_RING_EMPTY(&Callback)) pe_map_check(&Callback);
297              
298 94967 50         if (EvPERLCB(ev)) {
299 94967           SV *cb = SvRV((SV*)ev->callback);
300 94967 100         int pcflags = G_VOID | (SvIVX(Eval)? G_EVAL : 0);
301             int retcnt;
302 94967           SV *evsv = event_2sv(ev);
303 94967           dSP;
304 94967 50         PUSHMARK(SP);
305 94967 100         if (SvTYPE(cb) == SVt_PVCV) {
306 94961 50         XPUSHs(evsv);
307 94961           PUTBACK;
308 94961           retcnt = perl_call_sv((SV*) ev->callback, pcflags);
309             } else {
310 6           AV *av = (AV*)cb;
311             assert(SvTYPE(cb) == SVt_PVAV);
312 6 50         XPUSHs(*av_fetch(av, 0, 0));
313 6 50         XPUSHs(evsv);
314 6           PUTBACK;
315 6 50         retcnt = perl_call_method(SvPV(*av_fetch(av, 1, 0),n_a), pcflags);
316             }
317 94952           SPAGAIN;
318 94952           SP -= retcnt;
319 94952           PUTBACK;
320 94952 50         if (SvTRUE(ERRSV)) {
    50          
    50          
    50          
    0          
    50          
    50          
    0          
    0          
    0          
    0          
    50          
    50          
    50          
    50          
    50          
    100          
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    100          
321 1 50         if (pcflags & G_EVAL)
322 1           pe_callback_died(frp);
323             else
324 94952 0         sv_setsv(ERRSV, &PL_sv_no);
325             }
326             } else {
327             assert(ev->callback);
328 0           (* (void(*)(pe_event*)) ev->callback)(ev);
329             }
330              
331 94952           LEAVE;
332              
333 94952 50         if (Estat.on) {
334 0 0         if (frp->stats) /* maybe in transition */
335 0           Estat.commit(frp->stats, wa);
336 0           frp->stats=0;
337             }
338 94952 50         if (Dbg >= 3)
339 0 0         warn("Event: completed '%s'\n", SvPV(wa->desc, n_a));
340 94952           pe_event_postCB(frp);
341 94952           }
342              
343 25           static void boot_pe_event() {
344             pe_event_vtbl *vt;
345              
346 25           vt = &event_vtbl;
347 25           vt->new_event = pe_event_allocate;
348 25           vt->dtor = pe_event_dtor;
349 25           vt->stash = gv_stashpv("Event::Event", 1);
350 25           PE_RING_INIT(&vt->freelist, 0);
351              
352 25           vt = &ioevent_vtbl;
353 25           memcpy(vt, &event_vtbl, sizeof(pe_event_vtbl));
354 25           vt->stash = gv_stashpv("Event::Event::Io", 1);
355 25           vt->new_event = pe_ioevent_allocate;
356 25           vt->dtor = pe_ioevent_dtor;
357 25           PE_RING_INIT(&vt->freelist, 0);
358              
359 25           vt = &datafulevent_vtbl;
360 25           memcpy(vt, &event_vtbl, sizeof(pe_event_vtbl));
361 25           vt->stash = gv_stashpv("Event::Event::Dataful", 1);
362 25           vt->new_event = pe_datafulevent_allocate;
363 25           vt->dtor = pe_datafulevent_dtor;
364 25           PE_RING_INIT(&vt->freelist, 0);
365              
366 25           memset(QueueTime, 0, sizeof(QueueTime));
367 25           }