1. 介绍与安装

什么是MQ

MQ(Message Queue)消息队列,通过典型的 生产者消费者 模型,生产者向消息队列中生产信息,消费者不断地从队列中获取信息,它们是异步的。

主流的MQ有哪些

有RabbitMQ、ActiveMQ、kafka、阿里巴巴开发的RocketMQ等。

其中,RabbitMQ是基于AMQP协议,erlang语言开发,部署最广泛的开源消息中间件。

安装

官方文档:https://www.rabbitmq.com/download.html

我建议直接使用docker安装

1
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

然后访问 15672端口,出现登录界面,默认账户密码都是guest

2. RabbitMQ配置

2.1 RabbitMQ管理命令行

1
2
3
4
5
6
7
8
# 1.服务启动相关
systemctl start|restart|stop|status rabbitmq-server

# 2.管理命令行,查看命令帮助
rabbitmqctl help

# 3.插件管理命令行
rabbitmq-plugins enable|list|disable

2.2 可视化管理界面

通过http访问服务器的15672端口即可进入以下界面

p1-可视化界面概览

3. 消息模型

3.1 概览与准备

p2-消息模型概览

七种消息模型:https://www.rabbitmq.com/getstarted.html

1. 引入依赖

1
2
3
4
5
6
<!--rabbitmq依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>

2. 在管理界面创建虚拟主机

p3-创建虚拟主机

3. 创建用户并绑定虚拟主机

p4-创建用户

4. 点击用户名,设置Permissions中的Virtual Host即可

p5-绑定虚拟机

3.2 第一种模型(直连)

p6-直连模型
  • P:生产者,也就是要发送消息的程序

  • C:消费者,消息的接受者,会一直等待消息到来

  • queue:消息队列,图中红色部分,可以缓存消息,生产者向其投递消息,消费者从其取出消息

  • 生产者代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    public class Provider {

    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

    //1. 创建连接mq的连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();

    //2. 设置连接rabbitmq主机
    connectionFactory.setHost("39.97.107.13");

    //3. 设置端口号
    connectionFactory.setPort(5672);

    //4. 设置连接哪个虚拟主机
    connectionFactory.setVirtualHost("/test");

    //5. 设置用户名和密码
    connectionFactory.setUsername("test");
    connectionFactory.setPassword("123");

    //6. 获取连接对象
    Connection connection = connectionFactory.newConnection();

    //7. 获取连接中通道
    Channel channel = connection.createChannel();

    //8. 通道绑定对应消息队列
    //参数1:队列名,不存在时会自动创建
    //参数2:定义队列是否要持久化
    //参数3:exclusive 是否独占队列
    //参数4:autoDelete 消费完成后是否自动删除队列
    //参数5:附加参数
    channel.queueDeclare("hello",true,false,false,null);

    //9. 发布消息
    //参数1:交换机名称,没有则不填
    //参数2:队列名称
    //参数3:额外参数,可以设置消息持久化
    //参数4:消息内容,注意转换为字节
    channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"你好,rabbitmq!".getBytes());

    //10. 关闭资源
    channel.close();
    connection.close();
    }
    }
  • 消费者代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("39.97.107.13");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/test");
    connectionFactory.setUsername("test");
    connectionFactory.setPassword("123");

    // 创建连接对象
    Connection connection = connectionFactory.newConnection();

    // 创建通道
    Channel channel = connection.createChannel();

    // 通道绑定对象,注意参数和生产者中一致
    channel.queueDeclare("hello",true,false,false,null);

    // 消费消息
    // 参数1:队列名
    // 参数2:是否开启消息自动确认
    // 参数3:消费时的回调接口
    channel.basicConsume("hello",true,new DefaultConsumer(channel){
    @Override
    // 最后一个参数为消息队列取出的消息
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]body){
    System.out.println("收到新消息:" + new String(body));
    }
    });

    // 不建议close通道和连接,因为需要监听消息
    //channel.close();
    //connection.close();
    }
    }

3.3 工具类封装

