Springboot整合Rabbitmq

生产者

首先在pom文件中导入启动器:spring-boot-starter-amqp

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

编写配置文件 application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//生产者和消费者都必须配置的基本连接信息
spring.rabbitmq.host=www.chenghao.work
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtual-host=/test
spring.rabbitmq.connection-timeout=3000ms

#producer特有,消费者中不添加
#confirmlistener mq接收到消息,会进行回调
spring.rabbitmq.publisher-confirms=true
#returnlistener 如果没有任何队列接收到消息,会退回
spring.rabbitmq.publisher-returns=true
# 如果为false,消息无法到达就会抛弃
spring.rabbitmq.template.mandatory=true

编写生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
public class Send  {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送消息
public void sendMsg(Person person){
//CorrelationData对象的作用是作为消息的附加信息传递,我们用它来保存消息的自定义id
CorrelationData correlationData = new CorrelationData(person.getUsername() + "" + new Date().getTime());

//第一个参数rabbitmq-test 交换机名称 第二个参数:路由名称 第三个参数:具体消息 第四个参数:消息的附加信息
rabbitTemplate.convertAndSend("rabbitmq-test","chenghao",person,correlationData);
}
}

编写对应的测试类

1
2
3
4
5
6
7
8
9
10
11
12
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
private Send send;
@Test
public void send() {
send.sendMsg(new Person("chenghao",26));
}

}
点击test运行,200无问题

上面代码测试来看,好像能运行,没有什么问题,但我们连交换机都没有注册,让我们加入mq的消息确认机制进一步查看


加入消息确认机制ConfirmCallback 这一步确认消息是否到达rabbitmq服务器,但也就是只确认是否正确到达 Exchange 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//加入消息确认机制  这一步确认消息是否到达rabbitmq服务器,但也就是只确认是否正确到达 Exchange 中
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

/**
* CorrelationData 消息的附加信息,即自定义id
* isack 代表消息是否被rabbitmq接收 true 代表接收 false代表拒收。
* cause 如果拒收cause则说明拒收的原因,帮助我们进行后续处理
*/
@Override
public void confirm(CorrelationData correlationData, boolean isack, String cause) {
System.out.println(correlationData);
System.out.println("ack:" + isack);
if (!isack) {
System.err.println(cause);
}
}
};
//发送消息时将刚创建的confirmCallback 赋值给rabbitTemplate
public void sendMsg(Person person){
////CorrelationData对象的作用是作为消息的附加信息传递,我们用它来保存消息的自定义id
CorrelationData correlationData = new CorrelationData(person.getUsername() + "" + new Date().getTime());
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.convertAndSend("rabbitmq-test","chenghao",person,correlationData);
}
控制台结果:
CorrelationData [id=chenghao1566232281561]
ack:false
no exchange 'rabbitmq-test' in vhost '/chenghao'

根据控制台错误信息发现没有交换机,手动后台管理界面添加交换机,在用debug执行单元测试

1
2
3
控制台结果:
CorrelationData [id=chenghao1566232516637]
ack:true

但是这仅仅只能表示消息现在能够进入交换机中,我们继续加入ReturnCallback,表示当交换机找不到队列回调默认方法(相当于投递到了邮局也有可能找不到收件人地址)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) {
System.err.println("Code:" + replyCode + ",Text:" + replyText );
System.err.println("Exchange:" + exchange + ",RoutingKey:" + routingkey );
}
};
//发送消息
public void sendMsg(Person person){
////CorrelationData对象的作用是作为消息的附加信息传递,我们用它来保存消息的自定义id
CorrelationData correlationData = new CorrelationData(person.getUsername() + "" + new Date().getTime());
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend("rabbitmq-test","chenghao",person,correlationData);
}
控制台结果:
Code:312,Text:NO_ROUTE
Exchange:rabbitmq-test,RoutingKey:chenghao
CorrelationData [id=chenghao1566231274946]
ack:true

消息并没有进入队列中,我们通过rabbitmq后台管理手动添加queue,添加上了交换机和队列,测试通过,查看后台管理界面,消息已经存入队列等待消费


消费者

消费端和生产端一样,首先导入依赖,编写配置文件,在配置文件中基本连接信息不变,补充

1
2
3
4
5
6
#手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#最小的消费者数量,一开始默认为1个消费者
spring.rabbitmq.listener.simple.concurrency=1
#最大的消费者数量,如果消息过多,最多创建5个消费者
spring.rabbitmq.listener.simple.max-concurrency=5

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class MessageConsumer {

//@RabbitListener注解用于声明式定义消息接受的队列与exhcange绑定的信息
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value="rabbitmq-consumer" , durable="true"),
exchange = @Exchange(value = "rabbitmq-test") ,
key = "chenghao"
)
)
//@Payload 代表运行时将消息反序列化后注入到后面的参数中
public void handleMessage(@Payload Person person , Channel channel ,
@Headers Map<String,Object> headers) {
System.out.println(person.getUsername() + "-" + person.getAge());
//所有消息处理后必须进行消息的ack,channel.basicAck()
Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
try {
//false表示不批量接收
channel.basicAck(tag , false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
控制台输出:
chenghao-29

生产者代码链接product
消费者代码链接consumer


queue,exhange管理

通过上一篇springboot整合rabbitmq中已经提到,在发送端如果不创建声明队列和交换机,会出现错误,这个时候我们可以去管理控制台手动创建对应的交换机和队列解决这个问题,但我认为项目中还是加入config写清楚注释方便管理和后来接手的开发人员,在rabbitmq官网也建议在生产端和消费端都进行声明


管理交换机exhangeConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 交换机配置类
* @author chenghao
* @date 2019/9/7
*/
@Configuration
public class ExchangeConfig {
/**
* 用于测试的交换机
*/
public static final String TEST_EXCHANGE = "test";

/**
* 用与测试的交换机
* 第一个参数交换机名称,第二个参数是否持久化,第三个参数是否自动删除
* @return
*/
@Bean
public DirectExchange testExchange(){
return new DirectExchange(TEST_EXCHANGE,true,false);
}
}

管理队列QueueConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 队列配置类
* @author chenghao
* @date 2019/9/7
*/
@Configuration
public class QueueConfig {
/**
* 测试队列名称
*/
private static final String TEST_QUEUE = "test";

/**
* 用于和测试交换机绑定
* 第一个参数表示队列名称,第二个参数表示是否持久化
*/
@Bean
public Queue testQueue(){
return new Queue(TEST_QUEUE,true);
}
}

将交换机和队列进行绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 设定路由键绑定交换机和队列
* @author chenghao
* @date 2019/9/7
*/
@Configuration
public class RabbitmqConfig {

@Resource
private QueueConfig queueConfig;
@Resource
private ExchangeConfig exchangeConfig;

public static final String TEST_ROUTINGKEY = "test";

/**
* 绑定test交换机和test队列,路由键test
* @return
*/
@Bean
public Binding bing_test(){
return BindingBuilder.bind(queueConfig.testQueue())
.to(exchangeConfig.testExchange())
.with(TEST_ROUTINGKEY);
}

}

测试

1
2
3
4
5
6
7
8
9
@Test
public void send() throws Exception{
send.sendMsg(new Person("chenghao",18));
TimeUnit.SECONDS.sleep(5);
}
运行结果:
CorrelationData [id=chenghao1567846017404]
ack:true
消息也成功接收到了

此处为什么加入休眠,因为测试方法结束,rabbitmq相关的资源也就关闭了,虽然我们的消息发送出去,但异步的ConfirmCallback却由于资源关闭