redis 是一个内存数据库,如果你把进程杀掉,那么里面存储的数据都会消失,那么这篇文章就是来解决 redis 持久化的问题
我们在 redis.conf 文件中增加两个配置
	
		
			| 1 2 | appendonly yes appendfilename appendonly.aof | 
	
	- appenonly 表示只追加
- appendfilename 表示追加到那什么文件中
指令: *3\r\n$3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nvalue\r\n 落在 appendonly.aof 文件中
	
		
			| 1 2 3 4 5 6 7 | *3 $3 SET $3 KEY $5 value | 
	
这里要实现的功能就是把用户发过来的指令,用 RESP 的形式记录在 appendonly.aof 文件中
这个文件是在机器的硬盘上,当 redis 停了之后,内存中的数据都没了,但这个文件会保存下
redis 重启后,会读取这个文件,把之前内存中的数据再次加载回来
定义 AofHandler
在项目下新建文件 aof/aof.go
在里面定义一个 AofHandler 结构体,它的作用就是用来处理 appendonly.aof 文件
	
		
			| 1 2 3 4 5 6 7 | type AofHandler struct {     database    databaseface.Database // 持有 db,db 有业务核心     aofFile     *os.File              // 持有 aof 文件     aofFilename string                // aof 文件名     currentDB   int                   // 当前 db     aofChan     chan *payload         // 写文件的缓冲区 } | 
	
这里有注意的是 aofChan,它是写文件的缓冲区
因为从文件中读取指令,指令是非常密集的,但是将指令写入硬盘时非常慢的,我们又不可能每次都等待指令写完成后再去操作 redis
这时我们就把所有想写入 aof 文件的指令放到 aofChan 中,然后在另一个 goroutine 中去写入硬盘
所以这个 aofChan 的类型是 payload 结构体
	
		
			| 1 2 3 4 5 | type CmdLine = [][]byte type payload struct {   cmdLine CmdLine // 指令   dbIndex int     // db 索引 } | 
	
AofHandler 结构体定义好之后,我们需要定义一个 NewAofHandler 函数来初始化 AofHandler 结构体
还需要定义一个 AddAof 方法,用来往 aofChan 中添加指令
放到缓冲区之后,还需要一个方法 HandleAof 将指令写入硬盘
最后还要实现一个从硬盘加载 aof 文件到内存的的函数 LoadAof
实现 NewAofHandler
NewAofHandler 函数用来初始化 AofHandler 结构体
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | func NewAofHandler(database databaseface.Database) (*AofHandler, error) {   // 初始化 AofHandler 结构体   handler := &AofHandler{}   // 从配置文件中读取 aof 文件名   handler.aofFilename = config.Properties.AppendFilename   // 持有 db   handler.database = database   // 从硬盘加载 aof 文件   handler.LoadAof()   // 打开 aof 文件, 如果不存在则创建   aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)   if err != nil {     return nil, err   }   // 持有 aof 文件   handler.aofFile = aofFile   // 初始化 aofChan   handler.aofChan = make(chan *payload, aofBufferSize)   // 启动一个 goroutine 处理 aofChan   go func() {     handler.HandleAof()   }()   // 返回 AofHandler 结构体   return handler, nil } | 
	
实现 AddAof
AddAof 方法用来往 aofChan 中添加指令,它不做落盘的操作
因为在执行指令的时候,等待它落盘的话,效率太低了,所以我们把指令放到 aofChan 中,然后在另一个 goroutine 中去处理
	
		
			| 1 2 3 4 5 6 7 8 9 10 | func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {   // 如果配置文件中的 appendonly 为 true 并且 aofChan 不为 nil   if config.Properties.AppendOnly && handler.aofChan != nil {     // 往 aofChan 中添加指令     handler.aofChan <- &payload{       cmdLine: cmdLine,       dbIndex: dbIndex,     }   } } | 
	
实现 HandleAof
HandleAof 方法用来处理 aofChan 中的指令,将指令写入硬盘
currentDB 记录的是当前工作的 DB,如果切换了 DB,会在 aof 文件中插入 select 0 这样切换 DB 的语句
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | func (handler *AofHandler) HandleAof() {   // 初始化 currentDB   handler.currentDB = 0   // 遍历 chan   for p := range handler.aofChan {     // 如果当前 db 不等于上一次工作的 db,就要插入一条 select 语句     if p.dbIndex != handler.currentDB {       // 我们要把 select 0 编码成 RESP 格式       // 也就是 *2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n       data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()       // 写入 aof 文件       _, err := handler.aofFile.Write(data)       if err != nil {         logger.Warn(err)         continue       }       // 更新 currentDB       handler.currentDB = p.dbIndex     }     // 这里是插入正常的指令     data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()     // 写入 aof 文件     _, err := handler.aofFile.Write(data)     if err != nil {       logger.Warn(err)     }   } } | 
	
实现 Aof 落盘功能
我们之前在实现指令的部分,都是直接执行指令,现在我们要把指令写入 aof 文件
我们在 StandaloneDatabase 结构体中增加一个 aofHandler 字段
	
		
			| 1 2 3 4 | type StandaloneDatabase struct {   dbSet      []*DB   aofHandler *aof.AofHandler // 增加落盘功能 } | 
	
然后新建 database 时需要对 aofHandler 进行初始化
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | func NewStandaloneDatabase() *StandaloneDatabase {   // ...   // 先看下配置文件中的 appendonly 是否为 true   if config.Properties.AppendOnly {     // 初始化 aofHandler     aofHandler, err := aof.NewAofHandler(database)     if err != nil {       panic(err)     }     // 持有 aofHandler     database.aofHandler = aofHandler     // 遍历 dbSet     for _, db := range database.dbSet {       // 解决闭包问题       sdb := db       // 为每个 db 添加 AddAof 方法       // 这个 addAof 方法是在执行指令的时候调用的       sdb.addAof = func(line CmdLine) {         database.aofHandler.AddAof(sdb.index, line)       }     }   }   return database } | 
	
这里要注意的是 addAof 方法,它是在执行指令的时候调用的
因为我们需要在指令中调用 Addaof 函数,实现指令写入 aof 文件
但是在指令中,???们只能拿到 db,db 上又没有操作 aof 相关的方法,所以我们需要在 db 中增加一个 addAof 方法
	
		
			| 1 2 3 4 5 | type DB struct {     index  int           // 数据的编号     data   dict.Dict     // 数据类型     addAof func(CmdLine) // 每个 db 都有一个 addAof 方法 } | 
	
然后就在需要落盘的指令中调用 addAof 方法
DEL 方法需要记录下来,因为 DEL 方法是删除数据的,如果不记录下来,那么 aof 文件中的数据就会和内存中的数据不一致
	
		
			| 1 2 3 4 5 6 7 8 | // DEL K1 K2 K3 func DEL(db *DB, args [][]byte) resp.Reply {   deleted := db.Removes(keys...)   // delete 大于 0 说明有数据被删除   if deleted > 0 {     db.addAof(utils.ToCmdLine2("DEL", args...))   } } | 
	
FLUSHDB 方法也需要记录下来,因为 FLUSHDB 方法是删除当前 DB 中的所有数据
	
		
			| 1 2 3 4 | // FLUSHDB func FLUSHDB(db *DB, args [][]byte) resp.Reply {     db.addAof(utils.ToCmdLine2("FLUSEHDB", args...)) } | 
	
RENAME 和 RENAMENX 方法也需要记录下来,因为这两个方法是修改 key 的名字
	
		
			| 1 2 3 4 5 6 7 8 9 | // RENAME K1 K2 func RENAME(db *DB, args [][]byte) resp.Reply {   db.addAof(utils.ToCmdLine2("RENAME", args...)) }   // RENAMENX K1 K2 func RENAMENX(db *DB, args [][]byte) resp.Reply {   db.addAof(utils.ToCmdLine2("RENAMENX", args...)) } | 
	
SET 和 SETNX 方法也需要记录下来,因为这两个方法是设置数据的
	
		
			| 1 2 3 4 5 6 7 8 9 | // SET K1 v func SET(db *DB, args [][]byte) resp.Reply {   db.addAof(utils.ToCmdLine2("SET", args...)) }   // SETNX K1 v func SETNX(db *DB, args [][]byte) resp.Reply {   db.addAof(utils.ToCmdLine2("SETNX", args...)) } | 
	
GETSET 方法也需要记录下来,因为这个方法是设置数据的同时返回旧数据
	
		
			| 1 2 3 4 | // GETSET K1 v1 func GETSET(db *DB, args [][]byte) resp.Reply {   db.addAof(utils.ToCmdLine2("GETSET", args...)) } | 
	
实现 LoadAof
LoadAof 方法用来从硬盘加载 aof 文件到内存
aof 中的指令是符合 RESP 协议的,我们就可以把这些指令当成用户发过来的指令,执行就可以了
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | func (handler *AofHandler) LoadAof() {   // 打开 aof 文件   file, err := os.Open(handler.aofFilename)   if err != nil {     logger.Error(err)     return   }   // 关闭文件   defer func() {     _ = file.Close()   }()   // 创建一个 RESP 解析器,将 file 传入,解析后的指令会放到 chan 中   ch := parser.ParseStream(file)   fackConn := &connection.Connection{}   // 遍历 chan,执行指令   for p := range ch {     if p.Err != nil {       // 如果是 EOF,说明文件读取完毕       if p.Err == io.EOF {         break       }       logger.Error(err)       continue     }     if p.Data == nil {       logger.Error("empty payload")       continue     }     // 将指令转换成 MultiBulkReply 类型     r, ok := p.Data.(*reply.MultiBulkReply)     if !ok {       logger.Error("exec multi mulk")       continue     }     // 执行指令     rep := handler.database.Exec(fackConn, r.Args)     if reply.IsErrReply(rep) {       logger.Error(rep)     }   } } |