美文网首页
使用 Java client 操作 RabbitMQ

使用 Java client 操作 RabbitMQ

作者: SheHuan | 来源:发表于2021-05-25 21:45 被阅读0次

上一篇我们介绍了 RabbitMQ 的工作流程,以及常用的交换机,接下里我们结合具体的例子来看一下具体的应用。

使用 Java client 操作 RabbitMQ 可以参考以下步骤来实现:

  1. 创建连接工厂(ConnectionFactory),设置 RabbitMQ 服务信息、账号、密码等
  2. 使用连接工厂建立连接(Connection)
  3. 使用连接创建数据通道(Channel)
  4. 创建交换机(Exchange)、队列(Queue),绑定两者
  5. 使用数据通道发送、接收消息
  6. 释放数据通道、连接

本文的例子会结合 Fanout ExchangeDirect ExchangeTopic Exchange这三种常用的交换机来实现 。

一、准备工作

创建一个 Maven 项目,添加 RabbitMQ 依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

考虑到代码的复用,我们先将一些通用的步骤封装一下:

public class RabbitMQConnection {
    ConnectionFactory connectionFactory;

    public RabbitMQConnection() {
        // 创建连接工厂
        connectionFactory = new ConnectionFactory();
        // 设置 RabbitMQ 服务地址
        connectionFactory.setHost("localhost");
        // 设置 RabbitMQ 服务端口
        connectionFactory.setPort(5672);
        // 设置账号
        connectionFactory.setUsername("admin");
        // 设置密码
        connectionFactory.setPassword("123456");
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");
    }


    public void create(String connectionName, RabbitMQTask rabbitMQTask) {
        Connection connection = null;
        Channel channel = null;
        try {
            // 创建连接
            connection = connectionFactory.newConnection(connectionName);
            // 创建数据通道
            channel = connection.createChannel();
            // 执行消息的发送、接收等业务
            rabbitMQTask.execute(channel);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放资源
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }
    }
}
public interface RabbitMQTask {
    void execute(Channel channel) throws IOException;
}

RabbitMQConnection中已经实现了资源的连接以及释放,第4、5步骤,需要在RabbitMQTask接口里,根据具体的业务去实现execute()方法。

简单起见,我们将生产者和消费者定义在同一个项目里。

二、Fanout Exchange

生产者代码如下:

public class Producer {
    public void work() {
        RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
        rabbitMQConnection.create("生产者", new RabbitMQTask() {
            public void execute(Channel channel) throws IOException {
                // 交换机名称
                String exchangeName = "fanout-example-exchange";
                // 交换机类型
                String exchangeType = "fanout";
                // 创建交换机,true表示持久化交换机,一般都为true
                channel.exchangeDeclare(exchangeName, exchangeType, true);

                // 创建消息队列
                /**
                 * 参数1:队列名称
                 * 参数2:是否需要持久化,非持久化队列在服务重启后,队列中的消息会丢失,一般都为true
                 * 参数3:排它性,是否是一个独占队列
                 * 参数4:队列中的消息被消费完后是否自动删除队列
                 * 参数5:附加参数,Headers Exchange 的参数可以在这里传递
                 */
                String queueName1 = "queue1";
                channel.queueDeclare(queueName1, true, false, false, null);
                String queueName2 = "queue2";
                channel.queueDeclare(queueName2, true, false, false, null);
                String queueName3 = "queue3";
                channel.queueDeclare(queueName3, true, false, false, null);

                // 绑定队列和交换机,不指定 routingKey
                channel.queueBind(queueName1, exchangeName, "");
                channel.queueBind(queueName2, exchangeName, "");
                channel.queueBind(queueName3, exchangeName, "");

                String routingKey = "";

                // 准备消息内容
                String message = "hello rabbitmq";
                // 发送消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                System.out.println("生产者发送的消息是:" + message);
            }
        });
    }

    public static void main(String[] args) {
        new Producer().work();
    }
}

生产者核心的业务都是基于Channel对象实现的,包括创建Fanout类型的交换机,创建队列,将交换机和队列绑定,由于使用了Fanout类型的交换机所以绑定时不用指定routingKey,发送消息时需要携带交换机名称和一个空的routingKey

消费者代码如下:

public class Consumer implements Runnable {
    private String queueName;

    public Consumer(String queueName) {
        this.queueName = queueName;
    }

