The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* 100 levels will trigger a manditory warning from perl */
#define MAX_CB_NEST 95

static NV QueueTime[PE_QUEUES];

static pe_cbframe CBFrame[MAX_CB_NEST];
static int CurCBFrame = -1;

pe_event_vtbl event_vtbl, ioevent_vtbl, datafulevent_vtbl;

static void pe_anyevent_init(pe_event *ev, pe_watcher *wa) {
    assert(wa);
    ev->up = wa;
    ++wa->refcnt;
    ev->mysv = 0;
    PE_RING_INIT(&ev->peer, ev);
    PE_RING_UNSHIFT(&ev->peer, &wa->events);
    ev->hits = 0;
    ev->prio = wa->prio;
    ev->callback = 0;
}

static void pe_anyevent_dtor(pe_event *ev) {
    STRLEN n_a;
    pe_watcher *wa = ev->up;
    if (WaDEBUGx(wa) >= 3)
	warn("Event=0x%x '%s' destroyed (SV=0x%x)",
	     ev,
	     SvPV(wa->desc, n_a),
	     ev->mysv? SvRV(ev->mysv) : 0);
    ev->up = 0;
    ev->mysv = 0;
    ev->hits = 0;
    if (EvPERLCB(ev))
	SvREFCNT_dec(ev->callback);
    ev->callback = 0;
    PE_RING_DETACH(&ev->peer);
    PE_RING_DETACH(&ev->que);
    --wa->refcnt;
    if (WaCANDESTROY(wa)) /* running */
	(*wa->vtbl->dtor)(wa);
}

static void pe_anyevent_set_cb(pe_event *ev, void *fptr, void *ext) {
    if (EvPERLCB(ev))
	SvREFCNT_dec(ev->callback);
    EvPERLCB_off(ev);
    ev->callback = fptr;
    ev->ext_data = ext;
}

static void pe_anyevent_set_perl_cb(pe_event *ev, SV *sv) {
    SV *old = 0;
    if (EvPERLCB(ev))
	old = ev->callback;
    ev->callback = SvREFCNT_inc(sv);
    SvREFCNT_dec(old);
    EvPERLCB_on(ev);
}

/*****************************************************************/

static pe_event *pe_event_allocate(pe_watcher *wa) {
    pe_event *ev;
    assert(wa);
    if (PE_RING_EMPTY(&event_vtbl.freelist)) {
	EvNew(0, ev, 1, pe_event);
	ev->vtbl = &event_vtbl;
	PE_RING_INIT(&ev->que, ev);
    } else {
	pe_ring *lk = event_vtbl.freelist.prev;
	PE_RING_DETACH(lk);
	ev = (pe_event*) lk->self;
    }	
    pe_anyevent_init(ev, wa);
    return ev;
}

static void pe_event_dtor(pe_event *ev) {
    pe_anyevent_dtor(ev);
    PE_RING_UNSHIFT(&ev->que, &event_vtbl.freelist);
}

static void pe_event_release(pe_event *ev) {
    if (!ev->mysv)
      (*ev->vtbl->dtor)(ev);
    else {
	  SvREFCNT_dec(ev->mysv);
	  ev->mysv=0;
    }
}

EKEYMETH(_event_hits) {
    if (!nval) {
	dSP;
	XPUSHs(sv_2mortal(newSViv(ev->hits)));
	PUTBACK;
    } else
	croak("'e_hits' is read-only");
}

EKEYMETH(_event_prio) {
    if (!nval) {
	dSP;
	XPUSHs(sv_2mortal(newSViv(ev->prio)));
	PUTBACK;
    } else
	croak("'e_prio' is read-only");
}

/*------------------------------------------------------*/

static pe_event *pe_ioevent_allocate(pe_watcher *wa) {
    pe_ioevent *ev;
    assert(wa);
    if (PE_RING_EMPTY(&ioevent_vtbl.freelist)) {
	EvNew(1, ev, 1, pe_ioevent);
	ev->base.vtbl = &ioevent_vtbl;
	PE_RING_INIT(&ev->base.que, ev);
    } else {
	pe_ring *lk = ioevent_vtbl.freelist.prev;
	PE_RING_DETACH(lk);
	ev = (pe_ioevent*) lk->self;
    }
    pe_anyevent_init(&ev->base, wa);
    ev->got = 0;
    return &ev->base;
}

