消息中间件-RabbitMQ

消息中间件-RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

1、什么是MQ?

MQ(message queue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递信息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

2、为什么要用MQ?

流量消峰(优点:防止系统宕机;缺点:访问速率偏慢)

31

应用解耦(如果订单系统出现故障,就会造成任意一个子系统出现下单异常;使用MQ,即使是出现异常,在MQ已经缓存完成,依然可以正常进行下单)

32

异步处理

33

分类

  • ActiveMQ

    优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据

    缺点:维护越来越少,高吞吐量场景较少使用。

  • Kafka

    优点:吞吐量高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,日志采集。

    缺点:社区更新较慢。消费失败不支持重试。

  • RocketMQ

    优点:单机吞吐量十万级,分布式架构,消息0丢失。java语言实现的。

    缺点:支持的客户端语言不多。

  • RabbitMQ

    优点:高并发的特性,性能较好,支持多语言。

    缺点:商业版需要收费,学习成本比较高。

选择

  • Kafka:大型公司,并且有日志采集功能。
  • RocketMQ:金融互联网,适合高并发场景。
  • RabbitMQ:中小型公司。

3、RabbitMQ

概念

RabbitMQ是一个消息中间件,它接受并转发消息。可以当做一个快递站点,dangni要发送包裹时,把包裹方法哦站点,最后包裹会通过快递员送到指定地方,按照这种逻辑,RabbitMQ是一个快递站点,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

四大核心组件(生产者、交换机、队列、消费者)

34

RabbitMQ核心部分

35

RabbitMQ的工作原理

36

Broker:接收和分发消息的应用,,RabbitMQ server就是Message Broker。

Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。

Connection:publisher/consumer和broker之间的TCP连接。

Channel:作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point)、topic(publish-subscribe)和 fanout(multicast)。

4、案例

  • 新建Producer.class;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.tzd;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author tianzedeng
* @date 2022/3/15 - 16:13
* 生产者:发送消息
*/
public class Producer {
//队列名称
public static final String QUEUE_NAME = "hello";

//发消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接RabbitMQ的队列
factory.setHost("本地ip");
//用户名
factory.setUsername("username");
//密码
factory.setPassword("password");

//创建连接
Connection connection = factory.newConnection();

//通过连接获取信道
Channel channel = connection.createChannel();

/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化(磁盘) 默认情况消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行消息共享,
* 4.是否自动删除 最后一个消费者端开连接后 该队列是否自动删除
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);

//发消息
String message = "hello world";

/*
* 1.发送到哪个交换机
* 2.路由的key是哪个
* 3.其它参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}

运行结果:

1
2
3
4
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
消息发送完毕
  • 新建Consumer.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.tzd;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author tianzedeng
* @date 2022/3/15 - 16:36
* 消费者:接收消息
*/
public class Consumer {

//队列的名称
public static final String QUEUE_NAME = "hello";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("本地IP");
factory.setUsername("username");
factory.setPassword("password");
Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

//声明 接收消息
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
/*
* 消费者接收消息
* */
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}

运行结果:

1
2
3
4
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
hello world

后台控制器如图:

37

5、工作队列原理

38

重复的连接工厂方法用工具类封装RabbitMqUtils.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.tzd.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Channel;

/**
* @author tianzedeng
* @date 2022/3/15 - 18:27
* 工具类
*/
public class RabbitMqUtils {

//得到一个连接的channel
public static Channel getChannel() throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("本地IP");
factory.setUsername("username");
factory.setPassword("password");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}

新建工作线程类Work01.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.tzd.rabbitmq.demo2;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.tzd.rabbitmq.util.RabbitMqUtils;

/**
* @author tianzedeng
* @date 2022/3/15 - 18:38
* 这是一个工作线程,相当于消费者
*/
public class Worker01 {

//队列名称
public static final String QUEUE_NAME = "hello";

//接收消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收到的消息:"+new String(message.getBody()));
};

CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费者接口回调逻辑");
};
//消息的接收
System.out.println("C2等待接收消息。。。。。");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}

新建生产者类Task01.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.tzd.rabbitmq.demo2;

import com.rabbitmq.client.Channel;
import com.tzd.rabbitmq.util.RabbitMqUtils;

import java.util.Scanner;

/**
* @author tianzedeng
* @date 2022/3/15 - 18:55
* 生产者发送大量的消息
*/
public class Task01 {

//队列名称
public static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//从控制台接收信息
Scanner scanner = new Scanner(System.in);

while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成"+message);
}
}
}

运行结果:

Task01:

1
2
3
4
5
6
7
8
9
10
11
12
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
AA
发送消息完成AA
BB
发送消息完成BB
CC
发送消息完成CC
DD
发送消息完成DD

Work01:

1
2
3
4
5
6
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
C1等待接收消息。。。。。
接收到的消息:AA
接收到的消息:CC

Work01:

1
2
3
4
5
6
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
C2等待接收消息。。。。。
接收到的消息:BB
接收到的消息:DD

6、消息应答

为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉RabbitMQ已经处理完毕,RabbitMQ可以删除该消息了。

  • 自动应答

    仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

  • 手动应答

    • Channel.basicAck(用于肯定确认)

      1
      2
      public abstract void basicAck(long deliveryTag,
      boolean multiple)
    • Channel.basicNack(用于否定确认)

      1
      2
      3
      public abstract void basicNack(long deliveryTag,
      boolean multiple,
      boolean requeue)
    • Channel.basicReject(用于否定确认)与Channel.basicNack相比少了一个参数,不处理该消息直接拒绝,可以将其丢弃。

      1
      2
      public abstract void basicReject(long deliveryTag,
      boolean requeue)
    • Multiple的解释

      手动应答的好处是可以批量应答并且减少网络拥堵。

      1
      channel.basicAck(deliverTag,true)

      true表示应答一次,就会应答所有进程里面的消息。

      false表示只会应答当前的消息。

      39

消息应答重新入队

40

新建Task.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.tzd.rabbitmq.demo3;

import com.rabbitmq.client.Channel;
import com.tzd.rabbitmq.util.RabbitMqUtils;

import java.util.Scanner;

/**
* @author tianzedeng
* @date 2022/3/15 - 20:18
* 消息在手动应答时是不丢失的,放回队列中重新消费
*/
public class task2 {

public static final String TASK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

//声明队列
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
//从控制台输入信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}
}

新建Work03.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.tzd.rabbitmq.demo3;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.tzd.rabbitmq.util.RabbitMqUtils;
import com.tzd.rabbitmq.util.SleepUtils;

/**
* @author tianzedeng
* @date 2022/3/15 - 20:26
* 消息在手动应答时不允许丢失,放回队列中重新消费
*/
public class Work03 {

public static final String TASK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息处理时间较短");

DeliverCallback deliverCallback = (consumerTag,message)->{
//沉睡1秒
SleepUtils.sleep(1);
System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
//手动应答
/*
* 1.消息的标记 tag
* 2.是否批量应答
* */
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> {
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
}));
}

}

新建Work04.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.tzd.rabbitmq.demo3;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.tzd.rabbitmq.util.RabbitMqUtils;
import com.tzd.rabbitmq.util.SleepUtils;

/**
* @author tianzedeng
* @date 2022/3/15 - 20:26
* 消息在手动应答时不允许丢失,放回队列中重新消费
*/
public class Work04 {

public static final String TASK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接收消息处理时间较长");

DeliverCallback deliverCallback = (consumerTag,message)->{
//沉睡1秒
SleepUtils.sleep(30);
System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
//手动应答
/*
* 1.消息的标记 tag
* 2.是否批量应答
* */
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> {
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
}));
}

}

封装睡眠工具类SleepUtils.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.tzd.rabbitmq.util;

