多后端存储设计-备份降级策略
使用多后端存储的设计,为大模型应用提供高可用性,容错性和持久性的对话历史存储,通过结合快速主数据库、持久备份和预写日志(WAL)来确保大模型应用的聊天历史永不丢失 - 即使在后端数据库出现故障或网络中断的情况下也是如此
一、为什么需要多后端
- 弹性:如果主数据库不可用,会自动切换到备份进行读取
- 持久化:所有的写入操作首先记录在 WAL 中,即使备份暂时不可用,也会保证不会丢失任何消息
- 一致性:WAL 确保所有的消息最终都会持久化到备份中,提供强大的持久化保证
二、架构设计
Excalidraw 文件:https://gcntfv628ebr.feishu.cn/file/BiIPb4jIZowZBFxMw7HctS3Inor
.zwNW1fYF.png)
- 主存储(PrimaryDB):快速的主数据库用于读写(例如:Redis、Postgres)
- 备份存储(BackupDB):持久的次级数据库用于冗余(例如:Sqlit、云数据库)
- WAL:内存中或持久的日志,用于写入持久性和异步备份
三、工作原理
1、saveMessage:保存消息
- 主数据库可用
- 写入主数据库
- 写入 WAL(必须成功确保持久性)
- WAL 会定期异步刷新道备份数据库
- 主数据库不可用
- 写入之前进行查询-防止重复写入
- 备用数据库中没有该数据,写入备用数据库
- 写入 WAL
- WAL 写入之前进行查询-防止重复写入
- 备用数据库中没有该数据,写入备用数据库
2、getHistory:获取历史记录
- 从主数据库读取
- 如果主数据库失败,则从备用数据库读取
3、clearHistory:清除历史记录:
清除所有三个:主数据库、备用数据库和 WAL
4、WAL 刷新工程线程:
按间隔运行(默认 5 秒),将处理的 WAL 条目刷新道备份数据库中
四、主备数据库同步
Excalidraw 文件:https://gcntfv628ebr.feishu.cn/file/SezSb5k2boNwFSxVJQzc9msknhe
.yE_b4VDY.png)
当主数据库出现断连,处于不可用的状态,那么这个时候备用数据库会成为“主力数据库”,数据的存储和查询都会进入到备用数据库中
因为之前的 WAL 的存在,备用数据库的数据一直是和主数据库中的数据是差不多的,所以不会受到很大的影响,这个时候服务是处于可用状态的
后续主数据库恢复,“主力数据库”又重新变为主数据库,但该数据库是丢失了一部分数据的,时间段就是(失效-恢复)这段时间
所以该同步脚本就是负责定时检查两个数据库的数据是否相同,要是不相同就进行增加补充
五、代码层实现细节
Excalidraw 文件:https://gcntfv628ebr.feishu.cn/file/SezSb5k2boNwFSxVJQzc9msknhe
.CMPXpRb3.png)
- 基础层:用于构建和管理多种不同的数据,并且给这些数据库定位分类,缓存和持久化,最后右 Manager 统一管理,对外进行实例化
- 策略层:查询不是简单的调用基础层中的查询方法,而是会进行策略查询,写入也是如此,会有 WAL 的异步写入,所以构建一个策略层供应用层调用
- 应用层:就是真正使用的地方,暴露构建参数进行实例化,同时提供读和写的方法
5.1、基础层的代码设计
缓存-Cache 设计
/**
* Cache Backend Interface
*
* Defines the contract for cache storage implementations.
* Cache backends are optimized for fast, ephemeral storage with optional TTL support.
*
* Implementations can include:
* - Redis: Distributed caching with network access
* - In-Memory: Fast local caching with no persistence
* - Memcached: Distributed memory caching system
*
* @module storage/backend/cache-backend
*/
/**
* CacheBackend Interface
*
* Provides a unified API for different cache storage implementations.
* All methods are asynchronous to support both local and network-based backends.
*
* @example
* ```typescript
* class RedisBackend implements CacheBackend {
* async get<T>(key: string): Promise<T | undefined> {
* const value = await this.redis.get(key);
* return value ? JSON.parse(value) : undefined;
* }
* // ... other methods
* }
* ```
*/
export interface CacheBackend {
// Basic operations
/**
* Retrieves a value from the cache by key
*
* @template T - The type of the cached value
* @param key - The cache key to retrieve
* @returns The cached value if found, undefined otherwise
*
* @example
* ```typescript
* const user = await cache.get<User>('user:123');
* if (!user) {
* // Cache miss - fetch from database
* }
* ```
*/
get<T>(key: string): Promise<T | undefined>;
/**
* Stores a value in the cache with optional TTL
*
* @template T - The type of the value to cache
* @param key - The cache key
* @param value - The value to cache (will be serialized)
* @param ttlSeconds - Optional time-to-live in seconds
*
* @example
* ```typescript
* // Cache for 1 hour
* await cache.set('user:123', userData, 3600);
*
* // Cache indefinitely
* await cache.set('config', configData);
* ```
*/
set<T>(key: string, value: T, ttlSeconds?: number): Promise<void>;
/**
* Removes a value from the cache
*
* @param key - The cache key to delete
*
* @example
* ```typescript
* // Invalidate user cache after update
* await cache.delete('user:123');
* ```
*/
delete(key: string): Promise<void>;
// Connection management
/**
* Establishes connection to the cache backend
*
* Should be called before performing any operations.
* Implementations should handle reconnection logic internally.
*
* @throws {StorageConnectionError} If connection fails
*
* @example
* ```typescript
* const cache = new RedisBackend(config);
* await cache.connect();
* // Now ready to use
* ```
*/
connect(): Promise<void>;
/**
* Gracefully closes the connection to the cache backend
*
* Should clean up resources and close any open connections.
* After disconnect, connect() must be called again before use.
*
* @example
* ```typescript
* // Clean shutdown
* await cache.disconnect();
* ```
*/
disconnect(): Promise<void>;
/**
* Checks if the backend is currently connected and ready
*
* @returns true if connected and operational, false otherwise
*
* @example
* ```typescript
* if (!cache.isConnected()) {
* await cache.connect();
* }
* ```
*/
isConnected(): boolean;
/**
* Returns the backend type identifier
*
* Useful for logging, monitoring, and conditional logic based on backend type.
*
* @returns Backend type string (e.g., 'redis', 'memory', 'memcached')
*
* @example
* ```typescript
* console.log(`Using ${cache.getBackendType()} for caching`);
* ```
*/
getBackendType(): string;
}Cache 缓存的设计方法
- get 方法:从缓存中获取指定的值
- set 方法:将值存储到缓存中
- delete 方法:从缓存中删除指定键的值
- disconnect 方法:优雅的关闭缓存的连接
- connect 方法:建立于缓存后端的连接
- isConnected 方法:检查缓存后端是否已连接并可用
- getBackendType 方法:返回后端类型标识符,用于日志记录,监控等
Database-持久化设计
/**
* Database Backend Interface
*
* Defines the contract for persistent storage implementations.
* Database backends are optimized for reliable, long-term data storage.
*
* Implementations can include:
* - SQLite: Lightweight, file-based database
* - PostgreSQL: Full-featured relational database
* - In-Memory: For testing or temporary persistence
*
* @module storage/backend/database-backend
*/
/**
* DatabaseBackend Interface
*
* Provides a unified API for different database storage implementations.
* Extends basic key-value operations with list operations for collections.
*
* @example
* ```typescript
* class SqliteBackend implements DatabaseBackend {
* async get<T>(key: string): Promise<T | undefined> {
* const row = await this.db.get('SELECT value FROM store WHERE key = ?', key);
* return row ? JSON.parse(row.value) : undefined;
* }
* // ... other methods
* }
* ```
*/
export interface DatabaseBackend {
// Basic key-value operations
/**
* Retrieves a value from the database by key
*
* @template T - The type of the stored value
* @param key - The storage key to retrieve
* @returns The stored value if found, undefined otherwise
*
* @example
* ```typescript
* const settings = await db.get<AppSettings>('app:settings');
* ```
*/
get<T>(key: string): Promise<T | undefined>;
/**
* Stores a value in the database
*
* Unlike cache, database storage is persistent and doesn't support TTL.
*
* @template T - The type of the value to store
* @param key - The storage key
* @param value - The value to store (will be serialized)
*
* @example
* ```typescript
* await db.set('user:123', userData);
* ```
*/
set<T>(key: string, value: T): Promise<void>;
/**
* Removes a value from the database
*
* @param key - The storage key to delete
*
* @example
* ```typescript
* await db.delete('user:123');
* ```
*/
delete(key: string): Promise<void>;
// Collection operations
/**
* Lists all keys matching a prefix
*
* Useful for finding related data or implementing namespaces.
*
* @param prefix - The key prefix to search for
* @returns Array of keys matching the prefix
*
* @example
* ```typescript
* // Get all user keys
* const userKeys = await db.list('user:');
* // Returns: ['user:123', 'user:456', ...]
* ```
*/
list(prefix: string): Promise<string[]>;
// List/Array operations
/**
* Appends an item to a list stored at the given key
*
* Creates the list if it doesn't exist. Useful for logs, history, etc.
*
* @template T - The type of items in the list
* @param key - The storage key for the list
* @param item - The item to append
*
* @example
* ```typescript
* // Add to user's activity log
* await db.append('activity:user:123', {
* action: 'login',
* timestamp: Date.now()
* });
* ```
*/
append<T>(key: string, item: T): Promise<void>;
/**
* Retrieves a range of items from a list
*
* Supports pagination through stored lists.
*
* @template T - The type of items in the list
* @param key - The storage key for the list
* @param start - Starting index (0-based)
* @param count - Number of items to retrieve
* @returns Array of items in the specified range
*
* @example
* ```typescript
* // Get latest 10 activities
* const activities = await db.getRange('activity:user:123', 0, 10);
* ```
*/
getRange<T>(key: string, start: number, count: number): Promise<T[]>;
// Connection management
/**
* Establishes connection to the database backend
*
* Should be called before performing any operations.
* May create database schema/tables if needed.
*
* @throws {StorageConnectionError} If connection fails
*
* @example
* ```typescript
* const db = new SqliteBackend(config);
* await db.connect();
* ```
*/
connect(): Promise<void>;
/**
* Gracefully closes the database connection
*
* Should ensure all pending writes are completed before closing.
*
* @example
* ```typescript
* await db.disconnect();
* ```
*/
disconnect(): Promise<void>;
/**
* Checks if the backend is currently connected
*
* @returns true if connected and operational, false otherwise
*/
isConnected(): boolean;
/**
* Returns the backend type identifier
*
* @returns Backend type string (e.g., 'sqlite', 'postgresql', 'memory')
*/
getBackendType(): string;
}Database 持久化的方法
- get 方法:从缓存中获取指定的值
- set 方法:将值存储到缓存中
- delete 方法:从缓存中删除指定键的值
- list 方法:列出所有匹配指定前缀的键,使用场景用于命名空间
- append 方法:向存储的列表追加元素,支持列表数据结构,用于消息队列的实现,历史记录的维护
- getRange 方法:获取列表中指定范围的元素
- disconnect 方法:优雅的关闭缓存的连接
- connect 方法:建立于缓存后端的连接
- isConnected 方法:检查缓存后端是否已连接并可用
- getBackendType 方法:返回后端类型标识符,用于日志记录,监控等
postgresql 的代码示例
/**
* PostgreSQL Database Backend Implementation
*
* High-performance PostgreSQL backend for persistent, reliable storage.
* Supports connection pooling, prepared statements, and comprehensive error handling.
*
* Features:
* - Connection pooling with configurable settings
* - Prepared statements for SQL injection protection
* - Schema creation and management
* - Comprehensive error handling
* - Performance monitoring and logging
* - Support for both connection URLs and individual parameters
*
* @module storage/backend/postgresql
*/
import { Pool, type PoolConfig } from 'pg';
import type { DatabaseBackend } from './database-backend.js';
import type { PostgresBackendConfig } from '../config.js';
import { StorageError, StorageConnectionError } from './types.js';
import { createLogger, type Logger } from '../../logger/index.js';
/**
* PostgreSQL Database Backend
*
* Implements the DatabaseBackend interface using PostgreSQL as the underlying storage.
* Provides ACID compliance, strong consistency, and enterprise-grade reliability.
*
* Key Features:
* - Connection pooling for high concurrency
* - Prepared statements for performance and security
* - Automatic schema creation and management
* - Full CRUD operations with atomic transactions
* - Comprehensive error handling and logging
*
* @example
* ```typescript
* const backend = new PostgresBackend({
* type: 'postgres',
* host: 'localhost',
* port: 5432,
* database: 'myapp',
* user: 'postgres',
* password: 'secret'
* });
*
* await backend.connect();
* await backend.set('user:123', userData);
* const user = await backend.get('user:123');
* await backend.disconnect();
* ```
*/
export class PostgresBackend implements DatabaseBackend {
private pool: Pool | undefined;
private connected = false;
private readonly config: PostgresBackendConfig;
private readonly logger: Logger;
// Prepared statement cache
private statements: Map<string, string> = new Map();
constructor(config: PostgresBackendConfig) {
this.config = config;
this.logger = createLogger();
this.initializeStatements();
}
/**
* Initialize prepared statement definitions
*/
private initializeStatements(): void {
this.statements.set('get', 'SELECT value FROM cipher_store WHERE key = $1');
this.statements.set(
'set',
`
INSERT INTO cipher_store (key, value, created_at, updated_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at
`
);
this.statements.set('delete', 'DELETE FROM cipher_store WHERE key = $1');
this.statements.set('list', 'SELECT key FROM cipher_store WHERE key LIKE $1 ORDER BY key');
this.statements.set(
'listAppend',
`
INSERT INTO cipher_lists (key, value, position, created_at)
VALUES ($1, $2, COALESCE((SELECT MAX(position) + 1 FROM cipher_lists WHERE key = $1), 0), $3)
`
);
this.statements.set(
'getRange',
`
SELECT value FROM cipher_lists
WHERE key = $1
ORDER BY position
LIMIT $2 OFFSET $3
`
);
this.statements.set('deleteList', 'DELETE FROM cipher_lists WHERE key = $1');
this.statements.set(
'updateListMetadata',
`
INSERT INTO cipher_list_metadata (key, count, created_at, updated_at)
VALUES ($1, (SELECT COUNT(*) FROM cipher_lists WHERE key = $1), $2, $3)
ON CONFLICT (key) DO UPDATE SET
count = (SELECT COUNT(*) FROM cipher_lists WHERE key = $1),
updated_at = EXCLUDED.updated_at
`
);
}
/**
* Connect to PostgreSQL database
*/
async connect(): Promise<void> {
if (this.connected) {
this.logger.debug('PostgreSQL backend already connected');
return;
}
try {
this.logger.info('Connecting to PostgreSQL database');
// Build connection configuration
const poolConfig = this.buildPoolConfig();
// Create connection pool
this.pool = new Pool(poolConfig);
// Test connection
const client = await this.pool.connect();
try {
await client.query('SELECT 1');
this.logger.debug('PostgreSQL connection test successful');
} finally {
client.release();
}
// Create tables if they don't exist
await this.createTables();
this.connected = true;
this.logger.info('PostgreSQL backend connected successfully', {
host: this.config.host || 'localhost',
database: this.config.database,
pool: {
max: poolConfig.max,
min: poolConfig.min,
},
});
} catch (error) {
this.logger.error('Failed to connect to PostgreSQL database', {
error: error instanceof Error ? error.message : String(error),
});
throw new StorageConnectionError(
'Failed to connect to PostgreSQL database',
'postgres',
error as Error
);
}
}
/**
* Disconnect from PostgreSQL database
*/
async disconnect(): Promise<void> {
if (!this.connected || !this.pool) {
return;
}
try {
await this.pool.end();
this.connected = false;
this.pool = undefined;
this.logger.info('PostgreSQL backend disconnected');
} catch (error) {
this.logger.error('Error disconnecting from PostgreSQL', {
error: error instanceof Error ? error.message : String(error),
});
throw new StorageError('Failed to disconnect from PostgreSQL', 'disconnect', error as Error);
}
}
/**
* Check if backend is connected
*/
isConnected(): boolean {
return this.connected && this.pool !== undefined;
}
/**
* Get backend type identifier
*/
getBackendType(): string {
return 'postgres';
}
/**
* Get value by key
*/
async get<T>(key: string): Promise<T | undefined> {
this.checkConnection();
try {
const result = await this.pool!.query(this.statements.get('get')!, [key]);
if (result.rows.length === 0) {
return undefined;
}
const serialized = result.rows[0].value;
return JSON.parse(serialized) as T;
} catch (error) {
this.logger.error('Error getting value from PostgreSQL', {
key,
error: error instanceof Error ? error.message : String(error),
});
throw new StorageError('Failed to get value from PostgreSQL', 'get', error as Error);
}
}
/**
* Set key-value pair
*/
async set<T>(key: string, value: T): Promise<void> {
this.checkConnection();
try {
const serialized = JSON.stringify(value);
const now = new Date();
await this.pool!.query(this.statements.get('set')!, [key, serialized, now, now]);
} catch (error) {
this.logger.error('Error setting value in PostgreSQL', {
key,
error: error instanceof Error ? error.message : String(error),
});
throw new StorageError('Failed to set value in PostgreSQL', 'set', error as Error);
}
}
/**
* Delete key
*/
async delete(key: string): Promise<void> {
this.checkConnection();
try {
// Use transaction to delete from both key-value store and lists
const client = await this.pool!.connect();
try {
await client.query('BEGIN');
await client.query(this.statements.get('delete')!, [key]);
await client.query(this.statements.get('deleteList')!, [key]);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
} catch (error) {
this.logger.error('Error deleting value from PostgreSQL', {
key,
error: error instanceof Error ? error.message : String(error),
});
throw new StorageError('Failed to delete value from PostgreSQL', 'delete', error as Error);
}
}
/**
* List keys by prefix
*/
async list(prefix: string): Promise<string[]> {
this.checkConnection();
try {
const result = await this.pool!.query(this.statements.get('list')!, [prefix + '%']);
return result.rows.map(row => row.key);
} catch (error) {
this.logger.error('Error listing keys from PostgreSQL', {
prefix,
error: error instanceof Error ? error.message : String(error),
});
throw new StorageError('Failed to list keys from PostgreSQL', 'list', error as Error);
}
}
/**
* Append item to list
*/
async append<T>(key: string, item: T): Promise<void> {
this.checkConnection();
try {
const client = await this.pool!.connect();
try {
await client.query('BEGIN');
const serialized = JSON.stringify(item);
const now = new Date();
await client.query(this.statements.get('listAppend')!, [key, serialized, now]);
await client.query(this.statements.get('updateListMetadata')!, [key, now, now]);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
} catch (error) {
this.logger.error('Error appending to list in PostgreSQL', {
key,
error: error instanceof Error ? error.message : String(error),
});
throw new StorageError('Failed to append to list in PostgreSQL', 'append', error as Error);
}
}
/**
* Get range of items from list
*/
async getRange<T>(key: string, start: number, count: number): Promise<T[]> {
this.checkConnection();
try {
const result = await this.pool!.query(this.statements.get('getRange')!, [key, count, start]);
return result.rows.map(row => JSON.parse(row.value) as T);
} catch (error) {
this.logger.error('Error getting range from PostgreSQL', {
key,
start,
count,
error: error instanceof Error ? error.message : String(error),
});
throw new StorageError('Failed to get range from PostgreSQL', 'getRange', error as Error);
}
}
/**
* Get database information
*/
getInfo(): Record<string, any> {
const baseInfo = {
type: 'postgres',
connected: this.connected,
};
if (!this.connected || !this.pool) {
return baseInfo;
}
return {
...baseInfo,
pool: {
totalCount: this.pool.totalCount,
idleCount: this.pool.idleCount,
waitingCount: this.pool.waitingCount,
},
config: {
host: this.config.host || 'localhost',
port: this.config.port || 5432,
database: this.config.database,
ssl: this.config.ssl || false,
},
};
}
/**
* Perform database maintenance
*/
async maintenance(): Promise<void> {
this.checkConnection();
try {
this.logger.info('Running PostgreSQL maintenance');
// Analyze tables for query optimization
await this.pool!.query('ANALYZE cipher_store');
await this.pool!.query('ANALYZE cipher_lists');
await this.pool!.query('ANALYZE cipher_list_metadata');
this.logger.info('PostgreSQL maintenance completed successfully');
} catch (error) {
this.logger.error('Error during PostgreSQL maintenance', {
error: error instanceof Error ? error.message : String(error),
});
throw new StorageError('PostgreSQL maintenance failed', 'maintenance', error as Error);
}
}
// Private helper methods
/**
* Build connection pool configuration
*/
private buildPoolConfig(): PoolConfig {
// If URL is provided, use it
if (this.config.url) {
return {
connectionString: this.config.url,
max: this.config.pool?.max || this.config.maxConnections || 10,
min: this.config.pool?.min || 2,
idleTimeoutMillis:
this.config.pool?.idleTimeoutMillis || this.config.idleTimeoutMillis || 30000,
connectionTimeoutMillis:
this.config.pool?.acquireTimeoutMillis || this.config.connectionTimeoutMillis || 10000,
ssl: this.config.ssl,
};
}
// Build from individual parameters
return {
host: this.config.host || 'localhost',
port: this.config.port || 5432,
database: this.config.database,
user: this.config.user,
password: this.config.password,
max: this.config.pool?.max || this.config.maxConnections || 10,
min: this.config.pool?.min || 2,
idleTimeoutMillis:
this.config.pool?.idleTimeoutMillis || this.config.idleTimeoutMillis || 30000,
connectionTimeoutMillis:
this.config.pool?.acquireTimeoutMillis || this.config.connectionTimeoutMillis || 10000,
ssl: this.config.ssl,
};
}
/**
* Create database tables
*/
private async createTables(): Promise<void> {
if (!this.pool) {
throw new StorageError('Database not connected', 'createTables');
}
// Key-value store table
await this.pool.query(`
CREATE TABLE IF NOT EXISTS cipher_store (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
)
`);
// Lists table for list operations
await this.pool.query(`
CREATE TABLE IF NOT EXISTS cipher_lists (
key TEXT NOT NULL,
value TEXT NOT NULL,
position INTEGER NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
PRIMARY KEY (key, position)
)
`);
// List metadata table
await this.pool.query(`
CREATE TABLE IF NOT EXISTS cipher_list_metadata (
key TEXT PRIMARY KEY,
count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
)
`);
// Create indexes for performance
await this.pool.query(
'CREATE INDEX IF NOT EXISTS idx_cipher_store_updated_at ON cipher_store(updated_at)'
);
await this.pool.query('CREATE INDEX IF NOT EXISTS idx_cipher_lists_key ON cipher_lists(key)');
await this.pool.query(
'CREATE INDEX IF NOT EXISTS idx_cipher_lists_created_at ON cipher_lists(created_at)'
);
this.logger.debug('PostgreSQL tables and indexes created');
}
/**
* Check if backend is connected
*/
private checkConnection(): void {
if (!this.connected || !this.pool) {
throw new StorageError('PostgreSQL backend not connected', 'checkConnection');
}
}
}5.2、管理层的设计
其是存储系统的核心编排器,负责管理双后端存储架构
/**
* Storage Manager Implementation
*
* Orchestrates the dual-backend storage system with cache and database backends.
* Provides lazy loading, graceful fallbacks, and connection management.
*
* @module storage/manager
*/
import type { CacheBackend, DatabaseBackend, StorageBackends, StorageConfig } from './types.js';
import { StorageSchema } from './config.js';
import { Logger, createLogger } from '../logger/index.js';
import {
LOG_PREFIXES,
ERROR_MESSAGES,
TIMEOUTS,
HEALTH_CHECK,
BACKEND_TYPES,
} from './constants.js';
/**
* Health check result for storage backends
*/
export interface HealthCheckResult {
cache: boolean;
database: boolean;
overall: boolean;
details?: {
cache?: { status: string; latency?: number; error?: string };
database?: { status: string; latency?: number; error?: string };
};
}
/**
* Storage system information
*/
export interface StorageInfo {
connected: boolean;
backends: {
cache: {
type: string;
connected: boolean;
fallback: boolean;
};
database: {
type: string;
connected: boolean;
fallback: boolean;
};
};
connectionAttempts: number;
lastError: string | undefined;
}
/**
* Storage Manager
*
* Manages the lifecycle of storage backends with lazy loading and fallback support.
* Follows the factory pattern with graceful degradation to in-memory storage.
*
* @example
* ```typescript
* const manager = new StorageManager(config);
* const { cache, database } = await manager.connect();
*
* // Use backends
* await cache.set('key', value, 300);
* await database.set('user:123', userData);
*
* // Cleanup
* await manager.disconnect();
* ```
*/
export class StorageManager {
// Core state
private cache: CacheBackend | undefined;
private database: DatabaseBackend | undefined;
private connected = false;
private readonly config: StorageConfig;
private readonly logger: Logger;
// Connection tracking
private connectionAttempts = 0;
private lastConnectionError?: Error;
// Backend metadata
private cacheMetadata = {
type: 'unknown',
isFallback: false,
connectionTime: 0,
};
private databaseMetadata = {
type: 'unknown',
isFallback: false,
connectionTime: 0,
};
// Lazy loading module references (static to share across instances)
private static redisModule?: any;
private static sqliteModule?: any;
private static postgresModule?: any;
// Health check configuration
private readonly healthCheckKey = HEALTH_CHECK.KEY;
private readonly healthCheckTimeout = TIMEOUTS.HEALTH_CHECK;
/**
* Creates a new StorageManager instance
*
* @param config - Storage configuration with cache and database backend configs
* @throws {Error} If configuration is invalid
*/
constructor(config: StorageConfig) {
// Validate configuration using Zod schema
const validationResult = StorageSchema.safeParse(config);
if (!validationResult.success) {
throw new Error(
`${ERROR_MESSAGES.INVALID_CONFIG}: ${validationResult.error.errors
.map(e => `${e.path.join('.')}: ${e.message}`)
.join(', ')}`
);
}
this.config = validationResult.data;
this.logger = createLogger({
level: process.env.LOG_LEVEL || 'info',
});
this.logger.info(`${LOG_PREFIXES.MANAGER} Initialized with configuration`, {
cacheType: this.config.cache.type,
databaseType: this.config.database.type,
});
}
/**
* Get the current storage configuration
*
* @returns The storage configuration
*/
public getConfig(): Readonly<StorageConfig> {
return this.config;
}
/**
* Get information about the storage system
*
* @returns Storage system information including connection status and backend types
*/
public getInfo(): StorageInfo {
return {
connected: this.connected,
backends: {
cache: {
type: this.cacheMetadata.type,
connected: this.cache?.isConnected() ?? false,
fallback: this.cacheMetadata.isFallback,
},
database: {
type: this.databaseMetadata.type,
connected: this.database?.isConnected() ?? false,
fallback: this.databaseMetadata.isFallback,
},
},
connectionAttempts: this.connectionAttempts,
lastError: this.lastConnectionError?.message,
};
}
/**
* Get the current storage backends if connected
*
* @returns The storage backends or null if not connected
*/
public getBackends(): StorageBackends | null {
if (!this.connected || !this.cache || !this.database) {
return null;
}
return {
cache: this.cache,
database: this.database,
};
}
/**
* Check if the storage manager is connected
*
* @returns true if both backends are connected
*/
public isConnected(): boolean {
return (
this.connected && this.cache?.isConnected() === true && this.database?.isConnected() === true
);
}
// Placeholder methods for next phases
/**
* Connect to storage backends
*
* @returns The connected storage backends
* @throws {StorageConnectionError} If strict backends fail to connect
*/
public async connect(): Promise<StorageBackends> {
// Check if already connected
if (this.connected) {
this.logger.debug(`${LOG_PREFIXES.MANAGER} Already connected`, {
cacheType: this.cacheMetadata.type,
databaseType: this.databaseMetadata.type,
});
return {
cache: this.cache!,
database: this.database!,
};
}
this.connectionAttempts++;
this.logger.info(
`${LOG_PREFIXES.MANAGER} Starting connection attempt ${this.connectionAttempts}`
);
try {
// Create and connect cache backend
const cacheStartTime = Date.now();
try {
this.cache = await this.createCacheBackend();
await this.cache.connect();
this.cacheMetadata.connectionTime = Date.now() - cacheStartTime;
this.logger.info(`${LOG_PREFIXES.CACHE} Connected successfully`, {
type: this.cacheMetadata.type,
isFallback: this.cacheMetadata.isFallback,
connectionTime: `${this.cacheMetadata.connectionTime}ms`,
});
} catch (cacheError) {
// If the configured backend fails, try fallback to in-memory
this.logger.warn(`${LOG_PREFIXES.CACHE} Connection failed, attempting fallback`, {
error: cacheError instanceof Error ? cacheError.message : String(cacheError),
originalType: this.config.cache.type,
});
if (this.config.cache.type !== BACKEND_TYPES.IN_MEMORY) {
const { InMemoryBackend } = await import('./backend/in-memory.js');
this.cache = new InMemoryBackend();
await this.cache.connect();
this.cacheMetadata.type = BACKEND_TYPES.IN_MEMORY;
this.cacheMetadata.isFallback = true;
this.cacheMetadata.connectionTime = Date.now() - cacheStartTime;
this.logger.info(`${LOG_PREFIXES.CACHE} Connected to fallback backend`, {
type: this.cacheMetadata.type,
originalType: this.config.cache.type,
});
} else {
throw cacheError; // Re-throw if already using in-memory
}
}
// Create and connect database backend
const dbStartTime = Date.now();
try {
this.database = await this.createDatabaseBackend();
await this.database.connect();
this.databaseMetadata.connectionTime = Date.now() - dbStartTime;
this.logger.info(`${LOG_PREFIXES.DATABASE} Connected successfully`, {
type: this.databaseMetadata.type,
isFallback: this.databaseMetadata.isFallback,
connectionTime: `${this.databaseMetadata.connectionTime}ms`,
});
} catch (err) {
this.logger.error('Failed to connect to SQLite database', err);
// If the configured backend fails, try fallback to in-memory
this.logger.warn(`${LOG_PREFIXES.DATABASE} Connection failed, attempting fallback`, {
error: err instanceof Error ? err.message : String(err),
originalType: this.config.database.type,
});
if (this.config.database.type !== BACKEND_TYPES.IN_MEMORY) {
const { InMemoryBackend } = await import('./backend/in-memory.js');
this.database = new InMemoryBackend();
await this.database.connect();
this.databaseMetadata.type = BACKEND_TYPES.IN_MEMORY;
this.databaseMetadata.isFallback = true;
this.databaseMetadata.connectionTime = Date.now() - dbStartTime;
this.logger.info(`${LOG_PREFIXES.DATABASE} Connected to fallback backend`, {
type: this.databaseMetadata.type,
originalType: this.config.database.type,
});
} else {
throw err; // Re-throw if already using in-memory
}
}
this.connected = true;
this.logger.info(`${LOG_PREFIXES.MANAGER} Storage system connected`, {
cacheBackend: this.cacheMetadata.type,
databaseBackend: this.databaseMetadata.type,
totalConnectionTime: `${this.cacheMetadata.connectionTime + this.databaseMetadata.connectionTime}ms`,
});
return {
cache: this.cache!,
database: this.database!,
};
} catch (error) {
// Store error for reporting
this.lastConnectionError = error as Error;
// Disconnect any successfully connected backends
if (this.cache?.isConnected()) {
await this.cache.disconnect().catch(err =>
this.logger.error(`${LOG_PREFIXES.CACHE} Error during cleanup disconnect`, {
error: err,
})
);
}
if (this.database?.isConnected()) {
await this.database.disconnect().catch(err =>
this.logger.error(`${LOG_PREFIXES.DATABASE} Error during cleanup disconnect`, {
error: err,
})
);
}
// Reset state
this.cache = undefined;
this.database = undefined;
this.connected = false;
throw error;
}
}
/**
* Disconnect from all storage backends
*/
public async disconnect(): Promise<void> {
if (!this.connected) {
this.logger.debug(`${LOG_PREFIXES.MANAGER} Already disconnected`);
return;
}
this.logger.info(`${LOG_PREFIXES.MANAGER} Disconnecting storage backends`);
const disconnectPromises: Promise<void>[] = [];
// Disconnect cache backend
if (this.cache?.isConnected()) {
disconnectPromises.push(
this.cache
.disconnect()
.then(() => {
this.logger.info(`${LOG_PREFIXES.CACHE} Disconnected successfully`);
})
.catch(error => {
this.logger.error(`${LOG_PREFIXES.CACHE} Disconnect error`, { error });
throw error;
})
);
}
// Disconnect database backend
if (this.database?.isConnected()) {
disconnectPromises.push(
this.database
.disconnect()
.then(() => {
this.logger.info(`${LOG_PREFIXES.DATABASE} Disconnected successfully`);
})
.catch(error => {
this.logger.error(`${LOG_PREFIXES.DATABASE} Disconnect error`, { error });
throw error;
})
);
}
// Wait for all disconnects with timeout
try {
await Promise.race([
Promise.all(disconnectPromises),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Disconnect timeout')), TIMEOUTS.SHUTDOWN)
),
]);
} finally {
// Always clean up state
this.cache = undefined;
this.database = undefined;
this.connected = false;
// Reset metadata
this.cacheMetadata = {
type: 'unknown',
isFallback: false,
connectionTime: 0,
};
this.databaseMetadata = {
type: 'unknown',
isFallback: false,
connectionTime: 0,
};
this.logger.info(`${LOG_PREFIXES.MANAGER} Storage system disconnected`);
}
}
/**
* Perform health check on all backends
*
* @returns Health check results for each backend
*/
public async healthCheck(): Promise<HealthCheckResult> {
// Implementation in Phase 5
throw new Error('Not implemented yet - Phase 5');
}
// Private helper methods
/**
* Create cache backend based on configuration
*/
private async createCacheBackend(): Promise<CacheBackend> {
const config = this.config.cache;
this.logger.debug(`${LOG_PREFIXES.CACHE} Creating backend`, { type: config.type });
switch (config.type) {
case BACKEND_TYPES.REDIS: {
try {
// Lazy load Redis module
if (!StorageManager.redisModule) {
this.logger.debug(`${LOG_PREFIXES.CACHE} Lazy loading Redis module`);
const { RedisBackend } = await import('./backend/redis-backend.js');
StorageManager.redisModule = RedisBackend;
}
const RedisBackend = StorageManager.redisModule;
this.cacheMetadata.type = BACKEND_TYPES.REDIS;
this.cacheMetadata.isFallback = false;
return new RedisBackend(config);
} catch (error) {
this.logger.debug(`${LOG_PREFIXES.CACHE} Failed to create Redis backend`, {
error: error instanceof Error ? error.message : String(error),
});
throw error; // Let connection handler deal with fallback
}
}
case BACKEND_TYPES.IN_MEMORY:
default: {
// Use in-memory backend
const { InMemoryBackend } = await import('./backend/in-memory.js');
this.cacheMetadata.type = BACKEND_TYPES.IN_MEMORY;
this.cacheMetadata.isFallback = false;
return new InMemoryBackend();
}
}
}
/**
* Create database backend based on configuration
*/
private async createDatabaseBackend(): Promise<DatabaseBackend> {
const config = this.config.database;
this.logger.debug(`${LOG_PREFIXES.DATABASE} Creating backend`, { type: config.type });
switch (config.type) {
case BACKEND_TYPES.SQLITE: {
try {
// Lazy load SQLite module
if (!StorageManager.sqliteModule) {
this.logger.debug(`${LOG_PREFIXES.DATABASE} Lazy loading SQLite module`);
const { SqliteBackend } = await import('./backend/sqlite.js');
StorageManager.sqliteModule = SqliteBackend;
}
const SqliteBackend = StorageManager.sqliteModule;
this.databaseMetadata.type = BACKEND_TYPES.SQLITE;
this.databaseMetadata.isFallback = false;
return new SqliteBackend(config);
} catch (error) {
this.logger.debug(`${LOG_PREFIXES.DATABASE} Failed to create SQLite backend`, {
error: error instanceof Error ? error.message : String(error),
});
throw error; // Let connection handler deal with fallback
}
}
case BACKEND_TYPES.POSTGRES: {
try {
// Lazy load PostgreSQL module
if (!StorageManager.postgresModule) {
this.logger.debug(`${LOG_PREFIXES.DATABASE} Lazy loading PostgreSQL module`);
const { PostgresBackend } = await import('./backend/postgresql.js');
StorageManager.postgresModule = PostgresBackend;
}
const PostgresBackend = StorageManager.postgresModule;
this.databaseMetadata.type = BACKEND_TYPES.POSTGRES;
this.databaseMetadata.isFallback = false;
return new PostgresBackend(config);
} catch (error) {
this.logger.debug(`${LOG_PREFIXES.DATABASE} Failed to create PostgreSQL backend`, {
error: error instanceof Error ? error.message : String(error),
});
throw error; // Let connection handler deal with fallback
}
}
case BACKEND_TYPES.IN_MEMORY:
default: {
// Use in-memory backend
const { InMemoryBackend } = await import('./backend/in-memory.js');
this.databaseMetadata.type = BACKEND_TYPES.IN_MEMORY;
this.databaseMetadata.isFallback = false;
return new InMemoryBackend();
}
}
}
}管理层的方法设计
- connect 方法:建立存储连接
- disconnect 方法:断开存储连接
- getBackends 方法:获取存储后端
- getInfo 方法:获取系统信息
- isConnected 方法:检查连接状态
- healthCheck 方法:健康检查
import { StorageManager } from
'./src/core/storage/manager.js';
import type { StorageConfig } from
'./src/core/storage/types.js';
// 1. 配置存储系统
const config: StorageConfig = {
cache: {
type: 'redis',
host: 'localhost',
port: 6379,
db: 0,
keyPrefix: 'app:cache:'
},
database: {
type: 'sqlite',
path: './data/app.db'
}
};
// 2. 创建并连接存储管理器
async function initializeStorage() {
const manager = new StorageManager(config);
try {
// 连接到存储后端(自动处理降级)
const { cache, database } = await
manager.connect();
// 检查连接状态
const info = manager.getInfo();
console.log('Storage Status:', {
cacheType: info.backends.cache.type,
cacheFallback:
info.backends.cache.fallback,
databaseType:
info.backends.database.type,
databaseFallback:
info.backends.database.fallback
});
return { manager, cache, database };
} catch (error) {
console.error('Failed to initialize
storage:', error);
throw error;
}
}
// 3. 使用存储系统
async function useStorage() {
const { manager, cache, database } = await
initializeStorage();
try {
// === 缓存操作(临时数据,支持TTL)===
// 缓存用户会话(1小时过期)
await cache.set('session:user123', {
userId: '123',
username: 'john',
loginTime: Date.now()
}, 3600);
// 读取缓存
const session = await
cache.get('session:user123');
console.log('Session:', session);
// === 数据库操作(持久化数据)===
// 保存用户配置(永久存储)
await database.set('config:user123', {
theme: 'dark',
language: 'en',
notifications: true
});
// 读取配置
const config = await
database.get('config:user123');
console.log('User config:', config);
// 列表操作 - 记录用户活动
await database.append('activity:user123',
{
action: 'login',
timestamp: Date.now(),
ip: '192.168.1.1'
});
await database.append('activity:user123',
{
action: 'view_profile',
timestamp: Date.now(),
ip: '192.168.1.1'
});
// 获取最近的活动记录
const activities = await
database.getRange('activity:user123', 0, 10);
console.log('Recent activities:',
activities);
// 查找所有用户配置键
const configKeys = await
database.list('config:');
console.log('All config keys:',
configKeys);
} finally {
// 清理:断开连接
await manager.disconnect();
}
}5.3、策略层的设计
策略创建工厂,用于床和组装存储系统中的各种策略组件,它的核心作用是集中管理不同的策略创建逻辑,让系统能够灵活的切换和组合不同的存储行为策略
主要步骤如下:
- 创建了策略工厂,用于实例化策略方法
- 构建读策略,写策略等多种策略函数和模块
//策略工厂
import { ReadStrategy, ReadStrategyConfig } from '../interfaces/read-strategy.js';
import { WriteStrategy, WriteStrategyConfig } from '../interfaces/write-strategy.js';
import { HealthManager, HealthManagerConfig } from '../interfaces/health-manager.js';
import { WALManager } from '../interfaces/wal-manager.js';
// 策略实现导入
import { PrimaryFirstReadStrategy } from '../implementations/read/primary-first-read.js';
import { IdempotentWriteStrategy } from '../implementations/write/idempotent-write.js';
import { CircuitBreakerHealthManager } from '../implementations/health/circuit-breaker-health.js';
import { RedisStreamWALManager, RedisStreamWALConfig } from '../implementations/wal/redis-stream-wal.js';
/**
* 策略工厂
*
* 负责创建和配置各种策略实例
*/
export class StrategyFactory {
/**
* 创建读取策略
*/
static createReadStrategy(
type: ReadStrategyType,
config: ReadStrategyConfig,
healthManager: HealthManager
): ReadStrategy {
switch (type) {
case 'primary-first':
return new PrimaryFirstReadStrategy(config, healthManager);
case 'cache-enhanced':
// TODO: 实现缓存增强读策略
throw new Error('Cache-enhanced read strategy not implemented yet');
case 'failover':
// TODO: 实现故障转移读策略
throw new Error('Failover read strategy not implemented yet');
default:
throw new Error(`Unknown read strategy type: ${type}`);
}
}
/**
* 创建写入策略
*/
static createWriteStrategy(
type: WriteStrategyType,
config: WriteStrategyConfig,
healthManager: HealthManager
): WriteStrategy {
switch (type) {
case 'idempotent':
return new IdempotentWriteStrategy(config, healthManager);
case 'wal-first':
// TODO: 实现WAL优先写策略
throw new Error('WAL-first write strategy not implemented yet');
case 'dual-write':
// TODO: 实现双写策略
throw new Error('Dual-write strategy not implemented yet');
default:
throw new Error(`Unknown write strategy type: ${type}`);
}
}
/**
* 创建健康管理器
*/
static createHealthManager(
type: HealthManagerType,
config: HealthManagerConfig
): HealthManager {
switch (type) {
case 'circuit-breaker':
return new CircuitBreakerHealthManager(config);
case 'simple':
// TODO: 实现简单健康管理器
throw new Error('Simple health manager not implemented yet');
default:
throw new Error(`Unknown health manager type: ${type}`);
}
}
/**
* 创建WAL管理器
*/
static createWALManager(
type: WALManagerType,
config: WALManagerConfig
): WALManager {
switch (type) {
case 'redis-stream':
return new RedisStreamWALManager(config as RedisStreamWALConfig);
case 'file':
// TODO: 实现文件WAL
throw new Error('File WAL manager not implemented yet');
case 'in-memory':
// TODO: 实现内存WAL(仅用于开发)
throw new Error('In-memory WAL manager not implemented yet');
default:
throw new Error(`Unknown WAL manager type: ${type}`);
}
}
/**
* 创建完整的策略集合
*/
static createStrategySet(config: StrategySetConfig): StrategySet {
// 1. 创建健康管理器(其他策略依赖它)
const healthManager = this.createHealthManager(
config.health.type,
config.health.config
);
// 2. 创建读写策略
const readStrategy = this.createReadStrategy(
config.read.type,
config.read.config,
healthManager
);
const writeStrategy = this.createWriteStrategy(
config.write.type,
config.write.config,
healthManager
);
// 3. 创建WAL管理器(可选)
let walManager: WALManager | undefined;
if (config.wal) {
walManager = this.createWALManager(
config.wal.type,
config.wal.config
);
}
return {
readStrategy,
writeStrategy,
healthManager,
walManager
};
}
}
// 类型定义
export type ReadStrategyType = 'primary-first' | 'cache-enhanced' | 'failover';
export type WriteStrategyType = 'idempotent' | 'wal-first' | 'dual-write';
export type HealthManagerType = 'circuit-breaker' | 'simple';
export type WALManagerType = 'redis-stream' | 'file' | 'in-memory';
export interface StrategySetConfig {
read: {
type: ReadStrategyType;
config: ReadStrategyConfig;
};
write: {
type: WriteStrategyType;
config: WriteStrategyConfig;
};
health: {
type: HealthManagerType;
config: HealthManagerConfig;
};
wal?: {
type: WALManagerType;
config: WALManagerConfig;
};
}
export interface StrategySet {
readStrategy: ReadStrategy;
writeStrategy: WriteStrategy;
healthManager: HealthManager;
walManager?: WALManager;
}
export type WALManagerConfig = RedisStreamWALConfig | FileWALConfig | InMemoryWALConfig;
// 各种WAL配置类型
export interface FileWALConfig {
filePath: string;
maxFileSize: number;
rotateOnSize: boolean;
flushIntervalMs?: number;
}
export interface InMemoryWALConfig {
maxEntries: number;
flushIntervalMs?: number;
}//读策略接口设计 - read-strategy
export interface ReadStrategy {
/**
* 执行读取操作
* @param key 要读取的键
* @param managers 可用的存储管理器集合
* @returns 读取的数据
*/
execute(key: string, managers: StorageManagerSet): Promise<any>;
/**
* 策略名称
*/
readonly name: string;
/**
* 策略配置
*/
readonly config: ReadStrategyConfig;
}
export interface ReadStrategyConfig {
/** 是否将空结果视为失败 */
treatEmptyAsFailure?: boolean;
/** 最大重试次数 */
maxRetries?: number;
/** 重试延迟(ms) */
retryDelay?: number;
/** 是否启用缓存合并 */
enableCacheMerge?: boolean;
/** 读取超时时间(ms) */
timeout?: number;
}
export interface ReadContext {
key: string;
managers: StorageManagerSet;
attempt: number;
startTime: number;
metadata?: Record<string, any>;
}
export interface StorageManagerSet {
primary: StorageManager;
backup?: StorageManager;
cache?: StorageManager;
}
//读策略方法的实际实现
import {
ReadStrategy,
ReadStrategyConfig,
ReadContext,
StorageManagerSet
} from '../interfaces/read-strategy.js';
import { HealthManager } from '../interfaces/health-manager.js';
/**
* 主存储优先读取策略
*
* 策略:
* 1. 优先尝试主存储
* 2. 主存储失败或无数据时,尝试备用存储
* 3. 支持熔断器模式,避免频繁访问故障存储
*/
export class PrimaryFirstReadStrategy implements ReadStrategy {
readonly name = 'primary-first-read';
constructor(
public readonly config: ReadStrategyConfig,
private healthManager: HealthManager
) {}
async execute(key: string, managers: StorageManagerSet): Promise<any> {
const context: ReadContext = {
key,
managers,
attempt: 1,
startTime: Date.now()
};
// 策略1: 尝试主存储
const primaryResult = await this.tryPrimary(context);
if (primaryResult.success) {
return primaryResult.data;
}
// 策略2: 主存储失败,尝试备用存储
if (managers.backup) {
const backupResult = await this.tryBackup(context);
if (backupResult.success) {
return backupResult.data;
}
}
// 策略3: 都失败了,抛出错误
throw new Error(`All storage backends failed for key: ${key}`);
}
private async tryPrimary(context: ReadContext): Promise<StorageResult> {
if (!this.healthManager.isHealthy('primary')) {
return { success: false, reason: 'primary-unhealthy' };
}
try {
const backends = context.managers.primary.getBackends();
if (!backends) {
throw new Error('Primary storage not connected');
}
const result = await this.executeWithTimeout(
() => backends.database.get(context.key),
this.config.timeout || 5000
);
// 配置决定:空结果是否视为失败
if (result === null && this.config.treatEmptyAsFailure) {
throw new Error('Primary returned empty result');
}
// 记录成功
this.healthManager.markSuccess('primary', 'read', Date.now() - context.startTime);
return { success: true, data: result };
} catch (error) {
// 记录失败
this.healthManager.markFailure('primary', error as Error, 'read');
return {
success: false,
reason: 'primary-failed',
error: (error as Error).message
};
}
}
private async tryBackup(context: ReadContext): Promise<StorageResult> {
if (!this.healthManager.isHealthy('backup')) {
return { success: false, reason: 'backup-unhealthy' };
}
try {
const backends = context.managers.backup!.getBackends();
if (!backends) {
throw new Error('Backup storage not connected');
}
const result = await this.executeWithTimeout(
() => backends.database.get(context.key),
this.config.timeout || 5000
);
// 记录成功
this.healthManager.markSuccess('backup', 'read', Date.now() - context.startTime);
return { success: true, data: result };
} catch (error) {
// 记录失败
this.healthManager.markFailure('backup', error as Error, 'read');
return {
success: false,
reason: 'backup-failed',
error: (error as Error).message
};
}
}
private async executeWithTimeout<T>(
operation: () => Promise<T>,
timeoutMs: number
): Promise<T> {
return Promise.race([
operation(),
new Promise<T>((_, reject) => {
setTimeout(() => reject(new Error('Operation timeout')), timeoutMs);
})
]);
}
}
interface StorageResult {
success: boolean;
data?: any;
reason?: string;
error?: string;
}策略工厂中的方法:
- 读取策略:根据传入的参数,调用策略层中的相应的读取策略
- 写入策略:根据传入的参数,调用写入层的相应的写入策略
- 健康管理器:监控存储后端的健康状态,应对出现断连状态时及时察觉和更换组件
- WAL 管理器:预写入日志
- 创建完整策略集合:该方法一次性创建所有需要的策略组件,并处理它们之间的依赖关系
5.4、应用层的设计
import { StorageManager } from '../../../../../src/core/storage/manager.js'; // 引用现有的StorageManager
import { ReadStrategy } from '../interfaces/read-strategy.js';
import { WriteStrategy } from '../interfaces/write-strategy.js';
import { HealthManager } from '../interfaces/health-manager.js';
import { WALManager } from '../interfaces/wal-manager.js';
/**
* 策略化存储管理器
*
* 职责:
* 1. 协调多个StorageManager实例
* 2. 注入和管理各种策略
* 3. 提供统一的高级存储接口
* 4. 维护连接生命周期
*/
export class StrategicStorageManager {
private managers: StorageManagerSet;
private connected = false;
constructor(
private config: StrategicStorageConfig,
private readStrategy: ReadStrategy,
private writeStrategy: WriteStrategy,
private healthManager: HealthManager,
private walManager?: WALManager
) {
this.managers = {
primary: new StorageManager(config.primary),
backup: config.backup ? new StorageManager(config.backup) : undefined,
cache: config.cache ? new StorageManager(config.cache) : undefined
};
}
/**
* 连接所有存储管理器
*/
async connect(): Promise<void> {
if (this.connected) return;
try {
// 并行连接所有管理器
const connections = [
this.managers.primary.connect()
];
if (this.managers.backup) {
connections.push(this.managers.backup.connect());
}
if (this.managers.cache) {
connections.push(this.managers.cache.connect());
}
await Promise.all(connections);
// 启动WAL和健康检查
if (this.walManager) {
await this.walManager.startFlushWorker();
}
this.healthManager.startHealthCheck();
this.connected = true;
console.log('Strategic storage manager connected successfully');
} catch (error) {
console.error('Failed to connect strategic storage manager:', error);
throw error;
}
}
/**
* 断开所有连接
*/
async disconnect(): Promise<void> {
if (!this.connected) return;
try {
// 停止后台服务
this.healthManager.stopHealthCheck();
if (this.walManager) {
await this.walManager.stopFlushWorker();
}
// 断开所有管理器
const disconnections = [
this.managers.primary.disconnect()
];
if (this.managers.backup) {
disconnections.push(this.managers.backup.disconnect());
}
if (this.managers.cache) {
disconnections.push(this.managers.cache.disconnect());
}
await Promise.all(disconnections);
this.connected = false;
console.log('Strategic storage manager disconnected');
} catch (error) {
console.error('Error during strategic storage manager disconnect:', error);
throw error;
}
}
/**
* 策略化读取操作
*/
async get(key: string): Promise<any> {
this.ensureConnected();
return await this.readStrategy.execute(key, this.managers);
}
/**
* 策略化写入操作
*/
async set(key: string, value: any): Promise<void> {
this.ensureConnected();
return await this.writeStrategy.execute(key, value, this.managers, this.walManager);
}
/**
* 批量读取操作
*/
async mget(keys: string[]): Promise<Record<string, any>> {
this.ensureConnected();
const results: Record<string, any> = {};
// 并行读取所有键
const promises = keys.map(async key => {
try {
const value = await this.readStrategy.execute(key, this.managers);
return { key, value, success: true };
} catch (error) {
return { key, error, success: false };
}
});
const completed = await Promise.allSettled(promises);
for (const result of completed) {
if (result.status === 'fulfilled' && result.value.success) {
results[result.value.key] = result.value.value;
}
}
return results;
}
/**
* 批量写入操作
*/
async mset(entries: Record<string, any>): Promise<void> {
this.ensureConnected();
// 并行写入所有条目
const promises = Object.entries(entries).map(([key, value]) =>
this.writeStrategy.execute(key, value, this.managers, this.walManager)
);
await Promise.all(promises);
}
/**
* 删除操作
*/
async delete(key: string): Promise<void> {
this.ensureConnected();
// 使用写入策略处理删除操作
await this.writeStrategy.execute(key, null, this.managers, this.walManager);
}
/**
* 检查键是否存在
*/
async exists(key: string): Promise<boolean> {
try {
const value = await this.get(key);
return value !== null && value !== undefined;
} catch {
return false;
}
}
/**
* 获取健康状态
*/
getHealthStatus(): Record<string, any> {
return {
connected: this.connected,
backends: this.healthManager.getAllHealthStats(),
walStats: this.walManager?.getStats()
};
}
/**
* 获取存储管理器(用于直接访问)
*/
getStorageManagers(): StorageManagerSet {
return this.managers;
}
/**
* 获取底层backend(保持兼容性)
*/
getBackends(type: 'primary' | 'backup' | 'cache' = 'primary') {
const manager = this.managers[type];
return manager ? manager.getBackends() : null;
}
/**
* 强制刷新WAL
*/
async flushWAL(): Promise<void> {
if (this.walManager) {
const unflushed = await this.walManager.getUnflushedEntries();
console.log(`Flushing ${unflushed.length} WAL entries`);
// WAL刷新逻辑会在后台工作进程中处理
}
}
/**
* 清理过期的WAL条目
*/
async cleanupWAL(beforeTimestamp?: number): Promise<number> {
if (!this.walManager) return 0;
const timestamp = beforeTimestamp || Date.now() - 24 * 60 * 60 * 1000; // 24小时前
return await this.walManager.cleanup(timestamp);
}
private ensureConnected(): void {
if (!this.connected) {
throw new Error('Strategic storage manager not connected');
}
}
}
export interface StrategicStorageConfig {
primary: StorageConfig;
backup?: StorageConfig;
cache?: StorageConfig;
}
export interface StorageManagerSet {
primary: StorageManager;
backup?: StorageManager;
cache?: StorageManager;
}
// 从现有项目导入配置类型
interface StorageConfig {
database: {
type: 'postgresql' | 'sqlite' | 'redis' | 'in-memory';
[key: string]: any;
};
cache?: {
type: 'redis' | 'in-memory';
[key: string]: any;
};
}- connect 方法:连接所有存储器
- disconnect 方法:断开所有连接
- get 方法:策略化读取,其调用的是策略层的方法
- set 方法:策略化写入,其调用的也是策略层的方法
- delete 方法:删除数据
- mget 方法:批量读取-并行运行
- mset 方法:批量写入-并行运行
- exists 方法:检查键是否存在
- getHealthStatus:获取健康状态
- flushWAL 方法:强制刷新 WAL
- cleanupWAL 方法:清理过期 WAL
- getStorageManagers 方法:提供访问存储管理器
- getBackends 方法:获取后端,可以获取特定后端
应用层结合策略层的使用就非常的简单,借助策略工厂实例化策略模块,供应用层调用
/**
* 基础使用示例
*
* 展示如何使用策略化存储管理器进行基本的读写操作
*/
async function basicUsageExample() {
console.log('=== 基础使用示例 ===');
// 1. 配置存储
const storageConfig: StrategicStorageConfig = {
// 主存储:PostgreSQL
primary: {
database: {
type: 'postgresql',
url: 'postgresql://user:pass@localhost:5432/cipher_primary'
}
},
// 备用存储:SQLite
backup: {
database: {
type: 'sqlite',
path: './data',
database: 'cipher-backup.db'
}
}
};
// 2. 配置策略
const strategyConfig: StrategySetConfig = {
read: {
type: 'primary-first',
config: {
treatEmptyAsFailure: false,
timeout: 5000
}
},
write: {
type: 'idempotent',
config: {
enableIdempotency: true,
timeout: 5000
}
},
health: {
type: 'circuit-breaker',
config: {
failureThreshold: 5,
failureWindow: 60000,
circuitOpenDuration: 30000,
recoverySuccessThreshold: 3,
healthCheckInterval: 10000,
maxFailureHistory: 100,
latencyWindowSize: 50
}
}
};
// 3. 创建策略集合
const strategies = StrategyFactory.createStrategySet(strategyConfig);
// 4. 创建管理器 - 应用层的创建
const manager = new StrategicStorageManager(
storageConfig,
strategies.readStrategy,
strategies.writeStrategy,
strategies.healthManager,
strategies.walManager
);
try {
// 5. 连接
await manager.connect();
console.log('✅ 存储管理器已连接');
// 6. 基本写入操作
await manager.set('user:123', {
id: 123,
name: 'John Doe',
email: 'john@example.com',
createdAt: new Date().toISOString()
});
console.log('✅ 用户数据已写入');
// 7. 基本读取操作
const user = await manager.get('user:123');
console.log('✅ 用户数据已读取:', user);
// 8. 批量操作
await manager.mset({
'session:abc': { userId: 123, token: 'xyz', expiresAt: Date.now() + 3600000 },
'session:def': { userId: 456, token: 'uvw', expiresAt: Date.now() + 3600000 }
});
console.log('✅ 批量写入完成');
const sessions = await manager.mget(['session:abc', 'session:def']);
console.log('✅ 批量读取完成:', sessions);
// 9. 检查健康状态
const health = manager.getHealthStatus();
console.log('✅ 健康状态:', JSON.stringify(health, null, 2));
} catch (error) {
console.error('❌ 操作失败:', error);
} finally {
// 10. 清理
await manager.disconnect();
console.log('✅ 存储管理器已断开');
}
}六、最佳实践
- 使用快速、低延迟的数据库作为主数据库,例如 Redis、Postgres
- 使用一个高度耐用、可能比较慢的数据库进行备份,例如 S3,SQlit,云数据库
- 在生产环境中,考虑使用持久的 WAL,而不仅仅是内存中,以实现最大程度的持久性