动力节点首页 全国咨询热线:400-8080-105

绑定手机号,登录
手机号

验证码

微信登录
手机号登录
手机号

验证码

微信登录与注册
微信扫码登录与注册

扫码关注微信公众号完成登录与注册
手机号登录
首页 > 文章

RabbitMQ监控:监控队列状态

07-16 11:23 1094浏览
举报 T字号
  • 大字
  • 中字
  • 小字

验证RabbitMQ健康运行只是确保消息通信架构可靠性的一部分,同时,你也需要确保消息通信结构配置没有遭受意外修改,从而避免应用消息丢失。

RabbitMQ Management HTTP API提供了一个方法允许你查看任何vhost上的任何队列:/api/queues//。你不仅可以查看配置详情,还可以查看队列的数据统计,例如队列消耗的内存,或者队列的平均消息吞吐量。使用curl测试一下该API,这里的/%2F还是代表默认的vhost(/)。

curl -u guest:guest http://127.0.0.1:15672/api/queues/%2F/springrabbitexercise 
response
{
    "consumer_details": [
        {
            "channel_details": {
                "peer_host": "127.0.0.1",
                "peer_port": 62679,
                "connection_name": "127.0.0.1:62679 -> 127.0.0.1:5672",
                "user": "guest",
                "number": 2,
                "node": "rabbit@localhost",
                "name": "127.0.0.1:62679 -> 127.0.0.1:5672 (2)"
            },
            "arguments": [],
            "prefetch_count": 1,
            "ack_required": true,
            "exclusive": false,
            "consumer_tag": "amq.ctag-YImeU8Fm_VahDpxv8EAw2Q",
            "queue": {
                "vhost": "/",
                "name": "springrabbitexercise"
            }
        }
    ],
    "messages_details": {
        "rate": 7357
    },
    "messages": 232517,
    "messages_unacknowledged_details": {
        "rate": 0.2
    },
    "messages_unacknowledged": 5,
    "messages_ready_details": {
        "rate": 7356.8
    },
    "messages_ready": 232512,
    "reductions_details": {
        "rate": 1861021.8
    },
    "reductions": 58754154,
    ...
    "auto_delete": false,
    "durable": true,
    "vhost": "/",
    "name": "springrabbitexercise",
    "message_bytes_persistent": 2220250,
    "message_bytes_ram": 2220250,
    "message_bytes_unacknowledged": 40,
    "message_bytes_ready": 2220210,
    "message_bytes": 2220250,
    "messages_persistent": 232517,
    "messages_unacknowledged_ram": 5,
    "messages_ready_ram": 232512,
    "messages_ram": 232517,
    "garbage_collection": {
        "minor_gcs": 0,
        "fullsweep_after": 65535,
        "min_heap_size": 233,
        "min_bin_vheap_size": 46422,
        "max_heap_size": 0
    },
    "state": "running"
}

为了方便阅读,去掉了部分返回值,但是还是可以看到队列的很多信息。例如可以看到一个consumer的信息、消息占用的内存、队列的durable、auto_delete属性等。利用这些配置信息,新的健康监控程序可以通过API方法的输出来轻松监控队列的属性,并在发生变更时通知你。

就像之前编写健康检测程序那样,除了服务器、端口、vhost、用户名和密码之外,还需要知道:

* 队列的名称,以便监控其配置

* 该队列是否将durable和auto_delete选项打开

###清单3.1 检测队列配置

下面代码中的@Data和@Slf4j都是插件lombok中的注解

1.定义查看队列信息的接口RMQResource.java

@Path("api")
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
public interface RMQResource {    
    /**
     * Return a queue`s info
     *
     * @param vhost
     * @param name
     * @return {@link QueueInfo}
     */
    @GET
    @Path("queues/{vhost}/{name}")
    Response getQueueInfo(@PathParam("vhost") String vhost, @PathParam("name") String name);
}

2.定义查看队列接口的返回值QueueInfo.java

@Data
public class QueueInfo { 
    private ConsumerDetails[] consumer_details; 
    /**
     * unknown class
     */
    @JsonIgnore
    private Object[] incoming; 
    /**
     * unknown class
     */
    @JsonIgnore
    private Object[] deliveries; 
    /**
     * unknown class
     */
    @JsonIgnore
    private Object arguments; 
    private Boolean exclusive;    
    //...
    private Boolean auto_delete; 
    private Boolean durable; 
    private String vhost; 
    private String name; 
    /**
     * unknown class
     */
    @JsonIgnore
    private Object head_message_timestamp; 
    /**
     * unknown class
     */
    @JsonIgnore
    private Object recoverable_slaves; 
    private Long memory; 
    private Double consumer_utilisation; 
    private Integer consumers; 
    /**
     * unknown class
     */
    @JsonIgnore
    private Object exclusive_consumer_tag; 
    /**
     * unknown class
     */
    @JsonIgnore
    private Object policy; 
    @JsonFormat(pattern = "yyyy-MM-dd hh:mm:ss")
    private Date idle_since;
}