/**
* @author tianzedeng
* @date 2022/3/15 - 20:34
* 睡眠工具类
*/
public class SleepUtils {
public static void sleep(int second){
try {
Thread.sleep(1000*second);
}catch (InterruptedException _ignored){
Thread.currentThread().interrupt();
}
}
}

运行结果:

Task2:

1
2
3
4
5
6
7
8
9
10
11
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
aa
生产者发出消息:aa
bb
生产者发出消息:bb
cc
生产者发出消息:cc
dd
生产者发出消息:dd

Work03:在Work04没有接受到消息之前,手动打断Work04时,消息不会丢失,会回调到Work3重新消费。

1
2
3
4
5
6
7
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
C1等待接收消息处理时间较短
接收到的消息:aa
接收到的消息:cc
接收到的消息:dd

Work04:

1
2
3
4
5
6
7
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
C2等待接收消息处理时间较长
接收到的消息:bb

Process finished with exit code -1

7、RabbitMQ持久化

队列持久化(生产者告知)

1
2
3
//让消息队列持久化
boolean durable = true;
channel.queueDeclare(ACK_QUEUE_NAME,durable,false,false,null);

41

消息持久化(生产者就得提前告知队列要实行消息持久化,即要求保存在磁盘中)

1
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
1
2
//当durable为true时
channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

8、不公平分发

设置参数channel.basicQos(1);

1
2
int prefetchCount = 1;
channel.basicQos(prefetchCount);

9、预期值

42

10、发布确认

43

开启发布确认

1
2
Channel channel = connection.createChannel();
channel.confirmSelect();

单个确认发布

同步确认发布方式,一手交钱一手交货。但是发布速度慢,容易阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.tzd.rabbitmq.demo4;

import com.rabbitmq.client.Channel;
import com.tzd.rabbitmq.util.RabbitMqUtils;

import java.awt.*;
import java.util.UUID;

/**
* @author tianzedeng
* @date 2022/3/16 - 15:01
* 验证发布确认模式:
* 使用的时间比较哪种确认方式最好
* 1.单个确认
* 2.批量确认
* 3.异步批量确认
*/
public class ConfirmMessage {

//批量发消息的个数
public static final int MESSAGE_COUNT = 1000;

public static void main(String[] args) throws Exception {
//1.单个确认
ConfirmMessage.publishMessageIndividually();//发布1000个单独确认消息896ms
//2.批量确认
ConfirmMessage.publishMessageBatch();//发布1000个批量确认消息,耗时195ms
//3.异步批量确认
ConfirmMessage.publishMessageAsync();//发布1000个异步确认消息,耗时76ms
}
//单个确认
public static void publishMessageIndividually()throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();

//批量发消息
for (int i = 0;i < MESSAGE_COUNT;i++){
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//单个消息马上确认
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功");
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT +"个单独确认消息"+(end - begin)+"ms");

}
}

批量确认发布

1
2
3
4
5
6
7
8
9
10
11
12
13
//批量确认消息的大小
int batchSize = 100;
//批量发送消息 并且批量确认消息
for (int i = 0;i < MESSAGE_COUNT;i++){
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());

//判断达到100条消息 批量确认一次
if ((i+1)%batchSize == 0){
//发布确认
channel.waitForConfirms();
}
}

异步确认发布

44

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
1.deliveryTag:消息的标记
2.multiple:是否为批量确认
* */
//消息确认成功 回调函数
ConfirmCallback ackCallback = (deliveryTag,multiple)->{
System.out.println("确认的消息:"+ deliveryTag );
};
//消息确认失败 回调函数
ConfirmCallback nackCallback = (deliveryTag,multiple)->{
System.out.println("未确认的消息:"+ deliveryTag );
};
//准备消息的监听器 哪些消息成功 哪些消息失败
channel.addConfirmListener(ackCallback,nackCallback);

//批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
}

如何处理异步未确认消息

