package Tibco::Rv::Event; use vars qw/ $VERSION @CARP_NOT /; $VERSION = '1.12'; use Tibco::Rv::Msg; use Tibco::Rv::Cm::Msg; @CARP_NOT = qw/ Tibco::Rv::Msg Tibco::Rv::Cm::Msg /; use Inline with => 'Tibco::Rv::Inline'; use Inline C => 'DATA', NAME => __PACKAGE__, VERSION => $Tibco::Rv::Inline::VERSION; sub new { my ( $proto ) = shift; my ( %params ) = ( queue => $Tibco::Rv::Queue::DEFAULT, callback => sub { } ); my ( %args ) = @_; map { Tibco::Rv::die( Tibco::Rv::INVALID_ARG ) unless ( exists $params{$_} ) } keys %args; %params = ( %params, %args ); my ( $class ) = ref( $proto ) || $proto; my ( $self ) = bless { queue => $params{queue}, id => undef }, $class; $self->{callback} = $params{callback}; $self->{internal_nomsg_callback} = sub { $self->onEvent( ) }; $self->{internal_msg_callback} = sub { $self->onEvent( Tibco::Rv::Msg->_adopt( shift ) ) }; $self->{internal_cmmsg_callback} = sub { $self->onEvent( Tibco::Rv::Cm::Msg->_adopt( shift ) ) }; return $self; } sub queue { return shift->{queue} } sub callback { return shift->{callback} } sub onEvent { my ( $self, @args ) = @_; $self->{callback}->( @args ); } # callback not supported sub DESTROY { my ( $self ) = @_; return unless ( exists $self->{id} ); my ( $status ) = Event_DestroyEx( $self->{id} ); delete $self->{id}; Tibco::Rv::die( $status ) unless ( $status == Tibco::Rv::OK ); } 1; =pod =head1 NAME Tibco::Rv::Event - Base class for Tibco events =head1 SYNOPSIS use base qw/ Tibco::Rv::Event /; sub new { # ... my ( $self ) = $proto->SUPER::new( queue => $queue, callback => $callback ); # ... } =head1 DESCRIPTION Base class for Tibco Events -- Listeners, Timers, and IO events. Don't use this directly. =head1 CONSTRUCTOR =over 4 =item $self = $proto->SUPER::new( %args ) %args: queue => $queue, callback => sub { ... } Creates a C, or more specifically, one of the Event subclasses -- Listener, Timer, or IO, with queue $queue (defaults to $Tibco::Rv::Queue::DEFAULT if not specified), and the given callback (defaults to sub { } if not specified). =back =head1 METHODS =over 4 =item $queue = $event->queue Returns the queue on which events will be dispatched. =item $callback = $event->callback Returns the callback code reference. =item $event->onEvent =item $event->onEvent( $msg ) Trigger an event directly. Subclasses determine which version will be called. L objects use the version with a C<$msg> parameter, L and L objects use the version with no paramters. =item $event->DESTROY Cancels interest in this event. Called automatically when C<$event> goes out of scope. Calling DESTROY more than once has no effect. =back =head1 SEE ALSO =over 4 =item L =item L =item L =back =head1 AUTHOR Paul Sturm EIE =cut __DATA__ __C__ tibrv_status tibrvEvent_ResetTimerInterval( tibrvEvent event, tibrv_f64 interval ); tibrv_status tibrvcmEvent_SetExplicitConfirm( tibrvcmEvent cmListener ); tibrv_status tibrvcmEvent_ConfirmMsg( tibrvcmEvent cmListener, tibrvMsg message ); static void callback_perl_noargs( SV * callback ) { dSP; PUSHMARK( SP ); perl_call_sv( callback, G_VOID | G_DISCARD ); } static void callback_perl_msg( SV * callback, tibrvMsg message ) { dSP; ENTER; SAVETMPS; PUSHMARK( SP ); tibrvMsg_Detach( message ); XPUSHs( sv_2mortal( newSViv( (IV)message ) ) ); PUTBACK; perl_call_sv( callback, G_VOID | G_DISCARD ); FREETMPS; LEAVE; } /* static void onEventDestroy( tibrvEvent event, void * closure ) { callback_perl_noargs( (SV *)closure ); } */ /* no closure data here -- it gets closure data from constructor * so to support this, we'd have to store both callbacks in the closure * and have a "completionCallback" argument in constructor */ tibrv_status Event_DestroyEx( tibrvEvent event ) { return tibrvEvent_DestroyEx( event, NULL ); } tibrv_status cmEvent_DestroyEx( tibrvcmEvent cmEvent, tibrv_bool cancelAgreements ) { return tibrvcmEvent_DestroyEx( cmEvent, cancelAgreements, NULL ); } static void onEventMsg( tibrvEvent event, tibrvMsg message, void * closure ) { callback_perl_msg( (SV *)closure, message ); } static void onEventCmMsg( tibrvcmEvent event, tibrvMsg message, void * closure ) { callback_perl_msg( (SV *)closure, message ); } static void onEventNoMsg( tibrvEvent event, tibrvMsg message, void * closure ) { callback_perl_noargs( (SV *)closure ); } tibrv_status Event_CreateListener( SV * sv_event, tibrvQueue queue, SV * callback, tibrvTransport transport, const char * subject ) { tibrvEvent event = (tibrvEvent)NULL; tibrv_status status = tibrvEvent_CreateListener( &event, queue, onEventMsg, transport, subject, callback ); sv_setiv( sv_event, (IV)event ); return status; } tibrv_status Event_CreateTimer( SV * sv_event, tibrvQueue queue, SV * callback, tibrv_f64 interval ) { tibrvEvent event = (tibrvEvent)NULL; tibrv_status status = tibrvEvent_CreateTimer( &event, queue, onEventNoMsg, interval, callback ); sv_setiv( sv_event, (IV)event ); return status; } tibrv_status Event_CreateIO( SV * sv_event, tibrvQueue queue, SV * callback, tibrv_i32 socketId, tibrvIOType ioType ) { tibrvEvent event = (tibrvEvent)NULL; tibrv_status status = tibrvEvent_CreateIO( &event, queue, onEventNoMsg, socketId, ioType, callback ); sv_setiv( sv_event, (IV)event ); return status; } /* * BUG -- tibrv seems to pass back the event's closure data, not what * Queue_SetHook passed in (and then segfaults) */ static void onQueueEvent( tibrvQueue queue, void * closure ) { callback_perl_noargs( (SV *)closure ); } tibrv_status Queue_SetHook( tibrvQueue queue, SV * callback ) { tibrvQueueHook eventQueueHook = NULL; if ( SvOK( callback ) ) eventQueueHook = onQueueEvent; return tibrvQueue_SetHook( queue, eventQueueHook, callback ); } static void onQueueDestroy( tibrvQueue queue, void * closure ) { SV * callback = (SV *)closure; callback_perl_noargs( callback ); SvREFCNT_dec( callback ); } tibrv_status Queue_DestroyEx( tibrvQueue queue, SV * callback ) { tibrvQueueOnComplete completionFn = NULL; if ( SvOK( callback ) ) { completionFn = onQueueDestroy; SvREFCNT_inc( callback ); } return tibrvQueue_DestroyEx( queue, completionFn, callback ); } static void * onLedgerSubject( tibrvcmTransport cmTransport, const char * subject, tibrvMsg message, void * closure ) { tibrvMsg copy = NULL; tibrv_status status = tibrvMsg_CreateCopy( message, © ); if ( status == TIBRV_OK ) callback_perl_msg( (SV *)closure, copy ); return NULL; } tibrv_status cmTransport_ReviewLedger( tibrvcmTransport cmTransport, const char * subject, SV * callback ) { return tibrvcmTransport_ReviewLedger( cmTransport, onLedgerSubject, subject, callback ); } tibrv_status cmEvent_CreateListener( SV * sv_event, tibrvQueue queue, SV * callback, tibrvcmTransport transport, const char * subject ) { tibrvEvent event = (tibrvEvent)NULL; tibrv_status status = tibrvcmEvent_CreateListener( &event, queue, onEventCmMsg, transport, subject, callback ); sv_setiv( sv_event, (IV)event ); return status; } void Msg_GetCMValues( tibrvMsg message, SV * sv_CMSender, SV * sv_CMSequence, SV * sv_CMTimeLimit ) { const char * CMSender = NULL; tibrv_u64 CMSequence = 0; tibrv_f64 CMTimeLimit = 0.0; if ( tibrvMsg_GetCMSender( message, &CMSender ) == TIBRV_OK ) sv_setpv( sv_CMSender, CMSender ); if ( tibrvMsg_GetCMSequence( message, &CMSequence ) == TIBRV_OK ) sv_setuv( sv_CMSequence, (UV)CMSequence ); if ( tibrvMsg_GetCMTimeLimit( message, &CMTimeLimit ) == TIBRV_OK ) sv_setnv( sv_CMTimeLimit, CMTimeLimit ); }