/* 100 levels will trigger a manditory warning from perl */
#define MAX_CB_NEST 95
static NV QueueTime[PE_QUEUES];
static pe_cbframe CBFrame[MAX_CB_NEST];
static int CurCBFrame = -1;
pe_event_vtbl event_vtbl, ioevent_vtbl, datafulevent_vtbl;
static void pe_anyevent_init(pe_event *ev, pe_watcher *wa) {
assert(wa);
ev->up = wa;
++wa->refcnt;
ev->mysv = 0;
PE_RING_INIT(&ev->peer, ev);
PE_RING_UNSHIFT(&ev->peer, &wa->events);
ev->hits = 0;
ev->prio = wa->prio;
ev->callback = 0;
}
static void pe_anyevent_dtor(pe_event *ev) {
STRLEN n_a;
pe_watcher *wa = ev->up;
if (WaDEBUGx(wa) >= 3)
warn("Event=0x%x '%s' destroyed (SV=0x%x)",
ev,
SvPV(wa->desc, n_a),
ev->mysv? SvRV(ev->mysv) : 0);
ev->up = 0;
ev->mysv = 0;
ev->hits = 0;
if (EvPERLCB(ev))
SvREFCNT_dec(ev->callback);
ev->callback = 0;
PE_RING_DETACH(&ev->peer);
PE_RING_DETACH(&ev->que);
--wa->refcnt;
if (WaCANDESTROY(wa)) /* running */
(*wa->vtbl->dtor)(wa);
}
static void pe_anyevent_set_cb(pe_event *ev, void *fptr, void *ext) {
if (EvPERLCB(ev))
SvREFCNT_dec(ev->callback);
EvPERLCB_off(ev);
ev->callback = fptr;
ev->ext_data = ext;
}
static void pe_anyevent_set_perl_cb(pe_event *ev, SV *sv) {
SV *old = 0;
if (EvPERLCB(ev))
old = ev->callback;
ev->callback = SvREFCNT_inc(sv);
SvREFCNT_dec(old);
EvPERLCB_on(ev);
}
/*****************************************************************/
static pe_event *pe_event_allocate(pe_watcher *wa) {
pe_event *ev;
assert(wa);
if (PE_RING_EMPTY(&event_vtbl.freelist)) {
EvNew(0, ev, 1, pe_event);
ev->vtbl = &event_vtbl;
PE_RING_INIT(&ev->que, ev);
} else {
pe_ring *lk = event_vtbl.freelist.prev;
PE_RING_DETACH(lk);
ev = (pe_event*) lk->self;
}
pe_anyevent_init(ev, wa);
return ev;
}
static void pe_event_dtor(pe_event *ev) {
pe_anyevent_dtor(ev);
PE_RING_UNSHIFT(&ev->que, &event_vtbl.freelist);
}
static void pe_event_release(pe_event *ev) {
if (!ev->mysv)
(*ev->vtbl->dtor)(ev);
else {
SvREFCNT_dec(ev->mysv);
ev->mysv=0;
}
}
EKEYMETH(_event_hits) {
if (!nval) {
dSP;
XPUSHs(sv_2mortal(newSViv(ev->hits)));
PUTBACK;
} else
croak("'e_hits' is read-only");
}
EKEYMETH(_event_prio) {
if (!nval) {
dSP;
XPUSHs(sv_2mortal(newSViv(ev->prio)));
PUTBACK;
} else
croak("'e_prio' is read-only");
}
/*------------------------------------------------------*/
static pe_event *pe_ioevent_allocate(pe_watcher *wa) {
pe_ioevent *ev;
assert(wa);
if (PE_RING_EMPTY(&ioevent_vtbl.freelist)) {
EvNew(1, ev, 1, pe_ioevent);
ev->base.vtbl = &ioevent_vtbl;
PE_RING_INIT(&ev->base.que, ev);
} else {
pe_ring *lk = ioevent_vtbl.freelist.prev;
PE_RING_DETACH(lk);
ev = (pe_ioevent*) lk->self;
}
pe_anyevent_init(&ev->base, wa);
ev->got = 0;
return &ev->base;
}
static void pe_ioevent_dtor(pe_event *ev) {
pe_anyevent_dtor(ev);
PE_RING_UNSHIFT(&ev->que, &ioevent_vtbl.freelist);
}
EKEYMETH(_event_got) {
pe_ioevent *io = (pe_ioevent *)ev;
if (!nval) {
dSP;
XPUSHs(sv_2mortal(events_mask_2sv(io->got)));
PUTBACK;
} else
croak("'e_got' is read-only");
}
/*------------------------------------------------------*/
static pe_event *pe_datafulevent_allocate(pe_watcher *wa) {
pe_datafulevent *ev;
assert(wa);
if (PE_RING_EMPTY(&datafulevent_vtbl.freelist)) {
EvNew(15, ev, 1, pe_datafulevent);
ev->base.vtbl = &datafulevent_vtbl;
PE_RING_INIT(&ev->base.que, ev);
} else {
pe_ring *lk = datafulevent_vtbl.freelist.prev;
PE_RING_DETACH(lk);
ev = (pe_datafulevent*) lk->self;
}
pe_anyevent_init(&ev->base, wa);
ev->data = &PL_sv_undef;
return &ev->base;
}
static void pe_datafulevent_dtor(pe_event *ev) {
pe_datafulevent *de = (pe_datafulevent *)ev;
SvREFCNT_dec(de->data);
pe_anyevent_dtor(ev);
PE_RING_UNSHIFT(&ev->que, &datafulevent_vtbl.freelist);
}
EKEYMETH(_event_data) {
pe_datafulevent *de = (pe_datafulevent *)ev;
if (!nval) {
dSP;
XPUSHs(de->data);
PUTBACK;
} else
croak("'e_data' is read-only");
}
/*------------------------------------------------------*/
static void pe_event_postCB(pe_cbframe *fp) {
pe_event *ev = fp->ev;
pe_watcher *wa = ev->up;
--CurCBFrame;
if (WaACTIVE(wa) && WaINVOKE1(wa) && WaREPEAT(wa))
pe_watcher_on(wa, 1);
if (Estat.on) {
if (fp->stats) {
Estat.scrub(fp->stats, wa);
fp->stats = 0;
}
if (CurCBFrame >= 0) {
pe_cbframe *pfp = CBFrame + CurCBFrame;
if (!pfp->stats)
pfp->stats = Estat.enter(CurCBFrame, pfp->ev->up->max_cb_tm);
else
Estat.resume(pfp->stats);
}
}
/* this must be last because it can destroy the watcher */
pe_event_release(ev);
}
static void pe_callback_died(pe_cbframe *fp) {
dSP;
STRLEN n_a;
pe_watcher *wa = fp->ev->up;
SV *eval = perl_get_sv("Event::DIED", 1);
SV *err = (sv_true(ERRSV)?
sv_mortalcopy(ERRSV):
sv_2mortal(newSVpv("?",0)));
if (WaDEBUGx(wa) >= 4)
warn("Event: '%s' died with: %s\n", SvPV(wa->desc,n_a),
SvPV(ERRSV,n_a));
PUSHMARK(SP);
XPUSHs(event_2sv(fp->ev));
XPUSHs(err);
PUTBACK;
perl_call_sv(eval, G_EVAL|G_DISCARD);
if (sv_true(ERRSV)) {
warn("Event: '%s' died and then $Event::DIED died with: %s\n",
SvPV(wa->desc,n_a), SvPV(ERRSV,n_a));
sv_setpv(ERRSV, "");
}
}
static void _resume_watcher(void *vp) {
pe_watcher *wa = (pe_watcher *)vp;
pe_watcher_resume(wa);
}
static void pe_check_recovery() {
/* NO ASSERTIONS HERE! EVAL CONTEXT IS VERY MESSY */
int alert;
struct pe_cbframe *fp;
if (CurCBFrame < 0)
return;
alert=0;
while (CurCBFrame >= 0) {
fp = CBFrame + CurCBFrame;
if (fp->ev->up->running == fp->run_id)
break;
if (!alert) {
alert=1;
/* exception detected; alert the militia! */
pe_callback_died(fp);
}
pe_event_postCB(fp);
}
}
static void pe_event_invoke(pe_event *ev) {
STRLEN n_a;
int Dbg;
pe_watcher *wa = ev->up;
struct pe_cbframe *frp;
pe_check_recovery();
/* SETUP */
ENTER;
SAVEINT(wa->running);
PE_RING_DETACH(&ev->peer);
frp = &CBFrame[++CurCBFrame];
frp->ev = ev;
frp->run_id = ++wa->running;
if (Estat.on)
frp->stats = Estat.enter(CurCBFrame, wa->max_cb_tm);
assert(ev->prio >= 0 && ev->prio < PE_QUEUES);
QueueTime[ev->prio] = wa->cbtime = NVtime();
/* SETUP */
if (CurCBFrame+1 >= MAX_CB_NEST) {
ExitLevel = 0;
croak("Deep recursion detected; invoking unloop_all()\n");
}
Dbg = WaDEBUGx(wa);
if (Dbg) {
/*
SV *cvb = perl_get_sv("Carp::Verbose", 1);
if (!SvIV(cvb)) {
SAVEIV(SvIVX(cvb));
SvIVX(cvb) = 1;
}
*/
if (Dbg >= 2)
warn("Event: [%d]invoking '%s' (prio %d)\n",
CurCBFrame, SvPV(wa->desc,n_a),ev->prio);
}
if (!PE_RING_EMPTY(&Callback)) pe_map_check(&Callback);
if (EvPERLCB(ev)) {
SV *cb = SvRV((SV*)ev->callback);
int pcflags = G_VOID | (SvIVX(Eval)? G_EVAL : 0);
int retcnt;
SV *evsv = event_2sv(ev);
dSP;
PUSHMARK(SP);
if (SvTYPE(cb) == SVt_PVCV) {
XPUSHs(evsv);
PUTBACK;
retcnt = perl_call_sv((SV*) ev->callback, pcflags);
} else {
AV *av = (AV*)cb;
assert(SvTYPE(cb) == SVt_PVAV);
XPUSHs(*av_fetch(av, 0, 0));
XPUSHs(evsv);
PUTBACK;
retcnt = perl_call_method(SvPV(*av_fetch(av, 1, 0),n_a), pcflags);
}
SPAGAIN;
SP -= retcnt;
PUTBACK;
if (SvTRUE(ERRSV)) {
if (pcflags & G_EVAL)
pe_callback_died(frp);
else
sv_setsv(ERRSV, &PL_sv_no);
}
} else {
assert(ev->callback);
(* (void(*)(pe_event*)) ev->callback)(ev);
}
LEAVE;
if (Estat.on) {
if (frp->stats) /* maybe in transition */
Estat.commit(frp->stats, wa);
frp->stats=0;
}
if (Dbg >= 3)
warn("Event: completed '%s'\n", SvPV(wa->desc, n_a));
pe_event_postCB(frp);
}
static void boot_pe_event() {
pe_event_vtbl *vt;
vt = &event_vtbl;
vt->new_event = pe_event_allocate;
vt->dtor = pe_event_dtor;
vt->stash = gv_stashpv("Event::Event", 1);
PE_RING_INIT(&vt->freelist, 0);
vt = &ioevent_vtbl;
memcpy(vt, &event_vtbl, sizeof(pe_event_vtbl));
vt->stash = gv_stashpv("Event::Event::Io", 1);
vt->new_event = pe_ioevent_allocate;
vt->dtor = pe_ioevent_dtor;
PE_RING_INIT(&vt->freelist, 0);
vt = &datafulevent_vtbl;
memcpy(vt, &event_vtbl, sizeof(pe_event_vtbl));
vt->stash = gv_stashpv("Event::Event::Dataful", 1);
vt->new_event = pe_datafulevent_allocate;
vt->dtor = pe_datafulevent_dtor;
PE_RING_INIT(&vt->freelist, 0);
memset(QueueTime, 0, sizeof(QueueTime));
}