最好的解决方法就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

1
2
3
4
5
6
7
8
/*
* 线程安全有序的一个哈希表 适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.可以批量删除条目 只要给到序号即可
* 3.支持高并发
* */
ConcurrentSkipListMap<Long,String> outstandingConfirms =
new ConcurrentSkipListMap<>();
1
2
3
4
5
6
7
8
9
10
11
12
//消息确认成功 回调函数
ConfirmCallback ackCallback = (deliveryTag,multiple)->{
if (multiple){
//2.删除已经确认的消息 剩下就是未确认的消息
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =
outstandingConfirms.headMap(deliveryTag);
longStringConcurrentNavigableMap.clear();
}else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息:"+ deliveryTag );
};
1
2
3
4
5
6
//消息确认失败 回调函数
ConfirmCallback nackCallback = (deliveryTag,multiple)->{
//3.打印一下未确认的消息都有哪些
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息:"+message+"::::::未确认的消息tag:"+ deliveryTag );
};
1
2
3
4
5
6
7
//批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//1.记录所有要发送到的消息 消息的总和
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}

11、交换机

45

RabbitMQ消息传递模型的核心思想:生产者的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递到了哪个队列。

相反,生产者只能将消息发送到交换机。交换机工作的内容很简单,一方面它接收来自生产者的消息,另一方变将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把消息放到特定队列还是说把他们放到许多队列中还是说应该丢弃它们。这就是交换机来决定的。

46

Exchanges的类型

直接(direct)、主题(topic)、标题(header)、扇出(fanout)。

创建临时队列

1
String queueName = channel.queueDeclare().getQueue();

绑定(binding)

绑定就是exchange和queue之间的桥梁,它告诉我们exchange和哪个队列进行绑定关系。x与Q1和Q2进行绑定。

47

扇出(Fanout)—-发布订阅模式

它将接收到的消息广播到它知道的所有队列中。

新建消息的接收类:ReceiveLogs01和ReceiveLogs02

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.tzd.rabbitmq.demo5;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.tzd.rabbitmq.util.RabbitMqUtils;

/**
* @author tianzedeng
* @date 2022/3/16 - 19:27
*/
public class ReceiveLogs01 {

public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME ,"fanout");
//声明一个队列 临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机与队列
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待接收消息,把接收到的消息打印在屏幕上......");
//接收消息
DeliverCallback deliverCallback = (consumeTag, message)->{
System.out.println("ReceiveLogs01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
//消费者取消消息时回调接口
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{} );
}
}

新建生产者类:EmitLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.tzd.rabbitmq.demo5;

import com.rabbitmq.client.Channel;
import com.tzd.rabbitmq.util.RabbitMqUtils;

import java.util.Scanner;

/**
* @author tianzedeng
* @date 2022/3/16 - 19:46
* 发消息给交换机
*/
public class EmitLog {

public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

Scanner scanner = new Scanner(System.in);

while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}
}


运行结果:

48

49

50

Direct Exchange—–路由模式

Routing key不相同。

51

1
2
3
4
5
6
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");

channel.queueBind("disk",EXCHANGE_NAME,"error");

channel.basicPublish(EXCHANGE_NAME,"error/info/warning",null,message.getBytes("UTF-8"));

12、Topics

当想接收不同类型的日志时,可以使用topic类型。

52

53

53

注意事项:

当一个队列绑定键是#,那么这个队列将接收所有数据,跟fanout类似。

如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct。

新建ReceiveLogsTopic01和ReceiveLogsTopic02类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.tzd.rabbitmq.demo7;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.tzd.rabbitmq.util.RabbitMqUtils;

/**
* @author tianzedeng
* @date 2022/3/16 - 21:02
* 声明主题交换机及相关队列
* 消费者C1
*/
public class ReceiveLogsTopic01 {

//交换机的名称
public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
//创建信道
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
String queueName = "Q1";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接收消息......");

DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:"+queueName+"绑定键"+message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
}
}