可以看到,上面的生产者与消费者有大量重复的代码,可以写一个工具类来减少冗余

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class RabbitMQUtils {

private static ConnectionFactory connectionFactory;

static {
// 重量级资源,类加载执行一次
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("39.97.107.13");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/test");
connectionFactory.setUsername("test");
connectionFactory.setPassword("123");
}

// 提供连接对象方法
public static Connection getConnection(){
try{
return connectionFactory.newConnection();
}catch (Exception e){
e.printStackTrace();
}
return null;
}

// 关闭通道和连接方法
public static void closeConnectionAndChannel(Channel channel, Connection conn){
try {
if(channel!=null) channel.close();
if(conn!=null) conn.close();
}catch (Exception e){
e.printStackTrace();
}
}
}

3.4 第二种模型(work queue)——平均分配

p7-工作模型
  • P:生产者,任务的发布者

  • C1:消费者1,领取任务并完成任务,假设完成速度较慢

  • C2:消费者2,领取任务并完成任务,假设完成速度较快

  • 开发生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class Provider {

    public static void main(String[] args) throws IOException {

    // 获取连接与通道
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();

    // 通过通道声明队列
    channel.queueDeclare("work",true,false,false,null);

    // 生产多条消息
    for(int i=0; i<20; i++){
    channel.basicPublish("","work", null,(i + "你好,work queue!").getBytes());
    }

    // 关闭资源
    RabbitMQUtils.closeConnectionAndChannel(channel, connection);
    }
    }
  • 开发消费者1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class Consumer1 {
    public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare("work",true,false,false,null);

    channel.basicConsume("work",true,new DefaultConsumer(channel){
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]body){
    System.out.println("1号收到新消息:" + new String(body));
    }
    });
    }
    }
  • 开发消费者2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class Consumer2 {
    public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare("work",true,false,false,null);

    channel.basicConsume("work",true,new DefaultConsumer(channel){
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]body){
    System.out.println("2号收到新消息:" + new String(body));
    }
    });
    }
    }
  • 运行结果

    p8-work queue运行结果

    总结:默认情况下RabbitMQ按顺序将每个消息发送给下一个使用者,每个消费者平均会收到相同数量消息(平均分配),这种分发消息的机制称为循环

3.5 第二种模型(work queue)——消息确认机制与多劳多得

消息确认机制

如果开启了消息自动确认机制,消息队列将消息一次性传递给消费者后则将消息从队列中删除,而不会去管消息是否被消费完。(channel.basicConsume中的第二个参数 true/false)因此建议关闭。

多劳多得

只需要限定消费者每次从队列中取到的消息数量即可,消费完再拿

1
channel.basicQos(1); //每次只消费1个消息

然后在handleDelivery方法中进行手动确认消息

1
2
// 手动确认 参数1:确认信息标识 参数2:multiple 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);

3.6 第三种模型(fanout 广播)

p9-广播模型
  • 可以有多个消费者

  • 每个消费者有自己的 queue(队列)

  • 每个队列要绑定 exchange(交换机)

  • 生产者发送消息到交换机,由交换机将消息发送给绑定过的所有队列

  • 开发生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class Provider {
    public static void main(String[] args) throws IOException {
    // 获取连接对象
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();

    // 将通道声明指定交换机 参数1:交换机名称 参数2:交换机类型 fanout为广播
    channel.exchangeDeclare("login","fanout");

    // 发送消息 第二个参数原来为队列名,但在这种模型下消息不发送给队列
    channel.basicPublish("login","",null,"fanout type message".getBytes());

    //释放资源
    RabbitMQUtils.closeConnectionAndChannel(channel, connection);

    }
    }
  • 开发多个消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class Consumer1 {
    public static void main(String[] args) throws IOException {
    // 获取连接对象
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();

    // 绑定交换机
    channel.exchangeDeclare("login","fanout");

    // 临时队列
    String queueName = channel.queueDeclare().getQueue();

    // 绑定交换机和队列 队列名 交换机名 路由Key(下一个模型用到)
    channel.queueBind(queueName, "login","");

    // 消费消息
    channel.basicConsume(queueName,true, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]body){
    System.out.println("1收到新消息:" + new String(body));
    }
    });
    }
    }

