File Coverage

c/watcher.c
Criterion Covered Total %
statement 217 265 81.8
branch 117 234 50.0
condition n/a
subroutine n/a
pod n/a
total 334 499 66.9


line stmt bran cond sub pod time code
1             static int NextID = 0;
2             static pe_ring AllWatchers;
3             static struct pe_watcher_vtbl pe_watcher_base_vtbl;
4              
5 3885           static void pe_watcher_init(pe_watcher *ev, HV *stash, SV *temple) {
6             STRLEN n_a;
7             assert(ev);
8             assert(ev->vtbl);
9 3885 50         if (!ev->vtbl->stash)
10 0           croak("sub-class VTBL must have a stash (doesn't!)");
11 3885 100         if (!ev->vtbl->did_require) {
12             SV *tmp;
13 36 50         char *name = HvNAME(ev->vtbl->stash);
    50          
    50          
    0          
    50          
    50          
14             dTHX;
15 36 50         if (memEQ(name, "Event::", 7))
16 36           name += 7;
17 36           tmp = sv_2mortal(newSVpvf("Event/%s.pm", name));
18 36 50         perl_require_pv(SvPV(tmp, n_a));
19 36 50         if (sv_true(ERRSV))
    50          
20 0 0         croak("Event: could not load perl support code for Event::%s: %s",
21 0 0         name, SvPV(ERRSV,n_a));
    0          
    0          
    0          
22 36           ++ev->vtbl->did_require;
23             }
24             /* if we have a non-default stash then we need to save it! */
25 3885 50         ev->mysv = stash || temple ? wrap_watcher(ev, stash, temple) : 0;
    0          
26 3884           PE_RING_INIT(&ev->all, ev);
27 3884           PE_RING_INIT(&ev->events, 0);
28              
29             /* no exceptions after this point */
30              
31 3884           PE_RING_UNSHIFT(&ev->all, &AllWatchers);
32 3884           WaFLAGS(ev) = 0;
33 3884           WaINVOKE1_on(ev);
34 3884           WaREENTRANT_on(ev);
35 3884           ev->FALLBACK = 0;
36 3884           NextID = (NextID+1) & 0x7fff; /* make it look like the kernel :-, */
37 3884           ev->refcnt = 0;
38 3884           ev->desc = newSVpvn("??",2);
39 3884           ev->running = 0;
40 3884           ev->max_cb_tm = 1; /* make default configurable? */
41 3884           ev->cbtime = 0;
42 3884           ev->prio = PE_QUEUES;
43 3884           ev->callback = 0;
44 3884           ev->ext_data = 0;
45 3884           ev->stats = 0;
46 3884           }
47              
48 3842           static void pe_watcher_cancel_events(pe_watcher *wa) {
49             pe_event *ev;
50 4765 100         while (!PE_RING_EMPTY(&wa->events)) {
51 923           pe_ring *lk = wa->events.prev;
52 923           ev = (pe_event*) lk->self;
53 923           dequeEvent(ev);
54 923           pe_event_release(ev);
55             }
56 3842           }
57              
58 3884           static void pe_watcher_dtor(pe_watcher *wa) {
59             STRLEN n_a;
60             assert(WaCANDESTROY(wa));
61 3884 50         if (WaDESTROYED(wa)) {
62 0           warn("Attempt to destroy watcher 0x%x again (ignored)", wa);
63 0           return;
64             }
65 3884           WaDESTROYED_on(wa);
66             if (WaDEBUGx(wa) >= 3)
67             warn("Watcher '%s' destroyed", SvPV(wa->desc, n_a));
68             assert(PE_RING_EMPTY(&wa->events));
69 3884 100         if (WaPERLCB(wa))
70 3870           SvREFCNT_dec(wa->callback);
71 3884 50         if (wa->FALLBACK)
72 0           SvREFCNT_dec(wa->FALLBACK);
73 3884 50         if (wa->desc)
74 3884           SvREFCNT_dec(wa->desc);
75 3884 50         if (wa->stats)
76 3884           Estat.dtor(wa->stats);
77             /* safefree(wa); do it yourself */
78             }
79              
80             /********************************** *******************************/
81              
82 3878           WKEYMETH(_watcher_callback) {
83 3878 100         if (nval) {
84             AV *av;
85             SV *sv;
86 3875           SV *old=0;
87 3875 100         if (WaPERLCB(ev))
88 4           old = (SV*) ev->callback;
89 3875 50         if (!SvOK(nval)) {
    0          
    0          
90 0           WaPERLCB_off(ev);
91 0           ev->callback = 0;
92 0           ev->ext_data = 0;
93 0           pe_watcher_stop(ev, 0);
94 3875 50         } else if (SvROK(nval) && (SvTYPE(sv=SvRV(nval)) == SVt_PVCV)) {
    100          
95 3870           WaPERLCB_on(ev);
96 3870           ev->callback = SvREFCNT_inc(nval);
97 5 50         } else if (SvROK(nval) &&
    50          
98 5 100         (SvTYPE(av=(AV*)SvRV(nval)) == SVt_PVAV) &&
99 9           av_len(av) == 1) {
100             /* method lookup code adapted from universal.c */
101             STRLEN n_a;
102 4           SV *pkgsv = *av_fetch(av, 0, 0);
103 4           HV *pkg = NULL;
104 4           SV *namesv = *av_fetch(av, 1, 0);
105 4 50         char *name = SvPV(namesv, n_a);
106 4           int ok=0;
107 4 100         if(SvROK(pkgsv)) {
108 2           pkgsv = (SV*)SvRV(pkgsv);
109 2 50         if(SvOBJECT(pkgsv))
110 2           pkg = SvSTASH(pkgsv);
111             }
112             else {
113 2           pkg = gv_stashsv(pkgsv, FALSE);
114             }
115 4 50         if (pkg) {
116 4           GV *gv = gv_fetchmethod_autoload(pkg, name, FALSE);
117 4 100         if (gv && isGV(gv))
    50          
118 4           ok=1;
119             }
120             else {
121 0 0         warn("Event: package '%s' doesn't exist (creating)",
122 0           SvPV(pkgsv, n_a));
123 0           pkg = gv_stashsv(pkgsv, 1);
124             }
125 4 100         if (!ok) {
126 3 50         warn("Event: callback method %s->%s doesn't exist",
    50          
127 2 50         HvNAME(pkg), name);
    0          
    50          
    50          
128             }
129 4           WaPERLCB_on(ev);
130 4           ev->callback = SvREFCNT_inc(nval);
131             } else {
132 1 50         if (SvIV(DebugLevel) >= 2)
    50          
133 0           sv_dump(sv);
134 1           croak("Callback must be a code ref or [$object, $method_name]");
135             }
136 3874 100         if (old)
137 4           SvREFCNT_dec(old);
138             }
139             {
140 7754           SV *ret = (WaPERLCB(ev)?
141 3878 100         (SV*) ev->callback :
142 1           (ev->callback?
143 0           sv_2mortal(newSVpvf("",
144 1 50         ev->callback, ev->ext_data)) :
145             &PL_sv_undef));
146 3877           dSP;
147 3877 50         XPUSHs(ret);
148 3877           PUTBACK;
149             }
150 3877           }
151              
152 2           WKEYMETH(_watcher_cbtime) {
153 2 50         if (!nval) {
154 2           dSP;
155 2 50         XPUSHs(sv_2mortal(newSVnv(ev->cbtime)));
156 2           PUTBACK;
157             } else
158 0           croak("'e_cbtime' is read-only");
159 2           }
160              
161 3898           WKEYMETH(_watcher_desc) {
162 3898 100         if (nval) {
163 3884           sv_setsv(ev->desc, nval);
164             }
165             {
166 3898           dSP;
167 3898 50         XPUSHs(ev->desc);
168 3898           PUTBACK;
169             }
170 3898           }
171              
172 1           WKEYMETH(_watcher_debug) {
173 1 50         if (nval) {
174 0 0         if (sv_true(nval)) WaDEBUG_on(ev); else WaDEBUG_off(ev);
175             }
176             {
177 1           dSP;
178 1 50         XPUSHs(boolSV(WaDEBUG(ev)));
    50          
179 1           PUTBACK;
180             }
181 1           }
182              
183 7695           WKEYMETH(_watcher_priority) {
184 7695 100         if (nval) {
185 7685 50         ev->prio = SvIV(nval);
186             }
187             {
188 7695           dSP;
189 7695 50         XPUSHs(sv_2mortal(newSViv(ev->prio)));
190 7695           PUTBACK;
191             }
192 7695           }
193              
194 2           WKEYMETH(_watcher_reentrant) {
195 2 100         if (nval) {
196 1 50         if (sv_true(nval))
197 0           WaREENTRANT_on(ev);
198             else {
199 1 50         if (ev->running > 1)
200 0           croak("'reentrant' cannot be turned off while nested %d times",
201             ev->running);
202 1           WaREENTRANT_off(ev);
203             }
204             }
205             {
206 2           dSP;
207 2 50         XPUSHs(boolSV(WaREENTRANT(ev)));
    100          
208 2           PUTBACK;
209             }
210 2           }
211              
212 3803           WKEYMETH(_watcher_repeat) {
213 3803 100         if (nval) {
214 3802 100         if (sv_true(nval)) WaREPEAT_on(ev); else WaREPEAT_off(ev);
215             }
216             {
217 3803           dSP;
218 3803 50         XPUSHs(boolSV(WaREPEAT(ev)));
    100          
219 3803           PUTBACK;
220             }
221 3803           }
222              
223 0           WKEYMETH(_watcher_suspend) {
224 0 0         if (nval) {
225 0 0         if (sv_true(nval))
226 0           pe_watcher_suspend(ev);
227             else
228 0           pe_watcher_resume(ev);
229             }
230             {
231 0           dSP;
232 0 0         XPUSHs(boolSV(WaSUSPEND(ev)));
    0          
233 0           PUTBACK;
234             }
235 0           }
236              
237 1           WKEYMETH(_watcher_max_cb_tm) {
238 1 50         if (nval) {
239 0 0         int tm = SvIOK(nval)? SvIV(nval) : 0;
    0          
240 0 0         if (tm < 0) {
241 0           warn("e_max_cb_tm must be non-negative");
242 0           tm=0;
243             }
244 0           ev->max_cb_tm = tm;
245             }
246             {
247 1           dSP;
248 1 50         XPUSHs(sv_2mortal(newSViv(ev->max_cb_tm)));
249 1           PUTBACK;
250             }
251 1           }
252              
253             /********************************** *******************************/
254              
255 0           static void pe_watcher_nomethod(pe_watcher *ev, char *meth) {
256 0           HV *stash = ev->vtbl->stash;
257             assert(stash);
258 0 0         croak("%s::%s is missing", HvNAME(stash), meth);
    0          
    0          
    0          
    0          
    0          
259 0           }
260              
261 0           static char *pe_watcher_nostart(pe_watcher *ev, int repeat)
262 0           { pe_watcher_nomethod(ev,"start"); return 0; }
263 0           static void pe_watcher_nostop(pe_watcher *ev)
264 0           { pe_watcher_nomethod(ev,"stop"); }
265 0           static void pe_watcher_alarm(pe_watcher *ev, pe_timeable *tm)
266 0           { pe_watcher_nomethod(ev,"alarm"); }
267              
268 24           static void boot_pe_watcher() {
269 24           HV *stash = gv_stashpv("Event::Watcher", 1);
270             struct pe_watcher_vtbl *vt;
271 24           PE_RING_INIT(&AllWatchers, 0);
272 24           vt = &pe_watcher_base_vtbl;
273 24           vt->stash = 0;
274 24           vt->did_require = 0;
275 24           vt->dtor = 0;
276 24           vt->start = pe_watcher_nostart;
277 24           vt->stop = pe_watcher_nostop;
278 24           vt->alarm = pe_watcher_alarm;
279 24           newCONSTSUB(stash, "ACTIVE", newSViv(PE_ACTIVE));
280 24           newCONSTSUB(stash, "SUSPEND", newSViv(PE_SUSPEND));
281 24           newCONSTSUB(stash, "R", newSViv(PE_R));
282 24           newCONSTSUB(stash, "W", newSViv(PE_W));
283 24           newCONSTSUB(stash, "E", newSViv(PE_E));
284 24           newCONSTSUB(stash, "T", newSViv(PE_T));
285 24           }
286              
287 192           static void pe_register_vtbl(pe_watcher_vtbl *vt, HV *stash,
288             pe_event_vtbl *evt) {
289 192           vt->stash = stash;
290 192           vt->event_vtbl = evt;
291 192           vt->new_event = evt->new_event;
292 192           }
293              
294 16           static void pe_watcher_now(pe_watcher *wa) {
295             pe_event *ev;
296 16 50         if (WaSUSPEND(wa)) return;
297 16 100         if (!wa->callback) {
298             STRLEN n_a;
299 1 50         croak("Event: attempt to invoke now() method with callback unset on watcher '%s'", SvPV(wa->desc,n_a));
300             }
301              
302 15           WaRUNNOW_on(wa); /* race condition XXX */
303 15           ev = (*wa->vtbl->new_event)(wa);
304 15           ++ev->hits;
305 15           queueEvent(ev);
306             }
307              
308             /*******************************************************************
309             The following methods change the status flags. This is the only
310             code that should be changing these flags!
311             */
312              
313 3886           static void pe_watcher_cancel(pe_watcher *wa) {
314 3886 100         if (WaCANCELLED(wa))
315 2           return;
316 3884           WaSUSPEND_off(wa);
317 3884           pe_watcher_stop(wa, 1); /* peer */
318 3884           WaCANCELLED_on(wa);
319 3884 50         PE_RING_DETACH(&wa->all);
320 3884 50         if (wa->mysv)
321 3884           SvREFCNT_dec(wa->mysv); /* might destroy */
322 0 0         else if (WaCANDESTROY(wa))
    0          
    0          
323 0           (*wa->vtbl->dtor)(wa);
324             }
325              
326 2           static void pe_watcher_suspend(pe_watcher *ev) {
327             STRLEN n_a;
328             assert(ev);
329 2 50         if (WaSUSPEND(ev))
330 0           return;
331             if (WaDEBUGx(ev) >= 4)
332             warn("Event: suspend '%s'\n", SvPV(ev->desc,n_a));
333 2           pe_watcher_off(ev);
334 2           pe_watcher_cancel_events(ev);
335 2           WaSUSPEND_on(ev); /* must happen nowhere else!! */
336             }
337              
338 2           static void pe_watcher_resume(pe_watcher *ev) {
339             STRLEN n_a;
340             assert(ev);
341 2 50         if (!WaSUSPEND(ev))
342 0           return;
343 2           WaSUSPEND_off(ev);
344             if (WaDEBUGx(ev) >= 4)
345             warn("Event: resume '%s'%s\n", SvPV(ev->desc,n_a),
346             WaACTIVE(ev)?" ACTIVE":"");
347 2 50         if (WaACTIVE(ev))
348 2           pe_watcher_on(ev, 0);
349             }
350              
351 3931           static char *pe_watcher_on(pe_watcher *wa, int repeat) {
352             STRLEN n_a;
353             char *excuse;
354 3931 100         if (WaPOLLING(wa) || WaSUSPEND(wa))
    50          
355 3           return 0;
356 3928 100         if (WaCANCELLED(wa))
357 1 50         croak("Event: attempt to start cancelled watcher '%s'",
358 2           SvPV(wa->desc,n_a));
359 3927           excuse = (*wa->vtbl->start)(wa, repeat);
360 3927 100         if (excuse) {
361 8 50         if (SvIV(DebugLevel))
    50          
362 0 0         warn("Event: can't restart '%s' %s", SvPV(wa->desc, n_a), excuse);
363 8           pe_watcher_stop(wa, 1); /* update flags! */
364             } else
365 3919           WaPOLLING_on(wa); /* must happen nowhere else!! */
366 3930           return excuse;
367             }
368              
369 3921           static void pe_watcher_off(pe_watcher *wa) {
370 3921 100         if (!WaPOLLING(wa) || WaSUSPEND(wa)) return;
    50          
371 3919           (*wa->vtbl->stop)(wa);
372 3919           WaPOLLING_off(wa);
373             }
374              
375 3879           static void pe_watcher_start(pe_watcher *ev, int repeat) {
376             char *excuse;
377             STRLEN n_a;
378 3879 100         if (WaACTIVE(ev))
379 1           return;
380             if (WaDEBUGx(ev) >= 4)
381             warn("Event: active ON '%s'\n", SvPV(ev->desc,n_a));
382 3878           excuse = pe_watcher_on(ev, repeat);
383 3877 100         if (excuse)
384 8 50         croak("Event: can't start '%s' %s", SvPV(ev->desc,n_a), excuse);
385 3869           WaACTIVE_on(ev); /* must happen nowhere else!! */
386 3869           ++ActiveWatchers;
387             }
388              
389 3927           static void pe_watcher_stop(pe_watcher *ev, int cancel_events) {
390             STRLEN n_a;
391 3927 100         if (!WaACTIVE(ev))
392 58           return;
393             if (WaDEBUGx(ev) >= 4)
394             warn("Event: active OFF '%s'\n", SvPV(ev->desc,n_a));
395 3869           pe_watcher_off(ev);
396 3869           WaACTIVE_off(ev); /* must happen nowhere else!! */
397 3869 100         if (cancel_events) pe_watcher_cancel_events(ev);
398 3869           --ActiveWatchers;
399             }