#include "perl_zeromq.h" #include "xshelper.h" #if (PERLZMQ_TRACE > 0) #define PerlZMQ_trace(...) \ { \ PerlIO_printf(PerlIO_stderr(), "[perlzmq] "); \ PerlIO_printf(PerlIO_stderr(), __VA_ARGS__); \ PerlIO_printf(PerlIO_stderr(), "\n"); \ } #else #define PerlZMQ_trace(...) #endif STATIC_INLINE void PerlZMQ_set_bang(pTHX_ int err) { SV *errsv = get_sv("!", GV_ADD); PerlZMQ_trace("Set ERRSV ($!) to %d", err); sv_setiv(errsv, err); } STATIC_INLINE int PerlZMQ_Raw_Message_mg_dup(pTHX_ MAGIC* const mg, CLONE_PARAMS* const param) { PerlZMQ_Raw_Message *const src = (PerlZMQ_Raw_Message *) mg->mg_ptr; PerlZMQ_Raw_Message *dest; PerlZMQ_trace("Message -> dup"); PERL_UNUSED_VAR( param ); Newxz( dest, 1, PerlZMQ_Raw_Message ); zmq_msg_init( dest ); zmq_msg_copy ( dest, src ); mg->mg_ptr = (char *) dest; return 0; } STATIC_INLINE int PerlZMQ_Raw_Message_mg_free( pTHX_ SV * const sv, MAGIC *const mg ) { PerlZMQ_Raw_Message* const msg = (PerlZMQ_Raw_Message *) mg->mg_ptr; PERL_UNUSED_VAR(sv); PerlZMQ_trace( "START mg_free (Message)" ); if ( msg != NULL ) { PerlZMQ_trace( " + zmq message %p", msg ); zmq_msg_close( msg ); Safefree( msg ); } PerlZMQ_trace( "END mg_free (Message)" ); return 1; } STATIC_INLINE MAGIC* PerlZMQ_Raw_Message_mg_find(pTHX_ SV* const sv, const MGVTBL* const vtbl){ MAGIC* mg; assert(sv != NULL); assert(vtbl != NULL); for(mg = SvMAGIC(sv); mg; mg = mg->mg_moremagic){ if(mg->mg_virtual == vtbl){ assert(mg->mg_type == PERL_MAGIC_ext); return mg; } } croak("ZeroMQ::Raw::Message: Invalid ZeroMQ::Raw::Message object was passed to mg_find"); return NULL; /* not reached */ } STATIC_INLINE int PerlZMQ_Raw_Context_mg_free( pTHX_ SV * const sv, MAGIC *const mg ) { PerlZMQ_Raw_Context* const ctxt = (PerlZMQ_Raw_Context *) mg->mg_ptr; PERL_UNUSED_VAR(sv); PerlZMQ_trace("START mg_free (Context)"); if (ctxt != NULL) { #ifdef USE_ITHREADS PerlZMQ_trace( " + thread enabled. thread %p", aTHX ); PerlZMQ_trace( " + context wrapper %p with zmq context %p", ctxt, ctxt->ctxt ); if ( ctxt->interp == aTHX ) { /* is where I came from */ PerlZMQ_trace( " + detected mg_free from creating thread %p, cleaning up", aTHX ); zmq_term( ctxt->ctxt ); mg->mg_ptr = NULL; Safefree(ctxt); } #else PerlZMQ_trace(" + zmq context %p", ctxt); zmq_term( ctxt ); mg->mg_ptr = NULL; #endif } PerlZMQ_trace("END mg_free (Context)"); return 1; } STATIC_INLINE MAGIC* PerlZMQ_Raw_Context_mg_find(pTHX_ SV* const sv, const MGVTBL* const vtbl){ MAGIC* mg; assert(sv != NULL); assert(vtbl != NULL); for(mg = SvMAGIC(sv); mg; mg = mg->mg_moremagic){ if(mg->mg_virtual == vtbl){ assert(mg->mg_type == PERL_MAGIC_ext); return mg; } } croak("ZeroMQ::Raw::Context: Invalid ZeroMQ::Raw::Context object was passed to mg_find"); return NULL; /* not reached */ } STATIC_INLINE int PerlZMQ_Raw_Context_mg_dup(pTHX_ MAGIC* const mg, CLONE_PARAMS* const param){ PERL_UNUSED_VAR(mg); PERL_UNUSED_VAR(param); return 0; } STATIC_INLINE int PerlZMQ_Raw_Socket_invalidate( PerlZMQ_Raw_Socket *sock ) { SV *ctxt_sv = sock->assoc_ctxt; int rv; PerlZMQ_trace("START socket_invalidate"); PerlZMQ_trace(" + zmq socket %p", sock->socket); rv = zmq_close( sock->socket ); if ( SvOK(ctxt_sv) ) { PerlZMQ_trace(" + associated context: %p", ctxt_sv); SvREFCNT_dec(ctxt_sv); sock->assoc_ctxt = NULL; } Safefree(sock); PerlZMQ_trace("END socket_invalidate"); return rv; } STATIC_INLINE int PerlZMQ_Raw_Socket_mg_free(pTHX_ SV* const sv, MAGIC* const mg) { PerlZMQ_Raw_Socket* const sock = (PerlZMQ_Raw_Socket *) mg->mg_ptr; PERL_UNUSED_VAR(sv); PerlZMQ_trace("START mg_free (Socket)"); if (sock) { PerlZMQ_Raw_Socket_invalidate( sock ); mg->mg_ptr = NULL; } PerlZMQ_trace("END mg_free (Socket)"); return 1; } STATIC_INLINE int PerlZMQ_Raw_Socket_mg_dup(pTHX_ MAGIC* const mg, CLONE_PARAMS* const param){ #ifdef USE_ITHREADS /* single threaded perl has no "xxx_dup()" APIs */ mg->mg_ptr = NULL; PERL_UNUSED_VAR(param); #else PERL_UNUSED_VAR(mg); PERL_UNUSED_VAR(param); #endif return 0; } STATIC_INLINE MAGIC* PerlZMQ_Raw_Socket_mg_find(pTHX_ SV* const sv, const MGVTBL* const vtbl){ MAGIC* mg; assert(sv != NULL); assert(vtbl != NULL); for(mg = SvMAGIC(sv); mg; mg = mg->mg_moremagic){ if(mg->mg_virtual == vtbl){ assert(mg->mg_type == PERL_MAGIC_ext); return mg; } } croak("ZeroMQ::Socket: Invalid ZeroMQ::Socket object was passed to mg_find"); return NULL; /* not reached */ } STATIC_INLINE void PerlZMQ_free_string(void *data, void *hint) { PERL_UNUSED_ARG(hint); free(data); } #include "mg-xs.inc" MODULE = ZeroMQ PACKAGE = ZeroMQ PREFIX = PerlZMQ_ PROTOTYPES: DISABLED BOOT: { PerlZMQ_trace( "Booting Perl ZeroMQ" ); } void PerlZMQ_version() PREINIT: int major, minor, patch; I32 gimme; PPCODE: gimme = GIMME_V; if (gimme == G_VOID) { /* WTF? you don't want a return value?! */ XSRETURN(0); } zmq_version(&major, &minor, &patch); if (gimme == G_SCALAR) { XPUSHs( sv_2mortal( newSVpvf( "%d.%d.%d", major, minor, patch ) ) ); XSRETURN(1); } else { mXPUSHi( major ); mXPUSHi( minor ); mXPUSHi( patch ); XSRETURN(3); } MODULE = ZeroMQ PACKAGE = ZeroMQ::Constants INCLUDE: const-xs.inc MODULE = ZeroMQ PACKAGE = ZeroMQ::Raw PREFIX = PerlZMQ_Raw_ PROTOTYPES: DISABLED PerlZMQ_Raw_Context * PerlZMQ_Raw_zmq_init( nthreads = 5 ) int nthreads; PREINIT: SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Context", 20 )); CODE: PerlZMQ_trace( "START zmq_init" ); #ifdef USE_ITHREADS PerlZMQ_trace( " + threads enabled, aTHX %p", aTHX ); Newxz( RETVAL, 1, PerlZMQ_Raw_Context ); RETVAL->interp = aTHX; RETVAL->ctxt = zmq_init( nthreads ); PerlZMQ_trace( " + created context wrapper %p", RETVAL ); PerlZMQ_trace( " + zmq context %p", RETVAL->ctxt ); #else PerlZMQ_trace( " + non-threaded context"); RETVAL = zmq_init( nthreads ); #endif PerlZMQ_trace( "END zmq_init"); OUTPUT: RETVAL int PerlZMQ_Raw_zmq_term( context ) PerlZMQ_Raw_Context *context; CODE: #ifdef USE_ITHREADS RETVAL = zmq_term( context->ctxt ); #else RETVAL = zmq_term( context ); #endif if (RETVAL == 0) { /* Cancel the SV's mg attr so to not call zmq_term automatically */ MAGIC *mg = PerlZMQ_Raw_Context_mg_find( aTHX_ SvRV(ST(0)), &PerlZMQ_Raw_Context_vtbl ); mg->mg_ptr = NULL; } /* mark the original SV's _closed flag as true */ { SV *svr = SvRV(ST(0)); if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) { croak("PANIC: Failed to store closed flag on blessed reference"); } } OUTPUT: RETVAL PerlZMQ_Raw_Message * PerlZMQ_Raw_zmq_msg_init() PREINIT: SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Message", 20 )); int rc; CODE: Newxz( RETVAL, 1, PerlZMQ_Raw_Message ); rc = zmq_msg_init( RETVAL ); if ( rc != 0 ) { SET_BANG; zmq_msg_close( RETVAL ); RETVAL = NULL; } OUTPUT: RETVAL PerlZMQ_Raw_Message * PerlZMQ_Raw_zmq_msg_init_size( size ) IV size; PREINIT: SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Message", 20 )); int rc; CODE: Newxz( RETVAL, 1, PerlZMQ_Raw_Message ); rc = zmq_msg_init_size(RETVAL, size); if ( rc != 0 ) { SET_BANG; zmq_msg_close( RETVAL ); RETVAL = NULL; } OUTPUT: RETVAL PerlZMQ_Raw_Message * PerlZMQ_Raw_zmq_msg_init_data( data, size = -1) SV *data; IV size; PREINIT: SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Message", 20 )); STRLEN x_data_len; char *sv_data = SvPV(data, x_data_len); char *x_data; int rc; CODE: if (size >= 0) { x_data_len = size; } Newxz( RETVAL, 1, PerlZMQ_Raw_Message ); x_data = (char *)malloc(x_data_len); memcpy(x_data, sv_data, x_data_len); rc = zmq_msg_init_data(RETVAL, x_data, x_data_len, PerlZMQ_free_string, NULL); if ( rc != 0 ) { SET_BANG; zmq_msg_close( RETVAL ); RETVAL = NULL; } else { PerlZMQ_trace("zmq_msg_init_data created message %p", RETVAL); } OUTPUT: RETVAL SV * PerlZMQ_Raw_zmq_msg_data(message) PerlZMQ_Raw_Message *message; CODE: RETVAL = newSV(0); sv_setpvn( RETVAL, (char *) zmq_msg_data(message), (STRLEN) zmq_msg_size(message) ); OUTPUT: RETVAL size_t PerlZMQ_Raw_zmq_msg_size(message) PerlZMQ_Raw_Message *message; CODE: RETVAL = zmq_msg_size(message); OUTPUT: RETVAL int PerlZMQ_Raw_zmq_msg_close(message) PerlZMQ_Raw_Message *message; CODE: PerlZMQ_trace("START zmq_msg_close"); RETVAL = zmq_msg_close(message); Safefree(message); { MAGIC *mg = PerlZMQ_Raw_Message_mg_find( aTHX_ SvRV(ST(0)), &PerlZMQ_Raw_Message_vtbl ); mg->mg_ptr = NULL; } /* mark the original SV's _closed flag as true */ { SV *svr = SvRV(ST(0)); if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) { croak("PANIC: Failed to store closed flag on blessed reference"); } } PerlZMQ_trace("END zmq_msg_close"); OUTPUT: RETVAL int PerlZMQ_Raw_zmq_msg_move(dest, src) PerlZMQ_Raw_Message *dest; PerlZMQ_Raw_Message *src; CODE: RETVAL = zmq_msg_move( dest, src ); OUTPUT: RETVAL int PerlZMQ_Raw_zmq_msg_copy (dest, src); PerlZMQ_Raw_Message *dest; PerlZMQ_Raw_Message *src; CODE: RETVAL = zmq_msg_copy( dest, src ); OUTPUT: RETVAL PerlZMQ_Raw_Socket * PerlZMQ_Raw_zmq_socket (ctxt, type) PerlZMQ_Raw_Context *ctxt; IV type; PREINIT: SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Socket", 19 )); CODE: PerlZMQ_trace( "START zmq_socket" ); Newxz( RETVAL, 1, PerlZMQ_Raw_Socket ); RETVAL->assoc_ctxt = NULL; RETVAL->socket = NULL; #ifdef USE_ITHREADS PerlZMQ_trace( " + context wrapper %p, zmq context %p", ctxt, ctxt->ctxt ); RETVAL->socket = zmq_socket( ctxt->ctxt, type ); #else PerlZMQ_trace( " + zmq context %p", ctxt ); RETVAL->socket = zmq_socket( ctxt, type ); #endif RETVAL->assoc_ctxt = ST(0); SvREFCNT_inc(RETVAL->assoc_ctxt); PerlZMQ_trace( " + created socket %p", RETVAL ); PerlZMQ_trace( "END zmq_socket" ); OUTPUT: RETVAL int PerlZMQ_Raw_zmq_close(socket) PerlZMQ_Raw_Socket *socket; CODE: RETVAL = PerlZMQ_Raw_Socket_invalidate( socket ); /* Cancel the SV's mg attr so to not call socket_invalidate again during Socket_mg_free */ { MAGIC *mg = PerlZMQ_Raw_Socket_mg_find( aTHX_ SvRV(ST(0)), &PerlZMQ_Raw_Socket_vtbl ); mg->mg_ptr = NULL; } /* mark the original SV's _closed flag as true */ { SV *svr = SvRV(ST(0)); if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) { croak("PANIC: Failed to store closed flag on blessed reference"); } } OUTPUT: RETVAL int PerlZMQ_Raw_zmq_connect(socket, addr) PerlZMQ_Raw_Socket *socket; char *addr; CODE: PerlZMQ_trace( "START zmq_connect" ); PerlZMQ_trace( " + socket %p", socket ); RETVAL = zmq_connect( socket->socket, addr ); if (RETVAL != 0) { croak( "%s", zmq_strerror( zmq_errno() ) ); } PerlZMQ_trace( "END zmq_connect" ); OUTPUT: RETVAL int PerlZMQ_Raw_zmq_bind(socket, addr) PerlZMQ_Raw_Socket *socket; char *addr; CODE: PerlZMQ_trace( "zmq_bind: socket %p", socket ); RETVAL = zmq_bind( socket->socket, addr ); if (RETVAL != 0) { croak( "%s", zmq_strerror( zmq_errno() ) ); } OUTPUT: RETVAL PerlZMQ_Raw_Message * PerlZMQ_Raw_zmq_recv(socket, flags = 0) PerlZMQ_Raw_Socket *socket; int flags; PREINIT: SV *class_sv = sv_2mortal(newSVpvn( "ZeroMQ::Raw::Message", 20 )); int rv; zmq_msg_t msg; CODE: PerlZMQ_trace( "START zmq_recv" ); RETVAL = NULL; zmq_msg_init(&msg); rv = zmq_recv(socket->socket, &msg, flags); PerlZMQ_trace(" + zmq recv with flags %d", flags); PerlZMQ_trace(" + zmq_recv returned with rv '%d'", rv); if (rv != 0) { SET_BANG; zmq_msg_close(&msg); PerlZMQ_trace(" + zmq_recv got bad status, closing temporary message"); } else { Newxz(RETVAL, 1, PerlZMQ_Raw_Message); zmq_msg_init(RETVAL); zmq_msg_copy( RETVAL, &msg ); zmq_msg_close(&msg); PerlZMQ_trace(" + zmq_recv created message %p", RETVAL ); } OUTPUT: RETVAL int PerlZMQ_Raw_zmq_send(socket, message, flags = 0) PerlZMQ_Raw_Socket *socket; SV *message; int flags; PREINIT: PerlZMQ_Raw_Message *msg = NULL; CODE: if (! SvOK(message)) croak("ZeroMQ::Socket::send() NULL message passed"); if (sv_isobject(message) && sv_isa(message, "ZeroMQ::Raw::Message")) { MAGIC *mg = PerlZMQ_Raw_Context_mg_find(aTHX_ SvRV(message), &PerlZMQ_Raw_Message_vtbl); if (mg) { msg = (PerlZMQ_Raw_Message *) mg->mg_ptr; } if (msg == NULL) { croak("Got invalid message object"); } RETVAL = zmq_send(socket->socket, msg, flags); } else { STRLEN data_len; char *x_data; char *data = SvPV(message, data_len); zmq_msg_t msg; x_data = (char *)malloc(data_len); memcpy(x_data, data, data_len); zmq_msg_init_data(&msg, x_data, data_len, PerlZMQ_free_string, NULL); RETVAL = zmq_send(socket->socket, &msg, flags); zmq_msg_close( &msg ); } OUTPUT: RETVAL SV * PerlZMQ_Raw_zmq_getsockopt(sock, option) PerlZMQ_Raw_Socket *sock; int option; PREINIT: char buf[256]; int i; uint64_t u64; int64_t i64; uint32_t i32; size_t len; int status = -1; CODE: switch(option){ case ZMQ_TYPE: case ZMQ_LINGER: #ifdef ZMQ_RECONNECT_IVL case ZMQ_RECONNECT_IVL: #endif #ifdef ZMQ_RECONNECT_IVL_MAX case ZMQ_RECONNECT_IVL_MAX: #endif case ZMQ_BACKLOG: case ZMQ_FD: len = sizeof(i); status = zmq_getsockopt(sock->socket, option, &i, &len); if(status == 0) RETVAL = newSViv(i); break; case ZMQ_RCVMORE: case ZMQ_SWAP: case ZMQ_RATE: case ZMQ_RECOVERY_IVL: case ZMQ_MCAST_LOOP: len = sizeof(i64); status = zmq_getsockopt(sock->socket, option, &i64, &len); if(status == 0) RETVAL = newSViv(i64); break; case ZMQ_HWM: case ZMQ_AFFINITY: case ZMQ_SNDBUF: case ZMQ_RCVBUF: len = sizeof(u64); status = zmq_getsockopt(sock->socket, option, &u64, &len); if(status == 0) RETVAL = newSVuv(u64); break; case ZMQ_EVENTS: len = sizeof(i32); status = zmq_getsockopt(sock->socket, option, &i32, &len); if(status == 0) RETVAL = newSViv(i32); break; case ZMQ_IDENTITY: len = sizeof(buf); status = zmq_getsockopt(sock->socket, option, &buf, &len); if(status == 0) RETVAL = newSVpvn(buf, len); break; } if(status != 0){ switch(_ERRNO) { SET_BANG; case EINTR: croak("The operation was interrupted by delivery of a signal"); case ETERM: croak("The 0MQ context accociated with the specified socket was terminated"); case EFAULT: croak("The provided socket was not valid"); case EINVAL: croak("Invalid argument"); default: croak("Unknown error reading socket option"); } } OUTPUT: RETVAL int PerlZMQ_Raw_zmq_setsockopt(sock, option, value) PerlZMQ_Raw_Socket *sock; int option; SV *value; PREINIT: STRLEN len; const char *ptr; uint64_t u64; int64_t i64; int i; CODE: switch(option){ case ZMQ_IDENTITY: case ZMQ_SUBSCRIBE: case ZMQ_UNSUBSCRIBE: ptr = SvPV(value, len); RETVAL = zmq_setsockopt(sock->socket, option, ptr, len); break; case ZMQ_SWAP: case ZMQ_RATE: #ifdef ZMQ_RECONNECT_IVL case ZMQ_RECONNECT_IVL: #endif #ifdef ZMQ_RECONNECT_IVL_MAX case ZMQ_RECONNECT_IVL_MAX: #endif case ZMQ_MCAST_LOOP: i64 = SvIV(value); RETVAL = zmq_setsockopt(sock->socket, option, &i64, sizeof(int64_t)); break; case ZMQ_HWM: case ZMQ_AFFINITY: case ZMQ_SNDBUF: case ZMQ_RCVBUF: u64 = SvUV(value); RETVAL = zmq_setsockopt(sock->socket, option, &u64, sizeof(uint64_t)); break; case ZMQ_LINGER: i = SvIV(value); RETVAL = zmq_setsockopt(sock->socket, option, &i, sizeof(i)); break; default: warn("Unknown sockopt type %d, assuming string. Send patch", option); ptr = SvPV(value, len); RETVAL = zmq_setsockopt(sock->socket, option, ptr, len); } OUTPUT: RETVAL int PerlZMQ_Raw_zmq_poll( list, timeout = 0 ) AV *list; long timeout; PREINIT: I32 list_len; zmq_pollitem_t *pollitems; CV **callbacks; int i; CODE: list_len = av_len( list ) + 1; if (list_len <= 0) { XSRETURN(0); } Newxz( pollitems, list_len, zmq_pollitem_t); Newxz( callbacks, list_len, CV *); /* list should be a list of hashrefs fd, events, and callbacks */ for (i = 0; i < list_len; i++) { SV **svr = av_fetch( list, i, 0 ); HV *elm; if (svr == NULL || ! SvOK(*svr) || ! SvROK(*svr) || SvTYPE(SvRV(*svr)) != SVt_PVHV) { Safefree( pollitems ); Safefree( callbacks ); croak("Invalid value on index %d", i); } elm = (HV *) SvRV(*svr); callbacks[i] = NULL; pollitems[i].revents = 0; pollitems[i].events = 0; pollitems[i].fd = 0; pollitems[i].socket = NULL; svr = hv_fetch( elm, "socket", 6, NULL ); if (svr != NULL) { MAGIC *mg; if (! SvOK(*svr) || !sv_isobject( *svr) || ! sv_isa(*svr, "ZeroMQ::Raw::Socket")) { Safefree( pollitems ); Safefree( callbacks ); croak("Invalid 'socket' given for index %d", i); } mg = PerlZMQ_Raw_Socket_mg_find( aTHX_ SvRV(*svr), &PerlZMQ_Raw_Socket_vtbl ); pollitems[i].socket = ((PerlZMQ_Raw_Socket *) mg->mg_ptr)->socket; PerlZMQ_trace( " + pollitem[%d].socket = %p", i, pollitems[i].socket ); } else { svr = hv_fetch( elm, "fd", 2, NULL ); if (svr == NULL || ! SvOK(*svr) || SvTYPE(*svr) != SVt_IV) { Safefree( pollitems ); Safefree( callbacks ); croak("Invalid 'fd' given for index %d", i); } pollitems[i].fd = SvIV( *svr ); } svr = hv_fetch( elm, "events", 6, NULL ); if (svr == NULL || ! SvOK(*svr) || SvTYPE(*svr) != SVt_IV) { Safefree( pollitems ); Safefree( callbacks ); croak("Invalid 'events' given for index %d", i); } pollitems[i].events = SvIV( *svr ); svr = hv_fetch( elm, "callback", 8, NULL ); if (svr == NULL || ! SvOK(*svr) || ! SvROK(*svr) || SvTYPE(SvRV(*svr)) != SVt_PVCV) { Safefree( pollitems ); Safefree( callbacks ); croak("Invalid 'callback' given for index %d", i); } callbacks[i] = (CV *) SvRV( *svr ); } /* now call zmq_poll */ RETVAL = zmq_poll( pollitems, list_len, timeout ); for ( i = 0; i < list_len; i++ ) { if (pollitems[i].revents & pollitems[i].events) { dSP; ENTER; SAVETMPS; PUSHMARK(SP); PUTBACK; call_sv( (SV*)callbacks[i], G_SCALAR ); SPAGAIN; PUTBACK; FREETMPS; LEAVE; } } Safefree(pollitems); Safefree(callbacks); OUTPUT: RETVAL int PerlZMQ_Raw_zmq_device( device, insocket, outsocket ) int device; PerlZMQ_Raw_Socket *insocket; PerlZMQ_Raw_Socket *outsocket; CODE: RETVAL = zmq_device( device, insocket->socket, outsocket->socket ); OUTPUT: RETVAL