    @Override
    public void run() {
        RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
        rabbitMQConnection.create("消费者", new RabbitMQTask() {
            public void execute(Channel channel) throws IOException {
                // 接收消息
                channel.basicConsume(queueName, true, new DeliverCallback() {
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println(queueName + "收到的消息是:" + new String(message.getBody(), "utf-8"));
                    }
                }, new CancelCallback() {
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("接收消息失败");
                    }
                });

                System.out.println(queueName + "开始接收消息");
                System.in.read();
            }
        });
    }

    public static void main(String[] args) {
        new Thread(new Consumer("queue1")).start();
        new Thread(new Consumer("queue2")).start();
        new Thread(new Consumer("queue3")).start();
    }
}

消费者的实现比较简单,用多线程模拟三个消费者,分别接收三个队列中的消息。

分别启动生产者和消费者,结果符合Fanout类型交换机的效果,消息分别进入到三个队列中,最终被消费者掉:

生产者 消费者

三、Direct Exchange

掌握 Fanout Exchange 的用法,学习 Direct Exchange 就很简单了。

修改一下生产者的代码:

public void work() {
        RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
        rabbitMQConnection.create("生产者", new RabbitMQTask() {
            public void execute(Channel channel) throws IOException {
                // 交换机名称
                String exchangeName = "direct-example-exchange";
                // 交换机类型
                String exchangeType = "direct";
                // 创建交换机,true表示持久化交换机,一般都为true
                channel.exchangeDeclare(exchangeName, exchangeType, true);

                // 创建消息队列
                String queueName1 = "queue1";
                channel.queueDeclare(queueName1, true, false, false, null);
                String queueName2 = "queue2";
                channel.queueDeclare(queueName2, true, false, false, null);
                String queueName3 = "queue3";
                channel.queueDeclare(queueName3, true, false, false, null);

                // 绑定队列和交换机,指定routingKey
                channel.queueBind(queueName1, exchangeName, "red");
                channel.queueBind(queueName2, exchangeName, "green");
                channel.queueBind(queueName3, exchangeName, "blue");

                String routingKey = "red";
                String routingKey2 = "blue";

                // 准备消息内容
                String message = "hello rabbitmq";
                String message2 = "hello amqp";
                // 发送消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
                System.out.println("生产者发送的消息是:" + message);
                System.out.println("生产者发送的消息是:" + message2);
            }
        });
    }

使用 Direct 类型的交换机时,绑定交换机和队列时需要指定routingKey,发送消息时也要携带上routingKey去匹配消息队列。消费者代码不需要修改。按照预期生产者的消息最终会分别进入queue1queue3,最终被消费掉。

运行程序,结果符合预期:

生产者 消费者

四、Topic Exchange

上一篇我们已经了解到,Topic Exchange 和 Direct Exchange 的差别就是 Topic Exchange 的routingKey支持通配符模糊匹配,更像一种精细化的 Direct Exchange。

只需要修改生产者的代码:

public void work() {
        RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
        rabbitMQConnection.create("生产者", new RabbitMQTask() {
            public void execute(Channel channel) throws IOException {
                // 交换机名称
                String exchangeName = "topic-example-exchange";
                // 交换机类型
                String exchangeType = "topic";
                // 创建交换机,true表示持久化交换机,一般都为true
                channel.exchangeDeclare(exchangeName, exchangeType, true);

                // 创建消息队列
                String queueName1 = "queue1";
                channel.queueDeclare(queueName1, true, false, false, null);
                String queueName2 = "queue2";
                channel.queueDeclare(queueName2, true, false, false, null);
                String queueName3 = "queue3";
                channel.queueDeclare(queueName3, true, false, false, null);

                // 绑定队列和交换机,指定routingKey
                channel.queueBind(queueName1, exchangeName, "*.red.#");
                channel.queueBind(queueName2, exchangeName, "green.*");
                channel.queueBind(queueName3, exchangeName, "#.blue.#");

                String routingKey = "green.red";

                // 准备消息内容
                String message = "hello rabbitmq";
                // 发送消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                System.out.println("生产者发送的消息是:" + message);
            }
        });
    }

按照模糊匹配规则,消息会进入queue1queue2,最终被消费掉:

生产者 消费者

https://www.rabbitmq.com/getstarted.html

相关文章

网友评论

      本文标题:使用 Java client 操作 RabbitMQ

      本文链接:https://www.haomeiwen.com/subject/mqzzjltx.html