File Coverage

File:lib/ZMQx/Class/Socket.pm
Coverage:97.6%

linestmtbrancondsubtimecode
1package ZMQx::Class::Socket;
2
4
4
4
7
4
63
use strict;
3
4
4
4
8
2
47
use warnings;
4
4
4
4
39
7
74
use 5.010;
5
6# ABSTRACT: A ZMQ Socket
7
8
4
4
4
834
1932958
59
use Moose;
9
4
4
4
38946
8
135
use Carp qw(croak);
10
4
4
4
780
14447
49
use namespace::autoclean;
11
4
4
4
318
4
52
use Package::Stash;
12
4
4
4
765
10221
337
use ZMQ::LibZMQ3;
13
14
4
4
4
17
3
57
use ZMQ::Constants ':all';
15
16has 'socket' => (
17    is=>'ro',
18    isa=>'ZMQ::LibZMQ3::Socket',
19    required=>1,
20);
21
22has 'type' => (
23    is=>'ro',
24    required=>1,
25);
26
27sub bind {
28
6
11
    my ($self, $address) = @_;
29
6
98
    zmq_bind($self->socket,$address);
30}
31
32sub connect {
33
6
10
    my ($self, $address) = @_;
34
6
96
    zmq_connect($self->socket,$address);
35}
36
37sub setsockopt {
38
1
80
    my $self = shift;
39
1
18
    zmq_setsockopt($self->socket, @_);
40}
41
42sub getsockopt {
43
1
318
    my $self = shift;
44
1
17
    zmq_getsockopt($self->socket, @_);
45}
46
47#sub send {
48# my ($self, $msg) = @_;
49# zmq_msg_send($msg, $self->socket);
50#}
51#
52#sub send_multipart { # remove, make send smarter
53# my ($self, @parts) = @_;
54# my $socket = $self->socket;
55# my $last = pop(@parts);
56# foreach (@parts) {
57# zmq_msg_send( $_, $socket, ZMQ_SNDMORE );
58# }
59# zmq_msg_send($last, $socket );
60#}
61
62sub send {
63
6
1005846
    my ($self, $parts, $flags) = @_;
64
6
17
    $flags //= 0;
65
66
6
6
7
10
    my $max_idx = $#{$parts};
67
6
13
    if ($max_idx == 0) { # single part message
68
2
38
        return zmq_msg_send($parts->[0], $self->socket, $flags);
69    }
70
71    # multipart
72
4
108
    my $socket = $self->socket;
73
4
11
    my $mflags = $flags ? $flags | ZMQ_SNDMORE : ZMQ_SNDMORE;
74
4
14
    foreach (0 .. $max_idx - 1) {
75
4
77
        zmq_msg_send( $parts->[$_], $socket, $mflags);
76    }
77
4
257
    zmq_msg_send( $parts->[$max_idx], $socket, $flags);
78}
79
80sub receive_multipart {
81
4
112
    my ($self, $blocking) = @_;
82
4
98
    my $socket = $self->socket;
83
4
7
    my @parts;
84
4
263
    while ( my $rmsg = zmq_recvmsg( $socket, $blocking ? 0 : ZMQ_DONTWAIT)) {
85
6
188
        push (@parts,zmq_msg_data( $rmsg ));
86
6
169
        if (!zmq_getsockopt($socket, ZMQ_RCVMORE)) {
87
4
294
            return \@parts;
88        }
89    }
90}
91
92=method receive_all_multipart_messages
93
94    my $w;$w = AnyEvent->io (
95        fh => $fh,
96        poll => "r",
97        cb => sub {
98            my $msgs = receive_multipart_messages($pull);
99            foreach (@$msgs) {
100                say "got $_";
101            }
102        },
103    );
104
105=cut
106
107sub receive_all_multipart_messages {
108
2
155
    my ($self, $blocking) = @_;
109
2
47
    my $socket = $self->socket;
110
2
2
    my @parts;
111
2
1
    my @msgs;
112
2
27
    while (my $rmsg = zmq_recvmsg( $socket, $blocking ? 0 : ZMQ_DONTWAIT)) {
113
6
150
        push (@parts,zmq_msg_data( $rmsg ));
114
6
79
        if (! zmq_getsockopt($socket, ZMQ_RCVMORE)) {
115
3
101
            push(@msgs,[ @parts ]);
116
3
34
            undef @parts;
117        }
118    }
119
2
12
    return \@msgs;
120}
121
122sub wait_for_message {
123
1
1000272
    my $socket = shift;
124
1
3
    my $msg;
125
1
58
    my $got_message = AnyEvent->condvar;
126
1
69
    my $fh = $socket->get_fh;
127    my $watcher = AnyEvent->io (
128        fh => $fh,
129        poll => "r",
130        cb => sub {
131
1
83
            $msg = $socket->receive_multipart;
132
1
33
            $got_message->send;
133        },
134
1
143
    );
135
1
70
    $got_message->recv;
136
1
116
    return $msg;
137}
138
139sub subscribe {
140
2
57
    my ($self, $subscribe) = @_;
141
2
77
    croak('$socket->subscribe only works on SUB sockets') unless $self->type =~/^X?SUB$/;
142
2
14
    croak('required paramater $subscription missing') unless defined $subscribe;
143
2
56
    zmq_setsockopt($self->socket,ZMQ_SUBSCRIBE,$subscribe);
144}
145
146sub get_fh {
147
3
7
    my $self = shift;
148
3
101
    return zmq_getsockopt($self->socket, ZMQ_FD);
149}
150
151{
152
4
4
4
2538
4
538
    no strict 'refs';
153    my @sockopt_constants=qw(ZMQ_SNDHWM ZMQ_RCVHWM ZMQ_AFFINITY ZMQ_SUBSCRIBE ZMQ_UNSUBSCRIBE ZMQ_IDENTITY ZMQ_RATE ZMQ_RECOVERY_IVL ZMQ_SNDBUF ZMQ_RCVBUF ZMQ_LINGER ZMQ_RECONNECT_IVL ZMQ_RECONNECT_IVL_MAX ZMQ_BACKLOG ZMQ_MAXMSGSIZE ZMQ_MULTICAST_HOPS ZMQ_RCVTIMEO ZMQ_SNDTIMEO ZMQ_IPV4ONLY);
154    my $stash = Package::Stash->new(__PACKAGE__);
155    foreach my $const (@sockopt_constants) {
156        my $get = my $set = lc($const);
157        $set =~s/^zmq_/set_/;
158        $get =~s/^zmq_/get_/;
159
160        if ($stash->has_symbol('&'.$const)) {
161            my $constval = &$const;
162            $stash->add_symbol('&'.$set => sub {
163
1
573
                my $self = shift;
164
1
18
                zmq_setsockopt($self->socket,$constval,@_);
165
1
47
                return $self;
166            });
167            $stash->add_symbol('&'.$get => sub {
168
1
237
                my $self = shift;
169
1
18
                return zmq_getsockopt($self->socket,$constval);
170            });
171        }
172    }
173}
1741;