The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#include "perl_zeromq.h"
#include "xshelper.h"

#if (PERLZMQ_TRACE > 0)
#define PerlZMQ_trace(...) \
    { \
        PerlIO_printf(PerlIO_stderr(), "[perlzmq] "); \
        PerlIO_printf(PerlIO_stderr(), __VA_ARGS__); \
        PerlIO_printf(PerlIO_stderr(), "\n"); \
    }
#else
#define PerlZMQ_trace(...)
#endif

STATIC_INLINE void
PerlZMQ_set_bang(pTHX_ int err) {
    SV *errsv = get_sv("!", GV_ADD);
    PerlZMQ_trace("Set ERRSV ($!) to %d", err);
    sv_setiv(errsv, err);
}

STATIC_INLINE int
PerlZMQ_Raw_Message_mg_dup(pTHX_ MAGIC* const mg, CLONE_PARAMS* const param) {
    PerlZMQ_Raw_Message *const src = (PerlZMQ_Raw_Message *) mg->mg_ptr;
    PerlZMQ_Raw_Message *dest;

    PerlZMQ_trace("Message -> dup");
    PERL_UNUSED_VAR( param );
 
    Newxz( dest, 1, PerlZMQ_Raw_Message );
    zmq_msg_init( dest );
    zmq_msg_copy ( dest, src );
    mg->mg_ptr = (char *) dest;
    return 0;
}

STATIC_INLINE int
PerlZMQ_Raw_Message_mg_free( pTHX_ SV * const sv, MAGIC *const mg ) {
    PerlZMQ_Raw_Message* const msg = (PerlZMQ_Raw_Message *) mg->mg_ptr;

    PERL_UNUSED_VAR(sv);
    PerlZMQ_trace( "START mg_free (Message)" );
    if ( msg != NULL ) {
        PerlZMQ_trace( " + zmq message %p", msg );
        zmq_msg_close( msg );
        Safefree( msg );
    }
    PerlZMQ_trace( "END mg_free (Message)" );
    return 1;
}

STATIC_INLINE MAGIC*
PerlZMQ_Raw_Message_mg_find(pTHX_ SV* const sv, const MGVTBL* const vtbl){
    MAGIC* mg;

    assert(sv   != NULL);
    assert(vtbl != NULL);

    for(mg = SvMAGIC(sv); mg; mg = mg->mg_moremagic){
        if(mg->mg_virtual == vtbl){
            assert(mg->mg_type == PERL_MAGIC_ext);
            return mg;
        }
    }

    croak("ZeroMQ::Raw::Message: Invalid ZeroMQ::Raw::Message object was passed to mg_find");
    return NULL; /* not reached */
}

STATIC_INLINE int
PerlZMQ_Raw_Context_mg_free( pTHX_ SV * const sv, MAGIC *const mg ) {
    PerlZMQ_Raw_Context* const ctxt = (PerlZMQ_Raw_Context *) mg->mg_ptr;
    PERL_UNUSED_VAR(sv);

    PerlZMQ_trace("START mg_free (Context)");
    if (ctxt != NULL) {
#ifdef USE_ITHREADS
        PerlZMQ_trace( " + thread enabled. thread %p", aTHX );
        PerlZMQ_trace( " + context wrapper %p with zmq context %p", ctxt, ctxt->ctxt );
        if ( ctxt->interp == aTHX ) { /* is where I came from */
            PerlZMQ_trace( " + detected mg_free from creating thread %p, cleaning up", aTHX );
            zmq_term( ctxt->ctxt );
            mg->mg_ptr = NULL;
            Safefree(ctxt);
        }
#else
        PerlZMQ_trace(" + zmq context %p", ctxt);
        zmq_term( ctxt );
        mg->mg_ptr = NULL;
#endif
    }
    PerlZMQ_trace("END mg_free (Context)");
    return 1;
}

