package POEx::ZMQ3::Requestor; use Carp; use Moo; use POE; use namespace::clean; with 'POEx::ZMQ3::Role::Emitter'; has targets => ( is => 'ro', default => sub { [] }, ); sub build_defined_states { my ($self) = @_; [ $self => [ qw/ emitter_started zmqsock_recv / ], ] } sub start { my ($self, @endpoints) = @_; push @{ $self->targets }, @endpoints; $self->zmq->start; $self->zmq->create( $self->alias, 'REQ' ); $self->_start_emitter; } sub emitter_started { my ($kernel, $self) = @_[KERNEL, OBJECT]; $kernel->call( $self->zmq => subscribe => qw/ recv / ); while (my $endpoint = shift @{ $self->targets }) { $self->add_connect( $self->alias, $endpoint ) } } after add_connect => sub { my ($self, $alias, $endpoint) = @_; $self->emit( 'connected_to', $endpoint ) }; sub stop { my ($self) = @_; $self->zmq->stop; $self->_shutdown_emitter; } sub request { my ($self, $data) = @_; $self->zmq->write( $self->alias, $data ) } sub zmqsock_recv { my ($kernel, $self) = @_[KERNEL, OBJECT]; my ($alias, $data) = @_[ARG0, ARG1]; $self->emit( 'got_reply', $data ); } 1; =pod =head1 NAME POEx::ZMQ3::Requestor - A REQ-type ZeroMQ socket =head1 SYNOPSIS use POE; my $zreq = POEx::ZMQ3::Requestor->new(); POE::Session->create( inline_states => { _start => sub { ## Connect to a ZeroMQ REP replier: $zreq->start( 'tcp://127.0.0.1:5665' ); ## Subscribe to all emitted events: $_[KERNEL]->post( $zreq->session_id, 'subscribe', 'all' ); }, zeromq_connected_to => sub { ## Fire off a REQ to get started. $zreq->request('ping!') }, zeromq_got_reply => sub { ## Got a reply from server. my $data = $_[ARG0]; if ($data eq 'pong!') { $zreq->request('ping!') } else { warn "Don't know what to do with $data"; $zreq->stop; } }, }, ); $poe_kernel->run; =head1 DESCRIPTION A ZeroMQ REQ-type socket using L. ZeroMQ REQ and REP (Requestors and Repliers) work synchronously; a REQ is expected to start the conversation and one request should generate one reply. =head2 Methods =head3 start $zreq->start( $rep_server ); Start the Requestor and connect to a specified REP endpoint. =head3 stop $zreq->stop; Stop the Requestor, closing out the socket and stopping the event emitter. =head3 request $zreq->request( $data ); Send a request to the remote end. =head2 Events =head3 zeromq_connected_to Emitted when we are initialized; $_[ARG0] is the target REP server's address. =head3 zeromq_got_reply Emitted when we receive a reply to a request; $_[ARG0] is the raw data. =head1 SEE ALSO L L L L =head1 AUTHOR Jon Portnoy =cut