新建生产者类EmitLogTopic:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.tzd.rabbitmq.demo7;

import com.rabbitmq.client.Channel;
import com.tzd.rabbitmq.util.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/**
* @author tianzedeng
* @date 2022/3/16 - 21:17
* 生产者
*/
public class EmiLogTopic {

//交换机的名称
public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
//创建信道
Channel channel = RabbitMqUtils.getChannel();
//发送消息
Map<String,String> bindingMap = new HashMap<>();
bindingMap.put("quick.orange.rabbit","被队列Q1Q2接收");
bindingMap.put("lazy.orange.elephant","被队列Q1Q2接收");
bindingMap.put("quick.orange.fox","被队列Q1接收");
bindingMap.put("lazy.brown.fox","被队列Q2接收");
bindingMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被Q2接收");
bindingMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");

for (Map.Entry<String, String> bindingKeyEntry : bindingMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}

}
}

运行结果:

55

56

57

13、死信队列

死信的概念

无法被消费的消息叫死信。一般来说,producer将消息投递到broker或者直接到queue,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法消费,这样的消息如果没有后续的处理,就变成死信,有死信就自然有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中,还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信的来源

  • 消息TTL过期
  • 队列达到最大限度
  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false

58

新建Consumer01类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.tzd.rabbitmq.demo8;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.tzd.rabbitmq.util.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/**
* @author tianzedeng
* @date 2022/3/17 - 14:38
* 死信队列
*
* 消费者1
*/
public class Consumer01 {

//普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE = " normal_queue";
//死信队列名称
public static final String DEAD_QUEUE = " dead_queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();
//声明死信交换机和普通交换机 类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

//声明普通队列
Map<String, Object> arguments = new HashMap<>();
//过期时间
//arguments.put("x-message-ttl",10000);
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信routing key
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息......");

DeliverCallback deliverCallback = ( consumerTag, message)->{
System.out.println("Consumer01接收的消息是:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag -> {});
}
}

新建Consumer02类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.tzd.rabbitmq.demo8;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.tzd.rabbitmq.util.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

/**
* @author tianzedeng
* @date 2022/3/17 - 14:38
* 死信队列
*
* 消费者2
*/
public class Consumer02 {

//死信队列名称
public static final String DEAD_QUEUE = " dead_queue";

public static void main(String[] args) throws Exception {

Channel channel = RabbitMqUtils.getChannel();

System.out.println("等待接收消息......");

DeliverCallback deliverCallback = ( consumerTag, message)->{
System.out.println("Consumer02接收的消息是:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
}
}

新建Producer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.tzd.rabbitmq.demo8;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.tzd.rabbitmq.util.RabbitMqUtils;

/**
* @author tianzedeng
* @date 2022/3/17 - 15:08
* 死信队列----生产者
*/
public class Producer {

public static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//死信消息 设置TTL时间(time to live) 单位是ms
AMQP.BasicProperties properties =
new AMQP.BasicProperties()
.builder().expiration("10000").build();

for (int i = 1;i < 11;i++){
String message = "info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
}
}
}

运行结果:

59

14、延迟队列

延时队列内部是有序的。最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

使用场景

  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,如果在十天之内都没有上传过商品,则自动发送消息提醒
  • 用户注册成功后,如果三天内没有登录则进行短信提醒
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员
  • 预订会议后,需要在预订的时间点前十分钟通知各个与会人员参加会议

15、整合SpringBoot

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.tzd.rabbitmq.springboot_rabbitmq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/**
* @author tianzedeng
* @date 2022/3/17 - 18:33
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {

@Bean
public Docket webApiConfig(){

return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}

private ApiInfo webApiInfo() {

return new ApiInfoBuilder()
.title("rabbitmq接口文档")
.description("本文描述了rabbitmq微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288","http://atguigu.com","1129783206@qq.com"))
.build();
}

}

添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>

<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

修改配置

1
2
3
4
spring.rabbitmq.host=本地ip
spring.rabbitmq.port=5672
spring.rabbitmq.username=username
spring.rabbitmq.password=password

16、队列TTL

60

在pom.xml中添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>3.0.0</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

新建配置文件类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.tzd.rabbitmq.springboot_rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* @author tianzedeng
* @date 2022/3/17 - 18:59
* ttl----配置文件类
*/
@Configuration
public class TtlQueueConfig {

//普通交换机的名称
public static final String X_EXCHANGE = "X";
//死信交换机的名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列的名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信队列的名称
public static final String DEAD_LETTER_QUEUE = "QD";

//声明xExchange 别名
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}

//声明yExchange 别名
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}

//声明普通队列 TTL为10s
@Bean("queueA")
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信routing-key
arguments.put("x-dead-letter-routing-key","YD");
//设置TTL
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}