STATIC_INLINE MAGIC*
PerlZMQ_Raw_Context_mg_find(pTHX_ SV* const sv, const MGVTBL* const vtbl){
    MAGIC* mg;

    assert(sv   != NULL);
    assert(vtbl != NULL);

    for(mg = SvMAGIC(sv); mg; mg = mg->mg_moremagic){
        if(mg->mg_virtual == vtbl){
            assert(mg->mg_type == PERL_MAGIC_ext);
            return mg;
        }
    }

    croak("ZeroMQ::Raw::Context: Invalid ZeroMQ::Raw::Context object was passed to mg_find");
    return NULL; /* not reached */
}

STATIC_INLINE int
PerlZMQ_Raw_Context_mg_dup(pTHX_ MAGIC* const mg, CLONE_PARAMS* const param){
    PERL_UNUSED_VAR(mg);
    PERL_UNUSED_VAR(param);
    return 0;
}

STATIC_INLINE int
PerlZMQ_Raw_Socket_invalidate( PerlZMQ_Raw_Socket *sock )
{
    SV *ctxt_sv = sock->assoc_ctxt;
    int rv;

    PerlZMQ_trace("START socket_invalidate");
    PerlZMQ_trace(" + zmq socket %p", sock->socket);
    rv = zmq_close( sock->socket );

    if ( SvOK(ctxt_sv) ) {
        PerlZMQ_trace(" + associated context: %p", ctxt_sv);
        SvREFCNT_dec(ctxt_sv);
        sock->assoc_ctxt = NULL;
    }

    Safefree(sock);

    PerlZMQ_trace("END socket_invalidate");
    return rv;
}

STATIC_INLINE int
PerlZMQ_Raw_Socket_mg_free(pTHX_ SV* const sv, MAGIC* const mg)
{
    PerlZMQ_Raw_Socket* const sock = (PerlZMQ_Raw_Socket *) mg->mg_ptr;
    PERL_UNUSED_VAR(sv);
    PerlZMQ_trace("START mg_free (Socket)");
    if (sock) {
        PerlZMQ_Raw_Socket_invalidate( sock );
        mg->mg_ptr = NULL;
    }
    PerlZMQ_trace("END mg_free (Socket)");
    return 1;
}

STATIC_INLINE int
PerlZMQ_Raw_Socket_mg_dup(pTHX_ MAGIC* const mg, CLONE_PARAMS* const param){
#ifdef USE_ITHREADS /* single threaded perl has no "xxx_dup()" APIs */
    mg->mg_ptr = NULL;
    PERL_UNUSED_VAR(param);
#else
    PERL_UNUSED_VAR(mg);
    PERL_UNUSED_VAR(param);
#endif
    return 0;
}

STATIC_INLINE MAGIC*
PerlZMQ_Raw_Socket_mg_find(pTHX_ SV* const sv, const MGVTBL* const vtbl){
    MAGIC* mg;

    assert(sv   != NULL);
    assert(vtbl != NULL);

    for(mg = SvMAGIC(sv); mg; mg = mg->mg_moremagic){
        if(mg->mg_virtual == vtbl){
            assert(mg->mg_type == PERL_MAGIC_ext);
            return mg;
        }
    }

    croak("ZeroMQ::Socket: Invalid ZeroMQ::Socket object was passed to mg_find");
    return NULL; /* not reached */
}

STATIC_INLINE void 
PerlZMQ_free_string(void *data, void *hint) {
    PERL_UNUSED_ARG(hint);
    free(data);
}

#include "mg-xs.inc"

MODULE = ZeroMQ    PACKAGE = ZeroMQ   PREFIX = PerlZMQ_

PROTOTYPES: DISABLED

BOOT:
    {
        PerlZMQ_trace( "Booting Perl ZeroMQ" );
    }

