在介绍RabbitMQ之前实现要介绍一下MQ,MQ是什么?MQ全称是Message Queue,可以理解为消息队列的意思,简单来说就是消息以管道的方式进行传递。RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言的。
在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中,而不是像几年前一样页面卡死或报错给用户。像这种排队结算就用到了消息队列机制,放入通道里面一个一个结算处理,而不是某个时间断突然涌入大批量的查询新增把数据库给搞宕机,所以RabbitMQ本质上起到的作用就是削峰填谷,为业务保驾护航。
现在的市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,那问题来了为什么要选择RabbitMQ?
在了解消息通讯之前首先要了解3个概念:生产者、消费者和代理。
首先你必须连接到Rabbit才能发布和消费消息,那怎么连接和发送消息的呢?
你的应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,有点像程序连接数据库,使用Java有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。 信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。
对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。
想要真正的了解Rabbit有些名词是你必须知道的。包括:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。
Rabbit队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足4个条件:
Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。
消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。所以使用者要根据自己的情况,选择适合自己的方式。
1.创建maven的java项目,在pom.xml中引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
2.实现连接
java实现代码分为两个类,第一个是创建Rabbit连接,第二是应用类使用最简单的方式发布和消费消息。连接分为两种方式:
方式一:
public static Connection GetRabbitConnection() {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(Config.UserName);
factory.setPassword(Config.Password);
factory.setVirtualHost(Config.VHost);
factory.setHost(Config.Host);
factory.setPort(Config.Port);
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
方式二:
public static Connection GetRabbitConnection2() {
ConnectionFactory factory = new ConnectionFactory();
// 连接格式:amqp://userName:password@hostName:portNumber/virtualHost
String uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port,
Config.VHost);
Connection conn = null;
try {
factory.setUri(uri);
factory.setVirtualHost(Config.VHost);
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
3.发布消息和消费消息
使用最简单的方式发布和消费消息
public static void main(String[] args) {
Publisher(); // 推送消息
Consumer(); // 消费消息
}
/**
* 推送消息
*/
public static void Publisher() {
// 创建一个连接
Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
if (conn != null) {
try {
// 创建通道
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(Config.QueueName, false, false, false, null);
String content = String.format("当前时间:%s", new Date().getTime());
// 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-routing headers,此属性为MessageProperties.PERSISTENT_TEXT_PLAIN用于设置纯文本消息存储到硬盘;参数四:消息主体】
channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));
System.out.println("已发送消息:" + content);
// 关闭连接
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消费消息
*/
public static void Consumer() {
// 创建一个连接
Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
if (conn != null) {
try {
// 创建通道
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(Config.QueueName, false, false, false, null);
// 创建订阅器,并接受消息
channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey(); // 队列名称
String contentType = properties.getContentType(); // 内容类型
String content = new String(body, "utf-8"); // 消息正文
System.out.println("消息正文:" + content);
channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Publisher(); // 推送消息
Consumer(); // 消费消息
}
/**
* 推送消息
*/
public static void Publisher() {
// 创建一个连接
Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
if (conn != null) {
try {
// 创建通道
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(Config.QueueName, false, false, false, null);
String content = String.format("当前时间:%s", new Date().getTime());
// 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-routing headers,此属性为MessageProperties.PERSISTENT_TEXT_PLAIN用于设置纯文本消息存储到硬盘;参数四:消息主体】
channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));
System.out.println("已发送消息:" + content);
// 关闭连接
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消费消息
*/
public static void Consumer() {
// 创建一个连接
Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
if (conn != null) {
try {
// 创建通道
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(Config.QueueName, false, false, false, null);
// 创建订阅器,并接受消息
channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey(); // 队列名称
String contentType = properties.getContentType(); // 内容类型
String content = new String(body, "utf-8"); // 消息正文
System.out.println("消息正文:" + content);
channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码小兵86504-19 19:55
代码小兵49806-21 15:40
代码小兵27905-08 15:05
代码小兵99203-29 13:44
代码小兵27905-08 16:00