KVStore v3 – 功能总览与执行流程梳理 版本定位:
单机、单线程、WAL + B+Tree + Snapshot 的 KV 存引擎
功能 0:系统初始化与冷启动恢复 功能描述
启动时恢复已有数据
尽量减少 WAL replay 的成本
流程
创建 kvstore 实例
初始化 B+ 树(空)
尝试加载 snapshot (如果存在)
打开 WAL 日志文件
replay WAL 中 snapshot 之后的增量日志
进入 NORMAL 模式,提供对外服务
函数调用链(关键路径)
1 2 3 4 5 6 7 8 9 10 kvstore_open() ├── bptree_create() ├── kvstore_load_snapshot() │ └── kvstore_apply_put_internal() ├── fopen(data.log ) ├── kvstore_replay_log() │ ├── kvstore_crc_check() │ ├── kvstore_apply_put_internal() │ └── kvstore_apply_del_internal() └── store->mode = KVSTORE_MODE_NORMAL
功能 1:PUT(写入 / 更新) 功能描述
写入一个 key-value
保证 crash 后可恢复(WAL)
写路径遵循:先落盘,再改内存
PUT 执行流程
1 2 3 4 5 6 7 8 9 kvstore_put() ├── kvstore_log_put() │ ├── crc32() │ ├── fprintf () │ └── fflush() ├── kvstore_maybe_compact() │ └── kvstore_compact() [条件触发] └── kvstore_apply_put() └── bptree_insert()
功能 2:DEL (删除) 功能描述
删除一个 key
删除本质上也是一次“写操作”
通过 WAL 保证删除可重放
DEL 执行流程
检查 store 是否为空
检查是否处于只读模式
先写删除日志
更新统计信息
判断是否需要 compaction
从 B+ 树中删除 key
函数调用链
1 2 3 4 5 6 7 8 9 kvstore_del() ├── kvstore_log_del() │ ├── crc32() │ ├── fprintf () │ └── fflush() ├── kvstore_maybe_compact() │ └── kvstore_compact() [条件触发] └── kvstore_apply_del() └── bptree_delete()
功能 3:GET (查询) 功能描述
只读操作
不涉及 WAL
完全依赖内存结构(B+ 树)
GET 执行流程
1 2 kvstore_search() └── bptree_search()
功能 4. WAL 写入与校验(持久化基础设施) 功能描述
所有写操作的持久化入口
使用 CRC32 保证单条日志完整性
为 replay 提供可信输入
WAL 写入流程 (PUT / DEL)
构造 payload (逻辑操作)
对 payload 计算 crc32
写入 payload|crc\n
flush 到 OS
关键函数
1 2 3 kvstore_log_put() kvstore_log_del() └── crc32()
功能 5:WAL Replay (崩溃恢复核心) 功能描述
冷启动时恢复 WAL 中的历史操作
跳过损坏的尾部日志
不经过 public API (绕过 readonly)
Replay 执行流程
切换为 REPLAY 模式
rewind 日志文件
校验 header
逐行读取日志
校验 CRC
解析操作类型
调用 apply 内部函数修改内存
恢复完成,切回 NORMAL 模式
函数调用链
1 2 3 4 5 6 7 8 kvstore_replay_log() ├── rewind() ├── kvstore_log_header() ├── fgets() ├── kvstore_crc_check() │ └── crc32() ├── kvstore_apply_put_internal() └── kvstore_apply_del_internal()
功能 6:Snapshot (冷启动加速) 功能描述
将当前完整内存状态持久化为快照
减少 replay WAL 的成本
本质识“全量导出”
Snapshot 创建流程
创建临时 snapshot 文件
顺序扫描 B+ 树
对每个 KV 写入 snapshot
flush + fsync
原子 rename 成正式版 snapshot
函数调用链
1 2 3 4 5 6 7 8 kvstore_create_snapshot() ├── fopen(tmp) ├── bptree_scan() │ └── snapshot_write_cb() │ └── crc32() ├── fflush() ├── fsync() └── rename()
功能 7:Snapshot 加载(冷启动) 功能描述
优先恢复 snapshot
snapshot 中的数据视为“可信全量”
WAL 只负责补增量
Snapshot 记载流程
1 2 3 4 5 kvstore_load_snapshot() ├── fopen(data.snapshot) ├── fgets() ├── sscanf ("PUT %d %ld" ) └── kvstore_apply_put_internal()
功能 8:日志压缩 (Compaction) 功能描述
清理冗余 WAL
把“历史操作”压缩为“当前状态”
控制日志体积和 replay 时间
Compaction 执行流程
判断是否满足压缩条件
创建 compact 临时文件
写入日志 header
顺序扫描 B+ 树
写入每个 KV 的 PUT 记录
flush + fsync
原子替换旧日志
重新打开日志文件
生成 snapshot
函数调用链
1 2 3 4 5 6 7 8 9 10 11 12 kvstore_maybe_compact() └── kvstore_compact() ├── fopen(tmp) ├── fprintf (header) ├── bptree_scan() │ └── compact_write_cb() │ └── crc32() ├── fflush() ├── fsync() ├── rename() ├── reopen log_fp └── kvstore_create_snapshot()
功能 9: 错误码与错误说明 功能描述
将底层错误抽象为稳定接口
方便调试、日志、上层调用
函数
总结
kvstore v3 是一个基于B+ 树内存索引 + WAL 持久化 + Snapshot 冷启动优化 的单机 KV 存储系统
写路径遵循 WA-first 原则
通过 CRC 校验、Replay策略与日志压缩 保证崩溃可恢复性与长期运行稳定性
复盘后的 kvstore.c 中的函数 kvstore_create 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 kvstore* kvstore_create (const char * log_path) { kvstore* store = malloc (sizeof (*store)); if (!store) return NULL ; store->tree = NULL ; store->log_fp = NULL ; store->log_size = 0 ; store->ops_count = 0 ; store->mode = KVSTORE_MODE_NORMAL; store->readonly = 0 ; store->tree = bptree_create(); if (!store->tree) goto fail; kvstore_load_snapshot(store); if (kvstore_open_log(store, log_path) != KVSTORE_OK) { goto fail; } if (kvstore_replay_log(store) != KVSTORE_OK) { fprintf (stderr , "[DEBUG] replay failed\n" ); goto fail; } return store; fail: kvstore_destroy(store); return NULL ; }
kvstore_destroy 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void kvstore_destroy (kvstore* store) { if (!store) { return ; } if (store->log_fp) { fclose(store->log_fp); store->log_fp = NULL ; } if (store->tree) { bptree_destroy(store->tree); store->tree = NULL ; } free (store); }
kvstore_search 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 int kvstore_search (kvstore* store, int key, long * value) { if (!store || !store->tree) return KVSTORE_ERR_NULL; return bptree_search(store->tree, key, value); }
kvstore_put 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 int kvstore_put (kvstore* store, int key, long value) { if (!store) return KVSTORE_ERR_NULL; if (store->mode != KVSTORE_MODE_NORMAL) return KVSTORE_ERR_READONLY; if (kvstore_log_put(store, key, value) != KVSTORE_OK) return KVSTORE_ERR_IO; store->ops_count++; kvstore_maybe_compact(store); return kvstore_apply_put(store, key, value); }
kvstore_del 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 int kvstore_del (kvstore* store, int key) { if (!store) return KVSTORE_ERR_NULL; if (store->readonly) return KVSTORE_ERR_READONLY; if (kvstore_log_del(store, key) != 0 ) return KVSTORE_ERR_IO; store->ops_count++; kvstore_maybe_compact(store); return kvstore_apply_del(store, key); }
kvstore_apply_put 1 2 3 4 5 6 7 8 9 10 11 12 static int kvstore_apply_put (kvstore* store, int key, long value) { if (!store || !store->tree) return KVSTORE_ERR_NULL; return kvstore_apply_put_internal(store->tree, key, value, store->mode); }
kvstore_apply_del 1 2 3 4 5 6 7 static int kvstore_apply_del (kvstore* store, int key) { if (!store || !store->tree) return KVSTORE_ERR_NULL; return kvstore_apply_del_internal(store->tree, key); }
kvstore_apply_put_internal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 static int kvstore_apply_put_internal (bptree* tree, int key, long value, kvstore_mode_t mode) { if (!tree) return KVSTORE_ERR_NULL; int ret = bptree_insert(tree, key, value); if (ret == BPTREE_OK && mode == KVSTORE_MODE_NORMAL) { printf ("键 %d 已存在,已更新值为 %ld\n" , key, value); } return ret; }
kvstore_apply_del_internal 1 2 3 4 5 6 7 8 9 static int kvstore_apply_del_internal (bptree* tree, int key) { if (!tree) return KVSTORE_ERR_NULL; return bptree_delete(tree, key); }
kvstore_open_log 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 static int kvstore_open_log (kvstore* store, const char * path) { if (!store) return KVSTORE_ERR_NULL; if (!path) return KVSTORE_ERR_INTERNAL; FILE* fp = fopen(path, "a+" ); if (!fp) { perror("fopen log failed" ); return KVSTORE_ERR_IO; } store->log_fp = fp; snprintf (store->log_path, sizeof (store->log_path), "%s" , path); fseek(fp, 0 , SEEK_END); long size = ftell(fp); if (size == 0 ) { fprintf (fp, "%s\n" , KVSTORE_LOG_VERSION); fflush(fp); } fseek(fp, 0 , SEEK_SET); return KVSTORE_OK; }
kvstore_log_put 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 static int kvstore_log_put (kvstore* store, int key, long value) { if (!store->log_fp) return KVSTORE_ERR_NULL; char buf[128 ]; snprintf (buf, sizeof (buf), "PUT %d %ld" , key, value); uint32_t crc = crc32(buf); int bytes = fprintf (store->log_fp, "%s|%u\n" , buf, crc); fflush(store->log_fp); store->ops_count += 1 ; store->log_size += bytes; return KVSTORE_OK; }
kvstore_log_del 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 static int kvstore_log_del (kvstore* store, int key) { if (!store->log_fp) return KVSTORE_ERR_NULL; char buf[128 ]; snprintf (buf, sizeof (buf), "DEL %d" , key); uint32_t crc = crc32(buf); int bytes = fprintf (store->log_fp, "%s|%u\n" , buf, crc); fflush(store->log_fp); store->log_size += bytes; store->ops_count += 1 ; return KVSTORE_OK; }
compact_write_cb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static int compact_write_cb (int key, long value, void * arg) { FILE* fp = (FILE*)arg; char payload[128 ]; snprintf (payload, sizeof (payload), "PUT %d %ld" , key, value); uint32_t crc_val = crc32(payload); if (fprintf (fp, "%s|%u\n" , payload, crc_val) < 0 ) { return KVSTORE_ERR_INTERNAL; } return KVSTORE_OK; }
kvstore_compact 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 int kvstore_compact (kvstore* store) { if (!store) return KVSTORE_ERR_NULL; const char * tmp_path = "data.compact" ; const char * data_path = store->log_path; FILE* fp = fopen("data.compact" , "w" ); if (!fp) { perror("fopen compact" ); return KVSTORE_ERR_IO; } fprintf (fp, "%s\n" , KVSTORE_LOG_VERSION); if (bptree_scan(store->tree, compact_write_cb, fp) != 0 ) { fclose(fp); unlink(tmp_path); return KVSTORE_ERR_INTERNAL; } fflush(fp); fsync(fileno(fp)); fclose(fp); FILE* old_fp = store->log_fp; store->log_fp = NULL ; if (rename(tmp_path, data_path) != 0 ) { perror("rename" ); store->log_fp = old_fp; return KVSTORE_ERR_INTERNAL; } fclose(old_fp); store->log_fp = fopen(data_path, "a+" ); if (!store->log_fp) { perror("reopen data.log" ); return KVSTORE_ERR_INTERNAL; } kvstore_create_snapshot(store); return KVSTORE_OK; }
kvstore_maybe_compact 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 static void kvstore_maybe_compact (kvstore* store) { if (!store) return ; if (store->log_size >= KVSTORE_MAX_LOG_SIZE || store->ops_count >= KVSTORE_MAX_OPS) { int rc = kvstore_compact(store); if (rc == KVSTORE_OK) { store->log_size = 0 ; store->ops_count = 0 ; } else { fprintf (stderr , "[WARN] kvstore compaction failed, will retry later\n" ); } } }
snapshot_write_cb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 static int snapshot_write_cb (int key, long value, void * arg) { FILE* fp = (FILE*)arg; char payload[128 ]; snprintf (payload, sizeof (payload), "PUT %d %ld" , key, value); uint32_t crc_val = crc32(payload); fprintf (fp, "%s|%u\n" , payload, crc_val); return KVSTORE_OK; }
kvstore_create_snapshot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 static int kvstore_create_snapshot (kvstore* store) { if (!store) return KVSTORE_ERR_NULL; const char * tmp = "data.snapshot.tmp" ; const char * snap = "data.snapshot" ; FILE* fp = fopen(tmp, "w" ); if (!fp) { perror("fopen snapshot" ); return KVSTORE_ERR_INTERNAL; } if (bptree_scan(store->tree, snapshot_write_cb, fp) != 0 ) { fclose(fp); return KVSTORE_ERR_INTERNAL; } fflush(fp); fsync(fileno(fp)); fclose(fp); if (rename(tmp, snap) != 0 ) { perror("rename snapshot" ); return KVSTORE_ERR_NULL; } return KVSTORE_OK; }
kvstore_load_snapshot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 static int kvstore_load_snapshot (kvstore* store) { if (!store) return KVSTORE_ERR_NULL; FILE* fp = fopen("data.snapshot" , "r" ); if (!fp) { return KVSTORE_OK; } char line[256 ]; int key; long value; kvstore_mode_t old_mode = store->mode; store->mode = KVSTORE_MODE_REPLAY; while (fgets(line, sizeof (line), fp)) { if (line[0 ] == '\n' || line[0 ] == '\r' ) continue ; if (sscanf (line, "PUT %d %ld" , &key, &value) == 2 ) { kvstore_apply_put_internal(store->tree, key, value, store->mode); } } store->mode = old_mode; fclose(fp); return KVSTORE_OK; }
uint32_r crc32 1 2 3 4 5 6 7 8 9 10 11 12 uint32_t crc32 (const char * s) { uint32_t crc = 0xFFFFFFFF ; while (*s) { crc ^= (unsigned char )*s++; for (int i = 0 ; i < 8 ; i++) { crc = (crc >> 1 ) ^ (0xEDB88320 & -(crc & 1 )); } } return ~crc; }
1 2 3 4 5 6 7 8 9 static int kvstore_log_header (const char * line) { if (strncmp (line, KVSTORE_LOG_VERSION, strlen (KVSTORE_LOG_VERSION)) != 0 ) { fprintf (stderr , "Invalid log version. Expected: %s\n" , KVSTORE_LOG_VERSION); return KVSTORE_ERR_CORRUPTED; } return KVSTORE_OK; }
kvstore_crc_check 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static int kvstore_crc_check (const char * payload, const char * crc_str) { if (!payload || !crc_str) return 0 ; char * end = NULL ; uint32_t crc_stored = (uint32_t )strtoul(crc_str, &end, 10 ); if (end == crc_str || *end != '\0' ) return 0 ; uint32_t crc_calc = crc32(payload); return crc_calc == crc_stored; }
kvstore_replay_log 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 static int kvstore_replay_log (kvstore* store) { if (!store || !store->tree || !store->log_fp) RETURN_ERR(KVSTORE_ERR_NULL); store->mode = KVSTORE_MODE_REPLAY; char line[256 ]; int rc = KVSTORE_OK; rewind(store->log_fp); if (!fgets(line, sizeof (line), store->log_fp)) { goto out; } if (kvstore_log_header(line) != KVSTORE_OK) { rc = KVSTORE_ERR_CORRUPTED; goto out; } while (fgets(line, sizeof (line), store->log_fp)) { if (line[0 ] == '\n' || line[0 ] == '\r' || line[0 ] == ' ' ) continue ; line[strcspn (line, "\r\n" )] = '\0' ; char * sep = strchr (line, '|' ); if (!sep) { rc = KVSTORE_OK; break ; } *sep = '\0' ; char * crc_str = sep + 1 ; if (!kvstore_crc_check(line, crc_str)) { if (feof(store->log_fp)) { rc = KVSTORE_OK; break ; } rc = KVSTORE_ERR_CORRUPTED; fprintf (stderr , "[DEBUG] CRC failed! Payload: [%s], Stored CRC: [%s]\n" , line, crc_str); break ; } int key; long val; if (sscanf (line, "PUT %d %ld" , &key, &val) == 2 ) { kvstore_apply_put_internal(store->tree, key, val, store->mode); } else if (sscanf (line, "DEL %d" , &key) == 1 ) { kvstore_apply_del_internal(store->tree, key); } else { rc = KVSTORE_ERR_CORRUPTED; break ; } } out: store->mode = KVSTORE_MODE_NORMAL; return rc; }
kvstore_strerror 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 const char * kvstore_strerror (int err) { switch (err) { case KVSTORE_OK: return "OK" ; case KVSTORE_ERR_NULL: return "null pointer" ; case KVSTORE_ERR_INTERNAL: return "internal error" ; case KVSTORE_ERR_IO: return "io error" ; case KVSTORE_ERR_READONLY: return "read-only mode" ; case KVSTORE_ERR_NOT_FOUND: return "key not found" ; case KVSTORE_ERR_CORRUPTED: return "data corrupted" ; default : return "unknown error" ; } }
kvstore_debug_set_mode 1 2 3 4 5 6 7 8 void kvstore_debug_set_mode (kvstore* store, kvstore_mode_t mode) { if (store) { store->mode = mode; fprintf (stderr , "[debug] 手动切换至 %d模式\n" , mode); } }