void
PerlZMQ_version()
    PREINIT:
        int major, minor, patch;
        I32 gimme;
    PPCODE:
        gimme = GIMME_V;
        if (gimme == G_VOID) {
            /* WTF? you don't want a return value?! */
            XSRETURN(0);
        }

        zmq_version(&major, &minor, &patch);
        if (gimme == G_SCALAR) {
            XPUSHs( sv_2mortal( newSVpvf( "%d.%d.%d", major, minor, patch ) ) );
            XSRETURN(1);
        } else {
            mXPUSHi( major );
            mXPUSHi( minor );
            mXPUSHi( patch );
            XSRETURN(3);
        }

MODULE = ZeroMQ    PACKAGE = ZeroMQ::Constants 

INCLUDE: const-xs.inc

MODULE = ZeroMQ    PACKAGE = ZeroMQ::Raw  PREFIX = PerlZMQ_Raw_

PROTOTYPES: DISABLED

PerlZMQ_Raw_Context *
PerlZMQ_Raw_zmq_init( nthreads = 5 )
        int nthreads;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Context", 20 ));
    CODE:
        PerlZMQ_trace( "START zmq_init" );
#ifdef USE_ITHREADS
        PerlZMQ_trace( " + threads enabled, aTHX %p", aTHX );
        Newxz( RETVAL, 1, PerlZMQ_Raw_Context );
        RETVAL->interp = aTHX;
        RETVAL->ctxt   = zmq_init( nthreads );
        PerlZMQ_trace( " + created context wrapper %p", RETVAL );
        PerlZMQ_trace( " + zmq context %p", RETVAL->ctxt );
#else
        PerlZMQ_trace( " + non-threaded context");
        RETVAL = zmq_init( nthreads );
#endif
        PerlZMQ_trace( "END zmq_init");
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_term( context )
        PerlZMQ_Raw_Context *context;
    CODE:
#ifdef USE_ITHREADS
        RETVAL = zmq_term( context->ctxt );
#else
        RETVAL = zmq_term( context );
#endif
        if (RETVAL == 0) {
            /* Cancel the SV's mg attr so to not call zmq_term automatically */
            MAGIC *mg =
                PerlZMQ_Raw_Context_mg_find( aTHX_ SvRV(ST(0)), &PerlZMQ_Raw_Context_vtbl );
            mg->mg_ptr = NULL;
        }

        /* mark the original SV's _closed flag as true */
        {
            SV *svr = SvRV(ST(0));
            if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) {
                croak("PANIC: Failed to store closed flag on blessed reference");
            }
        }
    OUTPUT:
        RETVAL

PerlZMQ_Raw_Message *
PerlZMQ_Raw_zmq_msg_init()
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Message", 20 ));
        int rc;
    CODE:
        Newxz( RETVAL, 1, PerlZMQ_Raw_Message );
        rc = zmq_msg_init( RETVAL );
        if ( rc != 0 ) {
            SET_BANG;
            zmq_msg_close( RETVAL );
            RETVAL = NULL;
        }
    OUTPUT:
        RETVAL

PerlZMQ_Raw_Message *
PerlZMQ_Raw_zmq_msg_init_size( size )
        IV size;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Message", 20 ));
        int rc;
    CODE: 
        Newxz( RETVAL, 1, PerlZMQ_Raw_Message );
        rc = zmq_msg_init_size(RETVAL, size);
        if ( rc != 0 ) {
            SET_BANG;
            zmq_msg_close( RETVAL );
            RETVAL = NULL;
        }
    OUTPUT:
        RETVAL

PerlZMQ_Raw_Message *
PerlZMQ_Raw_zmq_msg_init_data( data, size = -1)
        SV *data;
        IV size;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Message", 20 ));
        STRLEN x_data_len;
        char *sv_data = SvPV(data, x_data_len);
        char *x_data;
        int rc;
    CODE: 
        if (size >= 0) {
            x_data_len = size;
        }
        Newxz( RETVAL, 1, PerlZMQ_Raw_Message );
        x_data = (char *)malloc(x_data_len);
        memcpy(x_data, sv_data, x_data_len);
        rc = zmq_msg_init_data(RETVAL, x_data, x_data_len, PerlZMQ_free_string, NULL);
        if ( rc != 0 ) {
            SET_BANG;
            zmq_msg_close( RETVAL );
            RETVAL = NULL;
        }
        else {
            PerlZMQ_trace("zmq_msg_init_data created message %p", RETVAL);
        }
    OUTPUT:
        RETVAL

SV *
PerlZMQ_Raw_zmq_msg_data(message)
        PerlZMQ_Raw_Message *message;
    CODE:
        RETVAL = newSV(0);
        sv_setpvn( RETVAL, (char *) zmq_msg_data(message), (STRLEN) zmq_msg_size(message) );
    OUTPUT:
        RETVAL

size_t
PerlZMQ_Raw_zmq_msg_size(message)
        PerlZMQ_Raw_Message *message;
    CODE:
        RETVAL = zmq_msg_size(message);
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_msg_close(message)
        PerlZMQ_Raw_Message *message;
    CODE:
        PerlZMQ_trace("START zmq_msg_close");
        RETVAL = zmq_msg_close(message);
        Safefree(message);
        {
            MAGIC *mg =
                 PerlZMQ_Raw_Message_mg_find( aTHX_ SvRV(ST(0)), &PerlZMQ_Raw_Message_vtbl );
             mg->mg_ptr = NULL;
        }
        /* mark the original SV's _closed flag as true */
        {
            SV *svr = SvRV(ST(0));
            if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) {
                croak("PANIC: Failed to store closed flag on blessed reference");
            }
        }
        PerlZMQ_trace("END zmq_msg_close");
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_msg_move(dest, src)
        PerlZMQ_Raw_Message *dest;
        PerlZMQ_Raw_Message *src;
    CODE:
        RETVAL = zmq_msg_move( dest, src );
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_msg_copy (dest, src);
        PerlZMQ_Raw_Message *dest;
        PerlZMQ_Raw_Message *src;
    CODE:
        RETVAL = zmq_msg_copy( dest, src );
    OUTPUT:
        RETVAL

PerlZMQ_Raw_Socket *
PerlZMQ_Raw_zmq_socket (ctxt, type)
        PerlZMQ_Raw_Context *ctxt;
        IV type;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Socket", 19 ));
    CODE:
        PerlZMQ_trace( "START zmq_socket" );
        Newxz( RETVAL, 1, PerlZMQ_Raw_Socket );
        RETVAL->assoc_ctxt = NULL;
        RETVAL->socket = NULL;
#ifdef USE_ITHREADS
        PerlZMQ_trace( " + context wrapper %p, zmq context %p", ctxt, ctxt->ctxt );
        RETVAL->socket = zmq_socket( ctxt->ctxt, type );
#else
        PerlZMQ_trace( " + zmq context %p", ctxt );
        RETVAL->socket = zmq_socket( ctxt, type );
