topic的路由规则里使用【.】号分隔单词,使用【*】号匹配1个单词,使用【#】匹配多个.和多个*。
在下面的例子中:
logger.*可以匹配logger.error和logger.warning,但logger*.error只能匹配logger.error
logger#可以匹配到logger.error和logger.warning。
下面的例子使用topic接收警告、错误的日志,并根据匹配的路由规则发送给不同的Queue队列来处理的例子:
日志生产者SenderWithTopicExchange
1 package com.yzl.test2; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory;10 11 /**12 * 使用topic交换器发送消息13 * 分为警告和错误2种日志14 * @author: yzl15 * @date: 2016-10-2216 */17 public class SenderWithTopicExchange {18 //交换器名称19 private static final String EXCHANGE_NAME = "myTopicExchange";20 //路由键的前缀21 private static final String BASE_ROUTING_KEY = "logger.";22 23 public static void main(String[] args) throws Exception {24 //使用CountDownLatch控制2个线程一起运行25 final CountDownLatch cdl = new CountDownLatch(2);26 //连接到rabbitmq服务器27 ConnectionFactory factory = new ConnectionFactory();28 factory.setHost("localhost");29 Connection connection = factory.newConnection();30 //创建一个信道31 final Channel channel = connection.createChannel();32 //定义一个名字为topicExchange的topic类型的exchange33 channel.exchangeDeclare(EXCHANGE_NAME, "topic");34 35 ExecutorService pool = Executors.newFixedThreadPool(2);36 pool.submit(new Runnable() {37 @Override38 public void run() {39 try {40 cdl.await();41 //发送警告日志,绑定路由键:logger.warning42 String warningMsg = "warning message is :";43 for(int i=1; i<800; i++){44 System.out.println("发送警告消息:" + warningMsg+i);45 channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "warning", null, (warningMsg+i).getBytes());46 Thread.sleep(2000L);47 }48 } catch (Exception e) {49 e.printStackTrace();50 }51 }52 });53 pool.submit(new Runnable() {54 @Override55 public void run() {56 try {57 cdl.await();58 //发送错误日志,绑定路由键:logger.error59 String errorMsg = "error message is :";60 for(int i=1; i<1000; i++){61 System.out.println("发送错误消息:" + errorMsg+i);62 channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "error", null, (errorMsg+i).getBytes());63 Thread.sleep(2000L);64 }65 } catch (Exception e) {66 e.printStackTrace();67 }68 }69 });70 71 cdl.countDown();72 cdl.countDown();73 }74 }
消息消费者ReceiverWithTopicExchange
1 package com.yzl.test2; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.DefaultConsumer;10 import com.rabbitmq.client.Envelope;11 12 /**13 * 使用topic交换器接收消息14 * 15 * @author: yzl16 * @date: 2016-10-2217 */18 public class ReceiverWithTopicExchange {19 // 交换器名称20 private static final String EXCHANGE_NAME = "myTopicExchange";21 22 public static void main(String[] args) throws Exception {23 // 连接到rabbitmq服务器24 ConnectionFactory factory = new ConnectionFactory();25 factory.setHost("localhost");26 Connection connection = factory.newConnection();27 // 创建一个信道28 final Channel channel = connection.createChannel();29 // 定义一个名字为topicExchange的topic类型的exchange30 channel.exchangeDeclare(EXCHANGE_NAME, "topic");31 32 //定义接收警告消息的队列33 channel.queueDeclare("warningQueue", false, false, false, null);34 //定义接收错误消息的队列35 channel.queueDeclare("errorQueue", false, false, false, null);36 //定义接收所有级别日志消息的队列37 channel.queueDeclare("allLoggerQueue", false, false, false, null);38 39 //使用logger.warning路由键绑定myTopicExchange与warningQueue40 channel.queueBind("warningQueue", EXCHANGE_NAME, "logger.warning");41 //使用logger.error路由键绑定myTopicExchange与errorQueue42 channel.queueBind("errorQueue", EXCHANGE_NAME, "logger.error");43 //使用logger.*路由规则绑定myTopicExchange与allLoggerQueue44 channel.queueBind("allLoggerQueue", EXCHANGE_NAME, "logger.*");45 46 //只处理警告日志,使用手动ack确认47 channel.basicConsume("warningQueue", false, new DefaultConsumer(channel){48 @Override49 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)50 throws IOException {51 String msg = new String(body);52 System.out.println("warningQueue accept a warning msg :" + msg);53 channel.basicAck(envelope.getDeliveryTag(), false);54 }55 });56 //只处理错误日志,使用手动ack确认57 channel.basicConsume("errorQueue", false, new DefaultConsumer(channel){58 @Override59 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)60 throws IOException {61 String msg = new String(body);62 System.out.println("errorQueue accept a error msg :" + msg);63 channel.basicAck(envelope.getDeliveryTag(), false);64 }65 });66 //处理警告和错误日志,使用手动ack确认67 channel.basicConsume("allLoggerQueue", false, new DefaultConsumer(channel){68 @Override69 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)70 throws IOException {71 String msg = new String(body);72 System.out.println("allLoggerQueue accept a logger msg :" + msg);73 channel.basicAck(envelope.getDeliveryTag(), false);74 }75 });76 }77 }
结果输出:
发送警告消息:warning message is :1发送错误消息:error message is :1发送警告消息:warning message is :2发送错误消息:error message is :2发送错误消息:error message is :3发送警告消息:warning message is :3
allLoggerQueue accept a logger msg :error message is :1allLoggerQueue accept a logger msg :warning message is :1errorQueue accept a error msg :error message is :1warningQueue accept a warning msg :warning message is :1warningQueue accept a warning msg :warning message is :2errorQueue accept a error msg :error message is :2allLoggerQueue accept a logger msg :warning message is :2allLoggerQueue accept a logger msg :error message is :2allLoggerQueue accept a logger msg :warning message is :3errorQueue accept a error msg :error message is :3warningQueue accept a warning msg :warning message is :3allLoggerQueue accept a logger msg :error message is :3
消息处理流程: