博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
订阅多个主题_SpringBoot整合Redis,怎么实现发布/订阅?
阅读量:6914 次
发布时间:2019-06-27

本文共 3309 字,大约阅读时间需要 11 分钟。

86fb053338015666fc93709e4a3944b6.png

一、简介

  • 1、发布订阅
    SUBSCRIBE, UNSUBSCRIBE 和 PUBLISH 实现了 发布/订阅消息范例,发送者 (publishers) 不用编程就可以向特定的接受者发送消息 (subscribers). Rather, 发布的消息进入通道,不需要知道有没有订阅者. 订阅者发表感兴趣的一个或多个通道,并且只接受他们感兴趣的消息,不管发布者是不是存在. 发布者和订阅者的解耦可以允许更大的伸缩性和更多动态的网络拓扑。

二、注入消息发布/订阅

  • 1、添加消息监听器
/** * 消息监听 * 

* 可以传入多个 MessageListenerAdapter */@BeanRedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 监听所有库的key过期事件 container.setConnectionFactory(connectionFactory); // 可以添加多个 messageListener,配置不同的通道 container.addMessageListener(messageListenerAdapter, new PatternTopic("user")); return container;}

所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(“user”),表示发布的主题信息

  • 小插曲
    前面我们学习了监听 key 过期事件,如果我们只需要监听当前库的 key 过期事件,可以这样写:
@Value("${spring.redis.database}")public String redisDatabaseIndex;

先拿到我们项目中使用的 Redis 的库索引

// 监听当前库的key过期container.addMessageListener(messageListenerAdapter, new PatternTopic("__keyevent@" + redisDatabaseIndex + "__:expired"));

然后使用发布/订阅模式,订阅主题为:keyevent@0:expired 的消息,则表示订阅数据库索引为 0 的 key 过期事件,监听所有的库则为:keyevent@*:expired

  • 2、绑定消息处理器
/** * 消息监听器适配器,绑定消息处理器 * 

* 可以配置多个 listenerAdapter,监听不同的通道 */@BeanMessageListenerAdapter listenerAdapter(RedisMessageListener receiver) { return new MessageListenerAdapter(receiver, "onMessage");}

也就是说,当我们订阅的频道,当有消息进来时,指定它的处理类以及处理方法

三、注入消息处理器

上面我们已经注入了 RedisMessageListener 消息处理器,并指定了处理方法 onMessage(),代码如下:

package com.zyxx.common.redis;import lombok.extern.slf4j.Slf4j;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.stereotype.Component;/** * Redis 消息接收 * * @Author Lizhou **/@Slf4j@Componentpublic class RedisMessageListener implements MessageListener {    @Override    public void onMessage(Message message, byte[] pattern) {        // 接收的topic        log.info("channel:" + new String(pattern));        // 消息的POJO        log.info("message:" + message.toString());    }}

需要实现 MessageListener 接口,重写 onMessage() 方法,然后就可以获取到通道以及消息了,从而进行我们的一些业务逻辑处理

四、操作API

在 RedisUtils 中,我们增加一个操作方法

/** * 向通道发布消息 */public boolean convertAndSend(String channel, Object message) {    if (StringUtils.isBlank(channel)) {        return false;    }    try {        template.convertAndSend(channel, message);        log.info("发送消息成功,channel:{},message:{}", channel, message);        return true;    } catch (Exception e) {        log.info("发送消息失败,channel:{},message:{}", channel, message);        e.printStackTrace();    }    return false;}

这里的 channel 相当于 我们存入数据的时候的 key,如果该通道不存在,则会新建一个通道

五、测试

  • 1、测试用例
package com.zyxx.redistest;import com.zyxx.redistest.common.RedisUtils;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass RedisTestApplicationTests {    @Autowired    private RedisUtils redisUtil;    @Test    void contextLoads() {        String message = "Hello World!";        // 发送消息        redisUtil.convertAndSend("user", message);    }}

我们向通道 user 发送了一条 “Hello World!” 的消息

  • 2、测试结果

930ea03b3541aaa832ad8025a78609ad.png

可以看出,我们的消息发送成功,再看控制台

d1be9aff8e01c7bc52358a11e9994b95.png

我们接收到通道 user 发送了一条 “Hello World!” 的消息

作者:Asurplus、
原文链接: https:// lizhou.blog.csdn.net/ar ticle/details/109238701
你可能感兴趣的文章
ReactNative WebView组件详解
查看>>
iOS -- 拨打电话
查看>>
模仿CyclicBarrier,自定义自己屏障类
查看>>
Vue+Vue-router微信分享功能
查看>>
1.数码相框-相框框架分析(1)
查看>>
Javascript中的原型继承具体解释
查看>>
Python基础之(三)----PyGame安装步骤
查看>>
MYSQL SHOW VARIABLES简介
查看>>
Win8Metro(C#)数字图像处理--2.8图像线性变换
查看>>
解决eclipse不识别Android手机的问题
查看>>
axel命令 文件下载
查看>>
python基础训练题1-列表操作
查看>>
编程学习资源
查看>>
selenium+python自动化95-弹出框死活定位不到
查看>>
[Asp.net core]使用Polly网络请求异常重试
查看>>
user-agent
查看>>
C#使用Xamarin开发可移植移动应用(1.入门与Xamarin.Forms页面),附源码
查看>>
java 正则例子
查看>>
SpringBoot乱码
查看>>
MySQL远程连接失败(错误码:2003)
查看>>