#endif
        RETVAL->assoc_ctxt = ST(0);
        SvREFCNT_inc(RETVAL->assoc_ctxt);
        PerlZMQ_trace( " + created socket %p", RETVAL );
        PerlZMQ_trace( "END zmq_socket" );
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_close(socket)
        PerlZMQ_Raw_Socket *socket;
    CODE:
        RETVAL = PerlZMQ_Raw_Socket_invalidate( socket );
        /* Cancel the SV's mg attr so to not call socket_invalidate again
           during Socket_mg_free
        */
        {
            MAGIC *mg =
                 PerlZMQ_Raw_Socket_mg_find( aTHX_ SvRV(ST(0)), &PerlZMQ_Raw_Socket_vtbl );
             mg->mg_ptr = NULL;
        }

        /* mark the original SV's _closed flag as true */
        {
            SV *svr = SvRV(ST(0));
            if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) {
                croak("PANIC: Failed to store closed flag on blessed reference");
            }
        }
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_connect(socket, addr)
        PerlZMQ_Raw_Socket *socket;
        char *addr;
    CODE:
        PerlZMQ_trace( "START zmq_connect" );
        PerlZMQ_trace( " + socket %p", socket );
        RETVAL = zmq_connect( socket->socket, addr );
        if (RETVAL != 0) {
            croak( "%s", zmq_strerror( zmq_errno() ) );
        }
        PerlZMQ_trace( "END zmq_connect" );
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_bind(socket, addr)
        PerlZMQ_Raw_Socket *socket;
        char *addr;
    CODE:
        PerlZMQ_trace( "zmq_bind: socket %p", socket );
        RETVAL = zmq_bind( socket->socket, addr );
        if (RETVAL != 0) {
            croak( "%s", zmq_strerror( zmq_errno() ) );
        }
    OUTPUT:
        RETVAL

PerlZMQ_Raw_Message *
PerlZMQ_Raw_zmq_recv(socket, flags = 0)
        PerlZMQ_Raw_Socket *socket;
        int flags;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Message", 20 ));
        int rv;
        zmq_msg_t msg;
    CODE:
        PerlZMQ_trace( "START zmq_recv" );
        RETVAL = NULL;
        zmq_msg_init(&msg);
        rv = zmq_recv(socket->socket, &msg, flags);
        PerlZMQ_trace(" + zmq recv with flags %d", flags);
        PerlZMQ_trace(" + zmq_recv returned with rv '%d'", rv);
        if (rv != 0) {
            SET_BANG;
            zmq_msg_close(&msg);
            PerlZMQ_trace(" + zmq_recv got bad status, closing temporary message");
        } else {
            Newxz(RETVAL, 1, PerlZMQ_Raw_Message);
            zmq_msg_init(RETVAL);
            zmq_msg_copy( RETVAL, &msg );
            zmq_msg_close(&msg);
            PerlZMQ_trace(" + zmq_recv created message %p", RETVAL );
        }
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_send(socket, message, flags = 0)
        PerlZMQ_Raw_Socket *socket;
        SV *message;
        int flags;
    PREINIT:
        PerlZMQ_Raw_Message *msg = NULL;
    CODE:
        if (! SvOK(message))
            croak("ZeroMQ::Socket::send() NULL message passed");

        if (sv_isobject(message) && sv_isa(message, "ZeroMQ::Raw::Message")) {
            MAGIC *mg = PerlZMQ_Raw_Context_mg_find(aTHX_ SvRV(message), &PerlZMQ_Raw_Message_vtbl);
            if (mg) {
                msg = (PerlZMQ_Raw_Message *) mg->mg_ptr;
            }

            if (msg == NULL) {
                croak("Got invalid message object");
            }
            
            RETVAL = zmq_send(socket->socket, msg, flags);
        } else {
            STRLEN data_len;
            char *x_data;
            char *data = SvPV(message, data_len);
            zmq_msg_t msg;

            x_data = (char *)malloc(data_len);
            memcpy(x_data, data, data_len);
            zmq_msg_init_data(&msg, x_data, data_len, PerlZMQ_free_string, NULL);
            RETVAL = zmq_send(socket->socket, &msg, flags);
            zmq_msg_close( &msg ); 
        }
    OUTPUT:
        RETVAL

