博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ中实现延时消息
阅读量:3655 次
发布时间:2019-05-21

本文共 6927 字,大约阅读时间需要 23 分钟。

平常项目中很多场景需要使用延时消息处理,例如订单超过多久没有支付需要取消等。如何在消息中间件RabbitMQ中实现该功能?下面描述下使用Dead Letter Exchange实现延时消息场景,当然会有别的其他实现方式。

1. 什么是Dead Letter Exchange?

RabbitMQ中通常消息被直接发送到队列中或者从Exchange中Route到队列上后,消息如果被消费者消费完毕并确认后消息就会从Broker中被删除。

如果存在以下三种情况,同时队列上设置了Dead Letter Exchange,消息会被转送到Dead Letter Exchange中。

  • 消息被拒绝(basicReject或者basicNack) requeue=false

  • 消息存活时间超过了TTL预设值(x-message-ttl)

  • 队列满了

Dead Letter Exchange像平常的Exchange一样,可以设置它的BuiltinExchangeType,也可以为它绑定队列。

这里我们可以通过设定Dead Letter Exchange,并为它绑定一个队列,然后定义Consumer消费这个队列,就可以达到处理延时消息的功能。

2. 代码实例

流程先

I. 定义消息生产者

/*** * 消息发送者 */static class NormalEXSend {    private Connection conn;    private Channel chnl;    public NormalEXSend(String tag) throws IOException, TimeoutException {        ConnectionFactory connFact = initConnFac();        conn = connFact.newConnection();        chnl = conn.createChannel();        // 定义正常工作Exchange        chnl.exchangeDeclare(WORKER_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 定义 dead letter exchange        chnl.exchangeDeclare(DELAY_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        Map
args = new HashMap<>(); args.put("x-message-ttl", 60000); // timeout 1min args.put("x-dead-letter-exchange", DELAY_EXCHANGE_NAME); args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY); // 定义正常工作Queue同时设置dead letter exchange chnl.queueDeclare(WORKER_QUEUE_NAME, false, false, false, args); // 绑定到正常工作Exchange chnl.queueBind(WORKER_QUEUE_NAME, WORKER_EXCHANGE_NAME, tag); } /** * 发送消息 * @param key * @param msg * @throws IOException */ public void send(String key, String msg) throws IOException { AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN; // send a message to a exchange chnl.basicPublish(WORKER_EXCHANGE_NAME, key, props, msg.getBytes()); System.out.println(String.format("[%s|%s|Sender] send 【%s】 to exchange:%s", Thread.currentThread().getName(), System.currentTimeMillis(), msg, WORKER_EXCHANGE_NAME)); }}

II. 定义延时消息处理者

其中receive方法中consumerhandleDelivery方法参数properties可以获取到消息的death原因properties.getHeaders().get("x-first-death-reason"),可能值rejected | expired | maxlen。此处可以根据判断此值去处理由于超时而引起death的消息(就是我们想要处理的延时消息)。

/**     * 延时消息处理者     */    static class DelayEXRecv {        private Connection conn;        private Channel chnl;        public DelayEXRecv() throws IOException, TimeoutException {            ConnectionFactory connFact = initConnFac();            conn = connFact.newConnection();            chnl = conn.createChannel();            // 定义延时消息队列            chnl.queueDeclare(DELAY_QUEUE_NAME, false, false, false, null);            // 绑定到延时消息Exchange            chnl.queueBind(DELAY_QUEUE_NAME, DELAY_EXCHANGE_NAME, DEAD_ROUTING_KEY);        }        /**         * 接收消息         * @throws IOException         */        public void receive() throws IOException {            chnl.basicQos(1);            // no auto ack            boolean autoAck = false;            chnl.basicConsume(DELAY_QUEUE_NAME, autoAck, new DefaultConsumer(chnl) {                public void handleDelivery(String consumerTag, Envelope envelope,                                           AMQP.BasicProperties properties, byte[] body) throws IOException {                    String message = new String(body, "UTF-8");                    // 打印出延时原因 rejected | expired | maxlen                    // 项目中可以根据原因处理目标消息                    System.out.println(String.format("[%s|%s|Delay_Receiver] received the delay msg 【%s】 from EXCHANGE: %s, the delay reason is: %s", Thread.currentThread().getName(), System.currentTimeMillis(), message, envelope.getExchange(), properties.getHeaders().get("x-first-death-reason")));                    // 确认消息                    chnl.basicAck(envelope.getDeliveryTag(), false);                }            });        }    }

III. 试验一把

 
private static final String WORKER_EXCHANGE_NAME = "exchange.worker";    private static final String DELAY_EXCHANGE_NAME = "exchange.delay";    private static final String WORKER_QUEUE_NAME = "queue.worker";    private static final String DELAY_QUEUE_NAME = "queue.delay";    private static final String DEAD_ROUTING_KEY = "dead.key.message";    public static void main(String[] args) {        ExecutorService exec = Executors.newFixedThreadPool(2);        exec.execute(new Runnable() {            @Override            public void run() {                try {                    String key = "worker";                    NormalEXSend sender = new NormalEXSend(key);                    for (int i =0; i < 5; i++) {                        sender.send(key, String.format("YaYYY, one message, No.:%s!", i));                        Thread.sleep(3000);                    }                } catch (IOException | TimeoutException | InterruptedException e) {                    e.printStackTrace();                }            }        });        exec.execute(new Runnable() {            @Override            public void run() {                try {                    DelayEXRecv receiver = new DelayEXRecv();                    receiver.receive();                    System.out.println(String.format("[%s|%s|Delay_Receiver] Starting the Delay Msg Receiver process...", Thread.currentThread().getName(), System.currentTimeMillis()));                } catch (IOException | TimeoutException e) {                    e.printStackTrace();                }            }        });        exec.shutdown();    }

IV. 打印结果

[pool-1-thread-2|1515750089010|Delay_Receiver] Starting the Delay Msg Receiver process...[pool-1-thread-1|1515750089020|Sender] send 【YaYYY, one message, No.:0!】 to exchange:exchange.worker[pool-1-thread-1|1515750092020|Sender] send 【YaYYY, one message, No.:1!】 to exchange:exchange.worker[pool-1-thread-1|1515750095020|Sender] send 【YaYYY, one message, No.:2!】 to exchange:exchange.worker[pool-1-thread-1|1515750098021|Sender] send 【YaYYY, one message, No.:3!】 to exchange:exchange.worker[pool-1-thread-1|1515750101022|Sender] send 【YaYYY, one message, No.:4!】 to exchange:exchange.worker[pool-2-thread-4|1515750149038|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:0!】 from EXCHANGE: exchange.delay, the delay reason is: expired[pool-2-thread-5|1515750152035|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:1!】 from EXCHANGE: exchange.delay, the delay reason is: expired[pool-2-thread-6|1515750155035|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:2!】 from EXCHANGE: exchange.delay, the delay reason is: expired[pool-2-thread-7|1515750158036|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:3!】 from EXCHANGE: exchange.delay, the delay reason is: expired[pool-2-thread-8|1515750161036|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:4!】 from EXCHANGE: exchange.delay, the delay reason is: expired

可以看出消息是在制定延时的1min后才被获取消费。

Yayy, 至此结束。

参考:http://www.rabbitmq.com/dlx.html

转载地址:http://mkcfn.baihongyu.com/

你可能感兴趣的文章
前端实现视频在线预览插件之video.js上手
查看>>
【Unity】删除所有子物体保留父物体的2种方式
查看>>
基本组件操作
查看>>
Time模块
查看>>
InputModule
查看>>
Unity3D Waypoint (路点)
查看>>
同步延时问题解决方案
查看>>
面试题总结
查看>>
简易小地图制作重点
查看>>
Lua中的元表元方法
查看>>
第九章 质量与变更管理
查看>>
Rabbitmq高级特性及集群
查看>>
RocketMq入门
查看>>
RocketMQ高级原理详解
查看>>
RocketMQ应用
查看>>
kafka搭建与使用
查看>>
docke学习内容之二
查看>>
SpringDataJpa学习一
查看>>
springboot中的日志框架
查看>>
springboot的MVC自动配置
查看>>