Redis 全面整合
Redis
一款基于内存的高性能非关系(NoSql)型数据库,以key-value作为存储形式,并支持丰富的存储数据结构(stirng,list,set,zset,hash 特殊结构:Geospatial,Hyperloglog,Bitmap)
Redis的数据是存在内存中的,因此它的读写速度非常快,每秒可以处理超过10万次读写操作,广泛应用于缓存,而它为什么快呢?
- 基于内存存储实现
- 高效的数据结构
- 采用单线程(6.0后引入多线程,但数据操作任然是单线程)
- 合理的线程模型
I/O 多路复用: 可以让单个线程高效的处理多个连接请求,而Redis使用用epoll作为I/O多路复用技术的实现。并且,Redis自身的事件处理模型将epoll中的连接、读写、关闭都转换为事件,不在网络I/O上浪费过多的时间
- 单线程模型
Redis是单线程模型的,而单线程避免了CPU不必要的上下文切换和竞争锁的消耗
Redis 6.0 引入了多线程提速,它的执行命令操作内存的仍然是个单线程
同时也拥有很多其他功能和使用场景:比如 分布式锁, LUA 脚本, 消息队列, 定时器, 全局计数器, 窗口滑动计时 等等
配置解析(redis.conf)
#1、是否开启守护线程,默认关闭,也就是指你启动redis的时候,停留在当前操作命令界面,没开启守护,退了服务就挂了,开启就不会挂,后台一直运行,(一般会打开)
daemonize no
#2、绑定的主机地址(想要远程连接需要注释掉)
bind 127.0.0.1
-----------以上两个基本都是会改的,下面按需选择-------------------
#3、当Redis以守护进程方式运行时,Redis默认会把pid写入/var/run/redis.pid文件,可以通过pidfile指定
pidfile /var/run/redis.pid
#4、指定Redis监听端口,默认端口为6379,作者在自己的一篇博文中解释了为什么选用6379作为默认端口,因为6379在手机按键上MERZ对应的号码,而MERZ取自意大利歌女Alessia Merz的名字
port 6379
#5、当 客户端闲置多长时间后关闭连接,如果指定为0,表示关闭该功能
timeout 300
#6、指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为verbose
loglevel verbose
#7、日志记录方式,默认为标准输出,如果配置Redis为守护进程方式运行,而这里又配置为日志记录方式为标准输出,则日志将会发送给/dev/null
logfile stdout
#8、设置数据库的数量,默认数据库为0,可以使用SELECT 命令在连接上指定数据库id
databases 16
#9、指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合save
Redis默认配置文件中提供了三个条件:
save 900 1
save 300 10
save 60 10000
分别表示900秒(15分钟)内有1个更改,300秒(5分钟)内有10个更改以及60秒内有10000个更
改。
#10、指定存储至本地数据库时是否压缩数据,默认为yes,Redis采用LZF压缩,如果为了节省CPU时
间,可以关闭该选项,但会导致数据库文件变的巨大
rdbcompression yes
#11、指定本地数据库文件名,默认值为dump.rdb
dbfilename dump.rdb
#12、指定本地数据库存放目录
dir ./
#13、设置当本机为slav服务时,设置master服务的IP地址及端口,在Redis启动时,它会自动从master进行数据同步
slaveof
#14、当master服务设置了密码保护时,slav服务连接master的密码
masterauth
#15、设置Redis连接密码,如果配置了连接密码,客户端在连接Redis时需要通过AUTH 命令提供密码,默认关闭
requirepass foobared
#16、设置同一时间最大客户端连接数,默认无限制,Redis可以同时打开的客户端连接数为Redis进程可以打开的最大文件描述符数,如果设置 maxclients 0,表示不作限制。当客户端连接数到达限制时,Redis会关闭新的连接并向客户端返回max number of clients reached错误信息
maxclients 128
#17、指定Redis最大内存限制,Redis在启动时会把数据加载到内存中,达到最大内存后,Redis会先尝试清除已到期或即将到期的Key,当此方法处理 后,仍然到达最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis新的vm机制,会把Key存放内存,Value会存放在swap区
maxmemory
#18、指定是否在每次更新操作后进行日志记录,Redis在默认情况下是异步的把数据写入磁盘,如果不开启,可能会在断电时导致一段时间内的数据丢失。因为 redis本身同步数据文件是按上面save条件来同步的,所以有的数据会在一段时间内只存在于内存中。默认为no
appendonly no
#19、指定更新日志文件名,默认为appendonly.aof
appendfilename appendonly.aof
#20、指定更新日志条件,共有3个可选值:
no:表示等操作系统进行数据缓存同步到磁盘(快)
always:表示每次更新操作后手动调用fsync()将数据写到磁盘(慢,安全)
everysec:表示每秒同步一次(折衷,默认值)
appendfsync everysec
#21、指定是否启用虚拟内存机制,默认值为no,简单的介绍一下,VM机制将数据分页存放,由Redis将访问量较少的页即冷数据swap到磁盘上********,访问多的页面由磁盘自动换出到内存中(在后面的文章我会仔细分析Redis的VM机制)
vm-enabled no
#22、虚拟内存文件路径,默认值为/tmp/redis.swap,不可多个Redis实例共享
vm-swap-file /tmp/redis.swap
#23、将所有大于vm-max-memory的数据存入虚拟内存,无论vm-max-memory设置多小,所有索引数据都是内存存储的(Redis的索引数据 就是keys),也就是说,当vm-max-memory设置为0的时候,其实是所有value都存在于磁盘。默认值为0
vm-max-memory 0
#24、Redis swap文件分成了很多的page,一个对象可以保存在多个page上面,但一个page上不能被多个对象共享,vm-page-size是要根据存储的 数据大小来设定的,作者建议如果存储很多小对象,page大小最好设置为32或者64bytes;如果存储很大大对象,则可以使用更大的page,如果不 确定,就使用默认值
vm-page-size 32
#25、设置swap文件中的page数量,由于页表(一种表示页面空闲或使用的bitmap)是在放在内存中的,在磁盘上每8个pages将消耗1byte的内存。
vm-pages 134217728
#26、设置访问swap文件的线程数,最好不要超过机器的核数,如果设置为0,那么所有对swap文件的操作都是串行的,可能会造成比较长时间的延迟。默认值为4
vm-max-threads 4
#27、设置在向客户端应答时,是否把较小的包合并为一个包发送,默认为开启
glueoutputbuf yes
#28、指定在超过一定的数量或者最大的元素超过某一临界值时,采用一种特殊的哈希算法
hash-max-zipmap-entries 64
hash-max-zipmap-value 512
#29、指定是否激活重置哈希,默认为开启(后面在介绍Redis的哈希算法时具体介绍)
activerehashing yes
30、指定包含其它的配置文件,可以在同一主机上多个Redis实例之间使用同一份配置文件,而同时各个实例又拥有自己的特定配置文件
include /path/to/local.conf
数据结构
这里会采用原生命令和springboot相结合的案例放出来,而springboot在用redistemplate存储对象的时候,一定要序列号,否则会出现异常,同时要配置一下序列化规则,方便正常阅读
KEY
#=================================
# EXISTS(是否存在) move(移除) expire(设置过期时间) ttl(查看过期时间) type(查看类型)
FLUSHALL(清空整个redis数据,慎用)
#=================================
# keys * 查看所有的key
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> set name test1
OK
127.0.0.1:6379> keys *
1) "name"
# exists key 的名字,判断某个key是否存在
127.0.0.1:6379> EXISTS name
(integer) 1
127.0.0.1:6379> EXISTS name1
(integer) 0
# move key db ---> 当前库就没有了,被移除了
127.0.0.1:6379> move name 1
(integer) 1
127.0.0.1:6379> keys *
(empty list or set)
# expire key 秒钟:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删
除。
# ttl key 查看还有多少秒过期,-1 表示永不过期,-2 表示已过期
127.0.0.1:6379> set name qinjiang
OK
127.0.0.1:6379> EXPIRE name 10
(integer) 1
127.0.0.1:6379> ttl name
(integer) 3
127.0.0.1:6379> ttl name
(integer) 2
127.0.0.1:6379> ttl name
(integer) 1
127.0.0.1:6379> ttl name
(integer) -2
127.0.0.1:6379> keys *
(empty list or set)
# type key 查看你的key是什么类型
127.0.0.1:6379> set name test1
OK
127.0.0.1:6379> get name
"test1"
127.0.0.1:6379> type name
string
# 测试环境清空所有数据
127.0.0.1:6379> FLUSHALL
OK
String
String类型是二进制安全的,意思是redis的string可以包含任何数据,比如jpg图片或者序列化的对象
String类型是redis最基本的数据类型,一个redis中字符串value最多可以是512M
# ===================================================
# set(设值)、get(取值)、del(删值)、append(追加值)、strlen(长度)
# ===================================================
127.0.0.1:6379> set key1 value1 # 设置值
OK
127.0.0.1:6379> get key1 # 获得key
"value1"
127.0.0.1:6379> del key1 # 删除key
(integer) 1
127.0.0.1:6379> keys * # 查看全部的key
(empty list or set)
127.0.0.1:6379> exists key1 # 确保 key1 不存在
(integer) 0
127.0.0.1:6379> append key1 "hello" # 对不存在的 key 进行 APPEND ,等同于 SET
key1 "hello"
(integer) 5 # 字符长度
127.0.0.1:6379> APPEND key1 "-2333" # 对已存在的字符串进行 APPEND
(integer) 10 # 长度从 5 个字符增加到 10 个字符
127.0.0.1:6379> get key1
"hello-2333"
127.0.0.1:6379> STRLEN key1 # # 获取字符串的长度
(integer) 10
#===================================================
# incr、decr 一定要是数字才能进行加减,+1 和 -1。
# incrby、decrby 命令将 key 中储存的数字加上指定的增量值。
# incrbyfloat 指定增加浮点数
#===================================================
127.0.0.1:6379> set views 0 # 设置浏览量为0
OK
127.0.0.1:6379> incr views # 浏览 + 1
(integer) 1
127.0.0.1:6379> incr views # 浏览 + 1
(integer) 2
127.0.0.1:6379> decr views # 浏览 - 1
(integer) 1
127.0.0.1:6379> incrby views 10 # +10
(integer) 11
127.0.0.1:6379> decrby views 10 # -10
(integer) 1
127.0.0.1:6379> incrbyfloat money 12.5
"12.5"
127.0.0.1:6379> incrbyfloat money 1.5
"14"
#===================================================
# range [范围]
# getrange 获取指定区间范围内的值,类似between...and的关系,从零到负一表示全部
#===================================================
127.0.0.1:6379> set key2 abcd123456 # 设置key2的值
OK
127.0.0.1:6379> getrange key2 0 -1 # 获得全部的值
"abcd123456"
127.0.0.1:6379> getrange key2 0 2 # 截取部分字符串
"abc"
#===================================================
#substr 截取
# setrange 修改指定区间范围内的值,格式是setrange key值 具体值
#===================================================
127.0.0.1:6379> substr user 0 3
"a234"
127.0.0.1:6379> get key2
"abcd123456"
127.0.0.1:6379> SETRANGE key2 1 xx # 替换值
(integer) 10
127.0.0.1:6379> get key2
"axxd123456"
#===================================================
# setex(set with expire)设值并设置过期时间(单位秒)
# psetex 设值并设置过期时间(单位毫秒)
# setnx(set if not exist) key不存在就插入,存在就返回0
# set key value [XX] key不存在就返回0,存在的时候就替换
#===================================================
127.0.0.1:6379> setex key3 60 value # 设值并设置过期时间
OK
127.0.0.1:6379> ttl key3 # 查看剩余的时间
(integer) 55
127.0.0.1:6379> setnx mykey "redis" # 如果不存在就设置,成功返回1
(integer) 1
127.0.0.1:6379> setnx mykey "mongodb" # 如果存在就设置,失败返回0
(integer) 0
127.0.0.1:6379> get mykey
"redis"
127.0.0.1:6379> set mykey1 "redis" xx # 如果不存在返回0
(integer) 0
127.0.0.1:6379> set mykey "mongodb" xx # 如果存在就替换
(integer) 1
127.0.0.1:6379> get mykey
"redis"
#===================================================
# mset Mset 命令用于同时设置一个或多个 key-value 对。
# mget Mget 命令返回所有(一个或多个)给定 key 的值。如果给定的 key 里面,有某个 key 不存在,那么这个 key 返回特殊值 nil
# msetnx 当所有 key 都成功设置,返回 1 。如果所有给定 key 都设置失败(至少有一个 key 已经存在),那么返回 0 。原子操作
#===================================================
127.0.0.1:6379> mset k10 v10 k11 v11 k12 v12
OK
127.0.0.1:6379> keys *
1) "k12"
2) "k11"
3) "k10"
127.0.0.1:6379> mget k10 k11 k12 k13
1) "v10"
2) "v11"
3) "v12"
4) (nil)
127.0.0.1:6379> msetnx k10 v10 k15 v15 # 原子性操作!
(integer) 0
127.0.0.1:6379> get key15
(nil)
# 传统对象缓存
set user:1 value(json数据)
# 可以用来缓存对象
mset user:1:name zhangsan user:1:age 2
mget user:1:name user:1:age
#===================================================
# getset(先get再set)
#===================================================
127.0.0.1:6379> getset db mongodb # 没有旧值,返回 nil
(nil)
127.0.0.1:6379> get db
"mongodb"
127.0.0.1:6379> getset db redis # 返回旧值 mongodb
"mongodb"
127.0.0.1:6379> get db
"redis"
应用场景:⼀般常⽤在需要计数的场景,⽐如⽤户的访问次数、热点⽂章的点赞转发数量, 粉丝数等等。
案例1:统计数量
@Controller
public class BbsTopicPvController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("/bbs/detail/{id}")
public String pvindex(@PathVariable("id") Integer id, ModelMap modelMap) {
Long increment = redisTemplate.opsForValue().increment("bbs:" + id);
modelMap.put("increment", increment);
return "pv01/index";
}
}
案例2:分布式全局id
#1:全局唯一性,不能出现重复的ID
#2:单调递增,保证下一个ID一定大于上一个ID
#3:范围趋势递增。在一个时间段内,生成的ID是递增趋势的比如:202012120001 202012120002…. 第二天的时候又要从1开始计数。202012130001 202012130002…..
#4:安全性,在不同的领域中我们有些业务不要出现连续的递增,可以很好的保护数据格式和形态。因为很容易让竞争对手套取数据。
@RestController
public class ProductController {
@Autowired
private SequenceUtils sequenceUtils;
@PostMapping("/product/creator2")
public R creatorIds2() {
// 1:创建产品的分布式全局ID
List<String> ids = new ArrayList<>();
for (int i = 0; i < 100; i++) {
String sequence = sequenceUtils.getSequence();
ids.add(sequence);
}
return R.ok().data("ids", ids);
}
}
@Component
public class CacheService {
@Autowired
RedisTemplate redisTemplate;
public Long getIncrementNum(String key) {
// 不存在准备创建 键值对
RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());//0
// 计数器累加
Long counter = entityIdCounter.incrementAndGet();
System.out.println("=========================>"+ counter);
if ((null counter || counter.longValue() 1)) {// 初始设置过期时间
System.out.println("设置过期时间为1天!");
// 设置清除的目的,是让每天的计数器都从0开始
entityIdCounter.expire(1, TimeUnit.DAYS);// 单位天
}
return counter;
}
}
@Component
public class SequenceUtils {
@Autowired
CacheService cacheService;
// 1:每天公司的订单数量,如果一天是几百写 3 几千就 4 几万 5
static final int DEFAULT_LENGTH = 3;
public String getSequence() {
// 1: 时间前缀
String currentDate = new SimpleDateFormat("yyyyMMdd").format(new Date());
// 2:redis递增获取每天的递增数量
Long num = cacheService.getIncrementNum("id:generator:order:" + currentDate);
// 3:获取递增长度,是否小于DEFAULT_LENGTH 如果小于就前面补零。如果大于就递增即可
String str = String.valueOf(num);
// 4:补零操作
int len = str.length();
// 4-1:是否小于DEFAULT_LENGTH 如果小于就前面补零。如果大于就递增即可
StringBuilder sb = new StringBuilder();
// 5:添加时间
sb.append(currentDate);
if (len >= DEFAULT_LENGTH) {
sb.append(str);
return sb.toString();
}
int rest = DEFAULT_LENGTH - len;
for (int i = 0; i < rest; i++) {
sb.append('0');
}
sb.append(str);
// 6: 时间+补零操作返回订单流水号
return sb.toString();
}
}
Hash
Redis hash 是一个String类型的field和value的映射表,hash特别适合用于存储对象
注意点:如果开发使用hgetall,哈希元素比较多的话,可能导致Redis阻塞,可以使用hscan。而如果只是获取部分field,建议使用hmget。
# ===================================================
# hset、hget 命令用于为哈希表中的字段赋值 。
# hmset、hmget 同时将多个field-value对设置到哈希表中。会覆盖哈希表中已存在的字段。
# hgetall 用于返回哈希表中,所有的字段和值。
# hdel 用于删除哈希表 key 中的一个或多个指定字段
# hscan 刷选哈希键中的键值对,语法:HSCAN key cursor [MATCH pattern] [COUNT count],count默认为10 参考[文章](https://blog.51cto.com/u_13581092/2149178)
# ===================================================
127.0.0.1:6379> hset myhash field1 "test1"
(integer) 1
127.0.0.1:6379> hget myhash field1
"test1"
127.0.0.1:6379> HMSET myhash field1 "Hello" field2 "World"
OK
127.0.0.1:6379> HGET myhash field1
"Hello"
127.0.0.1:6379> HGET myhash field2
"World"
127.0.0.1:6379> hgetall myhash
1) "field1"
2) "Hello"
3) "field2"
4) "World"
127.0.0.1:6379> HDEL myhash field1
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "World"
127.0.0.1:6379> HMSET site twle "twle.cn" bing "bing.com" qq "qq.com" baidu "baidu.com"
(integer) 4
127.0.0.1:6379> HSCAN site 0 match "t*"
1) "0"
2) 1) "twle"
2) "twle.cn"
# ===================================================
# hlen 获取哈希表中字段的数量。
# ===================================================
127.0.0.1:6379> hlen myhash
(integer) 1
127.0.0.1:6379> HMSET myhash field1 "Hello" field2 "World"
OK
127.0.0.1:6379> hlen myhash
(integer) 2
# ===================================================
# hexists 查看哈希表的指定字段是否存在。
# ===================================================
127.0.0.1:6379> hexists myhash field1
(integer) 1
127.0.0.1:6379> hexists myhash field3
(integer) 0
# ===================================================
# hkeys 获取哈希表中的所有域(field)。
# hvals 返回哈希表所有域(field)的值。
# ===================================================
127.0.0.1:6379> HKEYS myhash
1) "field2"
2) "field1"
127.0.0.1:6379> HVALS myhash
1) "World"
2) "Hello"
# ===================================================
# hincrby 为哈希表中的字段值加上指定增量值。
# ===================================================
127.0.0.1:6379> hset myhash field 5
(integer) 1
127.0.0.1:6379> HINCRBY myhash field 1
(integer) 6
127.0.0.1:6379> HINCRBY myhash field -1
(integer) 5
127.0.0.1:6379> HINCRBY myhash field -10
(integer) -5
#===================================================
# hsetnx 为哈希表中不存在的的字段赋值 。
# ===================================================
127.0.0.1:6379> HSETNX myhash field1 "hello"
(integer) 1 # 设置成功,返回 1 。
127.0.0.1:6379> HSETNX myhash field1 "world"
(integer) 0 # 如果给定字段已经存在,返回 0 。
127.0.0.1:6379> HGET myhash field1
"hello"
List
它的子元素底层实际是个双向链表
Redis列表是简单的字符串列表,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)
- 如果键不存在,创建新的链表
- 如果键已存在,新增内容
- 如果值全移除,对应的键也就消失了
- 链表的操作无论是头和尾效率都极高,但假如是对中间元素进行操作,效率就很惨淡了
一般应用在栈、队列、消息队列、列表消息等场景(支持分页)
# ===================================================
# Lpush:将一个或多个值插入到列表头部。(左)
# rpush:将一个或多个值插入到列表尾部。(右)
# lrange:返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定。
# 其中 0 表示列表的第一个元素, 1 表示列表的第二个元素,以此类推。
# 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此
类推。
# ===================================================
127.0.0.1:6379> LPUSH list "one"
(integer) 1
127.0.0.1:6379> LPUSH list "two"
(integer) 2
127.0.0.1:6379> RPUSH list "right"
(integer) 3
127.0.0.1:6379> Lrange list 0 -1
1) "two"
2) "one"
3) "right"
127.0.0.1:6379> Lrange list 0 1
1) "two"
2) "one"
# ===================================================
# lpop 命令用于移除并返回列表的第一个元素。当列表 key 不存在时,返回 nil 。
# rpop 移除列表的最后一个元素,返回值为移除的元素。
# ===================================================
127.0.0.1:6379> Lpop list
"two"
127.0.0.1:6379> Rpop list
"right"
127.0.0.1:6379> Lrange list 0 -1
1) "one"
# ===================================================
# Lindex,按照索引下标获得元素(-1代表最后一个,0代表是第一个)
# ===================================================
127.0.0.1:6379> Lindex list 1
(nil)
127.0.0.1:6379> Lindex list 0
"one"
127.0.0.1:6379> Lindex list -1
"one"
# ===================================================
# llen 用于返回列表的长度。
# ===================================================
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> Lpush list "one"
(integer) 1
127.0.0.1:6379> Lpush list "two"
(integer) 2
127.0.0.1:6379> Lpush list "three"
(integer) 3
127.0.0.1:6379> Llen list # 返回列表的长度
(integer) 3
# ===================================================
# lrem key 根据参数 COUNT 的值,移除列表中与参数 VALUE 相等的元素。
# ===================================================
127.0.0.1:6379> lrem list 1 "two"
(integer) 1
127.0.0.1:6379> Lrange list 0 -1
1) "three"
2) "one"
# ===================================================
# Ltrim key 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区
间之内的元素都将被删除。
# ===================================================
127.0.0.1:6379> RPUSH mylist "hello"
(integer) 1
127.0.0.1:6379> RPUSH mylist "hello"
(integer) 2
127.0.0.1:6379> RPUSH mylist "hello2"
(integer) 3
127.0.0.1:6379> RPUSH mylist "hello3"
(integer) 4
127.0.0.1:6379> ltrim mylist 1 2
OK
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) "hello2"
# ===================================================
# rpoplpush 移除列表的最后一个元素,并将该元素添加到另一个列表并返回。
# ===================================================
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "foo"
(integer) 2
127.0.0.1:6379> rpush mylist "bar"
(integer) 3
127.0.0.1:6379> rpoplpush mylist myotherlist
"bar"
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) "foo"
127.0.0.1:6379> lrange myotherlist 0 -1
1) "bar"
# ===================================================
# lset key index value 将列表 key 下标为 index 的元素的值设置为 value 。
# ===================================================
127.0.0.1:6379> exists list # 对空列表(key 不存在)进行 LSET
(integer) 0
127.0.0.1:6379> lset list 0 item # 报错
(error) ERR no such key
127.0.0.1:6379> lpush list "value1" # 对非空列表进行 LSET
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "value1"
127.0.0.1:6379> lset list 0 "new" # 更新值
OK
127.0.0.1:6379> lrange list 0 0
1) "new"
127.0.0.1:6379> lset list 1 "new" # index 超出范围报错
(error) ERR index out of range
# ===================================================
# linsert key before/after pivot value 用于在列表的元素前或者后插入元素。
# 将值 value 插入到列表 key 当中,位于值 pivot 之前或之后。
# ===================================================
redis> RPUSH mylist "Hello"
(integer) 1
redis> RPUSH mylist "World"
(integer) 2
redis> LINSERT mylist BEFORE "World" "There"
(integer) 3
redis> LRANGE mylist 0 -1
1) "Hello"
2) "There"
3) "World"
案例1:特价商品展示
商品存入缓存中
public class ProductListService {
@Autowired
private RedisTemplate redisTemplate;
@PostConstruct
public void initProducts() {
log.info("启动定时加载特价商品到redis的list中...");
new Thread(() -> runCourse()).start();
}
public void runCourse() {
while (true) {
// 1:从数据库中查询出特价商品
List<Product> productList = this.findProductsDB();
// 2:删除原来的特价商品
this.redisTemplate.delete("product:list");
// 3:把特价商品添加到集合中
this.redisTemplate.opsForList().leftPushAll("product:list", productList);
try {
// 4: 每隔一分钟执行一遍。
Thread.sleep(1000 * 60);
log.info("定时刷新特价商品....");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
/**
* 数据库中查询特价商品
*/
public List<Product> findProductsDB() {
List<Product> productList = new ArrayList<>();
for (long i = 1; i <= 100; i++) {
Product product = new Product();
product.setId((long) new Random().nextInt(1000));
product.setPrice((double) i);
product.setTitle("特价商品" + (i));
productList.add(product);
}
return productList;
}
}
从缓存中取数据
@RestController
public class ProductListController {
@Autowired
private RedisTemplate redisTemplate;
/**
* 查询产品信息
*
* @param pageNo
* @param pageSize
* @return
*/
@GetMapping("/findproducts")
public R findProducts(int pageNo, int pageSize) {
// 1: 计算分页的起始页
int start = (pageNo - 1) * pageSize;
// 2:计算分页的结束页
int end = start + pageSize - 1;
try {
// 3: 查询列表中对应的产品信息进行返回
List<Product> productList = this.redisTemplate.opsForList().range("product:list", start, end);
// 4: 如果缓存查询不到就去数据库查询
if (CollectionUtils.isEmpty(productList)) {
//todo: 查询数据库,存在缓存击穿的情况,大量的并发请求进来,可能把数据库冲垮,这里涉及到缓存击穿,穿透,雪崩等问题,后面有详情介绍
productList = productListService.findProductsDB();
}
// 5: 查询产品集合
return R.ok().data("products", productList);
} catch (Exception ex) {
//todo: 查询数据库,存在缓存击穿的情况,大量的并发请求进来,可能把数据库冲垮
return R.error().message("服务器忙!!!");
}
}
}
如上这种缓存击穿的解决方案之一,比较简单,就是存两份,一个master,一个salve,操作也是错开完成,这样就不会遇到直接冲到数据库的情况了,只贴改动的部分
public void runCourse() {
while (true) {
// 1:从数据库中查询出特价商品
List<Product> productList = this.findProductsDB();
// 2:删除Slave原来的特价商品 --- Slave集合 -- 注意一定要先缓存B。因为击穿了就会进行到B缓存,所以先让它去缓存数据
this.redisTemplate.delete("product:list:slave");
// 3:把特价商品添加到Slave集合中
this.redisTemplate.opsForList().leftPushAll("product:list:slave", productList);
// 4:删除Master原来的特价商品 --- Master集合
this.redisTemplate.delete("product:list:master");
// 5:把特价商品添加到Master集合中
this.redisTemplate.opsForList().leftPushAll("product:list:master", productList);
try {
// 6: 每隔一分钟执行一遍。
Thread.sleep(1000 * 60);
log.info("定时刷新特价商品....");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
@GetMapping("/findproducts/masterslave")
public R findProductsmasterslave(int pageNo, int pageSize) {
// 1: 计算分页的起始页
int start = (pageNo - 1) * pageSize;
// 2:计算分页的结束页
int end = start + pageSize - 1;
try {
// 3: 查询列表中对应的产品信息进行返回
List<Product> productList = this.redisTemplate.opsForList().range("product:list:master", start, end);
// 4: 如果master缓存查询不到就去salve缓存查询
if (CollectionUtils.isEmpty(productList)) {
// todo 存在缓存击穿的情况,就查询缓存B的数据
productList = this.redisTemplate.opsForList().range("product:list:slave", start, end);
}
// 5: 查询产品集合
return R.ok().data("products", productList);
} catch (Exception ex) {
//todo: 查询数据库,存在缓存击穿的情况,大量的并发请求进来,可能把数据库冲
return R.error().message("服务器忙!!!");
}
}
案例2:针对短时间中大量请求小数据体的解决思路
这里就是提供思路了,补贴代码,太长了
就比如只是增加浏览量或者点击量之类小数据体,在某时时间段会有很大的并发量进来,如果直接访问redis可能会导致cpu飙升,像这种不是强同步要求,且数据小,那么就可采用两级的缓存机制,什么意思呢?
就是一级缓存用程序中map存,二级缓存就是redis
然后以时间片作为间隔进行同步,具体步骤如下:
1,新建map对象,对增加请求进行判断,有就直接数量+1,没有就以id为key,值存1
#注释:Map<时间分片, Map<文章的ID, 文章的PV数>> PV_MAP = new ConcurrentHashMap<>();
Map<Long, Map<Integer, Integer>> PV_MAP = new ConcurrentHashMap<>();
2,定时任务,每隔一段时间(比如6分钟),去遍历一级缓存map,根据当前时间片去比较,小于5分钟的存入二级缓存redis中,然后再把map同步后的数据删掉
long mill1 = System.currentTimeMillis() / (1000 60 1);
3,和第二步同理,定时任务,定时同步到数据库中,但时间需要比上一步的时间长一点(7分钟),不然就同步空数据了
Set
Redis的Set是String类型的无序集合,它底层是通过HashTable实现的(不允许重复元素)
注意点:smembers和lrange、hgetall都属于比较重的命令,如果元素过多存在阻塞Redis的可能性,可以使用sscan来完成
应用场景:用户标签,生成随机数抽奖、社交需求
# ===================================================
# sadd 将一个或多个成员元素加入到集合中,不能重复
# smembers 返回集合中的所有的成员。
# sismember 命令判断成员元素是否是集合的成员。
# SSCAN key cursor [MATCH pattern] [COUNT count] 遍历集合中键的元素
- cursor - 游标
- pattern - 匹配的模式
- count - 指定从数据集里返回多少元素,默认值为 10
返回值: 数组列表
# ===================================================
127.0.0.1:6379> sadd myset "hello"
(integer) 1
127.0.0.1:6379> sadd myset "kuangshen"
(integer) 1
127.0.0.1:6379> sadd myset "kuangshen"
(integer) 0
127.0.0.1:6379> SMEMBERS myset
1) "kuangshen"
2) "hello"
127.0.0.1:6379> SISMEMBER myset "hello"
(integer) 1
127.0.0.1:6379> SISMEMBER myset "world"
(integer) 0
127.0.0.1:6379> SADD myset1 "Google"
(integer) 1
127.0.0.1:6379> SADD myset1 "Redis"
(integer) 1
127.0.0.1:6379> SADD myset1 "Taobao"
(integer) 1
127.0.0.1:6379> SSCAN myset1 0 match R*
1) "0"
2) 1) "Redis"
# ===================================================
# scard,获取集合里面的元素个数
# ===================================================
127.0.0.1:6379> scard myset
(integer) 2
# ===================================================
# srem key value 用于移除集合中的一个或多个成员元素
# ===================================================
127.0.0.1:6379> srem myset "kuangshen"
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "hello"
# ===================================================
# srandmember key 命令用于返回集合中的一个随机元素。
# ===================================================
127.0.0.1:6379> SMEMBERS myset
1) "kuangshen"
2) "world"
3) "hello"
127.0.0.1:6379> SRANDMEMBER myset
"hello"
127.0.0.1:6379> SRANDMEMBER myset 2
1) "world"
2) "kuangshen"
127.0.0.1:6379> SRANDMEMBER myset 2
1) "kuangshen"
2) "hello"
# ===================================================
# spop key 用于移除集合中的指定 key 的一个或多个随机元素
# ===================================================
127.0.0.1:6379> SMEMBERS myset
1) "kuangshen"
2) "world"
3) "hello"
127.0.0.1:6379> spop myset
"world"
127.0.0.1:6379> spop myset
"kuangshen"
127.0.0.1:6379> spop myset
"hello"
# ===================================================
# smove SOURCE DESTINATION MEMBER
# 将指定成员 member 元素从 source 集合移动到 destination 集合。
# ===================================================
127.0.0.1:6379> sadd myset "hello"
(integer) 1
127.0.0.1:6379> sadd myset "world"
(integer) 1
127.0.0.1:6379> sadd myset "kuangshen"
(integer) 1
127.0.0.1:6379> sadd myset2 "set2"
(integer) 1
127.0.0.1:6379> smove myset myset2 "kuangshen"
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "world"
2) "hello"
127.0.0.1:6379> SMEMBERS myset2
1) "kuangshen"
2) "set2"
# ===================================================
- 数字集合类
- 差集: sdiff / 多个集合的差集放入targetkey中: sdiffstore targetkey key1 key2 ….
- 交集: sinter / 多个集合的交集放入targetkey中: sinterstore targetkey key1 key2 ….
- 并集: sunion / 多个集合的并集放入targetkey中: sunionstore targetkey key1 key2 ….
# ===================================================
127.0.0.1:6379> sadd key1 "a"
(integer) 1
127.0.0.1:6379> sadd key1 "b"
(integer) 1
127.0.0.1:6379> sadd key1 "c"
(integer) 1
127.0.0.1:6379> sadd key2 "c"
(integer) 1
127.0.0.1:6379> sadd key2 "d"
(integer) 1
127.0.0.1:6379> sadd key2 "e"
(integer) 1
127.0.0.1:6379> SDIFF key1 key2 # 差集
1) "a"
2) "b"
127.0.0.1:6379> SINTER key1 key2 # 交集
1) "c"
127.0.0.1:6379> SUNION key1 key2 # 并集
1) "a"
2) "b"
3) "c"
4) "e"
5) "d"
案例1:年会抽奖
@Service
@Log4j2
public class RandomService2 {
@Autowired
private RedisTemplate redisTemplate;
public static final String RANDOM_SET_KEY = "pop:set";
// 1 : 初始化抽奖的信息
@PostConstruct
public void initData() {
log.info("初始化奖品等级信息...");
boolean flag = this.redisTemplate.hasKey(RANDOM_SET_KEY);
if (!flag) {
List<Integer> initDataList = initDataList();
initDataList.forEach(data -> this.redisTemplate.opsForSet().add(RANDOM_SET_KEY, data));
}
}
/**
* 模拟100用户抽奖
* 比如:公司搞年会,参与的小伙伴要进行抽奖,这个时候我们就把所有参与的小伙伴
* 加入到set集合中即可。比如把小伙伴的ID或者工号加入到集合中。
* @return
*/
private List<Integer> initDataList() {
List<Integer> listdata = new ArrayList<>();
for (int i = 0; i < 100; i++) {
listdata.add(i + 1);
}
return listdata;
}
}
@RestController
@Log4j2
public class RandomController2 {
@Autowired
private RandomService2 randomService2;
@Autowired
private RedisTemplate redisTemplate;
/**
* 抽奖
*
* @param num 是代表获取奖品的人数,比如1等奖多少人 2等奖多少人。。。。
* @return
*/
@GetMapping("/random/paynum")
public R randomData(Integer num) {
try {
String result = null;
// 1: 随机从集合中获取指定num数量的元素出来,并删除
List<Integer> persons = this.redisTemplate.opsForSet().pop(RandomService.RANDOM_SET_KEY, num);
return R.ok().data("persons", persons).message("抽到奖品的员工有:" + persons);
} catch (Exception ex) {
log.error("抽奖失败");
return R.error().message("服务忙!!!");
}
}
}
案例2:
实现共同好友,关注列表,粉丝列表,互相关注等等,,,直接调用相应api即可
Zset
Redis zset 和 set 一样,也是String类型元素的集合,且不允许重复的成员
不同的是每个元素都会关联一个double类型的分数,Redis正是通过分数来为集合中的成员进行从小到大的排序,zset的成员是唯一的,但是分数(Score)却可以重复
在set基础上,加一个score值。之前set是k1 v1 v2 v3,现在zset是 k1 score1 v1 score2 v2
最经典的应用场景就是:排行榜, 也可以实现滑动计数的效果
# ===================================================
# zadd 将一个或多个成员元素及其分数值加入到有序集当中。
# zrange 返回有序集中,指定区间内的成员
# ===================================================
127.0.0.1:6379> zadd myset 1 "one"
(integer) 1
127.0.0.1:6379> zadd myset 2 "two" 3 "three"
(integer) 2
127.0.0.1:6379> ZRANGE myset 0 -1
1) "one"
2) "two"
3) "three"
# ===================================================
# zrangebyscore 返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大)
次序排列。
# ===================================================
127.0.0.1:6379> zadd salary 2500 xiaoming
(integer) 1
127.0.0.1:6379> zadd salary 5000 xiaohong
(integer) 1
127.0.0.1:6379> zadd salary 500 kuangshen
(integer) 1
# Inf无穷大量+∞,同样地,-∞可以表示为-Inf。
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf # 显示整个有序集
1) "kuangshen"
2) "xiaoming"
3) "xiaohong"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores # 递增排列
1) "kuangshen"
2) "500"
3) "xiaoming"
4) "2500"
5) "xiaohong"
6) "5000"
127.0.0.1:6379> ZREVRANGE salary 0 -1 WITHSCORES # 递减排列
1) "xiaohong"
2) "5000"
3) "xiaoming"
4) "2500"
5) "kuangshen"
6) "500"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf 2500 WITHSCORES # 显示工资 <=2500
的所有成员
1) "kuangshen"
2) "500"
3) "xiaoming"
4) "2500"
# ===================================================
# zrem 移除有序集中的一个或多个成员
# ===================================================
127.0.0.1:6379> ZRANGE salary 0 -1
1) "kuangshen"
2) "xiaoming"
3) "xiaohong"
127.0.0.1:6379> zrem salary kuangshen
(integer) 1
127.0.0.1:6379> ZRANGE salary 0 -1
1) "xiaoming"
2) "xiaohong"
# ===================================================
# zcard 命令用于计算集合中元素的数量。
# ===================================================
127.0.0.1:6379> zcard salary
(integer) 2
OK
# ===================================================
# zcount 计算有序集合中指定分数区间的成员数量。
# ===================================================
127.0.0.1:6379> zadd myset 1 "hello"
(integer) 1
127.0.0.1:6379> zadd myset 2 "world" 3 "kuangshen"
(integer) 2
127.0.0.1:6379> ZCOUNT myset 1 3
(integer) 3
127.0.0.1:6379> ZCOUNT myset 1 2
(integer) 2
# ===================================================
# zrank 返回有序集中指定成员的排名。其中有序集成员按分数值递增(从小到大)顺序排列。
# ===================================================
127.0.0.1:6379> zadd salary 2500 xiaoming
(integer) 1
127.0.0.1:6379> zadd salary 5000 xiaohong
(integer) 1
127.0.0.1:6379> zadd salary 500 kuangshen
(integer) 1
127.0.0.1:6379> ZRANGE salary 0 -1 WITHSCORES # 显示所有成员及其 score 值
1) "kuangshen"
2) "500"
3) "xiaoming"
4) "2500"
5) "xiaohong"
6) "5000"
127.0.0.1:6379> zrank salary kuangshen # 显示 kuangshen 的薪水排名,最少
(integer) 0
127.0.0.1:6379> zrank salary xiaohong # 显示 xiaohong 的薪水排名,第三
(integer) 2
# ===================================================
# zrevrank 返回有序集中成员的排名。其中有序集成员按分数值递减(从大到小)排序。
# ===================================================
127.0.0.1:6379> ZREVRANK salary kuangshen #
(integer) 2
127.0.0.1:6379> ZREVRANK salary xiaohong # 小红第一
(integer) 0
案例1:热榜排行
步骤1:先初始化一个月历史的数据
@Service
@Log4j2
public class WeiboxInitService {
@Autowired
private RedisTemplate redisTemplate;
/**
* 先初始化一个月的历史数据
*/
public void initData() {
// 1: 计算当前的小时key
long hour = System.currentTimeMillis() / (1000 60 60);
// 2: 把近一个月的数据放入到zset中
for (int i = 1; i <= 24 * 30; i++) {
// 3: 为什么是hour-1 ,因为每个小时的key是一样的,多一个小时就是多1,倒推的话:当前小时减去1
String key = "rank:hour:" + (hour - i);
this.initBbsMember(key);
}
}
/**
* 初始化文章数据
*
* @param key
*/
public void initBbsMember(String key) {
Random random = new Random();
// 采用26个字母来实现排行榜,随机为每个字母生成一个随机数作为score,注意:我这里只是用26个字母代替完整的数据
// 在真实的开发中肯定是从数据库表吧文章对应的数据查询出来,在进行存储到zset中。
for (int i = 1; i <= 26; i++) {
this.redisTemplate.opsForZSet().add(key, String.valueOf((char) (96 + i)), random.nextInt(20));
}
}
}
步骤2:定时刷新数据
@Service
@Log4j2
public class WeiboxTaskService {
@Autowired
private RedisTemplate redisTemplate;
@PostConstruct
public void init() {
log.info("模拟微博排行榜单-启动初始化开始....");
// 1:定时5秒钟,模拟微博的热度刷新(比如:模拟点赞,收藏,评论数热度值的更新)
new Thread(() -> this.refreshDataHour()).start();
// 2:定时1个小时合并天、周、月的排行榜。
new Thread(() -> this.refreshAllData()).start();
}
// 定时五秒去刷新一次,模拟微博的点赞,评论,收藏数
public void refreshAllData() {
while (true) {
// 刷新天
this.refreshDay();
// 刷新周
this.refreshWeek();
// 刷新月
this.refreshMonth();
// 如果是在分布式系统中:建议使用xxljob来实现定时刷新
try {
Thread.sleep(1000 60 60);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
// 定时五秒去刷新一次,模拟微博的点赞,评论,收藏数
public void refreshDataHour() {
while (true) {
this.refreshHour();
// 如果是在分布式系统中:建议使用xxljob来实现定时刷新
try {
Thread.sleep(5000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
/*
采用26个字母实现排行,随机为每篇文章生成一个随机的score,并且模拟点赞,收藏和评论
*/
public void refreshHour() {
// 1: 计算当前的小时key
long hour = System.currentTimeMillis() / (1000 60 60);
// 2: 把近一个月的数据放入到zset中
Random random = new Random();
for (int i = 1; i <= 26; i++) {
this.redisTemplate.opsForZSet().incrementScore("rank:hour:" + hour, String.valueOf((char) (96 + i)), random.nextInt(10));
}
}
/*
刷新近30天的统计数
*/
public void refreshMonth() {
// 1: 计算当前的小时key
long hour = System.currentTimeMillis() / (1000 60 60);
List<String> otherkeys = new ArrayList<>();
// 2 :算出近24小时内的key
for (int i = 0; i < 24 * 30 - 1; i++) {
String key = "rank:hour:" + (hour - i);
otherkeys.add(key);
}
// 把当前的小时时间的key,并且往后推24*30-1小时,共计24*30小时,并求出并集存入"rank:month"中
this.redisTemplate.opsForZSet().unionAndStore("rank:hour:" + hour, otherkeys, "rank:month");
log.info("月刷新合并完成...");
}
/*
刷新近7天的统计数
*/
public void refreshWeek() {
// 1: 计算当前的小时key
long hour = System.currentTimeMillis() / (1000 60 60);
List<String> otherkeys = new ArrayList<>();
// 2 :算出近24小时内的key
for (int i = 0; i < 24 * 7 - 1; i++) {
String key = "rank:hour:" + (hour - i);
otherkeys.add(key);
}
// 把当前的小时时间的key,并且往后推24*7-1小时,共计24*7小时,并求出并集存入"rank:week"中
this.redisTemplate.opsForZSet().unionAndStore("rank:hour:" + hour, otherkeys, "rank:week");
log.info("天刷新合并完成...");
}
/*
刷新当天的统计数据
*/
public void refreshDay() {
// 1: 计算当前的小时key
long hour = System.currentTimeMillis() / (1000 60 60);
List<String> otherkeys = new ArrayList<>();
// 2 :算出近24小时内的key
for (int i = 1; i <= 23; i++) {
String key = "rank:hour:" + (hour - i);
otherkeys.add(key);
}
//3:把当前的时间key和后推的23小时的key,求出并集存入到rank:day集合中
// 这里就是利用redis的zunionstore把当前时间和24小时之前时间
this.redisTemplate.opsForZSet().unionAndStore("rank:hour:" + hour, otherkeys, "rank:day");
//4:设置当天的key,40天过期,不然历史数据浪费内存
for (int i = 0; i < 24; i++) {
String key = "rank:hour:" + (hour - i);
this.redisTemplate.expire(key, 40, TimeUnit.DAYS);
}
log.info("天刷新合并完成...");
}
}
步骤3:排行榜查询接口
@RestController
@Log4j2
public class WeiboController {
@Autowired
private RedisTemplate redisTemplate;
// 24小时
@GetMapping("/bbs/hour")
public Set hourdata() {
// 1: 计算当前小时的key
long hour = System.currentTimeMillis() / (1000 60 60);
// 2: 从redis缓存中获取近24小时的数据
Set set = this.redisTemplate.opsForZSet().reverseRangeWithScores("rank:hour:" + hour, 0, 20);
return set;
}
// 一天
@GetMapping("/bbs/day")
public Set daydata() {
Set set = this.redisTemplate.opsForZSet().reverseRangeWithScores("rank:day", 0, 20);
return set;
}
// 周
@GetMapping("/bbs/week")
public Set weekdata() {
Set set = this.redisTemplate.opsForZSet().reverseRangeWithScores("rank:week", 0, 20);
return set;
}
// 月
@GetMapping("/bbs/month")
public Set monthdata() {
Set set = this.redisTemplate.opsForZSet().reverseRangeWithScores("rank:month", 0, 20);
return set;
}
}
案例2:关于关注列表和粉丝列表中,发布新消息的推送思路
这可以分两种思路:
第一种:push(消息发布),针对数量不大的情况,就是当up发布新消息后,直接将消息推送给所有的粉丝列表
第二种:pull(消息接收),针对大数据用户量,当自己上线后访问首页或者关注者发布信息时,会自动去查找关注者最新的消息动态进行展示,而判断依据就是根据自己是上次退出时间和本次登陆时间
案例3: 滑动窗口计时, 实现窗口滑动效果: 0-10 1-11.
public Response limitRequest(){
Long currentTime = new Date().getTime();
if(redisTemplate.hasKey("limit")) {
// intervalTime 限流的时间(滑动窗口)
Integer count = redisTemplate.opsForZSet().rangeByScore("limit", currentTime - intervalTime, currentTime).size();
if (count != null && count > 10) {
return Response.ok("每分钟最多只能访问10次");
}
}
redisTemplate.opsForZSet().add("limit", getID(), currentTime);
return Response.ok("成功访问");
}
Redis 的三种特殊数据类型
GEO地理位置
Redis 的 GEO 特性在 Redis 3.2 版本中推出, 这个功能可以将用户给定的地理位置信息储存起来, 并对这些信息进行操作。来实现诸如附近位置、摇一摇这类依赖于地理位置信息的功能。geo的数据类型为zset
GEO 的数据结构总共有六个常用命令:geoadd、geopos、geodist、georadius、georadiusbymember、gethash
1. geoadd
# 语法
geoadd key longitude latitude member ...
# 将给定的空间元素(纬度、经度、名字)添加到指定的键里面。
# 这些数据会以有序集he的形式被储存在键里面,从而使得georadius和georadiusbymember这样的
命令可以在之后通过位置查询取得这些元素。
# geoadd命令以标准的x,y格式接受参数,所以用户必须先输入经度,然后再输入纬度。
# geoadd能够记录的坐标是有限的:非常接近两极的区域无法被索引。
# 有效的经度介于-180-180度之间,有效的纬度介于-85.05112878 度至 85.05112878 度之间。,
当用户尝试输入一个超出范围的经度或者纬度时,geoadd命令将返回一个错误。
127.0.0.1:6379> geoadd china:city 116.23 40.22 北京
(integer) 1
127.0.0.1:6379> geoadd china:city 121.48 31.40 上海 113.88 22.55 深圳 120.21
30.20 杭州
(integer) 3
127.0.0.1:6379> geoadd china:city 106.54 29.40 重庆 108.93 34.23 西安 114.02
30.58 武汉
(integer) 3
2. geopos
# 语法
geopos key member [member...]
#从key里返回所有给定位置元素的位置(经度和纬度)
127.0.0.1:6379> geopos china:city 北京
1) 1) "116.23000055551528931"
2) "40.2200010338739844"
127.0.0.1:6379> geopos china:city 上海 重庆
1) 1) "121.48000091314315796"
2) "31.40000025319353938"
2) 1) "106.54000014066696167"
2) "29.39999880018641676"
127.0.0.1:6379> geopos china:city 新疆
1) (nil)
3. geodist
# 语法
geodist key member1 member2 [unit]
# 返回两个给定位置之间的距离,如果两个位置之间的其中一个不存在,那么命令返回空值。
# 指定单位的参数unit必须是以下单位的其中一个:
# m表示单位为米
# km表示单位为千米
# mi表示单位为英里
# ft表示单位为英尺
# 如果用户没有显式地指定单位参数,那么geodist默认使用米作为单位。
#geodist命令在计算距离时会假设地球为完美的球形,在极限情况下,这一假设最大会造成0.5%的误
差。
127.0.0.1:6379> geodist china:city 北京 上海
"1088785.4302"
127.0.0.1:6379> geodist china:city 北京 上海 km
"1088.7854"
127.0.0.1:6379> geodist china:city 重庆 北京 km
"1491.6716"
4. georadius
# 语法
georadius key longitude latitude radius m|km|ft|mi [withcoord][withdist]
[withhash][asc|desc][count count]
# 以给定的经纬度为中心, 找出某一半径内的元素
127.0.0.1:6379> georadius china:city 100 30 1000 km
重庆
西安
# withdist 返回位置名称和中心距离
127.0.0.1:6379> georadius china:city 100 30 1000 km withdist
重庆
635.2850
西安
963.3171
# withcoord 返回位置名称和经纬度
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord
重庆
106.54000014066696167
29.39999880018641676
西安
108.92999857664108276
34.23000121926852302
# withdist withcoord 返回位置名称 距离 和经纬度 count 限定寻找个数
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord withdist count
1
重庆
635.2850
106.54000014066696167
29.39999880018641676
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord withdist count
2
重庆
635.2850
106.54000014066696167
29.39999880018641676
西安
963.3171
108.92999857664108276
34.23000121926852302
5. georadiusbymember
# 语法
georadiusbymember key member radius m|km|ft|mi [withcoord][withdist]
[withhash][asc|desc][count count]
# 找出位于指定范围内的元素,中心点是由给定的位置元素决定
127.0.0.1:6379> GEORADIUSBYMEMBER china:city 北京 1000 km
北京
西安
127.0.0.1:6379> GEORADIUSBYMEMBER china:city 上海 400 km
杭州
上海
6. geohash
# 语法
geohash key member [member...]
# Redis使用geohash将二维经纬度转换为一维字符串,字符串越长表示位置更精确,两个字符串越相似
表示距离越近。
27.0.0.1:6379> geohash china:city 北京 重庆
wx4sucu47r0
wm5z22h53v0
127.0.0.1:6379> geohash china:city 北京 上海
wx4sucu47r0
wtw6sk5n300
zrem
GEO没有提供删除成员的命令,但是因为GEO的底层实现是zset,所以可以借用zrem命令实现对地理位置信息的删除
127.0.0.1:6379> geoadd china:city 116.23 40.22 beijin
1
127.0.0.1:6379> zrange china:city 0 -1 # 查看全部的元素
重庆
西安
深圳
武汉
杭州
上海
beijin
北京
127.0.0.1:6379> zrem china:city beijin # 移除元素
1
127.0.0.1:6379> zrem china:city 北京 # 移除元素
1
127.0.0.1:6379> zrange china:city 0 -1
重庆
西安
深圳
武汉
杭州
上海
HyperLogLog基数统计
用来做基数统计算法的数据结构,它提供了不精确的去重计数方案, 如统计网站的UV, 优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。
PFADD
PFCOUNT
PFMERGE
[PFADD key element [element ...] 添加指定元素到 HyperLogLog 中。
[PFCOUNT key [key ...] 返回给定 HyperLogLog 的基数估算值
[PFMERGE destkey sourcekey ...] 将多个 HyperLogLog 合并为一个HyperLogLog,并集计算
127.0.0.1:6379> PFADD mykey a b c d e f g h i j
1
127.0.0.1:6379> PFCOUNT mykey
10
127.0.0.1:6379> PFADD mykey2 i j z x c v b n m
1
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2
OK
127.0.0.1:6379> PFCOUNT mykey3
15
Bitmaps二进制位图
底层是通过对字符串的操作来实现. 可能会遇到这种情况:需要统计用户的某些信息,如活跃或不活跃,登录或者不登录;又如需要记录用户一年的打卡情况,打卡了是1, 没有打卡是0,如果使用普通的 key/value存储,则要记录365条记录,如果用户量很大,需要的空间也会很大,所以 Redis 提供了 Bitmap 位图这中数据结构,Bitmap 就是通过操作二进制位来进行记录,即为 0 和 1;如果要记录 365 天的打卡情况,使用 Bitmap表示的形式大概如下:0101000111000111...........................,这样有什么好处呢?当然就是节约内存了,365 天相当于 365 bit,又 1 字节 = 8 bit , 所以相当于使用 46 个字节即可
setbit 设置操作
SETBIT key offset value : 设置 key 的第 offset 位为value (1或0)
# 使用 bitmap 来记录上述事例中一周的打卡记录如下所示:
# 周一:1,周二:0,周三:0,周四:1,周五:1,周六:0,周天:0 (1 为打卡,0 为不打卡)
127.0.0.1:6379> setbit sign 0 1
0
127.0.0.1:6379> setbit sign 1 0
0
127.0.0.1:6379> setbit sign 2 0
0
127.0.0.1:6379> setbit sign 3 1
0
127.0.0.1:6379> setbit sign 4 1
0
127.0.0.1:6379> setbit sign 5 0
0
127.0.0.1:6379> setbit sign 6 0
0
getbit 获取操作
GETBIT key offset 获取offset设置的值,未设置过默认返回0
127.0.0.1:6379> getbit sign 3 # 查看周四是否打卡
1
127.0.0.1:6379> getbit sign 6 # 查看周七是否打卡
0
bitcount 统计操作
bitcount key [start, end] 统计 key 上位为1的个数
# 统计这周打卡的记录,可以看到只有3天是打卡的状态:
127.0.0.1:6379> bitcount sign
3
分布式锁
setnx key
命令 --插入key,如果存在就失败(0),成功就插入(1)
缺点:key会一直占用,搭配expire key使用,虽然两个命令单独都是原子操作,但组合在一个就不是原子操作
可改为如下命令,保证有效的原子性: set key value ex 10 nx
Springboot中可使用过redission提供的分布式锁,因为它提供了一个锁的守护线程(看门狗)防止锁释放了业务还没有执行完, redisson所有指令都通过lua脚本执行,可参考:[传送门]
布隆过滤器
布隆过滤器是一种占用空间很小的数据结构,它由一个很长的二进制向量和一组Hash映射函数组成,它用于检索一个元素是否在一个集合中,空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难
- 布隆过滤器:你可以先理解为一个不怎么精准的set数据结构,就是用于过滤数据是否重复的。
- Redis官方提供的布隆过滤器到Redis4.0提供了插件功能才正式使用。
- 布隆过滤器作为一个插件加载到Redis Server中,给Redis提供强大的布隆去重功能
就像下面这张图,它将数据根据算法转换为二进制类型,在对应的hash位置标记,比如"java"就是对应了hash槽的这三个位置,那么后续判断是否有存在"java"就直接找对应的位置就行了,如果是1,那就是存在,0则不存在,因此这种也会存在一个问题,hash碰撞导致误判
如何减少这种误差呢?
- 搞多几个哈希函数映射,降低哈希碰撞的概率
- 同时增加B数组的bit长度,可以增大hash函数生成的数据的范围,也可以降低哈希碰撞的概率
布隆过滤器适应于一些不需要完全精准的去重场景。比如
1、判断一个用户访问了一篇文章,比如:抖音,今日头条的推荐引擎去重
2、判断ip是否访问过本网站
不适合的场景:
1、判断一个用户是否收藏过一篇文章
2、判断一个用户是否签到过
3、判断一个用户是否点赞过一篇文章
布隆过滤器需要额外安装,就像插件一样,项目中使用也需要额外配置一下,这里不写了,自行百度,看命令
BF.RESERVE {key} {error_rate} {capacity}
error_rate:错误率,允许布隆过滤器的错误率。这个值越低。过滤器的位数组的大小越大,占用空间也就越大。
capacity:存储的元素个数,当实际存储的元素个数超过这个值之后,过滤器的准确率会下降。
注意:必须在add之前使用bf.reserve命令显示的创建。
如果对应的key,已经存在,bf.reserve会报错。
如果不使用bf.reserve,默认的:error_rate是0.01。默认的;capacity是100.
bf.add 添加元素
bf.exists 查询元素是否存在
bf.madd 一次添加多个元素
bf.mexists 一次查询多个元素是否存在
127.0.0.1:6379> bf.reserve filter 0.01 100
OK
127.0.0.1:6379> bf.add filter 1
(integer) 1
127.0.0.1:6379> bf.exists filter 1
(integer) 1
127.0.0.1:6379> bf.madd filter 1
1) (integer) 0
127.0.0.1:6379> bf.madd filter 1 2 3 4 5 6
1) (integer) 0
2) (integer) 1
3) (integer) 1
4) (integer) 1
5) (integer) 1
6) (integer) 1
127.0.0.1:6379> bf.madd filter 1 2 3 4 5 6
1) (integer) 0
2) (integer) 0
3) (integer) 0
4) (integer) 0
5) (integer) 0
6) (integer) 0
127.0.0.1:6379> bf.madd filter 1 2 3 4 5 6 7 6
1) (integer) 0
2) (integer) 0
3) (integer) 0
4) (integer) 0
5) (integer) 0
6) (integer) 0
7) (integer) 1
8) (integer) 0
127.0.0.1:6379> bf.mexists filter 1 2 3 4 5 6 7 6
1) (integer) 1
2) (integer) 1
3) (integer) 1
4) (integer) 1
5) (integer) 1
6) (integer) 1
7) (integer) 1
8) (integer) 1
Lua脚本
Lua是一个轻量、简洁、可扩展的脚本语音,它的特点有:(额,其实说白了,这个脚本是原子性的操作,快捷复用,这也是用它的原因)
- 轻量:编译后提交很小。
- 简洁:由C编写,启动快,运行快
- 可扩展:可内嵌到各种编程语言或者系统中运行。提升静态语言的灵活性。而且完全不需要担心语法问题。
使用建议: 额,其实这玩意儿看着脑袋还是有点大的,如果只是简单使用就看看基本的语法意思,能百度出来就百度cv吧,不然得花一点时间去慢慢调慢慢试了,适得其反了,这有两篇参考文章:[文章1],[文章2]
基础语法:
EVAL script numkeys KEYS [KEYS....] ARGV [ARGV....] --要注意的是:Lua的数组坐标不是从0开始,而是从1开始。
- script代表的是:参数是一段Lua脚本程序,脚本不必(也不应该)定义未一个Lua函数。
- numkeys :用于指定key参数的个数
- KEYS[KEYS….]:代表redis的KEYS,从evel的第三个参数开始算起,标识在脚本中所用到的redis键(KEY)
- ARGV[ARGV….]:代表lua的入参,在Lua中通过全局变量的argv数组访问,访问的形式呵呵KEYS变量类似(ARGV[1],ARGV[2])以此类推。
linux 命令案例:
127.0.0.1:6379> EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 tt1 tt2 vv1 vv2
1) "tt1"
2) "tt2"
3) "vv1"
4) "vv2"
- eval是redis执行lua脚本的命令
- 双引号是lua执行的字符串脚本(注意是:字符串类型)
- 2 代表的是2个key
- key1,key2代表的是KEYS[1],KEYS[2]的入参
- feige,achao是ARGV[1],ARGV[2]的入参.
- 这里-脚本return只是返回输出,没有做任何操作
springboot中lua案列:
原生代码写法:
@PostMapping("/user/update")
public R updateUser(User user) {
String key = "user:" + user.getId();
// 1: 第一次:先到缓存中根据key去查找一次,看是否存在
User olduser = JsonUtil.string2Obj(this.stringRedisTemplate.opsForValue().get(key), User.class);
if (olduser == null) {
// 2:第二次:发送第二次redis请求。不存在就新增
this.stringRedisTemplate.opsForValue().set(key, JsonUtil.obj2String(olduser));
return R.ok();
}
if (olduser.getNickname().equals(user.getNickname())) {
log.info("用户对象:{},无须修改!", key);
} else {
log.info("用户对象:{},修改成功!!", key);
// 2:第二次:发送第二次redis请求。不存在就新增
this.stringRedisTemplate.opsForValue().set(key, JsonUtil.obj2String(olduser));
}
return R.ok();
}
lua脚本改进:
-- 成功返回1、没有设置返回0
-- 如果redis没找到。就直接添加
if redis.call('get',KEYS[1]) == nil then
redis.call('set',KEYS[1],ARGV[1]);
return 1;
end
-- 如果旧值等于新值,不进行操作,如果不相同就执行更新
if redis.call('get',KEYS[1]) == ARGV[1] then
return 0;
else
redis.call('set',KEYS[1],ARGV[1]);
return 1;
end
> 完整的案例使用:
1,将脚本保存到resource下面,命名updateuser.lua
2,配置脚本对象
@SpringBootConfiguration
public class LuaConfiguration {
/**
* 将lua脚本的内容加载出来放入到DefaultRedisScript
* @return
*/
@Bean
public DefaultRedisScript<Long> initluascript() {
DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/updateuser.lua")));
defaultRedisScript.setResultType(Long.class);
return defaultRedisScript;
}
}
3.编写controller层
@Autowired
private DefaultRedisScript<Long> defaultRedisScript;
@PostMapping("/user/lua/update")
public R luaupdateUser(Integer userid, String nickname) {
String key = "user:" + userid;
// 1: 设置lua的key
List<String> keysList = Arrays.asList(key);
// 2 : execute 参数1:执行的lua脚本的对象 参数2:参数的key列表 参数3:执行lua每个key对应的参数
Long execute = this.stringRedisTemplate.execute(defaultRedisScript, keysList, nickname);
return R.ok().data("result", execute);
}
> 类似的案例还有IP限制,接口限流等等之类的,都差不多,比较简单也方便复用,在提供一个IP限流的脚本案例
-- 为某个接口的请求IP设置计数器,比如:127.0.0.1请求某接口
-- KEYS[1] = 127.0.0.1 也就是用户的IP
-- ARGV[1] = 过期时间 30m
-- ARGV[2] = 限制的次数
local limitCount = redis.call('incr',KEYS[1]);
if limitCount == 1 then
redis.call("expire",KEYS[1],ARGV[1])
end
-- 如果次数还没有过期,并且还在规定的次数内,说明还在请求同一接口
if limitCount > tonumber(ARGV[2]) then
return 0
end
return 1
缓存击穿、缓存穿透、缓存雪崩
这三种情况都是指当前有大量请求涌入没有命中缓存,直接冲击数据库而造成的奔溃,只是场景的不同而已
缓存穿透
指查询一个一定不存在的数据,由于缓存是不命中时需要从数据库查询,查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到数据库去查询,进而给数据库带来压力
缓存穿透一般都是这几种情况产生的:
- 业务/运维/开发失误的操作,比如缓存和数据库的数据都被误删除了
- 黑客非法请求攻击,比如黑客故意捏造大量非法请求,以读取不存在的业务数据
如何避免缓存穿透呢? 一般有三种方法
- 如果是非法请求,我们在API入口,对参数进行校验,过滤非法值
- 缓存空对象,设置过期时间
- 使用布隆过滤器快速判断数据是否存在,再继续后面的操作
缓存雪崩
是指大量的请求进来,这个时候Redis缓存成片的key过期,而导致数据无法命中Redis缓存,这个时候就会冲入db中,导致宕机
比较好的解决方法就是将数据key的过期时间错开一些,避免同一时间大量过期
缓存击穿
指热点key在某个时间点过期的时候,而恰好在这个时间点对这个Key有大量的并发请求过来,从而大量的请求打到db
缓存击穿看着有点像,其实它两区别是,缓存雪奔是指数据库压力过大甚至down机,缓存击穿只是大量并发请求到了DB数据库层面。可以认为击穿是缓存雪奔的一个子集吧。有些文章认为它俩区别,是区别在于击穿针对某一热点key缓存,雪奔则是很多key
解决方案:
- 设置热点数据永不过期
- 加互斥锁: 分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可
- 主备缓存,参考文章 [List] 数据结构部分,商品展示案例
redis 发布订阅
Redis提供了发布/订阅及阻塞队列功能,能实现一个简单的消息队列系统.
- 缺点: 安全性和准确性不保证, 不能作为专业的消息组件
- 优点: 轻便快捷, 可用于要求性不高的消息通知情况
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息
Redis 的 subscribe 命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时, 信息就会被发送给所有订阅指定频道的客户端
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
Redis 发布订阅命令
由于使用场景较少,不做过多介绍,可参考其他文章了解:[传送门],[传送门2]
redis 持久化
Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供了持久化功能!
RDB
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照(什么是快照?可以这样理解,给当前时刻的数据,拍一张照片,然后保存下来),它恢复时是将快照文件直接读到内存里, 它是Redis默认的持久化方式
原理: Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失
执行完操作后,在指定目录下会生成一个dump.rdb文件,Redis 重启的时候,通过加载dump.rdb文件来恢复数据
触发方式: 一般采用配置方式自动触发,可参考上面的redis.conf 配置中save 命令(第9)
- RDB 的优点
适合大规模的数据恢复场景,如备份,全量复制等
- RDB缺点
没办法做到实时持久化/秒级持久化, 在一定间隔时间做一次备份,所以如果redis意外down掉的话,就会丢失最后一次快照后的所有修改
新老版本存在RDB格式兼容问题
Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
AOF
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
Aof保存的是 appendonly.aof 文件 , 文件配置参考redis,conf 中 第 18,19,20
- 优点:
1、每修改同步:appendfsync always 同步持久化,每次发生数据变更会被立即记录到磁盘,性能较差但数据完整性比较好
2、每秒同步: appendfsync everysec 异步操作,每秒记录 ,如果一秒内宕机,有数据丢失
3、不同步: appendfsync no 从不同步
- 缺点:
1、相同数据集的数据而言,aof 文件要远大于 rdb文件,恢复速度慢于 rdb。
2、Aof 运行效率要慢于 rdb,每秒同步策略效率较好,不同步效率和rdb相同
两中持久化方式只要配置是开启的,如果遇到异常宕机后,重启后会自动加载备份
说明:
1. 同时开启两种持久化方式
在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?作者建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的Bug,留着作为一个万一的手段
2. 性能建议
因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留 save 900 1 这条规则
redis 高可用
主从模式,哨兵模式,集群模式
- 主从模式
主从模式中,Redis部署了多台机器,有主节点,负责读写操作,有从节点,只负责读操作。从节点的数据来自主节点,实现原理就是主从复制机制
- 哨兵模式
主从模式中,一旦主节点由于故障不能提供服务,需要人工将从节点晋升为主节点,同时还要通知应用方更新主节点地址. 哨兵模式它可以监视所有的Redis主节点和从节点,并在被监视的主节点进入下线状态时,自动将下线主服务器属下的某个从节点升级为新的主节点. 但是呢,一个哨兵进程对Redis节点进行监控,就可能会出现问题(单点问题),因此,可以使用多个哨兵来进行监控Redis节点,并且各个哨兵之间还会进行监控
- Cluster集群模式
哨兵模式基于主从模式,实现读写分离,它还可以自动切换,系统可用性更高。但是它每个节点存储的数据是一样的,浪费内存,并且不好在线扩容。
Cluster集群模式实现了Redis的分布式存储。对数据进行分片,也就是说每台Redis节点上存储不同的内容,来解决在线扩容的问题。并且,**它也提供复制和故障转移的功能**
Hash Slot插槽算法, 把整个数据库被分为16384个slot(槽),每个进入Redis的键值对,根据key进行散列,分配到这16384插槽中的一个, 集群中的每个节点负责一部分的hash槽,比如当前集群有A、B、C个节点,每个节点上的哈希槽数 =16384/3,那么就有:
- 节点A负责0~5460号哈希槽
- 节点B负责5461~10922号哈希槽
- 节点C负责10923~16383号哈希槽
redis集群通过ping/pong消息,实现故障发现, 为了保证高可用,Cluster集群引入了主从复制,一个主节点对应一个或者多个从节点, 如果主节点宕机时,就会启用从节点 (与纯主从模式不同的是,主从节点之间并没有读写分离, Slave 只用作 Master 宕机的高可用备份,所以更合理来说应该是主备模式。如果主节点没有从节点,那么一旦发生故障时,集群将完全处于不可用状态。)
相关问题
1. 开始为什么redis是单线程,6.0后怎么又引入多线程了
① 官方表示,因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了!
② redis使用多线程并非是完全摒弃单线程,redis还是使用单线程模型来处理客户端的请求,只是使用多线程来处理数据的读写和协议解析,执行命令还是使用单线程,这样做的目的是因为redis的性能瓶颈在于网络IO而非CPU,使用多线程能提升IO读写的效率,从而整体提高redis的性能
注意: Redis6 的多线程默认是禁⽤的,只使⽤主线程, 如需开启需要修改 Redis 配置⽂件 redis.conf 设置开启多线程及线程数使得多线程生效:
io-threads-do-reads yes
io-threads 4 # 官⽹建议4核的机器建议设置为2或3个线程,8核的建议设置为6个线程
2. 如何保证与数据库的一致性
比较常用且典型的就是 "延时双删" 了,但这并不是强一致性的,只是相对较优的方案
为什么使用?
一般在更新数据库数据时,需要同步redis中缓存的数据,所以存在两种方法:
- 第一种方案:先执行update操作,再执行缓存清除
- 第二种方案:先执行缓存清除,再执行update操作
这两种方案的弊端是当存在并发请求时,很容易出现以下问题:
- 第一种方案:当请求1执行update操作后,还未来得及进行缓存清除,此时请求2查询到并使用了redis中的旧数据
- 第二种方案:当请求1执行清除缓存后,还未进行update操作,此时请求2进行查询到了旧数据并写入了redis
> 延时双删也很好理解,
> 1.先删除缓存 2.再更新数据库,然后休眠一会(比如1秒)3.再次删除缓存
> 为什么要休眠一会呢,因为在并发量大的情况, 第一步执行完,在要执行第二步的时候(还没有执行),现在用户访问了旧数据,那接下下来发现缓存是没有数据的(第一步删了),就需要更新缓存了
> 而同时第二步执行完了,可能就会存在如果不等待一下,等旧数据写入缓存的话,那第三部执行删除缓存其实是删的空值,删完了这时候旧数据才写入到了缓存,那就很尴尬,当没来