SV *
PerlZMQ_Raw_zmq_getsockopt(sock, option)
        PerlZMQ_Raw_Socket *sock;
        int option;
    PREINIT:
        char     buf[256];
        int      i;
        uint64_t u64;
        int64_t  i64;
        uint32_t i32;
        size_t   len;
        int      status = -1;
    CODE:
        switch(option){
            case ZMQ_TYPE:
            case ZMQ_LINGER:
#ifdef ZMQ_RECONNECT_IVL
            case ZMQ_RECONNECT_IVL:
#endif
#ifdef ZMQ_RECONNECT_IVL_MAX
            case ZMQ_RECONNECT_IVL_MAX:
#endif
            case ZMQ_BACKLOG:
            case ZMQ_FD:
                len = sizeof(i);
                status = zmq_getsockopt(sock->socket, option, &i, &len);
                if(status == 0)
                    RETVAL = newSViv(i);
                break;

            case ZMQ_RCVMORE:
            case ZMQ_SWAP:
            case ZMQ_RATE:
            case ZMQ_RECOVERY_IVL:
            case ZMQ_MCAST_LOOP:
                len = sizeof(i64);
                status = zmq_getsockopt(sock->socket, option, &i64, &len);
                if(status == 0)
                    RETVAL = newSViv(i64);
                break;

            case ZMQ_HWM:
            case ZMQ_AFFINITY:
            case ZMQ_SNDBUF:
            case ZMQ_RCVBUF:
                len = sizeof(u64);
                status = zmq_getsockopt(sock->socket, option, &u64, &len);
                if(status == 0)
                    RETVAL = newSVuv(u64);
                break;

            case ZMQ_EVENTS:
                len = sizeof(i32);
                status = zmq_getsockopt(sock->socket, option, &i32, &len);
                if(status == 0)
                    RETVAL = newSViv(i32);
                break;

            case ZMQ_IDENTITY:
                len = sizeof(buf);
                status = zmq_getsockopt(sock->socket, option, &buf, &len);
                if(status == 0)
                    RETVAL = newSVpvn(buf, len);
                break;
        }
        if(status != 0){
        switch(_ERRNO) {
            SET_BANG;
            case EINTR:
                    croak("The operation was interrupted by delivery of a signal");
            case ETERM:
                croak("The 0MQ context accociated with the specified socket was terminated");
            case EFAULT:
                croak("The provided socket was not valid");
                case EINVAL:
                    croak("Invalid argument");
            default:
                croak("Unknown error reading socket option");
        }
    }
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_setsockopt(sock, option, value)
        PerlZMQ_Raw_Socket *sock;
        int option;
        SV *value;
    PREINIT:
        STRLEN len;
        const char *ptr;
        uint64_t u64;
        int64_t  i64;
        int i;
    CODE:
        switch(option){
            case ZMQ_IDENTITY:
            case ZMQ_SUBSCRIBE:
            case ZMQ_UNSUBSCRIBE:
                ptr = SvPV(value, len);
                RETVAL = zmq_setsockopt(sock->socket, option, ptr, len);
                break;

            case ZMQ_SWAP:
            case ZMQ_RATE:
#ifdef ZMQ_RECONNECT_IVL
            case ZMQ_RECONNECT_IVL:
#endif
#ifdef ZMQ_RECONNECT_IVL_MAX
            case ZMQ_RECONNECT_IVL_MAX:
#endif
            case ZMQ_MCAST_LOOP:
                i64 = SvIV(value);
                RETVAL = zmq_setsockopt(sock->socket, option, &i64, sizeof(int64_t));
                break;

            case ZMQ_HWM:
            case ZMQ_AFFINITY:
            case ZMQ_SNDBUF:
            case ZMQ_RCVBUF:
                u64 = SvUV(value);
                RETVAL = zmq_setsockopt(sock->socket, option, &u64, sizeof(uint64_t));
                break;

            case ZMQ_LINGER:
                i = SvIV(value);
                RETVAL = zmq_setsockopt(sock->socket, option, &i, sizeof(i));
                break;

            default:
                warn("Unknown sockopt type %d, assuming string.  Send patch", option);
                ptr = SvPV(value, len);
                RETVAL = zmq_setsockopt(sock->socket, option, ptr, len);
        }
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_poll( list, timeout = 0 )
        AV *list;
        long timeout;
    PREINIT:
        I32 list_len;
        zmq_pollitem_t *pollitems;
        CV **callbacks;
        int i;
    CODE:
        list_len = av_len( list ) + 1;
        if (list_len <= 0) {
            XSRETURN(0);
        }

        Newxz( pollitems, list_len, zmq_pollitem_t);
        Newxz( callbacks, list_len, CV *);

        /* list should be a list of hashrefs fd, events, and callbacks */
        for (i = 0; i < list_len; i++) {
            SV **svr = av_fetch( list, i, 0 );
            HV  *elm;
            if (svr == NULL || ! SvOK(*svr) || ! SvROK(*svr) || SvTYPE(SvRV(*svr)) != SVt_PVHV) {
                Safefree( pollitems );
                Safefree( callbacks );
                croak("Invalid value on index %d", i);
            }
            elm = (HV *) SvRV(*svr);

            callbacks[i] = NULL;
            pollitems[i].revents = 0;
            pollitems[i].events  = 0;
            pollitems[i].fd      = 0;
            pollitems[i].socket  = NULL;

            svr = hv_fetch( elm, "socket", 6, NULL );
            if (svr != NULL) {
                MAGIC *mg;
                if (! SvOK(*svr) || !sv_isobject( *svr) || ! sv_isa(*svr, "ZeroMQ::Raw::Socket")) {
                    Safefree( pollitems );
                    Safefree( callbacks );
                    croak("Invalid 'socket' given for index %d", i);
                }
                mg = PerlZMQ_Raw_Socket_mg_find( aTHX_ SvRV(*svr), &PerlZMQ_Raw_Socket_vtbl );
                pollitems[i].socket = ((PerlZMQ_Raw_Socket *) mg->mg_ptr)->socket;
                PerlZMQ_trace( " + pollitem[%d].socket = %p", i, pollitems[i].socket );
            } else {
                svr = hv_fetch( elm, "fd", 2, NULL );
                if (svr == NULL || ! SvOK(*svr) || SvTYPE(*svr) != SVt_IV) {
                    Safefree( pollitems );
                    Safefree( callbacks );
                    croak("Invalid 'fd' given for index %d", i);
                }
                pollitems[i].fd = SvIV( *svr );
            }

            svr = hv_fetch( elm, "events", 6, NULL );
            if (svr == NULL || ! SvOK(*svr) || SvTYPE(*svr) != SVt_IV) {
                Safefree( pollitems );
                Safefree( callbacks );
                croak("Invalid 'events' given for index %d", i);
            }
            pollitems[i].events = SvIV( *svr );

            svr = hv_fetch( elm, "callback", 8, NULL );
            if (svr == NULL || ! SvOK(*svr) || ! SvROK(*svr) || SvTYPE(SvRV(*svr)) != SVt_PVCV) {
                Safefree( pollitems );
                Safefree( callbacks );
                croak("Invalid 'callback' given for index %d", i);
            }
            callbacks[i] = (CV *) SvRV( *svr );
        }

        /* now call zmq_poll */
        RETVAL = zmq_poll( pollitems, list_len, timeout );
        for ( i = 0; i < list_len; i++ ) {
            if (pollitems[i].revents & pollitems[i].events) {
                dSP;
                ENTER;
                SAVETMPS;
                PUSHMARK(SP);
                PUTBACK;

                call_sv( (SV*)callbacks[i], G_SCALAR );
                SPAGAIN;

                PUTBACK;
                FREETMPS;
                LEAVE;
            }
        }
        Safefree(pollitems);
        Safefree(callbacks);
    OUTPUT:
        RETVAL

int
PerlZMQ_Raw_zmq_device( device, insocket, outsocket )
        int device;
        PerlZMQ_Raw_Socket *insocket;
        PerlZMQ_Raw_Socket *outsocket;
    CODE:
        RETVAL = zmq_device( device, insocket->socket, outsocket->socket );
    OUTPUT:
        RETVAL