RabbitMQ Example Code

PHP

The PHP RabbitMQ tutorial recommends using the php-amqplib client to send and receive messages. The best way to include php-amqplib in a project is to use composer, a dependency management tool for PHP. Use the following steps:

  1. Add a composer.json file to the root of the project containing these lines:
  2. {
      "require": {
          "videlalvaro/php-amqplib": "2.5.*"
      }
    }
  3. If necessary, install composer by following the instructions on the composer website.
  4. Install the php-amqplib dependency:
php composer.phar install

The following example code shows how php-amqplib can be used to process asynchronous reply messages. When a message is retrieved from the queue, the callback function on lines 13 through 19 is called. Any business logic related to processing the response should be added to this function. After processing, the message must be acknowledged to prevent redelivery (line 17).

<?php
require __DIR__.'/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPSSLConnection;

//define('AMQP_DEBUG', true);

$host = "amqpslvsdev01.us.gspt.net";
$port = 443;
$username = "guest";
$password = "guest";
$queue = "q.test";

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    //process message here
    //send ack for the message back to the broker
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

while (true) {
    try {
        $connection = new AMQPSSLConnection($host, $port, $username, $password, "/", 
            array('verify_peer' => false), 
            array('heartbeat' => 10, 'read_write_timeout' => 20)););
        $channel = $connection->channel();
        $channel->queue_declare($queue, false, true, false, false);
        echo ' [*] Connected, waiting for messages.', "\n";
        $channel->basic_consume($queue, '', false, false, false, false, $callback);
        // Loop as long as the channel has callbacks registered
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    } catch (\PhpAmqpLib\Exception\AMQPExceptionInterface $e) {
        echo "AMQP Exception: ", $e->getMessage(), "\n";
        sleep(1);
    }
}

Additional Notes

Line Numbers Note
5 If this line is uncommented, php-amqplib logs low-level details related to the connection. This is useful for debugging.
7 - 11 Connection and queue details are provided as part of your onboarding process.
13 -18 This function is called whenever a message is received. All successfully processed messages must be acknowledged.
22 The actual connection to the broker is created here. There is no connect() or start() function. If a problem occurs while connecting, the AMQPSSL Connection constructor throws an exception. The connection is attempted again. The heartbeat element controls the frequency of heartbeat messages, which allow the client to detect broken connections. The read_write_timeout element must be greater than the heartbeat interval.
26 This line declares the queue from which messages will be received.
28 The AMQP consume frame registers the client as a listener on the queue. The callback function is also provided here.
30 - 32 The while loop allows the client to wait for messages to show up on the queue, as shown in the RabbitMQ PHP tutorial.

Java

The RabbitMQ amqp-client library must be added to the project's classpath as a first step. If you are using maven, the following dependency can be added to the project's pom:

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.5.4</version>
</dependency>

Configuration for ivy, gradle, sbt, and others are available here. Alternatively, the amqp-client jar can be downloaded directly from the RabbitMQ website.

The following example code shows one way of implementing a RabbitMQ subscriber using the amqp-client library. The main method starting on line 73 demonstrates how the RabbitMQSubscriber class can be used. Calling start starts a new thread, which creates a connection to RabbitMQ and sets up a subscriber. Code for processing each message would be added to the handleDelivery method on line 43. If the initial connection fails or an existing connection closes, the connection is reattempted and the subscriber recreated.

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class RabbitMQSubscriber extends Thread {
    private ConnectionFactory connectionFactory;
    private String queue;
    private Connection connection;
    private Channel channel;
    private boolean shutdown;
    private long reconnectInterval;

    public RabbitMQSubscriber(String username, String password, String host, int port,
            String queue, long reconnectInterval) throws Exception {

        connectionFactory = new ConnectionFactory();
        connectionFactory.useSslProtocol();
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setRequestedHeartbeat(60);
        connectionFactory.setConnectionTimeout(10000);

        this.queue = queue;
        this.reconnectInterval = reconnectInterval;
    }

    @Override
    public void run() {
        while (!shutdown) {
            try {
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                channel.queueDeclare(queue, true, false, false, null);
                channel.basicConsume(queue, false, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            BasicProperties properties, byte[] body) throws IOException {

                        System.out.println("received message: " + new String(body));

                        //process message here
                        //send an ack for the message back to the broker

                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });

                while (!shutdown && connection.isOpen()) {
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
                try {
                    Thread.sleep(reconnectInterval);
                } catch (InterruptedException e1) {}
            }
        }
    }

    public void shutdown() throws Exception {
        shutdown = true;
        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws Exception {
        String username = "guest";
        String password = "guest";
        String host = "amqpslvsdev01.us.gspt.net";
        Integer port = 443;
        String queue = "q.test";
        RabbitMQSubscriber subscriber = 
            new RabbitMQSubscriber(username, password, host, port, queue, 5000);
        subscriber.start();
        Thread.sleep(30000);
        subscriber.shutdown();
    }
}

Additional Notes

Line Numbers Note
21 -28 The ssl-enabled RabbitMQ connection factory is created here. The RequestedHeartbeat property (given in seconds) controls the frequency of heartbeat messages, which allow the client to detect broken connections.
41 -53 The DefaultConsumer instance passed to basicConsume provides the code that is executed whenever a message is received from the queue. After processing the message successfully, an acknowledgment must be sent back to RabbitMQ to prevent message redelivery (line 51).
55 - 57 This while loop lets the subscriber run while monitoring for shutdown or for a closed connection. If a connection is closed, the connection is recreated at the top of the outer while loop.
73 - 84 Example main method demonstrates one way of starting and stopping the subscriber thread. Connection and queue details are provided as part of your onboarding process.