将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,则可以使用发布者确认。
通道是吃资源的,而且大多数应用情景下同一个JVM进程很少会开放小几百的通道出来。设想我们应用的每个线程都持有一个通道,单个JVM里上千个线程已经会是一个相当大的开销,这些开销本来是可以避免的。此外,一小部分快速的发布者可以很轻松地占满网络接口和代理节点。
自动应答
对于运行在本地的RabbitMQ节点而言,这些参数都有合适的默认值。
消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。在AMQP0-9-1模型中,有两种途径可以达到此目的:
许多Channel接口方法都是被重载的。这里用到的关于exchangeDeclare,queueDeclare和queueBind的短结构的重载方法使用了合适的默认值,更易于使用。当然也有更多参数的长结构的重载方法,使用那些方法可以将一些必要的默认参数进行重写,进行更全面的控制。
队列和交换机可以被动地进行声明。被动声明会简单地检查提供的名称所对应的实体是否存在。对成功检测到的队列来说,被动声明会返回跟非被动声明同样的信息,即队列中处于就绪状态的消费者和消息数量。
exchange自动删除的条件,有队列或者交换器绑定了本交换器,然后所有队列或交换器都与本交换器解除绑定,autoDelete=true时,此交换器就会被自动删除。队列自动删除的条件,有消息者订阅本队列,然后所有消费者都解除订阅此队列,autoDelete=true时,此队列会自动删除,即使此队列中还有消息。
连接和通道的寿命
队列和交换机的被动声明
或者当它不再被使用的时候:
channel.queueDelete("queue-name", true, false)
消费者应用-用来接受和处理消息的应用-在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP代理在什么时候删除消息才是正确的?AMQP0-9-1规范给我们两种建议:
autoDelete属性问题
消费者操作线程池
客户端独占队列
类似于尝试从一个不存在的队列里消费消息这种通道级别的异常会导致通道关闭。已经关闭的通道不可以再被使用,也不会再接收到如消息投递之类的服务器事件。RabbitMQ会记录下通道级别的异常,并且会为通道初始化一个关闭顺序
为了解决这个问题,我们可以使用具有预取计数=1设置的基本Qos方法。这告诉RabbitMQ不要一次向一个工人发送多条消息。或者,换句话说,在处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它会将其分派给下一个仍然不繁忙的工作人员。
channel.basicQos(1)
可以看出默认消费者线程大小是cpu核心数*2
当我们向某个交换机发送消息后,交换机发现消息无法被路由到任何一个绑定到该交换机的队列上,那么如果publiher发送消息时,将mandatory属性设置为了false,那么消息会被转交给alternateexchange兜底交换机,前提是该交换机存在,不存在会记录警告日志。
扇型交换机例:
前者被称作自动确认模式,后者被称作显式确认模式。在显式模式下,由消费者应用来选择什么时候发送确认回执。应用可以在收到消息后立即发送,或将未处理的消息存储后发送,或等到消息被处理完毕后再发送确认回执。
操作成功后,消息代理使用exchange.declare-ok方法进行回应:
实现Consumer最简单的方式是子类化DefaultConsumer。此子类的实例化对象可以当做basicConsume调用时的参数进行传递,用于设置订阅:
@Slf4j
public class Publisher implements Runnable {
@Override
public void run() {
try {
RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
Channel channel = rabbitmqUtil.prepareChannel();
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,"你好,我是生产者".getBytes(StandardCharsets.UTF_8));
log.info("发送消息...");
} catch (IOException | TimeoutException e) {
log.error("出现异常: ",e);
}
}
}
@Slf4j
public class Consumer implements Runnable{
@Override
public void run() {
RabbitmqUtil rabbitmqUtil = null;
try {
rabbitmqUtil = new RabbitmqUtil("application.yml");
Channel channel = rabbitmqUtil.prepareChannel();
//不开启自动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
//手动确认消息收到
channel.basicAck(deliveryTag, false);
log.info("接收到消息: {} , 路由key为: {} ,类型为: {}",new String(body),routingKey,contentType);
}
});
} catch (IOException | TimeoutException e) {
log.error("出现异常: ",e);
}
}
}
简单队列模式
一些常见的操作还带有“非等待”版本,这种版本的操作不会等待服务器的响应。例如,以下方法会声明一个队列并且通知服务器不要发送任何响应
channel.queueDeclareNoWait(queueName, true, false, false, null);
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
工作队列的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
由于网络的不确定性和应用失败的可能性,处理确认回执就变的十分重要。有时我们确认消费者收到消息就可以了,有时确认回执意味着消息已被验证并且处理完毕,例如对某些数据已经验证完毕并且进行了数据存储或者索引操作。
将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。
主机列表的使用
队列持久化
AMQP的消息除属性外,也含有一个有效载荷-Payload,它被AMQP代理当作不透明的字节数组来对待。消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似JSON这种序列化的格式数据,为了节省,协议缓冲器和MessagePack将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP及其同行者们通常使用"content-type"和“content-encoding”这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。
AMQP模型中的消息对象是带有属性的。有些属性及其常见,以至于AMQP0-9-1明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。例如:
实战演示:
quick.orange.rabbit:被队列Q1Q2接收到quick.orange.fox:被队列Q1接收到lazy.brown.fox:被队列Q2接收到lazy.pink.rabbit:虽然满足队列Q2的两个绑定但是只会被接收一次quick.orange.male.rabbit:四个单词不匹配任何绑定会被丢弃
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了。
消费者
以下例子会发布一条具有过期时间属性的消息:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build(),
"hello4".getBytes(StandardCharsets.UTF_8));
默认交换机实际上是一个由消息代理预先声明好的没有名字的直连交换机。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列都会自动绑定到默认交换机上,绑定的路由键名称与队列名称相同。
初始化
当调用Consumers相关的接口方法时,单个订阅始终由其消费者标签引用。消费者标签可以由客户端或者服务器来生成,用于消费者的身份识别。想让RabbitMQ生成一个节点范围内的唯一标签,可以使用不含有消费者标签属性的Channel#basicConsume重载,或者传递一个空字符串做为消费者标签,然后使用Channel#basicConsume返回的值。消费者标签同样用于清除消费者之用。
什么是绑定模式,后面我们讲到RabbitMQ对AMQP协议具体实现的时候会看到
消费者:
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("消息为: {}",new String(body));
}
});
让我们来看看交换机类,有一组方法被关联到了交换机的操作上。这些方法如下所示:
从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。基于此原因,AMQP模块包含了一个消息确认的概念:当一个消息从队列中投递给消费者后,消费者会通知一下消息代理,这个可以是自动的也可以由处理消息的应用的开发者执行。当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执。
路由模式
分发有关于特定地理位置的数据,例如销售点由多个工作者完成的后台任务,每个工作者负责处理某些特定的任务股票价格更新涉及到分类或者标签的新闻更新云端的不同种类服务的协调分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。
AMQP0-9-1方法
上面的路由模式本质就是利用了直接交换机的routeKey绑定特性,但是直接交换机还是有一个坏处,就是无法模糊匹配,必须精确指定routekey才行,因此这就有了主体模式,提供模糊匹配的功能
简单来讲,对任何一种对象类型进行声明的目的是为了确保它们已经存在,并在需要的时候对其进行创建。
注意,当只有一个客户端打算独占队列时,这是一个典型的队列声明方式。队列不需要既定的名称,没有其他客户端使用此队列,队列会被自动清理掉。如果有多个客户端消费打算消费一个既定名称的队列,一下代码更为合适:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
常量准备
/**
* 直接交换机
*/
public static final String DIRECT_EXCHANGE="direct";
/**
* 队列一
*/
public static final String QUEUE_ONE="queue_one";
public static final String ONE_KEY="one";
/**
* 队列二
*/
public static final String QUEUE_TWO="queue_two";
public static final String TWO_KEY="two";
交换机:和生产者建立连接并接受生产者投递的消息
通过订阅来接收消息
Consumers同样可以通过实现handleCancelOk和handleCancel方法来分别被告知是通过显式还是隐式方式进行取消。
绑定是交换机将消息路由给队列所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
实际是将消息放入默认直连交换机中,然后该交换机绑定指定队列
将消息投递给应用应用根据需要主动获取消息
使用AddressResolver接口实现服务发现
工具类
public class RabbitmqUtil {
private final String keyPrefix="spring.rabbitmq.";
private final YamlUtil yamlUtil;
private final RabbitmqClient rabbitmqClient;
public RabbitmqUtil(String ymlPath) {
this.yamlUtil =new YamlUtil(ymlPath);
this.rabbitmqClient=RabbitmqClient.builder()
.userName(yamlUtil.get(keyPrefix+"username"))
.password(yamlUtil.get(keyPrefix+"password"))
.host(yamlUtil.get(keyPrefix+"host"))
.port(Integer.valueOf(yamlUtil.get(keyPrefix+"port")))
.virtualHost(yamlUtil.get(keyPrefix+"virtual-host"))
.build();
}
public Connection getConnection() throws IOException, TimeoutException {
return rabbitmqClient.getConnection();
}
}
RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
Connection connection = rabbitmqUtil.getConnection();
发布者发布消息时可以给消息指定各种消息属性。有些属性有可能会被消息代理使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。
交换机和交换机类型
getAddresses方法会在newConnection方法中被回调
就像发布者一样,这里同样也需要考虑到消费者的并发安全性。
以上的操作来自逻辑上的配对:exchange.declare和exchange.declare-ok,exchange.delete和exchange.delete-ok.这些操作分为“请求-requests”和“响应-responses”。
在共享的通道上并发执行发布会导致错误的帧交错在网络上,触发连接级别的协议异常并导致连接被代理直接关闭。需要在应用程序代码中进行显式同步。
多个RouteKey可能共同指向同一个queue,也可能一个RouteKey指向多个队列,因此两者之间是多对多的关系
我们可以使用AddressResolver接口实现来改变连接时的端点解析算法:
Connection conn = factory.newConnection(addressResolver);
AMQP是一个网络协议。它支持符合要求的客户端应用和消息中间件代理之间进行通信。
主题交换机
直连交换机经常用来循环分发任务给多个工作者。当这样做的时候,我们需要明白一点,在AMQP0-9-1中,消息的负载均衡是发生在消费者之间的,而不是队列之间。
不同的消费者实例必须持有不同的消费者标签。非常不建议在同一个连接上出现重复的消费者标签,这回导致自动连接覆盖问题,并在监控消费者时混淆监控数据。
Java客户端开发指南
交换机可以有两个状态:持久、暂存。持久化的交换机会在消息代理重启后依旧存在,而暂存的交换机则不会。然而并不是所有的应用场景都需要持久化的交换机。
以下示例发送消息的时候会指定投递模式为优先级为1并且消息体类型为"text/plain"。使用Builder类去创建一个需要指定多个属性的消息属性对象,例如:
//发布消息到交换机
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("dhy")
.build(),
"hello2".getBytes(StandardCharsets.UTF_8));
以下代码声明了一个交换机以及一个服务端命名的队列,然后将它们绑定到一起
channel.exchangeDeclare("dhy-exchange", "direct", true);
//queueDeclare创建的队列名为""
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "dhy-exchange", "dhy");
尝试去拉取消息,如果当前存在消息则返回,否则返回null,因此客户端大多需要不断轮询来获取消息,所以这种方式不推荐。
就拿上面那副举个例子:
有些应用需要与AMQP代理建立多个连接。无论怎样,同时开启多个TCP连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP0-9-1提供了通道来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。
把交换机里的消息发送给所有绑定该交换机的队列,忽略routingKey。
消费者的回调的调度是在一个独立的线程池里完成的,这个线程池跟通道实例化的那个池是分开的。这表示Consumers可以安全的调用类似于Channel#queueDeclare和Channel#basicCancel这种链接和通道的阻塞方法。
Rabbitmq初识
消息确认
AMQP0.1协议解析
消费者生产者
因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:
该模型很简单,就是生产者直接将消息放入队列中,消费者从队列取出消息进行消费
AMQP是一个可编程的协议
队列:Exchange将消息分发到指定的Queue
交换机和队列在使用事前必须对他们进行声明。
例如,客户端发布了一条带有mandatory标识的消息,此消息设置了交换机类型为“直连”,但是交换机并没有绑定到队列上,此时退还监听就会被调用。
Thread publisher = new Thread(new Publisher(),"生产者线程");
publisher.start();
相关源码全部发布在下面的仓库中
拒绝消息
发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不查看使用者的未确认消息的数量。它只是盲目地将第n条消息分派给第n个使用者。
“非等待”版本的操作会更具效率,但是安全保障较低,例如,它们更依赖心跳机制去检测失败的操作。如果不确定,就从标准版本的操作用起。“非等待”版本只是在高级拓扑结构的情况下需要。
当消息重新排队时,如果可能,它将被放置在其队列中的原始位置。如果不是,则消息将重新排队到更靠近队列头的位置。
RabbitMQJava客户端使用com.rabbitmq.client作为它的顶级包。关键的类和接口有:
Channel#queueDeclarePassive和Channel#exchangeDeclarePassive方法被用来进行被动声明。下边演示Channel#queueDeclarePassive的使用:
AMQP.Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
response.getMessageCount();
response.getConsumerCount();
主题交换机通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由。
需要注意的是,默认情况下guest用户只能用本地进行连接。目的是为了限制已知凭证在生产系统中的使用。
#在配置文件中设置loopback_users为none,那么guest账号就可以进行远程连接了
loopback_users = none
消费者:监听RabbitMQ中的Queue中的消息
如果发布的消息设置了mandatory标识,但是没有被成功路由,代理会将其返回给发送的客户端。
如果同时也提供了ExecutorService,那线程池也是对应的第一次成功连接的那个。
如果在创建连接前没有指定参数值,则会使用默认参数:
Username | “guest” |
Password | “guest” |
Virtual host | “/” |
Hostname | “localhost” |
port | 5672正常通信端口,5671用于SSL加密通信 |
交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定的规则所决定的。AMQP0-9-1的代理提供了四种交换机
Direct exchange(直连交换机) | (Empty string) and amq.direct |
Fanout exchange(扇型交换机) | amq.fanout |
Topic exchange(主题交换机) | amq.topic |
Headers exchange(头交换机) | amq.match (and amq.headers in RabbitMQ) |
就跟主机列表一样,先尝试返回的第一个Address,如果失败了再试第二个,直到成功为止。
在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。
实体和消息的清除
接收消息最高效的方式是使用Consumer消息推送接口设置订阅。消息在到达时被自动投递到其中,而不是显示的去请求。
每个消费者都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。
消息代理和他们所扮演的角色
持久化、非自动删除的“直连”交换机拥有既定名称的,持久化、非独占、非自动删除的队列
客户端的TCP节点使用的凭证
上面这种交换机被称为扇形交换机
发布订阅模式很简单,这里给出一个简单的例子:
以下是发布带有自定义headers消息的示例:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("dhy", 18);
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build(),
"hello3".getBytes(StandardCharsets.UTF_8));
Channels虽然也是长期存活的,但是由于有大量的可恢复的协议错误会导致通道关闭,通道的存活期会比连接短一些。虽然每个操作都打开和关闭一个通道不是必须的操作,但是也不是不可行。有的选的情况下,还是优先考虑通道的复用为好。
也可以通过通道池的方式来避免在共享通道上并发发布消息:一旦一个线程使用完了某个通道,就将通道归还到池中,使得通道可以被其他线程再次使用。通道池可以视为一个特殊的同步解决方案。建议使用现成的池库来实现,而不是自己实现。例如开箱即用的SpringAMQP。
相同的executor服务可能会被多个连接共享,或者接连不断的重复使用、重复连接,但是无论如何当它关闭后是不可以再用的。
协议内置的消息确认功能将帮助开发者建立强大的软件。
使用案例:
交换机和队列
消息确认机制
主题模式
通过Channel的接口可以对协议进行操作。Connection用于开启通道,注册连接的生命周期内的处理事件,并且关闭不再需要的连接。ConnectionFactory用于实例化Connection对象,并且可以通过ConnectionFactory来进行诸如vhost、username等属性的设置。
使用Channel.basicGet来进行消息的“拉取”。返回值是包含有头信息和消息体的GetResponse对象实例。
@Slf4j
public class Consumer implements Runnable{
@Override
public void run() {
RabbitmqUtil rabbitmqUtil = null;
try {
rabbitmqUtil = new RabbitmqUtil("application.yml");
Channel channel = rabbitmqUtil.prepareChannel();
boolean autoAck = false;
while(true){
GetResponse response = channel.basicGet(QUEUE_NAME, autoAck);
if (response == null) {
log.info("当前无消息....");
} else {
byte[] body = response.getBody();
log.info("msg: {}",new String(body));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
break;
}
}
} catch (IOException | TimeoutException e) {
log.error("出现异常: ",e);
}
}
}
生产者代码准备@Slf4jpublicclassPublisherimplementsRunnable{@Overridepublicvoidrun{try{RabbitmqUtilrabbitmqUtil=newRabbitmqUtil;Channelchannel=rabbitmqUtil.createChannel;//声明直接交换机channel.exchangeDeclare;//分别发送两个消息,对应的路由key为one和twochannel.basicPublish);channel.basicPublish);}catch{log.error;}}}消费者代码准备
@Slf4j
public class ConsumerOne implements Runnable{
@Override
public void run() {
RabbitmqUtil rabbitmqUtil = null;
try {
rabbitmqUtil = new RabbitmqUtil("application.yml");
Channel channel = rabbitmqUtil.createChannel();
channel.queueDeclare(QUEUE_ONE,false,false,false,null);
//绑定别忘了
channel.queueBind(QUEUE_ONE,DIRECT_EXCHANGE,ONE_KEY);
channel.basicConsume(QUEUE_ONE,true,new DefaultConsumer(channel){
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("消息为: {}",new String(body));
}
});
} catch (IOException | TimeoutException e) {
log.error("出现异常: ",e);
}
}
}
@Slf4j
public class ConsumerTwo implements Runnable{
@Override
public void run() {
RabbitmqUtil rabbitmqUtil = null;
try {
rabbitmqUtil = new RabbitmqUtil("application.yml");
Channel channel = rabbitmqUtil.createChannel();
channel.queueDeclare(QUEUE_TWO,false,false,false,null);
channel.queueBind(QUEUE_TWO,DIRECT_EXCHANGE,TWO_KEY);
channel.basicConsume(QUEUE_TWO,true,new DefaultConsumer(channel){
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("消息为: {}",new String(body));
}
});
} catch (IOException | TimeoutException e) {
log.error("出现异常: ",e);
}
}
}
RabbitMQ节点可以持有有限的的客户端信息:
头交换机
扇型交换机
ChannIN类中源码其实也非常简单:
@Override
public GetResponse basicGet(String queue, boolean autoAck)
throws IOException
{
validateQueueNameLength(queue);
//构造命令
AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
.queue(queue)
.noAck(autoAck)
.build());
//命令执行
Method method = replyCommand.getMethod();
//判断执行结果
if (method instanceof Basic.GetOk) {
Basic.GetOk getOk = (Basic.GetOk)method;
Envelope envelope = new Envelope(getOk.getDeliveryTag(),
getOk.getRedelivered(),
getOk.getExchange(),
getOk.getRoutingKey());
BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
byte[] body = replyCommand.getContentBody();
int messageCount = getOk.getMessageCount();
metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
//有结果,那么构造成GetResponse后返回
return new GetResponse(envelope, props, body, messageCount);
} else if (method instanceof Basic.GetEmpty) {
//没有消息,那么返回的结果为空,会返回null
return null;
} else {
throw new UnexpectedMethodError(method);
}
}
mandatory是强制的意思
主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的多消费者/多应用的时候,主题交换机都可以被列入考虑范围。
如果AMQP的消息无法路由到队列,消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。
先启动消费者,再启动生产者,因为需要先创建队列才行,否则交换机收到消息找不到队列,那么会直接丢弃消息
Thread consumer1 = new Thread(new ConsumerOne(),"消费者1");
Thread consumer2 = new Thread(new ConsumerTwo(),"消费者2");
Thread publisher = new Thread(new Publisher(),"生产者");
consumer1.start();
consumer2.start();
Thread.sleep(1000);
publisher.start();
线程之间共享通道也会干扰发布者确认。最好能够完全避免在共享的通道上上进行并发发布,例如通过每个线程使用一个通道的方式实现并发。
AddressResolver是实现自定义服务发现逻辑的最佳方式,客户端可以自动连接到首次启动时尚未出现故障的节点。
也可以做到当队列为空时对其进行删除:
channel.queueDelete("queue-name", false, true)
更复杂的消费者需要去覆写其他方法。特别说明的是,当通道和连接关闭时,handleShutdownSignal会被调用,handleConsumeOk会在调用其他Consumer回调之前被传递给消费者标签。
@Override
public void handleConsumeOk(String consumerTag) {
this._consumerTag = consumerTag;
}
队列,交换机和绑定统称为AMQP实体。
公平调度和预取值
使用pushAPI,应用需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者。
提供本次连接的标记名称
消息属性和有效载荷
拥有了交换机这个中间层,很多由发布者直接到队列难以实现的路由方案能够得以实现,并且避免了应用开发者的许多重复劳动。
NameDurabilityAuto-deleteArguments
以"amq."开始的队列名称被预留做消息代理内部使用。如果试在队列声明时打破这一规则的话,一个通道级的403错误会被抛出。
大规模多用户在线游戏可以使用它来处理排行榜更新等全局事件体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端分发系统使用它来广播各种状态和配置更新在群聊的时候,它被用来分发消息给参与群聊的用户。
如上所示,exchange.declare方法携带了好几个参数。这些参数可以允许客户端指定交换机名称、类型、是否持久化等等。
消息代理从发布者亦称生产者那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者。
消息持久化需要将相关的队列先进行持久化,然后在发布消息时,将消息标记为持久化。
channel.basicPublish("",QUEUE_NAME,true,
//消息添加持久化属性
MessageProperties.PERSISTENT_TEXT_PLAIN,("序号"+i).getBytes(StandardCharsets.UTF_8));
注意BasicProperties是AMQP自动生成的持有类的内置类。
生产者:发布消息到RabbitMQ中的交换机上
我们一共发送了6条消息,下面看看每个消费者都接收到了多少消息:
扇型交换机将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由。
客户端可以通过实现ReturnListener接口并调用Channel.addReturnListener来收到此类退还通知。如果客户端没有为特定的通道配置退还监听,那返回的相应消息会被默默地丢弃掉。
@Slf4j
public class Publisher implements Runnable {
@Override
public void run() {
try {
RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
Channel channel = rabbitmqUtil.prepareChannel();
channel.addReturnListener(new RouteFailListener());
channel.basicPublish(EXCHANGE_NAME,UNKNOWN_ROUTING_KEY,true,null,"你好,我是生产者".getBytes(StandardCharsets.UTF_8));
log.info("发送消息...");
} catch (IOException | TimeoutException e) {
log.error("出现异常: ",e);
}
}
}
Channel#exchangeDeclarePassive方法的返回值没包含什么有用的信息。只要方法正确返回,并且没有通道异常发生,就意味着交换机已经存在了。
持久化、非自动删除的“直连”形交换机具有系统生成的名称的,非持久化、独占、自动删除的队列
注意,RabbitMQ只支持通道级的预取计数,而不是连接级的或者基于大小的预取。
源码仓库
手动应答
ContenttypeContentencodingRoutingkeyDeliverymode投递模式MessagepriorityMessagepublishingtimestampExpirationperiodPublisherapplicationid
我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。
如果同时也提供了ExecutorService,那线程池也是对应的第一次成功连接的那个。
工作队列模式
虽然可以安全地并发调用通道上的某些操作,但有些操作则不能并发调用,如果那样做会导致错误的帧交错在网络上,或造成重复确认等问题。
生产者准备@Slf4jpublicclassPublisherimplementsRunnable{@Overridepublicvoidrun{try{RabbitmqUtilrabbitmqUtil=newRabbitmqUtil;Channelchannel=rabbitmqUtil.createChannel;//声明主题交换机channel.exchangeDeclare;channel.basicPublish);channel.basicPublish);channel.basicPublish);}catch{log.error;}}}消费者准备
@Slf4j
public class ConsumerOne implements Runnable{
@Override
public void run() {
RabbitmqUtil rabbitmqUtil = null;
try {
rabbitmqUtil = new RabbitmqUtil("application.yml");
Channel channel = rabbitmqUtil.createChannel();
channel.queueDeclare(Q1_QUEUE,false,false,false,null);
channel.queueBind(Q1_QUEUE,TOPIC_EXCHANGE,Q1_ROUTE_KEY);
channel.basicConsume(Q1_QUEUE,true,new DefaultConsumer(channel){
@SneakyThrows
由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理可以存在于不同的设备上。
直连型交换机例:
虚拟主机
Channel:代表AMQP0-9-1通道,并提供了大多数操作。Connection:代表AMQP0-9-1连接ConnectionFactory:构建Connection实例Consumer:代表消息的消费者DefaultConsumer:消费者通用的基类BasicProperties:消息的属性BasicProperties.Builder:BasicProperties的构建器
发布订阅模式
建议不要采用自动应答
有些属性是被AMQP代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头。他们跟HTTP协议的X-Headers很相似。消息属性需要在消息被发布的时候定义。
如果消费者在指定超时时间内没有对某个消息做出应答,那么会强制关闭当前通道,并抛出PRECONDITION_FAILED通道级异常PRECONDITION_FAILED默认超时时间为30分钟
每个通道都有自己的调度线程。对于大多数常见的每个Channel一个Consumer的场景下,这意味着消费者之间不会相互影响。需要注意,如果一个通道里有多个消费者,长时间运行的消费者会阻挡通道中其他消费者回调方法的调度。
此类消息可以被丢弃或死信或重新排队。此行为由requeue字段控制。当该字段设置为true时,代理将使用指定的传递标记将传递重新排队。或者,当此字段设置为false时,如果已配置,则消息将被路由到死信交换,否则将被丢弃。
准备好交换机和队列
public static final String EXCHANGE_NAME="dhy-exchange";
public static final String QUEUE_NAME="dhy-queue";
public static final String ROUTING_KEY="dhy";
public static final String TEMP_QUEUE="";
public static final String UNKNOWN_ROUTING_KEY ="unknown";
public Channel prepareChannel() throws IOException, TimeoutException {
RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml","dhy-connection");
Channel channel = rabbitmqUtil.createChannel();
//声明交换机和队列
//非持久化、非自动删除的“扇形”形交换机
channel.exchangeDeclare(EXCHANGE_NAME, FANOUT, false);
//拥有既定名称的,非持久化、非独占、非自动删除的队列
channel.queueDeclare(TEMP_QUEUE, false, false, false, null);
//绑定交换机和队列
channel.queueBind(TEMP_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
log.info("准备channel中..");
return channel;
}
可以把上面交换机和队列的关系理解为map集合的关系:
Map<List<RouteKey>,List<Queue>> exchange;
newConnection方法提供了很多重载方法,其中一部分提供了此次连接名称的设置
public Connection getConnection(String connectionName) throws IOException, TimeoutException {
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setHost(host);
connectionFactory.setPort(port);
return connectionFactory.newConnection(connectionName);
}
把一个Address数组传给newConnection是没问题的。Address是一个com.rabbitmq.clientpackage中包含主机和端口组件的简单的便捷类。
AMQP连接通常是长连接。AMQP是一个使用TCP提供可靠投递的应用层协议。AMQP使用认证机制并且提供TLS保护。当一个应用不再需要连接到AMQP代理的时候,需要优雅的释放掉AMQP连接,而不是直接将TCP连接关闭。
应该尽量避免在线程间共享通道对象。应用应该尽可能为每个线程都使用单独的通道,而不是将通道共享给多个线程。
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮循机制。
应该在有证据表明处理消费回调存在严重瓶颈时才去考虑使用这个功能。如果没有或者只有少量消费者回调需要执行,那默认分配的线程就足够了。即使偶尔会有消费者活动陡增的情况,最初的负载是很小的,并且线程资源的分配和不能无限扩大。
您可能已经注意到,调度仍然不完全符合我们的要求。例如,在有两个worker的情况下并且二者处理速度相差很大的情况下,一个worker会一直很忙,而另一个worker几乎不会做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地调度消息。
队列名称
传递消费者标签。
例如:
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);
如果对应的实体不存在,操作会抛出一个通道级别的异常。然后通道就不可以继续使用了,需要打开一个新的通道。通常在进行被动声明的时候使用临时的一次性通道。
消息能够以持久化的方式发布,AMQP代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式。将消息以持久化方式发布时,会对性能造成一定的影响。
需要注意的是,将通道关闭掉不是必须的操作。因为无论何种情况,通道都会在底层的连接关闭时自动关闭掉。
默认用户名和密码为guest
你可以通过Channel.basicCancel显式地取消一个指定的Consumer。
channel.basicCancel(consumerTag);
当连接关闭时,默认提供的ExecutorService也会执行shutdown,但是用户提供的ExecutorService则不会执行shutdown。提供自定义ExecutorService的客户端必须确保其最终会被关闭,否则线程池会影响JVM的中止。
这里我们并没有展示所有的可能性。
消费者代码不变,生产者代码同样不变,我们只需要同时启动两个消费者即可,并且通过web界面,手动往队列中塞入消息进行测试:
客户端connections是长连接。底层协议的设计和优化都考虑到了长连接的需求。这意味着对诸如消息发送之类的每个操作都建立一个连接的形式是极其不推荐的,那样做会产生大量的网络往返和开销。
举例:消费端程序调用了channel.basicQos,之后订阅了某个队列进行消费。RabbitMq会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,计数达到5后,那么RabbitMQ就不会向这个消费者再发消息。消费者确认了某条消息处理完后,RabbitMQ将相应的计数减1之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于TCPIP中的"滑动窗口"
为了在一个单独的代理上实现多个隔离的环境,AMQP提供了一个虚拟主机的概念。这跟Webservers虚拟主机概念非常相似,这为AMQP实体提供了完全隔离的环境。当连接被建立的时候,AMQP客户端来指定使用哪个虚拟主机。
队列在声明后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。
持久化队列会被存储在磁盘上,当消息代理重启的时候,它依旧存在。没有被持久化的队列称作暂存队列。并不是所有的场景和案例都需要将队列持久化。
当消息代理将消息发送给应用后立即删除。待应用发送一个确认回执后再删除消息。
发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”,“nyse.vmw”,"quick.orange.rabbit"这种类型的。当然这个单词列表最多不能超过255个字节。在这个规则列表中,其中有两个替换符:
* 可以代替一个单词
# 可以代替零个或多个单词
如果需要更大的控制权,可以使用newConnection去应用ExecutorService以进行替代。这是一个应用一个比常规分配额更大的线程池的示例:
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
NameDurableExclusive使用,而且当连接关闭后队列即被删除)Auto-deleteArguments
这虽然能让开发人员自由发挥,但也需要他们注意潜在的定义冲突。当然这在实践中很少会发生,如果发生,会以配置错误的形式表现出来。
AMQP0-9-1的工作过程如下:消息被发布者发送给交换机,交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
Forexample:当你声明了一个名为"search-indexing-online"的队列,AMQP代理会自动将其绑定到默认交换机上,绑定的路由键名称也是为"search-indexing-online"。当携带着名为"search-indexing-online"的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为"search-indexing-online"的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。
Rabbitmq七种模式
上面是官网给出的三幅示例,其实就是利用了直连交换机通过路由key去完成消息投递的特性,构建出了上面这种路由模式
队列可以被清除:
channel.queuePurge("queue-name")
如果一个消费者在尚未发送确认回执的情况下挂掉了,那AMQP代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
RabbitmqClient封装建立连接用的相关属性
@Builder
public class RabbitmqClient {
private String userName;
private String password;
private String virtualHost;
private String host;
private Integer port;
public Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setHost(host);
connectionFactory.setPort(port);
return connectionFactory.newConnection();
}
}
默认情况下,消费者线程会通过一个新的ExecutorService线程池分配。
当我们向某个交换机发送消息后,交换机发现消息无法被路由到任何一个绑定到该交换机的队列上,那么如果publiher发送消息时,将mandatory属性设置为了true,该消息会被返回给publisher,对应的消息发送方需要提供一个处理回退消息的回调接口,可以通过该接口完成对路由失败消息的记录或者尝试将其转交给其他交换机发送。
建立连接
包括RabbitMQJava客户端在内的AMQP0-9-1客户端链接可以提供一个自定义标识符,一遍在服务器日志和管理界面中方便地对客户端进行区分。设置好后,日志内容管理界面中便会对标识符有所体现。标识符即为客户端提供的连接名称。名称可以用于标识应用或应用中特定的组件。虽然名称是可选的,但是强烈建议提供一个,这将大大简化某些操作任务。
可以显示地将队列和交换机删除:
channel.queueDelete("queue-name")
应用程序声明AMQP实体,定义需要的路由方案,或者删除不再需要的AMQP实体。
发布消息
//发布消息到交换机
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,"hello rabbitmq".getBytes(StandardCharsets.UTF_8));
消费者确认
一个线程用于消费,另一个线程在共享通道上推送是安全的。
如下的例子:客户端要求消息代理使用exchange.declare方法声明一个新的交换机:
AMQP中的队列跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,但是队列也有一些另外的属性。
消息确认
通过简单的对通道和连接进行关闭即可关闭掉RabbitMQ的连接:
public void close(){
log.info("关闭rabbitmq连接中...");
try {
//channel.close(); 非必须
connection.close();
} catch (IOException e) {
log.error("关闭连接过程中出现错误: ",e);
}
}
这将会主动声明以下对象,这两个对象都可以使用附加参数进行自定义。但在这里,没有给他们俩定义特殊的参数。
这种情形很常见,所以AMQP0-9-1内置了一个功能叫做消息确认,消费者用它来确认消息已经被接收或者处理。如果一个应用崩溃掉,而且消息的确认回执功能已经被开启,但是消息代理尚未获得确认回执,那么消息会被从新放入队列。
测试:
Thread consumer = new Thread(new Consumer(),"消费者线程");
Thread publisher = new Thread(new Publisher(),"生产者线程");
consumer.start();
publisher.start();
直连交换机
处理无法路由的消息
下面给出使用演示:
默认交换机
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
类比:
除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
AMQP0-9-1是一个可编程协议,某种意义上说AMQP的实体和路由规则是由应用本身定义的,而不是由消息代理定义。包括像声明队列和交换机,定义他们之间的绑定,订阅队列等等关于协议本身的操作。
有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
当交换机拿到一个RouteKey后,需要知道该把这个消息路由给哪些队列,怎么办呢?
List<Queue> queues=exchange.get(Arrays.asList(key1,key2...))
这样会先去尝试连接hostname1:portnumber失败的话会再尝试hostname2:portnumber返回的连接对象是第一次成功的数组元素的。这跟分别设置主机和端口然后依次调用factory.newConnection直到成功的操作一毛一样。
路由:交换机转换消息到队列的规则
下面先讲讲具体是如何进行模糊匹配的:
创建通道
public Channel createChannel() throws IOException {
log.info("通道创建中...");
return connection.createChannel();
}
AMQP队列类的配对方法-queue.declare方法和queue.declare-ok有着与其他配对方法非常相似的一系列事件:
在某些情况下,例如当一个消息无法被成功路由时,消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
想要实现更完善的控制,可以使用重载的变体来指定mandatory标识,或者发送预设好消息属性的消息。
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
日志如下:
19:52:56.103 [生产者线程] INFO com.dhy.util.RabbitmqUtil - 连接建立
19:52:56.232 [生产者线程] INFO com.dhy.util.RabbitmqUtil - 连接建立,客户端设置的连接名为dhy-connection
19:52:56.234 [生产者线程] INFO com.dhy.util.RabbitmqUtil - 通道创建中...
19:52:56.369 [生产者线程] INFO com.dhy.util.RabbitmqUtil - 准备channel中..
19:52:56.374 [生产者线程] INFO com.dhy.Publisher - 发送消息...
19:52:56.403 [AMQP Connection 110.40.155.17:5672] WARN com.dhy.RouteFailListener - 路由失败消息信息如下: replyCode=312 ,replyText=NO_ROUTE ,exchange=dhy-exchange ,routingKey=unknown ,body=你好,我是生产者
获取单条消息
确认消息//第一个参数:确认哪一个消息//第二个参数:是否开启消息批量应答channel.basicAck,false);批量应答怎么玩?可以对手动确认进行批处理以减少网络流量。这是通过将确认方法的多个字段设置为true来完成的。当批处理字段设置为true时:例如,假设通道Ch上有未确认的传递标记7和当确认帧到达该通道时,delivery_tag设置为8且批处理标记设置为true,则将确认从5到8的所有标记。如果将批处理标记设置为false,则交付6和7仍将不被确认。如果消费者拿到了消息但是直到断开连接前,都没有对消息进行应答,那么消息会重新入队拒绝消息//第一个参数:拒绝哪一个消息//第二个参数:是否将拒绝的消息重新入队channel.basicReject,true);//第一个参数:拒绝哪一个消息//第二个参数:是否批量拒绝//第三个参数:是否将拒绝的消息重新入队//basic.nack方法可以一次拒绝或重新排队多条消息。这就是它与basic.reject的区别。channel.basicNack,true,true);使用者无法立即处理交付,但其他实例可能能够处理。在这种情况下,可能需要将其重新排队,让另一个消费者接收并处理它。basic.reject和basic.nack是用于此目的的两种协议方法。
消息持久性
不是所有的AMQP方法都有与其配对的“另一半”。许多都没有相对应的“响应”方法,另外一些有着一种以上与之对应的“响应”方法。
Notice!!!
exchange.declare-ok方法除了通道号之外没有携带任何其他参数。
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于“拒绝消息”的原因处理失败了。当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。
给出的示例中,两个队列都是临时队列,即队列名由服务端生成的队列,与唯一的客户端绑定,客户端断开连接后,队列自动被删除
AddressResolver接口类似于:
public interface AddressResolver {
List<Address> getAddresses() throws IOException;
}
一个需要避免的经典的反模式就是为每个发布的消息开放单独的通道。通道应该是长时间存活的。
关闭Rabbitmq连接
Java客户端附带了以下实现:
默认只有一个主机地址是走DnsRecordIpAddressResolver
不等待服务器响应
在涉及多线程/进程的应用中,为每个线程/进程开启一个通道是很常见的,并且这些通道不能被线程/进程共享。
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout了如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了
通道和并发
一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
生产者:
//如果使用默认交换机,传入空字符串即可
//对于默认交换机而言,路由key就是绑定到其上的队列名
channel.basicPublish("",QUEUE_NAME,true,null,"你好,我是生产者".getBytes(StandardCharsets.UTF_8));
头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值等。
文章为作者独立观点,不代表股票交易接口观点