消息中间件-RabbitMQ RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang 语言编写的,而集群和故障转移是构建在开放电信平台 框架上的。所有主要的编程语言 均有与代理接口通讯的客户端 库。
1、什么是MQ? MQ(message queue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递信息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
2、为什么要用MQ? 流量消峰(优点:防止系统宕机;缺点:访问速率偏慢)
应用解耦(如果订单系统出现故障,就会造成任意一个子系统出现下单异常;使用MQ,即使是出现异常,在MQ已经缓存完成,依然可以正常进行下单)
异步处理
分类
ActiveMQ
优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
缺点:维护越来越少,高吞吐量场景较少使用。
Kafka
优点:吞吐量高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,日志采集。
缺点:社区更新较慢。消费失败不支持重试。
RocketMQ
优点:单机吞吐量十万级,分布式架构,消息0丢失。java语言实现的。
缺点:支持的客户端语言不多。
RabbitMQ
优点:高并发的特性,性能较好,支持多语言。
缺点:商业版需要收费,学习成本比较高。
选择
3、RabbitMQ 概念 RabbitMQ是一个消息中间件,它接受并转发消息。可以当做一个快递站点,dangni要发送包裹时,把包裹方法哦站点,最后包裹会通过快递员送到指定地方,按照这种逻辑,RabbitMQ是一个快递站点,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
四大核心组件(生产者、交换机、队列、消费者)
RabbitMQ核心部分
RabbitMQ的工作原理
Broker:接收和分发消息的应用,,RabbitMQ server就是Message Broker。
Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。
Connection:publisher/consumer和broker之间的TCP连接。
Channel:作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point)、topic(publish-subscribe)和 fanout(multicast)。
4、案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package com.tzd;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("本地ip" ); factory.setUsername("username" ); factory.setPassword("password" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true ,false ,false ,null ); String message = "hello world" ; channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("消息发送完毕" ); } }
运行结果:
1 2 3 4 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: 消息发送完毕
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.tzd;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("本地IP" ); factory.setUsername("username" ); factory.setPassword("password" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println(new String (message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
运行结果:
1 2 3 4 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: hello world
后台控制器如图:
5、工作队列原理
重复的连接工厂方法用工具类封装RabbitMqUtils.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.tzd.rabbitmq.util;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Channel;public class RabbitMqUtils { public static Channel getChannel () throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("本地IP" ); factory.setUsername("username" ); factory.setPassword("password" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
新建工作线程类Work01.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.tzd.rabbitmq.demo2;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.tzd.rabbitmq.util.RabbitMqUtils;public class Worker01 { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("接收到的消息:" +new String (message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消费者取消消费者接口回调逻辑" ); }; System.out.println("C2等待接收消息。。。。。" ); channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
新建生产者类Task01.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.tzd.rabbitmq.demo2;import com.rabbitmq.client.Channel;import com.tzd.rabbitmq.util.RabbitMqUtils;import java.util.Scanner;public class Task01 { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,true ,false ,false ,null ); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("发送消息完成" +message); } } }
运行结果:
Task01:
1 2 3 4 5 6 7 8 9 10 11 12 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: AA 发送消息完成AA BB 发送消息完成BB CC 发送消息完成CC DD 发送消息完成DD
Work01:
1 2 3 4 5 6 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: C1等待接收消息。。。。。 接收到的消息:AA 接收到的消息:CC
Work01:
1 2 3 4 5 6 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: C2等待接收消息。。。。。 接收到的消息:BB 接收到的消息:DD
6、消息应答 为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉RabbitMQ已经处理完毕,RabbitMQ可以删除该消息了。
消息应答重新入队
新建Task.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.tzd.rabbitmq.demo3;import com.rabbitmq.client.Channel;import com.tzd.rabbitmq.util.RabbitMqUtils;import java.util.Scanner;public class task2 { public static final String TASK_QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME,false ,false ,false ,null ); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("" ,TASK_QUEUE_NAME,null ,message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" +message); } } }
新建Work03.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.tzd.rabbitmq.demo3;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import com.tzd.rabbitmq.util.RabbitMqUtils;import com.tzd.rabbitmq.util.SleepUtils;public class Work03 { public static final String TASK_QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C1等待接收消息处理时间较短" ); DeliverCallback deliverCallback = (consumerTag,message)->{ SleepUtils.sleep(1 ); System.out.println("接收到的消息:" +new String (message.getBody(),"UTF-8" )); channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); }; boolean autoAck = false ; channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> { System.out.println(consumerTag+"消费者取消消费接口回调逻辑" ); })); } }
新建Work04.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.tzd.rabbitmq.demo3;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.tzd.rabbitmq.util.RabbitMqUtils;import com.tzd.rabbitmq.util.SleepUtils;public class Work04 { public static final String TASK_QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C2等待接收消息处理时间较长" ); DeliverCallback deliverCallback = (consumerTag,message)->{ SleepUtils.sleep(30 ); System.out.println("接收到的消息:" +new String (message.getBody(),"UTF-8" )); channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); }; boolean autoAck = false ; channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> { System.out.println(consumerTag+"消费者取消消费接口回调逻辑" ); })); } }
封装睡眠工具类SleepUtils.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.tzd.rabbitmq.util;public class SleepUtils { public static void sleep (int second) { try { Thread.sleep(1000 *second); }catch (InterruptedException _ignored){ Thread.currentThread().interrupt(); } } }
运行结果:
Task2:
1 2 3 4 5 6 7 8 9 10 11 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: aa 生产者发出消息:aa bb 生产者发出消息:bb cc 生产者发出消息:cc dd 生产者发出消息:dd
Work03:在Work04没有接受到消息之前,手动打断Work04时,消息不会丢失,会回调到Work3重新消费。
1 2 3 4 5 6 7 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: C1等待接收消息处理时间较短 接收到的消息:aa 接收到的消息:cc 接收到的消息:dd
Work04:
1 2 3 4 5 6 7 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: C2等待接收消息处理时间较长 接收到的消息:bb Process finished with exit code -1
7、RabbitMQ持久化 队列持久化(生产者告知) 1 2 3 boolean durable = true ;channel.queueDeclare(ACK_QUEUE_NAME,durable,false ,false ,null );
消息持久化(生产者就得提前告知队列要实行消息持久化,即要求保存在磁盘中) 1 channel.basicPublish("" ,TASK_QUEUE_NAME,null ,message.getBytes("UTF-8" ));
1 2 channel.basicPublish("" ,TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8" ));
8、不公平分发 设置参数channel.basicQos(1);
1 2 int prefetchCount = 1 ;channel.basicQos(prefetchCount);
9、预期值
10、发布确认
开启发布确认 1 2 Channel channel = connection.createChannel();channel.confirmSelect();
单个确认发布 同步确认发布方式,一手交钱一手交货。但是发布速度慢,容易阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package com.tzd.rabbitmq.demo4;import com.rabbitmq.client.Channel;import com.tzd.rabbitmq.util.RabbitMqUtils;import java.awt.*;import java.util.UUID;public class ConfirmMessage { public static final int MESSAGE_COUNT = 1000 ; public static void main (String[] args) throws Exception { ConfirmMessage.publishMessageIndividually(); ConfirmMessage.publishMessageBatch(); ConfirmMessage.publishMessageAsync(); } public static void publishMessageIndividually () throws Exception{ Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true ,false ,false ,null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); for (int i = 0 ;i < MESSAGE_COUNT;i++){ String message = i + "" ; channel.basicPublish("" ,queueName,null ,message.getBytes()); boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功" ); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT +"个单独确认消息" +(end - begin)+"ms" ); } }
批量确认发布 1 2 3 4 5 6 7 8 9 10 11 12 13 int batchSize = 100 ; for (int i = 0 ;i < MESSAGE_COUNT;i++){ String message = i + "" ; channel.basicPublish("" ,queueName,null ,message.getBytes()); if ((i+1 )%batchSize == 0 ){ channel.waitForConfirms(); } }
异步确认发布
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ConfirmCallback ackCallback = (deliveryTag,multiple)->{ System.out.println("确认的消息:" + deliveryTag ); }; ConfirmCallback nackCallback = (deliveryTag,multiple)->{ System.out.println("未确认的消息:" + deliveryTag ); }; channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = i + "" ; channel.basicPublish("" ,queueName,null ,message.getBytes()); }
如何处理异步未确认消息 最好的解决方法就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。
1 2 3 4 5 6 7 8 ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap <>();
1 2 3 4 5 6 7 8 9 10 11 12 ConfirmCallback ackCallback = (deliveryTag,multiple)->{ if (multiple){ ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = outstandingConfirms.headMap(deliveryTag); longStringConcurrentNavigableMap.clear(); }else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息:" + deliveryTag ); };
1 2 3 4 5 6 ConfirmCallback nackCallback = (deliveryTag,multiple)->{ String message = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息:" +message+"::::::未确认的消息tag:" + deliveryTag ); };
1 2 3 4 5 6 7 for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = i + "" ; channel.basicPublish("" ,queueName,null ,message.getBytes()); outstandingConfirms.put(channel.getNextPublishSeqNo(),message); }
11、交换机
RabbitMQ消息传递模型的核心思想:生产者的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递到了哪个队列。
相反,生产者只能将消息发送到交换机。交换机工作的内容很简单,一方面它接收来自生产者的消息,另一方变将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把消息放到特定队列还是说把他们放到许多队列中还是说应该丢弃它们。这就是交换机来决定的。
Exchanges的类型 直接(direct)、主题(topic)、标题(header)、扇出(fanout)。
创建临时队列 1 String queueName = channel.queueDeclare().getQueue();
绑定(binding) 绑定就是exchange和queue之间的桥梁,它告诉我们exchange和哪个队列进行绑定关系。x与Q1和Q2进行绑定。
扇出(Fanout)—-发布订阅模式 它将接收到的消息广播到它知道的所有队列中。
新建消息的接收类:ReceiveLogs01和ReceiveLogs02
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.tzd.rabbitmq.demo5;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.tzd.rabbitmq.util.RabbitMqUtils;public class ReceiveLogs01 { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME ,"fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"" ); System.out.println("等待接收消息,把接收到的消息打印在屏幕上......" ); DeliverCallback deliverCallback = (consumeTag, message)->{ System.out.println("ReceiveLogs01控制台打印接收到的消息:" +new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(queueName,true ,deliverCallback,consumerTag ->{} ); } }
新建生产者类:EmitLog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.tzd.rabbitmq.demo5;import com.rabbitmq.client.Channel;import com.tzd.rabbitmq.util.RabbitMqUtils;import java.util.Scanner;public class EmitLog { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout" ); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME,"" ,null ,message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" +message); } } }
运行结果:
Direct Exchange—–路由模式 Routing key不相同。
1 2 3 4 5 6 channel.queueBind("console" ,EXCHANGE_NAME,"info" ); channel.queueBind("console" ,EXCHANGE_NAME,"warning" ); channel.queueBind("disk" ,EXCHANGE_NAME,"error" ); channel.basicPublish(EXCHANGE_NAME,"error/info/warning" ,null ,message.getBytes("UTF-8" ));
12、Topics 当想接收不同类型的日志时,可以使用topic类型。
注意事项:
当一个队列绑定键是#,那么这个队列将接收所有数据,跟fanout类似。
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct。
新建ReceiveLogsTopic01和ReceiveLogsTopic02类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.tzd.rabbitmq.demo7;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import com.tzd.rabbitmq.util.RabbitMqUtils;public class ReceiveLogsTopic01 { public static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = "Q1" ; channel.queueDeclare(queueName,false ,false ,false ,null ); channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*" ); System.out.println("等待接收消息......" ); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println(new String (message.getBody(),"UTF-8" )); System.out.println("接收队列:" +queueName+"绑定键" +message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName,true ,deliverCallback,consumerTag -> {}); } }
新建生产者类EmitLogTopic:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.tzd.rabbitmq.demo7;import com.rabbitmq.client.Channel;import com.tzd.rabbitmq.util.RabbitMqUtils;import java.util.HashMap;import java.util.Map;public class EmiLogTopic { public static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); Map<String,String> bindingMap = new HashMap <>(); bindingMap.put("quick.orange.rabbit" ,"被队列Q1Q2接收" ); bindingMap.put("lazy.orange.elephant" ,"被队列Q1Q2接收" ); bindingMap.put("quick.orange.fox" ,"被队列Q1接收" ); bindingMap.put("lazy.brown.fox" ,"被队列Q2接收" ); bindingMap.put("lazy.pink.rabbit" ,"虽然满足两个绑定但只被Q2接收" ); bindingMap.put("quick.brown.fox" ,"不匹配任何绑定不会被任何队列接收到会被丢弃" ); bindingMap.put("quick.orange.male.rabbit" ,"是四个单词不匹配任何绑定会被丢弃" ); bindingMap.put("lazy.orange.male.rabbit" ,"是四个单词但匹配Q2" ); for (Map.Entry<String, String> bindingKeyEntry : bindingMap.entrySet()) { String routingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,routingKey,null ,message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" +message); } } }
运行结果:
13、死信队列 死信的概念 无法被消费的消息叫死信。一般来说,producer将消息投递到broker或者直接到queue,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法消费,这样的消息如果没有后续的处理,就变成死信,有死信就自然有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中,还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
死信的来源
消息TTL过期
队列达到最大限度
消息被拒绝(basic.reject或basic.nack)并且requeue=false
新建Consumer01类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.tzd.rabbitmq.demo8;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import com.tzd.rabbitmq.util.RabbitMqUtils;import java.util.HashMap;import java.util.Map;public class Consumer01 { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static final String DEAD_EXCHANGE = "dead_exchange" ; public static final String NORMAL_QUEUE = " normal_queue" ; public static final String DEAD_QUEUE = " dead_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"lisi" ); channel.queueDeclare(NORMAL_QUEUE,false ,false ,false ,arguments); channel.queueDeclare(DEAD_QUEUE,false ,false ,false ,null ); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan" ); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi" ); System.out.println("等待接收消息......" ); DeliverCallback deliverCallback = ( consumerTag, message)->{ System.out.println("Consumer01接收的消息是:" +new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(NORMAL_QUEUE,true ,deliverCallback,consumerTag -> {}); } }
新建Consumer02类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.tzd.rabbitmq.demo8;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.tzd.rabbitmq.util.RabbitMqUtils;import java.util.HashMap;import java.util.Map;public class Consumer02 { public static final String DEAD_QUEUE = " dead_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息......" ); DeliverCallback deliverCallback = ( consumerTag, message)->{ System.out.println("Consumer02接收的消息是:" +new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(DEAD_QUEUE,true ,deliverCallback,consumerTag -> {}); } }
新建Producer类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.tzd.rabbitmq.demo8;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.tzd.rabbitmq.util.RabbitMqUtils;public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); AMQP.BasicProperties properties = new AMQP .BasicProperties() .builder().expiration("10000" ).build(); for (int i = 1 ;i < 11 ;i++){ String message = "info" +i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan" ,null ,message.getBytes()); } } }
运行结果:
14、延迟队列 延时队列内部是有序的。最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景
15、整合SpringBoot 配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.tzd.rabbitmq.springboot_rabbitmq.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import springfox.documentation.builders.ApiInfoBuilder;import springfox.documentation.service.ApiInfo;import springfox.documentation.service.Contact;import springfox.documentation.spi.DocumentationType;import springfox.documentation.spring.web.plugins.Docket;import springfox.documentation.swagger2.annotations.EnableSwagger2;@Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig () { return new Docket (DocumentationType.SWAGGER_2) .groupName("webApi" ) .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo () { return new ApiInfoBuilder () .title("rabbitmq接口文档" ) .description("本文描述了rabbitmq微服务接口定义" ) .version("1.0" ) .contact(new Contact ("enjoy6288" ,"http://atguigu.com" ,"1129783206@qq.com" )) .build(); } }
添加依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.47</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency >
修改配置
1 2 3 4 spring.rabbitmq.host =本地ip spring.rabbitmq.port =5672 spring.rabbitmq.username =username spring.rabbitmq.password =password
16、队列TTL
在pom.xml中添加依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.47</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 3.0.0</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 3.0.0</version > </dependency > <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit-test</artifactId > <scope > test</scope > </dependency >
新建配置文件类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 package com.tzd.rabbitmq.springboot_rabbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class TtlQueueConfig { public static final String X_EXCHANGE = "X" ; public static final String Y_DEAD_LETTER_EXCHANGE = "Y" ; public static final String QUEUE_A = "QA" ; public static final String QUEUE_B = "QB" ; public static final String DEAD_LETTER_QUEUE = "QD" ; @Bean("xExchange") public DirectExchange xExchange () { return new DirectExchange (X_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange () { return new DirectExchange (Y_DEAD_LETTER_EXCHANGE); } @Bean("queueA") public Queue queueA () { Map<String, Object> arguments = new HashMap <>(3 ); arguments.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"YD" ); arguments.put("x-message-ttl" ,10000 ); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } @Bean("queueB") public Queue queueB () { Map<String, Object> arguments = new HashMap <>(3 ); arguments.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"YD" ); arguments.put("x-message-ttl" ,40000 ); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } @Bean("queueD") public Queue queueD () { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } @Bean public Binding queueABindingX (@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA" ); } @Bean public Binding queueBBindingX (@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB" ); } @Bean public Binding queueDBindingY (@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD" ); } }
新建消费者类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.tzd.rabbitmq.springboot_rabbitmq.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD (Message message, Channel channel) { String msg = new String (message.getBody()); log.info("当前时间:{},收到死信队列的消息:{}" ,new Date ().toString(),msg); } }
新建转发请求类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.tzd.rabbitmq.springboot_rabbitmq.controller;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void senMsg (@PathVariable String message) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}" ,new Date ().toString(),message); rabbitTemplate.convertAndSend("X" ,"XA" ,"消息来自ttl为10s:" +message); rabbitTemplate.convertAndSend("X" ,"XB" ,"消息来自ttl为40s:" +message); } }
输出:http://localhost:8080/ttl/sendMsg/你好
17、发布确认高级 在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?
1 spring.rabbitmq.publisher-confirm-type =correlated
NONE:是禁用发布确认模式,是默认值
CORRELATED: 是发布消息成功到交换器后会触发回调方法。
SIMPLE :经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
添加配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange" ; public static final String CONFIRM_QUEUE_NAME = "confirm.queue" ; @Bean("confirmExchange") public DirectExchange confirmExchange () { return new DirectExchange (CONFIRM_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue () { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBinding (@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("key1" ); } }
消息生产者回调接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Component @Slf4j public class MyCallBack implements RabbitTemplate .ConfirmCallback { @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : "" ; if (ack) { log.info("交换机已经收到 id 为:{}的消息" , id); } else { log.info("交换机还未收到 id 为:{}消息,原因:{}" , id, cause); } } }
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @RestController @RequestMapping("/confirm") @Slf4j public class ProducerController { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange" ; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyCallBack myCallBack; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(myCallBack); } @GetMapping("sendMessage/{message}") public void sendMessage (@PathVariable String message) { CorrelationData correlationData1 = new CorrelationData ("1" ); String routingKey = "key1" ; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1); log.info(routingKey + "发送消息内容:{}" , message + routingKey); CorrelationData correlationData2 = new CorrelationData ("2" ); routingKey = "key2" ; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2); log.info(routingKey + "发送消息内容:{}" , message + routingKey); } }
消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public class ConfirmConsumer { public static final String CONFIRM_QUEUE_NAME = "confirm.queue" ; @RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveMsg (Message message) { String msg = new String (message.getBody()); log.info("接受到队列 confirm.queue 消息:{}" , msg); } }
访问:http://localhost:8080/confirm/sendMessage/
18、幂等性 概念 用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。 举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常, 此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱 了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等.
消息重复消费 消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
解决思路 消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
消费端的幂等性保障 在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性, 这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。
业界主流的幂等性有两种操作:
·唯一 ID+指纹码机制,用数据库主键去重; ·利用 redis 的原子性去实现。 ·唯一ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
19、优先级队列 使用场景
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧。
但是,天猫商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果、小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。
队列中添加优先级
1 2 3 Map<String, Object> params = new HashMap (); params.put("x-max-priority" , 10 ); channel.queueDeclare("hello" , true , false , false , params);
消息中添加优先级
1 AMQP.BasicProperties properties = new AMQP .BasicProperties().builder().priority(10 ).build();
注意事项: 要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class PriorityProducer { private static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); AMQP.BasicProperties properties = new AMQP .BasicProperties().builder().priority(10 ).build(); for (int i = 1 ; i < 11 ; i++) { String message = "info" + i; if (i == 5 ) { channel.basicPublish("" , QUEUE_NAME, properties, message.getBytes()); } else { channel.basicPublish("" , QUEUE_NAME, null , message.getBytes()); } System.out.println("发送消息完成:" + message); } } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class PriorityConsumer { private final static String QUEUE_NAME = "hello" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); Map<String, Object> params = new HashMap (); params.put("x-max-priority" , 10 ); channel.queueDeclare(QUEUE_NAME, true , false , false , params); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(QUEUE_NAME, true , deliverCallback, cancelCallback); } }
20、惰性队列 使用场景 RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。
两种模式 队列具备两种模式:default 和 lazy。默认的为default 模式,在3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:
Map<String, Object> args = new HashMap<String, Object>(); args.put(“x-queue-mode”, “lazy”); channel.queueDeclare(“myqueue”, false, false, false, args);