`
taiwei.peng
  • 浏览: 228721 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Rabbit Mq Topic 交换机

阅读更多
1.TopicRabbitConfig
package com.soft.rabbit.server.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {

public final static String man = "topic.man";
public final static String woman = "topic.woman";

@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}

@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}

// 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
// 这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}

// 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}

}

2.rabbitconfig
package com.soft.rabbit.server.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback:     " + "相关数据:" + correlationData);
System.out.println("ConfirmCallback:     " + "确认情况:" + ack);
System.out.println("ConfirmCallback:     " + "原因:" + cause);
}
});

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange,String routingKey) {
System.out.println("ReturnCallback:     " + "消息:" + message);
System.out.println("ReturnCallback:     " + "回应码:" + replyCode);
System.out.println("ReturnCallback:     " + "回应信息:" + replyText);
System.out.println("ReturnCallback:     " + "交换机:" + exchange);
System.out.println("ReturnCallback:     " + "路由键:" + routingKey);
}
});

return rabbitTemplate;
}

}


3.TopicService
package com.soft.rabbit.server.service;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicService {

@Autowired
    private AmqpTemplate rabbitTemplate;

    public String sendTopicMessage1() {
    String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: MAN";
    String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    Map<String, Object> manMap = new HashMap<String, Object>();
        manMap.put("messageId", messageId);
        manMap.put("messageData", messageData);
        manMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
        return "ok";
    }
   
    public String sendTopicMessage2() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        Map<String, Object> womanMap = new HashMap<String, Object>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
        return "ok";
    }

}

4.消费者TopicReceiver
package com.soft.rabbit.client.service;

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.man")
public class TopicReceiver {

@RabbitHandler
public void process(Map testMessage) {
        System.out.println("TopicManReceiver消费者收到消息  : " + testMessage.toString());
    }

}

5.消费者TopicTotalReceiver
package com.soft.rabbit.client.service;

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {

@RabbitHandler
public void process(Map testMessage) {
        System.out.println("TopicTotalReceiver消费者收到消息  : " + testMessage.toString());
    }

}


6.生产者bootstrap.yml
#http端口配置
server:
  port: 6001
  connection-timeout: 5000
  tomcat:
    max-http-post-size: -1
    max-threads: 1000
    max-connections: 1000

spring:
  application:
    name: soft-rabbit-server
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #消息确认配置项
    #确认消息已发送到交换机(Exchange)
    publisher-confirms: true
    #publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true
               
# eureka注册中心配置
eureka:
  client:
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:7001/eureka/
  instance:
    instance-id: ${spring.cloud.client.ip-address}:${server.port}
    prefer-ip-address: true  
    hostname: ${spring.cloud.client.ip-address}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics