The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
NAME
    AnyEvent::RabbitMQ::Simple - Easy to use asynchronous AMQP client

VERSION
    version 0.01

SYNOPSIS
        use strict;
        use warnings;
        use AnyEvent::RabbitMQ::Simple;

        # create main loop
        my $loop = AE::cv;

        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            host       => '127.0.0.1',
            port       => 5672,
            user       => 'username',
            pass       => 'password',
            vhost      => '/',
            timeout    => 1,
            tls        => 0,
            verbose    => 0,
            confirm_publish => 1,
            prefetch_count => 10,

            failure_cb => sub {
                my ($event, $details, $why) = @_;
                if ( ref $why ) {
                    my $method_frame = $why->method_frame;
                    $why = $method_frame->reply_text;
                }
                $loop->croak("[ERROR] $event($details): $why" );
            },

            # routing layout
            # [========== exchanges ===================] [===== queues ==============]
            # [             (type/routing key)         ] [        (routing key) ]
            #  logger ----------> stats -------------->   stats-logs
            #   |(fanout)           (direct)                (mail.stats)
            #   |  |
            #   |  | \----------> errors ------------->   ftp-error-logs
            #   |  |              | (topic:*.error.#)       (ftp.error.#)
            #   |  |              |
            #   |  |              \------------------->   mail-error-logs
            #   |  |                                        (mail.error.#)
            #   |  |
            #   |   \-----------> info --------------->   info-logs
            #   |                   (topic:*.info.#)        (*.info.#)
            #   |
            #    \------------------------------------>   debug-queue


            # declare exchanges
            exchanges => [
                'logger' => {
                    durable => 0,
                    type => 'fanout',
                    internal => 0,
                    auto_delete => 1,
                },
                'stats' => {
                    durable => 0,
                    type => 'direct',
                    internal => 0,
                    auto_delete => 1,
                },
                'errors' => {
                    durable => 0,
                    type => 'topic',
                    internal => 0,
                    auto_delete => 1,
                },
                'info' => {
                    durable => 0,
                    type => 'topic',
                    internal => 0,
                    auto_delete => 1,
                },
            ],

            # declare queues
            queues => [
                'debug-queue' => {
                    durable => 0,
                    auto_delete => 1,
                },
                'stats-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
                'ftp-error-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
                'mail-error-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
                'info-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
            ],

            # exchange to exchange bindings, with optional routing key
            bind_exchanges => [
                { 'stats'   =>   'logger'                 },
                { 'errors'  => [ 'logger', '*.error.#' ]  },
                { 'info'    => [ 'logger', '*.info.#'  ]  },
            ],


            # queue to exchange bindings, with optional routing key
            bind_queues => [
                { 'debug-queue'     =>   'logger'                   },
                { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
                { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
                { 'info-logs'       => [ 'info',   'info.#'       ] },
                { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
            ],

        );

        # publisher timer
        my $t;

        # connect and set up channel
        my $conn = $rmq->connect();
        $conn->cb(
            sub {
                print "waiting for channel..\n";
                my $channel = shift->recv or $loop->croak("Could not open channel");

                print "************* consuming\n";
                for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) {
                    consume($channel, $q);
                }

                print "************* starting publishing\n";
                $t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) };
            }
        );

        # consumes from requested queue
        sub consume {
            my ($channel, $queue) = @_;

            my $consumer_tag;

            $channel->consume(
                queue => $queue,
                no_ack => 0,
                on_success => sub {
                    my $frame = shift;
                    $consumer_tag = $frame->method_frame->consumer_tag;
                    print "************* consuming from $queue with $consumer_tag\n";
                },
                on_consume => sub {
                    my $res = shift;
                    my $body = $res->{body}->payload;
                    print "+++++++++++++ consumed($queue): $body\n";
                    $channel->ack(
                        delivery_tag => $res->{deliver}->method_frame->delivery_tag
                    );
                },
                on_failure => sub {
                    print "************* failed to consume($queue)\n";
                }
            );
        }

        # randomly generates routing key and message body
        sub publish {
            my ($channel, $msg) = @_;

            unless ( $channel->is_open ) {
                warn "Cannot publish, channel closed";
                return;
            }

            my @system = qw( mail ftp web );
            my @levels = qw( debug info error stats );

            my $routing_key = $system[rand @system] .'.'. $levels[ rand @levels ];

            $msg = sprintf("[%s] %s", uc($routing_key), $msg);
            print "\n------- publishing: $msg\n";
            $channel->publish(
                routing_key => $routing_key,
                exchange => 'logger',
                body => $msg,
                on_ack => sub {
                    print "------- published: $msg\n";
                },
                on_return => sub {
                    print "************* failed to publish: $msg\n";
                }
            );
        }

        # wait forever or die on error
        my $done = $loop->recv;

DESCRIPTION
    This module is meant to simplify the process of setting up the RabbitMQ
    channel, so you can start publishing and/or consuming messages without
    chaining "on_success" callbacks.

