动力节点首页 全国咨询热线:400-8080-105

绑定手机号,登录
手机号

验证码

微信登录
手机号登录
手机号

验证码

微信登录与注册
微信扫码登录与注册

扫码关注微信公众号完成登录与注册
手机号登录
首页 > 文章

SpringBoot整合RabbitMQ

08-02 11:37 586浏览
举报 T字号
  • 大字
  • 中字
  • 小字

1.引入相关依赖

<dependencies>
    <!--amqp-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--web-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--test-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.配置RabbitMQ

首先应当确保你已安装了RabbitMQ

查看RabbitMQ自动配置类RabbitAutoConfiguration:

@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {

其中@EnableConfigurationProperties(RabbitProperties.class)是RabbitMQ的相关属性配置。

点进去RabbitProperties.class:

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
	private static final int DEFAULT_PORT = 5672;
	private static final int DEFAULT_PORT_SECURE = 5671;
	/**
	 * RabbitMQ host. Ignored if an address is set.
	 */
	private String host = "localhost";
	/**
	 * RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is
	 * enabled.
	 */
	private Integer port;
	/**
	 * Login user to authenticate to the broker.
	 */
	private String username = "guest";
	/**
	 * Login to authenticate against the broker.
	 */
	private String password = "guest";

我们可以通过spring.rabbitmq,在application.yml文件中配置相关的属性,比如host、port、username、password。

在application.yml配置RabbitMQ:

spring:
  #rabbitmq的相关配置
  rabbitmq:
    host: 192.168.204.131
    port: 5672
    username: guest
    password: guest

继续查看RabbitAutoConfiguration:

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);
    return template;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
    return new RabbitAdmin(connectionFactory);
}

发现其向容器中注入了两个组件:RabbitTemplate和AmqpAdmin,这两个组件有什么作用呢?

RabbitTemplate:可以发送消息、接收消息。

AmqpAdmin操作Exchange、Queue、Binding等,比如创建、删除、解绑。

(1)测试RabbitTemplate

首先在容器中通过自动注入的方式获取RabbitTemplate,然后在测试类中测试:

@SpringBootTest
class SpringBoot02AmqpApplicationTests {
    @Autowired
    RabbitTemplate rabbitTemplate;
}

1)使用RabbitTemplate测试发送消息

send(String exchange, String routingKey, Message message):需要自己定义一个Message,比较麻烦。

convertAndSend(String exchange, String routingKey, Object object):只需要传入一个Object,自动序列化发送给rabbitmq,object默认被当成消息体。

//单播(点对点)发送。
@Test
public void testRabbitTemplate() {
    HashMap<String, Object> map = new HashMap<>();
    map.put("name", "zhangsan");
    map.put("age", 22);
    rabbitTemplate.convertAndSend("exchange.direct","aiguigu.news",map);
}

这种方式在接收端接收的数据是这样式的:

rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAADdAAEbmFtZXQA CHpoYW5nc2FudAAEbGlzdHNyABpqYXZhLnV0aWwuQXJyYXlzJEFycmF5TGlzdNmkPL7NiAbSAgABWwABYXQAE1tMamF2YS9sYW5nL09iamVjdDt4cHVyABdb TGphdmEuaW8uU2VyaWFsaXphYmxlO67QCaxT1+1JAgAAeHAAAAADdAAEaGFoYXNyABFqYXZhLmxhbmcuSW50ZWdlchLioKT3gYc4AgABSQAFdmFsdWV4cgAQ amF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAKac3IAEWphdmEubGFuZy5Cb29sZWFuzSBygNWc+u4CAAFaAAV2YWx1ZXhwAXQAA2FnZXNxAH4ACwAA
ABZ4

这是由于默认使用的是jdk的序列化方式,那么如何将消息转化为json格式的数据发送出去?接下来自定义使用Jackson2JsonMessageConverter的消息转化器。

自定义MessageConverter配置:

@Configuration
@EnableRabbit   //开启基于注解的rabbitmq
public class MyAMQPConfig {
    /**
     * 设置自定义的 MessageConverter
     * 使用Jackson2JsonMessageConverter消息转换器
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

然后再次测试,在接收端接收的数据如下:

{"name":"zhangsan","list":["haha",666,true],"age":22}

再发送个对象试试:

Book book = new Book("西游记", "吴承恩");
rabbitTemplate.convertAndSend("exchange.direct", "aiguigu.news", book);

使用自定义的消息转化器之后,接收端数据:

{"bookName":"西游记","author":"吴承恩"}

2)使用RabbitTemplate测试接收消息

receiveAndConvert(String queueName):接收队列名称为queueName的消息。

//接收数据
@Test
public void testReceive() {
    Object o = rabbitTemplate.receiveAndConvert("aiguigu.news");
    System.out.println(o.getClass());
    System.out.println(o);
    // 接收map
    // class java.util.HashMap
    // {name=zhangsan, list=[haha, 666, true], age=22}
    // 接收book对象
    // class com.example.bean.Book
    // Book{bookName='西游记', author='吴承恩'}
}

(2)测试AmqpAdmin

removeBinding(Binding binding):解除某个bingding

@Test
public void testRemoveBinding() {
    //解除某个bingding
    amqpAdmin.removeBinding(new Binding("declaredQueue", Binding.DestinationType.QUEUE,"amqpAdmin_direct.exchange", "amqp.haha", null));
}

deleteExchange(String s):删除指定的exchange.

boolean deleteExchange = amqpAdmin.deleteExchange("amqpAdmin_direct.exchange");
System.out.println(deleteExchange); //true

deleteQueue(String s):删除指定Queue

boolean deleteQueue = amqpAdmin.deleteQueue("declaredQueue");
System.out.println("deleteQueue:"+deleteQueue); //true

getQueueInfo(String s),获取指定队列的信息。

@Test
public void getQueueInformation() {
    QueueInformation queueInformation = amqpAdmin.getQueueInfo("declaredQueue");
    int consumerCount = queueInformation.getConsumerCount();
    int messageCount = queueInformation.getMessageCount();
    String name = queueInformation.getName();
    System.out.println("consumerCount:" + consumerCount);   //0
    System.out.println("messageCount:" + messageCount); //0
    System.out.println("name:" + name); //declaredQueue
}

declareExchange(Exchange exchange):声明一个exchange.

/**
     * 以declare开头的是创建组件。
     * declareExchange(Exchange exchange):声明一个exchange
     * Exchange是一个接口,其实现类有:
     * 1.DirectExchange
     * 2.FanoutExchange
     * 3.TopicExchange
     * 4.HeadersExchange
     * 5.CustomExchange
     */
@Test
public void testCreateExchange() {
    //创建一个Exchange
    amqpAdmin.declareExchange(new DirectExchange("amqpAdmin_direct.exchange"));
    System.out.println("创建完成!");
    //创建一个queue
    String declaredQueue = amqpAdmin.declareQueue(new Queue("declaredQueue"));
    System.out.println("declaredQueue:" + declaredQueue);   //declaredQueue
    //创建绑定规则
    amqpAdmin.declareBinding(new Binding("declaredQueue", Binding.DestinationType.QUEUE,
                                         "amqpAdmin_direct.exchange", "amqp.haha", null));
} 

3.监听消息队列中的内容

使用@EnableRabbit+@RabbitListener监听消息队列中的内容。

@EnableRabbit:表示开启基于注解的rabbitmq。

@RabbitListener:表示监听某个队列的内容。

@Service
public class BookServiceImpl implements BookService {
    /**
     * 注解:@RabbitListener(queues = "aiguigu.news"),表示监听aigui.news这个队列的内容。
     *
     * @param book
     */
    @RabbitListener(queues = "aiguigu.news")
    @Override
    public void receive(Book book) {
        System.out.println("收到aiguigu.news消息:" + book);
    }
    /**
     * 接收消息的第二种方式:
     *
     * @param message
     */
    @RabbitListener(queues = "aiguigu")
    @Override
    public void receive(Message message) {
        //获取消息体
        byte[] body = message.getBody();
        System.out.println(body);   //[B@fe4bdc2
        //获得消息属性
        MessageProperties properties = message.getMessageProperties();
        System.out.println(properties);
        /*
        MessageProperties [headers={__TypeId__=com.example.bean.Book}, contentType=application/json,
        contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0,
        redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=aiguigu, deliveryTag=1,
        consumerTag=amq.ctag-ynkD05MwnffSCo9h7W5DGA, consumerQueue=aiguigu]
         */
    }
}

实现的效果,当给某个exchange发送消息的之后,exchange按照binding规则将消息分发给对应的队列,使用 @RabbitListener可以监听到这个队列的消息,就可以获取消息进行相应的操作。

动力节点在线课程涵盖零基础入门,高级进阶,在职提升三大主力内容,覆盖Java从入门到就业提升的全体系学习内容。全部Java视频教程免费观看,相关学习资料免费下载!对于火爆技术,每周一定时更新!如果想了解更多相关技术,可以到动力节点在线免费观看SpringBoot基础教程哦!

0人推荐
共同学习,写下你的评论
0条评论
代码小兵652
程序员代码小兵652

113篇文章贡献392215字

作者相关文章更多>

推荐相关文章更多>

重启Docker容器命令

代码小兵87208-09 15:04

SpringBoot是什么框架

代码小兵99203-29 17:29

简述SpringBoot与Spring区别

杨晶珍08-02 11:09

使用Docker搭建开发环境

代码小兵98808-09 14:38

Docker集群详解

代码小兵28608-09 14:50

发评论

举报

0/150

取消