//声明普通队列 TTL为40s
@Bean("queueB")
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信routing-key
arguments.put("x-dead-letter-routing-key","YD");
//设置TTL
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}

//声明死信队列
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}

//绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){

return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}

新建消费者类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.tzd.rabbitmq.springboot_rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @author tianzedeng
* @date 2022/3/17 - 19:37
* 队列TTL 消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {

//接收消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
}
}

新建转发请求类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.tzd.rabbitmq.springboot_rabbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
* @author tianzedeng
* @date 2022/3/17 - 19:28
* 发送延迟消息
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

@Autowired
private RabbitTemplate rabbitTemplate;
//开始发消息
@GetMapping("/sendMsg/{message}")
public void senMsg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);

rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s:"+message);
}
}

输出:http://localhost:8080/ttl/sendMsg/你好

17、发布确认高级

在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?

1
spring.rabbitmq.publisher-confirm-type=correlated
  • NONE:是禁用发布确认模式,是默认值
  • CORRELATED: 是发布消息成功到交换器后会触发回调方法。
  • SIMPLE :经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

添加配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

//声明业务 Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}

// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}

消息生产者回调接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交换机不管是否收到消息的一个回调方法
*
* @param correlationData 消息相关数据
* @param ack 交换机是否收到消息
* @param cause 为收到消息的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
}
}

}

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;

//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(myCallBack);
}

/**
* 消息回调和退回
*
* @param message
*/
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {

//指定消息 id 为 1
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
log.info(routingKey + "发送消息内容:{}", message + routingKey);

CorrelationData correlationData2 = new CorrelationData("2");
routingKey = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
log.info(routingKey + "发送消息内容:{}", message + routingKey);

}

}

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
log.info("接受到队列 confirm.queue 消息:{}", msg);
}

}

访问:http://localhost:8080/confirm/sendMessage/

18、幂等性

概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。 举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常, 此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱 了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等.

消息重复消费

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

解决思路

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性, 这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。

业界主流的幂等性有两种操作:

·唯一 ID+指纹码机制,用数据库主键去重;
·利用 redis 的原子性去实现。
·唯一ID+指纹码机制

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

19、优先级队列

使用场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧。

但是,天猫商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果、小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。

队列中添加优先级

1
2
3
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);

消息中添加优先级

1
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();

注意事项:

要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class PriorityProducer {
private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();

//给消息赋予一个 priority 属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();

for (int i = 1; i < 11; i++) {
String message = "info" + i;
if (i == 5) {
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
} else {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送消息完成:" + message);
}
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class PriorityConsumer {
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();

//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);

//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};

channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}

}

20、惰性队列

使用场景

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。

两种模式

队列具备两种模式:default 和 lazy。默认的为default 模式,在3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。

在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:

Map<String, Object> args = new HashMap<String, Object>();

args.put(“x-queue-mode”, “lazy”);
channel.queueDeclare(“myqueue”, false, false, false, args);

-------------本文结束感谢您的阅读-------------