METHODS
  new
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            ...
        );

    Returns configured the object using following parameters:

   host
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            host => '127.0.0.1', # default
            ...
        );

    Host IP.

   port
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            port => 5672, # default
            ...
        );

    Port number.

   vhost
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            vhost => '/', # default
            ...
        );

    Virtual host namespace.

   user
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            user => 'guest', # default
            ...
        );

    User name.

   pass
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            pass => 'guest', # default
            ...
        );

    Password.

   tune
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            tune => {
                heartbeat => $connection_heartbeat,
                channel_max => $max_channel_number,
                frame_max => $max_frame_size
            },
            ...
        );

    Connection tuning options.

   timeout
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            timeout => 0, # default
            ...
        );

    Connection timeout.

   tls
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            tls => 0, # default
            ...
        );

    Use TLS.

   verbose
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            verbose => 0, # default
            ...
        );

    Turn on protocol debug.

   confirm_publish
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            confirm_publish => 0, # default
            ...
        );

    Turn on confirm mode on channel. If set it enables the "on_ack" callback
    of channel's "publish" method.

   prefetch_count
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            prefetch_count => 0, # default
            ...
        );

    Specify the number of prefetched messages when consuming from the
    channel.

   exchange
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            exchange => 'name_of_exchange',
            ...
        );

    Optional name of exchange to declare with its default configuration
    options.

    See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for
    details.

   exchanges
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            exchanges => [
                'name_of_exchange' => {
                    durable => 1,
                    type => 'fanout',
                    ... # other exchange configuration parameters
                },
                ...
            ],
            ...
        );

    Optional list of exchanges to declare with their configuration options.

    See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for
    details.

   queue
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            queue => 'name_of_queue',
            ...
        );

    Optional name of queue to declare with its default configuration
    options.

    If no queues were declared or empty name has been specified a unique
    generated queue name will be available:

        my $gen_queue = $rmq->gen_queue;

    See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.

   queues
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            queues => [
                'name_of_queue' => {
                    durable => 1,
                    no_ack => 0,
                    ... # other queue configuration parameters
                },
                ...
            ],
            ...
        );

    Optional list of queues to declare with their configuration options.

    See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.

   gen_queue
        my $gen_queue = $rmq->gen_queue;

    Name of the generated queue if no queues were declared (or queue with
    empty name has been specified).

   bind_exchanges
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            bind_exchanges => [
                # without routing key
                { 'destination1' => 'source' },

                # with routing key
                { 'destination2'  => [ 'source', 'routing_key' ]  },
                ...
            ],
            ...
        );

    Optional list of exchange-to-exchange bindings.

    See "bind_exchange" in AnyEvent::RabbitMQ::Channel for details.

   bind_queues
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            bind_queues => [
                # without routing key
                { 'queue1' => 'exchange' },

                # with routing key
                { 'queue2'  => [ 'exchange', 'routing_key' ]  },
                ...
            ],
            ...
        );

    Optional list of queue-to-exchange bindings.

    See "bind_queue" in AnyEvent::RabbitMQ::Channel for details.

   failure_cb
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            failure_cb => sub {
                my ($event, $details, $why) = @_;
                if ( ref $why ) {
                    my $method_frame = $why->method_frame;
                    $why = $method_frame->reply_text;
                }
                $loop->croak("[ERROR] $event($details): $why" );
            },
            ...
        );

    Generic error handling callback. The value of $event is one of:

    ConnectOnFailure
    ConnectOnReadFailure
    ConnectOnReturn
    ConnectOnClose
    OpenChannelOnFailure
    OpenChannelOnReturn
    OpenChannelOnClose
    DeclareExchangeOnFailure
        Value of $details has following format: "name:$name_of_exchange".

    BindExchangeOnFailure
        Value of $details has following format:
        "source:$name_of_source_exchange,
        destination:$name_of_destination_exchange".

    DeclareQueueOnFailure
        Value of $details has following format: "name:$name_of_queue".

    BindQueueOnFailure
        Value of $details has following format: "queue:$name_of_queue,
        exchange:$name_of_exchange".

    ConfirmChannelOnFailure
    QosChannelOnFailure

  connect
        my $conn = $rmq->connect();
        $conn->cb(
            sub {
                my $channel = shift->recv or $loop->croak("Could not open channel");

                ...
            }
        );

    Returns the AnyEvent condvar that returns AnyEvent::RabbitMQ::Channel
    object after all the configuration steps were successful.

  disconnect
        $rmq->disconnect();

    Disconnects from RabbitMQ server.

SEE ALSO
    *   AnyEvent::RabbitMQ

    *   <https://www.rabbitmq.com/>

AUTHOR
    Alex J. G. Burzyński <ajgb@cpan.org>

COPYRIGHT AND LICENSE
    This software is copyright (c) 2016 by Alex J. G. Burzyński
    <ajgb@cpan.org>.

    This is free software; you can redistribute it and/or modify it under
    the same terms as the Perl 5 programming language system itself.