3.检测队列配置QueueConfigCheck.java

/**
 * 检测队列配置
 */ 
@Slf4j
public class QueueConfigCheck { 
    private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class); 
    public static void checkQueueConfig(String vhost, CheckQueue queue) {
        RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
        String host = config.getHost();
        Response response = null;
        try {
            response = rmqResource.getQueueInfo(vhost, queue.getQueue_name());
        } catch (Exception e) {
            log.error("UNKNOWN: Could not connect to {}, cause {}", host, e.getMessage());
            ExitUtil.exit(ExitType.UNKNOWN.getValue());
        }
        if (response == null || response.getStatus() == 404) {
            log.error("CRITICAL: Queue {} does not exist.", queue.getQueue_name());
            ExitUtil.exit(ExitType.CRITICAL.getValue());
        } else if (response.getStatus() > 299) {
            log.error("UNKNOWN: Unexpected API error : {}", response);
            ExitUtil.exit(ExitType.UNKNOWN.getValue());
        } else {
            QueueInfo info = response.readEntity(QueueInfo.class);
            if (!info.getAuto_delete().equals(queue.getAuto_delete())) {
                log.warn("WARN: Queue {} - auto_delete flag is NOT {}", queue.getQueue_name(), info.getAuto_delete());
                ExitUtil.exit(ExitType.WARN.getValue());
            }
            if (!info.getDurable().equals(queue.getDurable())) {
                log.warn("WARN: Queue {} - durable flag is NOT {}", queue.getQueue_name(), info.getDurable());
                ExitUtil.exit(ExitType.WARN.getValue());
            }
        }
        log.info("OK: Queue {} configured correctly.", queue.getQueue_name());
        ExitUtil.exit(ExitType.OK.getValue());
    }
}

4.检测队列配置的方法参数 CheckQueue.java @Data public class CheckQueue {

    private final String queue_name; 
    private final Boolean auto_delete; 
    private final Boolean durable; 
    public CheckQueue(String queue_name, Boolean auto_delete, Boolean durable) {
        this.queue_name = queue_name;
        this.auto_delete = auto_delete;
        this.durable = durable;
    }
}

5.运行检测程序

@Test
public void testQueueConfig() {
    String queue_name = "springrabbitexercise";
    Boolean auto_delete = false;
    Boolean durable = true;
    String vhost = "/";
    CheckQueue queue = new CheckQueue(queue_name, auto_delete, durable);
    QueueConfigCheck.checkQueueConfig(vhost, queue);
}

可以看到监控正常运行:

11:38:23.286 [main] INFO com.lanxiang.rabbitmqmonitor.check.QueueConfigCheck - OK: Queue springrabbitexercise configured correctly.

11:38:23.289 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK

这段RabbitMQ队列检测的程序有一处修改,如果健康检测程序无法连接到API服务器的话,会返回EXIT_UNKNOWN。健康检测要么成功要么失败,故障代码之间没有区别,但是队列检测API方法在失败时通过HTTP状态码提供了更多信息。如果HTTP状态码是404就代表尝试验证的队列不存在,检测失败并返回EXIT_CRITICAL。对于其他大于299的HTTP状态码,退出代码为EXIT_UNKNOWN。

在获取到RabbitMQ API的response之后,使用JSON进行解码,并且把得到的durable和auto_delete参数与期望的参数进行比较,如果参数和预期不相符的话,返回EXIT_WARNING或者EXIT_CRITICAL状态码。如果队列所有的配置都正确的话,那么就正确退出。

在了解我们对RabbitMQ做监控的原理之后,可以根据RabbitMQ Management HTTP API定制更多的监控,例如:

* /api/nodes,可以获取集群中每个节点的数据

* /api/queues/<vhost>/<queue>,可以获取队列的详细情况,例如消息处理的速率、积压的消息数量等。

动力节点在线课程涵盖零基础入门,高级进阶,在职提升三大主力内容,覆盖Java从入门到就业提升的全体系学习内容。全部Java视频教程免费观看,相关学习资料免费下载!对于火爆技术,每周一定时更新!如果想了解更多相关技术,可以到动力节点在线免费观看RabbitMQ视频教程学习哦!

0人推荐
共同学习,写下你的评论
0条评论
代码小兵279
程序员代码小兵279

75篇文章贡献270037字

相关课程 更多>

作者相关文章更多>

推荐相关文章更多>

Java面试题及答案整理

提枪策马乘胜追击04-21 20:01

Spring常见面试题

代码小兵92504-17 16:07

Java零基础实战项目——五子棋

代码小兵98804-25 13:57

Java string类详解

杨晶珍05-11 14:54

6道经典算法面试题

杨晶珍05-12 16:39

发评论

举报

0/150

取消