static void pe_ioevent_dtor(pe_event *ev) {
    pe_anyevent_dtor(ev);
    PE_RING_UNSHIFT(&ev->que, &ioevent_vtbl.freelist);
}

EKEYMETH(_event_got) {
    pe_ioevent *io = (pe_ioevent *)ev;
    if (!nval) {
	dSP;
	XPUSHs(sv_2mortal(events_mask_2sv(io->got)));
	PUTBACK;
    } else
	croak("'e_got' is read-only");
}

/*------------------------------------------------------*/

static pe_event *pe_datafulevent_allocate(pe_watcher *wa) {
    pe_datafulevent *ev;
    assert(wa);
    if (PE_RING_EMPTY(&datafulevent_vtbl.freelist)) {
	EvNew(15, ev, 1, pe_datafulevent);
	ev->base.vtbl = &datafulevent_vtbl;
	PE_RING_INIT(&ev->base.que, ev);
    } else {
	pe_ring *lk = datafulevent_vtbl.freelist.prev;
	PE_RING_DETACH(lk);
	ev = (pe_datafulevent*) lk->self;
    }
    pe_anyevent_init(&ev->base, wa);
    ev->data = &PL_sv_undef;
    return &ev->base;
}

static void pe_datafulevent_dtor(pe_event *ev) {
    pe_datafulevent *de = (pe_datafulevent *)ev;
    SvREFCNT_dec(de->data);
    pe_anyevent_dtor(ev);
    PE_RING_UNSHIFT(&ev->que, &datafulevent_vtbl.freelist);
}

EKEYMETH(_event_data) {
    pe_datafulevent *de = (pe_datafulevent *)ev;
    if (!nval) {
	dSP;
	XPUSHs(de->data);
	PUTBACK;
    } else
	croak("'e_data' is read-only");
}

/*------------------------------------------------------*/

static void pe_event_postCB(pe_cbframe *fp) {
    pe_event *ev = fp->ev;
    pe_watcher *wa = ev->up;
    --CurCBFrame;
    if (WaACTIVE(wa) && WaINVOKE1(wa) && WaREPEAT(wa))
	pe_watcher_on(wa, 1);
    if (Estat.on) {
	if (fp->stats) {
	    Estat.scrub(fp->stats, wa);
	    fp->stats = 0;
	}
	if (CurCBFrame >= 0) {
	    pe_cbframe *pfp = CBFrame + CurCBFrame;
	    if (!pfp->stats)
		pfp->stats = Estat.enter(CurCBFrame, pfp->ev->up->max_cb_tm);
	    else
		Estat.resume(pfp->stats);
	}
    }
    /* this must be last because it can destroy the watcher */
    pe_event_release(ev);
}

static void pe_callback_died(pe_cbframe *fp) {
    dSP;
    STRLEN n_a;
    pe_watcher *wa = fp->ev->up;
    SV *eval = perl_get_sv("Event::DIED", 1);
    SV *err = (sv_true(ERRSV)?
	       sv_mortalcopy(ERRSV):
	       sv_2mortal(newSVpv("?",0)));
    if (WaDEBUGx(wa) >= 4)
	warn("Event: '%s' died with: %s\n", SvPV(wa->desc,n_a),
	     SvPV(ERRSV,n_a));
    PUSHMARK(SP);
    XPUSHs(event_2sv(fp->ev));
    XPUSHs(err);
    PUTBACK;
    perl_call_sv(eval, G_EVAL|G_DISCARD);
    if (sv_true(ERRSV)) {
	warn("Event: '%s' died and then $Event::DIED died with: %s\n",
	     SvPV(wa->desc,n_a), SvPV(ERRSV,n_a));
	sv_setpv(ERRSV, "");
    }
}

static void _resume_watcher(void *vp) {
    pe_watcher *wa = (pe_watcher *)vp;
    pe_watcher_resume(wa);
}

static void pe_check_recovery() {
    /* NO ASSERTIONS HERE!  EVAL CONTEXT IS VERY MESSY */
    int alert;
    struct pe_cbframe *fp;
    if (CurCBFrame < 0)
	return;

    alert=0;
    while (CurCBFrame >= 0) {
	fp = CBFrame + CurCBFrame;
	if (fp->ev->up->running == fp->run_id)
	    break;
	if (!alert) {
	    alert=1;
	    /* exception detected; alert the militia! */
	    pe_callback_died(fp);
	}
	pe_event_postCB(fp);
    }
}

static void pe_event_invoke(pe_event *ev) {
    STRLEN n_a;
    int Dbg;
    pe_watcher *wa = ev->up;
    struct pe_cbframe *frp;

    pe_check_recovery();

    /* SETUP */
    ENTER;
    SAVEINT(wa->running);
    PE_RING_DETACH(&ev->peer);
    frp = &CBFrame[++CurCBFrame];
    frp->ev = ev;
    frp->run_id = ++wa->running;
    if (Estat.on)
	frp->stats = Estat.enter(CurCBFrame, wa->max_cb_tm);
    assert(ev->prio >= 0 && ev->prio < PE_QUEUES);
    QueueTime[ev->prio] = wa->cbtime = NVtime();
    /* SETUP */

    if (CurCBFrame+1 >= MAX_CB_NEST) {
	ExitLevel = 0;
	croak("Deep recursion detected; invoking unloop_all()\n");
    }

    Dbg = WaDEBUGx(wa);
    if (Dbg) {
	/*
	SV *cvb = perl_get_sv("Carp::Verbose", 1);
	if (!SvIV(cvb)) {
	    SAVEIV(SvIVX(cvb));
	    SvIVX(cvb) = 1;
	}
	*/

	if (Dbg >= 2)
	    warn("Event: [%d]invoking '%s' (prio %d)\n",
		 CurCBFrame, SvPV(wa->desc,n_a),ev->prio);
    }

    if (!PE_RING_EMPTY(&Callback)) pe_map_check(&Callback);

    if (EvPERLCB(ev)) {
	SV *cb = SvRV((SV*)ev->callback);
	int pcflags = G_VOID | (SvIVX(Eval)? G_EVAL : 0);
	int retcnt;
	SV *evsv = event_2sv(ev);
	dSP;
	PUSHMARK(SP);
	if (SvTYPE(cb) == SVt_PVCV) {
	    XPUSHs(evsv);
	    PUTBACK;
	    retcnt = perl_call_sv((SV*) ev->callback, pcflags);
	} else {
	    AV *av = (AV*)cb;
	    assert(SvTYPE(cb) == SVt_PVAV);
	    XPUSHs(*av_fetch(av, 0, 0));
	    XPUSHs(evsv);
	    PUTBACK;
	    retcnt = perl_call_method(SvPV(*av_fetch(av, 1, 0),n_a), pcflags);
	}
	SPAGAIN;
	SP -= retcnt;
	PUTBACK;
	if (SvTRUE(ERRSV)) {
	    if (pcflags & G_EVAL)
		pe_callback_died(frp);
	    else
		sv_setsv(ERRSV, &PL_sv_no);
	}
    } else {
	assert(ev->callback);
	(* (void(*)(pe_event*)) ev->callback)(ev);
    }

    LEAVE;

    if (Estat.on) {
	if (frp->stats)  /* maybe in transition */
	    Estat.commit(frp->stats, wa);
	frp->stats=0;
    }
    if (Dbg >= 3)
	warn("Event: completed '%s'\n", SvPV(wa->desc, n_a));
    pe_event_postCB(frp);
}

static void boot_pe_event() {
    pe_event_vtbl *vt;

    vt = &event_vtbl;
    vt->new_event = pe_event_allocate;
    vt->dtor = pe_event_dtor;
    vt->stash = gv_stashpv("Event::Event", 1);
    PE_RING_INIT(&vt->freelist, 0);

    vt = &ioevent_vtbl;
    memcpy(vt, &event_vtbl, sizeof(pe_event_vtbl));
    vt->stash = gv_stashpv("Event::Event::Io", 1);
    vt->new_event = pe_ioevent_allocate;
    vt->dtor = pe_ioevent_dtor;
    PE_RING_INIT(&vt->freelist, 0);

    vt = &datafulevent_vtbl;
    memcpy(vt, &event_vtbl, sizeof(pe_event_vtbl));
    vt->stash = gv_stashpv("Event::Event::Dataful", 1);
    vt->new_event = pe_datafulevent_allocate;
    vt->dtor = pe_datafulevent_dtor;
    PE_RING_INIT(&vt->freelist, 0);

    memset(QueueTime, 0, sizeof(QueueTime));
}