Springboot整合Rabbitmq
生产者
首先在pom文件中导入启动器:spring-boot-starter-amqp
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
编写配置文件 application.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| //生产者和消费者都必须配置的基本连接信息 spring.rabbitmq.host=www.chenghao.work spring.rabbitmq.username=test spring.rabbitmq.password=test spring.rabbitmq.virtual-host=/test spring.rabbitmq.connection-timeout=3000ms
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
|
编写生产者代码
1 2 3 4 5 6 7 8 9 10 11 12
| public class Send { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(Person person){ CorrelationData correlationData = new CorrelationData(person.getUsername() + "" + new Date().getTime());
rabbitTemplate.convertAndSend("rabbitmq-test","chenghao",person,correlationData); } }
|
编写对应的测试类
1 2 3 4 5 6 7 8 9 10 11 12
| @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private Send send; @Test public void send() { send.sendMsg(new Person("chenghao",26)); }
} 点击test运行,200无问题
|
上面代码测试来看,好像能运行,没有什么问题,但我们连交换机都没有注册,让我们加入mq的消息确认机制进一步查看
加入消息确认机制ConfirmCallback 这一步确认消息是否到达rabbitmq服务器,但也就是只确认是否正确到达 Exchange 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean isack, String cause) { System.out.println(correlationData); System.out.println("ack:" + isack); if (!isack) { System.err.println(cause); } } }; public void sendMsg(Person person){ CorrelationData correlationData = new CorrelationData(person.getUsername() + "" + new Date().getTime()); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.convertAndSend("rabbitmq-test","chenghao",person,correlationData); } 控制台结果: CorrelationData [id=chenghao1566232281561] ack:false no exchange 'rabbitmq-test' in vhost '/chenghao'
|
根据控制台错误信息发现没有交换机,手动后台管理界面添加交换机,在用debug执行单元测试
1 2 3
| 控制台结果: CorrelationData [id=chenghao1566232516637] ack:true
|
但是这仅仅只能表示消息现在能够进入交换机中,我们继续加入ReturnCallback,表示当交换机找不到队列回调默认方法(相当于投递到了邮局也有可能找不到收件人地址)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) { System.err.println("Code:" + replyCode + ",Text:" + replyText ); System.err.println("Exchange:" + exchange + ",RoutingKey:" + routingkey ); } };
public void sendMsg(Person person){ CorrelationData correlationData = new CorrelationData(person.getUsername() + "" + new Date().getTime()); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.convertAndSend("rabbitmq-test","chenghao",person,correlationData); } 控制台结果: Code:312,Text:NO_ROUTE Exchange:rabbitmq-test,RoutingKey:chenghao CorrelationData [id=chenghao1566231274946] ack:true
|
消息并没有进入队列中,我们通过rabbitmq后台管理手动添加queue,添加上了交换机和队列,测试通过,查看后台管理界面,消息已经存入队列等待消费
消费者
消费端和生产端一样,首先导入依赖,编写配置文件,在配置文件中基本连接信息不变,补充
1 2 3 4 5 6
| spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
|
消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Component public class MessageConsumer {
@RabbitListener( bindings = @QueueBinding( value = @Queue(value="rabbitmq-consumer" , durable="true"), exchange = @Exchange(value = "rabbitmq-test") , key = "chenghao" ) ) public void handleMessage(@Payload Person person , Channel channel , @Headers Map<String,Object> headers) { System.out.println(person.getUsername() + "-" + person.getAge()); Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); try { channel.basicAck(tag , false); } catch (IOException e) { e.printStackTrace(); } } } 控制台输出: chenghao-29
|
生产者代码链接product
消费者代码链接consumer
queue,exhange管理
通过上一篇springboot整合rabbitmq中已经提到,在发送端如果不创建声明队列和交换机,会出现错误,这个时候我们可以去管理控制台手动创建对应的交换机和队列解决这个问题,但我认为项目中还是加入config写清楚注释方便管理和后来接手的开发人员,在rabbitmq官网也建议在生产端和消费端都进行声明
管理交换机exhangeConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Configuration public class ExchangeConfig {
public static final String TEST_EXCHANGE = "test";
@Bean public DirectExchange testExchange(){ return new DirectExchange(TEST_EXCHANGE,true,false); } }
|
管理队列QueueConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
@Configuration public class QueueConfig {
private static final String TEST_QUEUE = "test";
@Bean public Queue testQueue(){ return new Queue(TEST_QUEUE,true); } }
|
将交换机和队列进行绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@Configuration public class RabbitmqConfig {
@Resource private QueueConfig queueConfig; @Resource private ExchangeConfig exchangeConfig;
public static final String TEST_ROUTINGKEY = "test";
@Bean public Binding bing_test(){ return BindingBuilder.bind(queueConfig.testQueue()) .to(exchangeConfig.testExchange()) .with(TEST_ROUTINGKEY); }
}
|
测试
1 2 3 4 5 6 7 8 9
| @Test public void send() throws Exception{ send.sendMsg(new Person("chenghao",18)); TimeUnit.SECONDS.sleep(5); } 运行结果: CorrelationData [id=chenghao1567846017404] ack:true 消息也成功接收到了
|
此处为什么加入休眠,因为测试方法结束,rabbitmq相关的资源也就关闭了,虽然我们的消息发送出去,但异步的ConfirmCallback却由于资源关闭