3.7 第四种模型(Routing)订阅模型——Direct(直连)

  • 在某些场景下,我们希望不同的消息被不同的队列消费,就要用到Direct类型的交换机
  • 需要指定一个 RoutingKey
  • 交换机根据消息的 RoutingKey进行判断,只有队列的 RoutingKey 与消息的一致才会接收消息
p10-路由模型之Direct
  • P:生产者,向交换机发送消息,指定一个 RoutingKey

  • X:交换机,接收生产者消息,然后把消息传递给相应 RoutingKey 的队列

  • C1:消费者1,其所在队列指定了需要 RoutingKey 为 error 的消息

  • C2:消费者2,其所在队列制定了需要RoutingKey 为 info, error, warning 的消息

  • 开发生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class Provider {
    public static void main(String[] args) throws IOException {
    // 获取连接对象
    Connection connection = RabbitMQUtils.getConnection();
    // 通过连接获取通道
    Channel channel = connection.createChannel();
    // 声明交换机 参数1:交换机名称 参数2:路由模式
    channel.exchangeDeclare("ming", "direct");
    // 发送消息
    String routingKey = "info";
    channel.basicPublish("ming", routingKey, null, ("direct模型发布routingKey为:"+routingKey).getBytes());
    // 关闭资源
    RabbitMQUtils.closeConnectionAndChannel(channel,connection);
    }
    }
  • 开发消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class Consumer1 {
    public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();

    // 声明交换机及类型
    channel.exchangeDeclare("ming","direct");

    // 创建临时队列
    String queue = channel.queueDeclare().getQueue();

    // 基于路由Key绑定队列和交换机 参数:队列名 交换机名 路由Key
    channel.queueBind(queue,"ming","warning");
    channel.queueBind(queue,"ming","error");

    // 消费消息
    channel.basicConsume(queue, true, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]body){
    System.out.println("1收到新消息:" + new String(body));
    }
    });
    }
    }

3.8 第四种模型(Routing)订阅模型——Topic

  • 与Direct相比,Topic类型的交换机可以让队列在绑定 RoutingKey 时使用通配符

    p11-路由模型之Topic
  • 通配符规则

    1
    2
    3
    4
    5
    6
    * 匹配恰好一个词
    # 匹配零个到多个词

    例如
    banana.* 可以匹配banana.book
    banana.# 可以匹配banana.book或者banana.hello.world或者banana
  • 开发生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class Provider {
    public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();

    // 声明交换机及类型
    channel.exchangeDeclare("topic_exchange","topic");

    // 发布消息
    String routingKey = "banana.book.hello";

    channel.basicPublish("topic_exchange",routingKey,null,("topic动态路由模型,routingKey:"+routingKey).getBytes());

    // 关闭资源
    RabbitMQUtils.closeConnectionAndChannel(channel, connection);
    }
    }
  • 开发消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class Consumer1 {
    public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();

    // 声明交换机及类型
    channel.exchangeDeclare("topic_exchange","topic");

    // 创建临时队列
    String queueName = channel.queueDeclare().getQueue();

    // 绑定队列和交换机 动态通配符形式
    channel.queueBind(queueName,"topic_exchange","banana.*");

    // 消费消息
    channel.basicConsume(queueName,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]body){
    System.out.println("1收到新消息:" + new String(body));
    }
    });
    }
    }

4. 在SpringBoot中使用

4.1 准备与配置

  • 第一步:添加依赖

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 第二步:配置application.yml(按照实际情况填)

    1
    2
    3
    4
    5
    6
    7
    spring:
    rabbitmq:
    host: 192.168.115.100
    port: 5672
    username: prod
    password: 123456
    virtual-host: /prod

4.2 第一种模型(直连)

  • 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class TestRabbitMQ {

    // 注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // HelloWorld模型测试
    @Test
    public void test(){
    rabbitTemplate.convertAndSend("hello","hello world"); // 队列名 消息
    }
    }
  • 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    @Component  // 默认持久化 非独占 不自动删除队列
    @RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "true"))
    public class HelloConsumer {
    @RabbitHandler
    public void receive(String message){
    System.out.println("message:"+message);
    }
    }

4.3 第二种模型(work queue)

  • 生产者

    1
    2
    3
    4
    5
    6
    7
    // work模型测试
    @Test
    public void testWork(){
    for(int i=1; i<=10; i++){
    rabbitTemplate.convertAndSend("work","work模型"+i);
    }
    }
  • 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Component
    public class WorkConsumer {

    // 第一个消费者
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
    System.out.println("message1:"+message);
    }

    // 第二个消费者
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
    System.out.println("message2:"+message);
    }

    }

4.3 第三种模型(fanout 广播)

  • 生产者

    1
    2
    3
    4
    5
    // fanout 广播模型测试
    @Test
    public void testFanout(){ // 交换机名 路由Key 消息
    rabbitTemplate.convertAndSend("logs","","Fanout模型发送的消息");
    }
  • 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Component
    public class FanoutConsumer {
    @RabbitListener(bindings = {
    @QueueBinding(
    value = @Queue, // 创建临时队列 或者指定@Queue("name")
    exchange = @Exchange(value = "logs",type = "fanout")// 绑定的交换机
    )
    })
    public void receive1(String message){
    System.out.println("message1:"+ message);
    }

    @RabbitListener(bindings = {
    @QueueBinding(
    value = @Queue, // 创建临时队列 或者指定@Queue("name")
    exchange = @Exchange(value = "logs",type = "fanout")// 绑定的交换机
    )
    })
    public void receive2(String message){
    System.out.println("message2:"+ message);
    }
    }

4.4 第四种模型(Routing)订阅模型——Direct(直连)

  • 生产者

    1
    2
    3
    4
    5
    // route 路由模式
    @Test
    public void testRoute(){
    rabbitTemplate.convertAndSend("direct_exchange","info","发送路由key为info的信息");
    }
  • 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Component
    public class RouteConsumer {

    @RabbitListener(bindings = {
    @QueueBinding(
    value = @Queue, // 创建临时队列
    exchange = @Exchange(value = "direct_exchange", type = "direct"), // 指定交换机名称与类型
    key = {"info","error","warning"} // 路由key
    )
    })
    public void receive1(String message){
    System.out.println("message1:"+message);
    }
    }

4.5 第四种模型(Routing)订阅模型——Topic

  • 生产者

    1
    2
    3
    4
    5
    // Topic 动态路由 订阅模式
    @Test
    public void testTopic(){
    rabbitTemplate.convertAndSend("topic_exchange","banana.book","banana.book路由信息");
    }
  • 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Component
    public class TopicConsumer {
    @RabbitListener(bindings = {
    @QueueBinding(
    value = @Queue,
    exchange = @Exchange(type = "topic",name = "topic_exchange"),
    key = {"banana.*"}
    )
    })
    public void receive1(String message){
    System.out.println(message);
    }
    }

5. 应用场景

5.1 异步处理

例如:用户注册后,利用消息队列分发任务(无需等待返回结果),同时发送邮件和短信验证(并行方式),提高处理的效率。

5.2 应用解耦

例如:用户网购下单,订单系统通知库存系统,传统方式为订单系统直接调用库存系统接口。

但是如果库存系统出现故障,下单会失败。

为了这两个系统之间解耦,可以引入消息队列,订单系统向队列写入消息,库存系统向队列订阅消息。

这样一来,就算系统故障,也能保证消息不会丢失。

5.3 流量削峰

例如:秒杀商品活动,容易因为流量过大导致应用宕机。

可以在秒杀业务系统前加入消息队列,超过消息队列长度最大值的用户请求直接跳转到错误界面。

最后,系统根据消息队列中的请求信息再做处理。