博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ学习笔记3-使用topic交换器
阅读量:6871 次
发布时间:2019-06-26

本文共 7183 字,大约阅读时间需要 23 分钟。

topic的路由规则里使用【.】号分隔单词,使用【*】号匹配1个单词,使用【#】匹配多个.和多个*。

在下面的例子中:

logger.*可以匹配logger.error和logger.warning,但logger*.error只能匹配logger.error

logger#可以匹配到logger.error和logger.warning。

下面的例子使用topic接收警告、错误的日志,并根据匹配的路由规则发送给不同的Queue队列来处理的例子:

日志生产者SenderWithTopicExchange

1 package com.yzl.test2; 2  3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6  7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory;10 11 /**12  * 使用topic交换器发送消息13  * 分为警告和错误2种日志14  * @author: yzl15  * @date: 2016-10-2216  */17 public class SenderWithTopicExchange {18     //交换器名称19     private static final String EXCHANGE_NAME = "myTopicExchange";20     //路由键的前缀21     private static final String BASE_ROUTING_KEY = "logger.";22     23     public static void main(String[] args) throws Exception {24         //使用CountDownLatch控制2个线程一起运行25         final CountDownLatch cdl = new CountDownLatch(2);26         //连接到rabbitmq服务器27         ConnectionFactory factory = new ConnectionFactory();28         factory.setHost("localhost");29         Connection connection = factory.newConnection();30         //创建一个信道31         final Channel channel = connection.createChannel();32         //定义一个名字为topicExchange的topic类型的exchange33         channel.exchangeDeclare(EXCHANGE_NAME, "topic");34         35         ExecutorService pool = Executors.newFixedThreadPool(2);36         pool.submit(new Runnable() {37             @Override38             public void run() {39                 try {40                     cdl.await();41                     //发送警告日志,绑定路由键:logger.warning42                     String warningMsg = "warning message is :";43                     for(int i=1; i<800; i++){44                         System.out.println("发送警告消息:" + warningMsg+i);45                         channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "warning", null, (warningMsg+i).getBytes());46                         Thread.sleep(2000L);47                     }48                 } catch (Exception e) {49                     e.printStackTrace();50                 }51             }52         });53         pool.submit(new Runnable() {54             @Override55             public void run() {56                 try {57                     cdl.await();58                     //发送错误日志,绑定路由键:logger.error59                     String errorMsg = "error message is :";60                     for(int i=1; i<1000; i++){61                         System.out.println("发送错误消息:" + errorMsg+i);62                         channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "error", null, (errorMsg+i).getBytes());63                         Thread.sleep(2000L);64                     }65                 } catch (Exception e) {66                     e.printStackTrace();67                 }68             }69         });70         71         cdl.countDown();72         cdl.countDown();73     }74 }

消息消费者ReceiverWithTopicExchange

1 package com.yzl.test2; 2  3 import java.io.IOException; 4  5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.DefaultConsumer;10 import com.rabbitmq.client.Envelope;11 12 /**13  * 使用topic交换器接收消息14  * 15  * @author: yzl16  * @date: 2016-10-2217  */18 public class ReceiverWithTopicExchange {19     // 交换器名称20     private static final String EXCHANGE_NAME = "myTopicExchange";21 22     public static void main(String[] args) throws Exception {23         // 连接到rabbitmq服务器24         ConnectionFactory factory = new ConnectionFactory();25         factory.setHost("localhost");26         Connection connection = factory.newConnection();27         // 创建一个信道28         final Channel channel = connection.createChannel();29         // 定义一个名字为topicExchange的topic类型的exchange30         channel.exchangeDeclare(EXCHANGE_NAME, "topic");31         32         //定义接收警告消息的队列33         channel.queueDeclare("warningQueue", false, false, false, null);34         //定义接收错误消息的队列35         channel.queueDeclare("errorQueue", false, false, false, null);36         //定义接收所有级别日志消息的队列37         channel.queueDeclare("allLoggerQueue", false, false, false, null);38         39         //使用logger.warning路由键绑定myTopicExchange与warningQueue40         channel.queueBind("warningQueue", EXCHANGE_NAME, "logger.warning");41         //使用logger.error路由键绑定myTopicExchange与errorQueue42         channel.queueBind("errorQueue", EXCHANGE_NAME, "logger.error");43         //使用logger.*路由规则绑定myTopicExchange与allLoggerQueue44         channel.queueBind("allLoggerQueue", EXCHANGE_NAME, "logger.*");45         46         //只处理警告日志,使用手动ack确认47         channel.basicConsume("warningQueue", false, new DefaultConsumer(channel){48             @Override49             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)50                     throws IOException {51                 String msg = new String(body);52                 System.out.println("warningQueue accept a warning msg :" + msg);53                 channel.basicAck(envelope.getDeliveryTag(), false);54             }55         });56         //只处理错误日志,使用手动ack确认57         channel.basicConsume("errorQueue", false, new DefaultConsumer(channel){58             @Override59             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)60                     throws IOException {61                 String msg = new String(body);62                 System.out.println("errorQueue accept a error msg :" + msg);63                 channel.basicAck(envelope.getDeliveryTag(), false);64             }65         });66         //处理警告和错误日志,使用手动ack确认67         channel.basicConsume("allLoggerQueue", false, new DefaultConsumer(channel){68             @Override69             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)70                     throws IOException {71                 String msg = new String(body);72                 System.out.println("allLoggerQueue accept a logger msg :" + msg);73                 channel.basicAck(envelope.getDeliveryTag(), false);74             }75         });76     }77 }

结果输出:

发送警告消息:warning message is :1发送错误消息:error message is :1发送警告消息:warning message is :2发送错误消息:error message is :2发送错误消息:error message is :3发送警告消息:warning message is :3
allLoggerQueue accept a logger msg :error message is :1allLoggerQueue accept a logger msg :warning message is :1errorQueue accept a error msg :error message is :1warningQueue accept a warning msg :warning message is :1warningQueue accept a warning msg :warning message is :2errorQueue accept a error msg :error message is :2allLoggerQueue accept a logger msg :warning message is :2allLoggerQueue accept a logger msg :error message is :2allLoggerQueue accept a logger msg :warning message is :3errorQueue accept a error msg :error message is :3warningQueue accept a warning msg :warning message is :3allLoggerQueue accept a logger msg :error message is :3

消息处理流程:

转载地址:http://oesfl.baihongyu.com/

你可能感兴趣的文章
javascript中null与undefined的区别
查看>>
mysql之select(二)
查看>>
万能分页存储过程
查看>>
jQuery模板插件jsrender
查看>>
内部类概述
查看>>
linux ln 命令使用参数详解(ln -s 软链接)
查看>>
结队开发项目—NABC模型
查看>>
qt5.4解决输出中文乱码问题
查看>>
深入分析Java ClassLoader原理
查看>>
Vim编辑器
查看>>
Codevs 3304 水果姐逛水果街Ⅰ 线段树
查看>>
linux共享windows资料
查看>>
前端UI框架总结
查看>>
( component 标签元素,及其 :is 属性 )的使用样例(组件切换的一个简单样例,不过,最好使用动画来实现组件的切换)...
查看>>
这7个人生捷径,一定不要走!
查看>>
Koa2+Mysql搭建简易博客
查看>>
Atom 初识
查看>>
Servlet、Filter和Listener
查看>>
高中数学运算能力训练题【基础中阶高阶辅导】
查看>>
Bean的装配方式
查看>>