Skip to content

多后端存储设计-备份降级策略

使用多后端存储的设计,为大模型应用提供高可用性,容错性和持久性的对话历史存储,通过结合快速主数据库、持久备份和预写日志(WAL)来确保大模型应用的聊天历史永不丢失 - 即使在后端数据库出现故障或网络中断的情况下也是如此

一、为什么需要多后端

  • 弹性:如果主数据库不可用,会自动切换到备份进行读取
  • 持久化:所有的写入操作首先记录在 WAL 中,即使备份暂时不可用,也会保证不会丢失任何消息
  • 一致性:WAL 确保所有的消息最终都会持久化到备份中,提供强大的持久化保证

二、架构设计

Excalidraw 文件:https://gcntfv628ebr.feishu.cn/file/BiIPb4jIZowZBFxMw7HctS3Inor

  1. 主存储(PrimaryDB):快速的主数据库用于读写(例如:Redis、Postgres)
  2. 备份存储(BackupDB):持久的次级数据库用于冗余(例如:Sqlit、云数据库)
  3. WAL:内存中或持久的日志,用于写入持久性和异步备份

三、工作原理

1、saveMessage:保存消息

  • 主数据库可用
    • 写入主数据库
    • 写入 WAL(必须成功确保持久性)
    • WAL 会定期异步刷新道备份数据库
  • 主数据库不可用
    • 写入之前进行查询-防止重复写入
    • 备用数据库中没有该数据,写入备用数据库
    • 写入 WAL
    • WAL 写入之前进行查询-防止重复写入
    • 备用数据库中没有该数据,写入备用数据库

2、getHistory:获取历史记录

  • 从主数据库读取
  • 如果主数据库失败,则从备用数据库读取

3、clearHistory:清除历史记录:

清除所有三个:主数据库、备用数据库和 WAL

4、WAL 刷新工程线程:

按间隔运行(默认 5 秒),将处理的 WAL 条目刷新道备份数据库中

四、主备数据库同步

Excalidraw 文件:https://gcntfv628ebr.feishu.cn/file/SezSb5k2boNwFSxVJQzc9msknhe

当主数据库出现断连,处于不可用的状态,那么这个时候备用数据库会成为“主力数据库”,数据的存储和查询都会进入到备用数据库中

因为之前的 WAL 的存在,备用数据库的数据一直是和主数据库中的数据是差不多的,所以不会受到很大的影响,这个时候服务是处于可用状态的

后续主数据库恢复,“主力数据库”又重新变为主数据库,但该数据库是丢失了一部分数据的,时间段就是(失效-恢复)这段时间

所以该同步脚本就是负责定时检查两个数据库的数据是否相同,要是不相同就进行增加补充


五、代码层实现细节

Excalidraw 文件:https://gcntfv628ebr.feishu.cn/file/SezSb5k2boNwFSxVJQzc9msknhe

  1. 基础层:用于构建和管理多种不同的数据,并且给这些数据库定位分类,缓存和持久化,最后右 Manager 统一管理,对外进行实例化
  2. 策略层:查询不是简单的调用基础层中的查询方法,而是会进行策略查询,写入也是如此,会有 WAL 的异步写入,所以构建一个策略层供应用层调用
  3. 应用层:就是真正使用的地方,暴露构建参数进行实例化,同时提供读和写的方法

5.1、基础层的代码设计

缓存-Cache 设计

typescript
/**
 * Cache Backend Interface
 *
 * Defines the contract for cache storage implementations.
 * Cache backends are optimized for fast, ephemeral storage with optional TTL support.
 *
 * Implementations can include:
 * - Redis: Distributed caching with network access
 * - In-Memory: Fast local caching with no persistence
 * - Memcached: Distributed memory caching system
 *
 * @module storage/backend/cache-backend
 */

/**
 * CacheBackend Interface
 *
 * Provides a unified API for different cache storage implementations.
 * All methods are asynchronous to support both local and network-based backends.
 *
 * @example
 * ```typescript
 * class RedisBackend implements CacheBackend {
 *   async get<T>(key: string): Promise<T | undefined> {
 *     const value = await this.redis.get(key);
 *     return value ? JSON.parse(value) : undefined;
 *   }
 *   // ... other methods
 * }
 * ```
 */
export interface CacheBackend {
	// Basic operations

	/**
	 * Retrieves a value from the cache by key
	 *
	 * @template T - The type of the cached value
	 * @param key - The cache key to retrieve
	 * @returns The cached value if found, undefined otherwise
	 *
	 * @example
	 * ```typescript
	 * const user = await cache.get<User>('user:123');
	 * if (!user) {
	 *   // Cache miss - fetch from database
	 * }
	 * ```
	 */
	get<T>(key: string): Promise<T | undefined>;

	/**
	 * Stores a value in the cache with optional TTL
	 *
	 * @template T - The type of the value to cache
	 * @param key - The cache key
	 * @param value - The value to cache (will be serialized)
	 * @param ttlSeconds - Optional time-to-live in seconds
	 *
	 * @example
	 * ```typescript
	 * // Cache for 1 hour
	 * await cache.set('user:123', userData, 3600);
	 *
	 * // Cache indefinitely
	 * await cache.set('config', configData);
	 * ```
	 */
	set<T>(key: string, value: T, ttlSeconds?: number): Promise<void>;

	/**
	 * Removes a value from the cache
	 *
	 * @param key - The cache key to delete
	 *
	 * @example
	 * ```typescript
	 * // Invalidate user cache after update
	 * await cache.delete('user:123');
	 * ```
	 */
	delete(key: string): Promise<void>;

	// Connection management

	/**
	 * Establishes connection to the cache backend
	 *
	 * Should be called before performing any operations.
	 * Implementations should handle reconnection logic internally.
	 *
	 * @throws {StorageConnectionError} If connection fails
	 *
	 * @example
	 * ```typescript
	 * const cache = new RedisBackend(config);
	 * await cache.connect();
	 * // Now ready to use
	 * ```
	 */
	connect(): Promise<void>;

	/**
	 * Gracefully closes the connection to the cache backend
	 *
	 * Should clean up resources and close any open connections.
	 * After disconnect, connect() must be called again before use.
	 *
	 * @example
	 * ```typescript
	 * // Clean shutdown
	 * await cache.disconnect();
	 * ```
	 */
	disconnect(): Promise<void>;

	/**
	 * Checks if the backend is currently connected and ready
	 *
	 * @returns true if connected and operational, false otherwise
	 *
	 * @example
	 * ```typescript
	 * if (!cache.isConnected()) {
	 *   await cache.connect();
	 * }
	 * ```
	 */
	isConnected(): boolean;

	/**
	 * Returns the backend type identifier
	 *
	 * Useful for logging, monitoring, and conditional logic based on backend type.
	 *
	 * @returns Backend type string (e.g., 'redis', 'memory', 'memcached')
	 *
	 * @example
	 * ```typescript
	 * console.log(`Using ${cache.getBackendType()} for caching`);
	 * ```
	 */
	getBackendType(): string;
}

Cache 缓存的设计方法

  1. get 方法:从缓存中获取指定的值
  2. set 方法:将值存储到缓存中
  3. delete 方法:从缓存中删除指定键的值
  4. disconnect 方法:优雅的关闭缓存的连接
  5. connect 方法:建立于缓存后端的连接
  6. isConnected 方法:检查缓存后端是否已连接并可用
  7. getBackendType 方法:返回后端类型标识符,用于日志记录,监控等

Database-持久化设计

typescript
/**
 * Database Backend Interface
 *
 * Defines the contract for persistent storage implementations.
 * Database backends are optimized for reliable, long-term data storage.
 *
 * Implementations can include:
 * - SQLite: Lightweight, file-based database
 * - PostgreSQL: Full-featured relational database
 * - In-Memory: For testing or temporary persistence
 *
 * @module storage/backend/database-backend
 */

/**
 * DatabaseBackend Interface
 *
 * Provides a unified API for different database storage implementations.
 * Extends basic key-value operations with list operations for collections.
 *
 * @example
 * ```typescript
 * class SqliteBackend implements DatabaseBackend {
 *   async get<T>(key: string): Promise<T | undefined> {
 *     const row = await this.db.get('SELECT value FROM store WHERE key = ?', key);
 *     return row ? JSON.parse(row.value) : undefined;
 *   }
 *   // ... other methods
 * }
 * ```
 */
export interface DatabaseBackend {
	// Basic key-value operations

	/**
	 * Retrieves a value from the database by key
	 *
	 * @template T - The type of the stored value
	 * @param key - The storage key to retrieve
	 * @returns The stored value if found, undefined otherwise
	 *
	 * @example
	 * ```typescript
	 * const settings = await db.get<AppSettings>('app:settings');
	 * ```
	 */
	get<T>(key: string): Promise<T | undefined>;

	/**
	 * Stores a value in the database
	 *
	 * Unlike cache, database storage is persistent and doesn't support TTL.
	 *
	 * @template T - The type of the value to store
	 * @param key - The storage key
	 * @param value - The value to store (will be serialized)
	 *
	 * @example
	 * ```typescript
	 * await db.set('user:123', userData);
	 * ```
	 */
	set<T>(key: string, value: T): Promise<void>;

	/**
	 * Removes a value from the database
	 *
	 * @param key - The storage key to delete
	 *
	 * @example
	 * ```typescript
	 * await db.delete('user:123');
	 * ```
	 */
	delete(key: string): Promise<void>;

	// Collection operations

	/**
	 * Lists all keys matching a prefix
	 *
	 * Useful for finding related data or implementing namespaces.
	 *
	 * @param prefix - The key prefix to search for
	 * @returns Array of keys matching the prefix
	 *
	 * @example
	 * ```typescript
	 * // Get all user keys
	 * const userKeys = await db.list('user:');
	 * // Returns: ['user:123', 'user:456', ...]
	 * ```
	 */
	list(prefix: string): Promise<string[]>;

	// List/Array operations

	/**
	 * Appends an item to a list stored at the given key
	 *
	 * Creates the list if it doesn't exist. Useful for logs, history, etc.
	 *
	 * @template T - The type of items in the list
	 * @param key - The storage key for the list
	 * @param item - The item to append
	 *
	 * @example
	 * ```typescript
	 * // Add to user's activity log
	 * await db.append('activity:user:123', {
	 *   action: 'login',
	 *   timestamp: Date.now()
	 * });
	 * ```
	 */
	append<T>(key: string, item: T): Promise<void>;

	/**
	 * Retrieves a range of items from a list
	 *
	 * Supports pagination through stored lists.
	 *
	 * @template T - The type of items in the list
	 * @param key - The storage key for the list
	 * @param start - Starting index (0-based)
	 * @param count - Number of items to retrieve
	 * @returns Array of items in the specified range
	 *
	 * @example
	 * ```typescript
	 * // Get latest 10 activities
	 * const activities = await db.getRange('activity:user:123', 0, 10);
	 * ```
	 */
	getRange<T>(key: string, start: number, count: number): Promise<T[]>;

	// Connection management

	/**
	 * Establishes connection to the database backend
	 *
	 * Should be called before performing any operations.
	 * May create database schema/tables if needed.
	 *
	 * @throws {StorageConnectionError} If connection fails
	 *
	 * @example
	 * ```typescript
	 * const db = new SqliteBackend(config);
	 * await db.connect();
	 * ```
	 */
	connect(): Promise<void>;

	/**
	 * Gracefully closes the database connection
	 *
	 * Should ensure all pending writes are completed before closing.
	 *
	 * @example
	 * ```typescript
	 * await db.disconnect();
	 * ```
	 */
	disconnect(): Promise<void>;

	/**
	 * Checks if the backend is currently connected
	 *
	 * @returns true if connected and operational, false otherwise
	 */
	isConnected(): boolean;

	/**
	 * Returns the backend type identifier
	 *
	 * @returns Backend type string (e.g., 'sqlite', 'postgresql', 'memory')
	 */
	getBackendType(): string;
}

Database 持久化的方法

  1. get 方法:从缓存中获取指定的值
  2. set 方法:将值存储到缓存中
  3. delete 方法:从缓存中删除指定键的值
  4. list 方法:列出所有匹配指定前缀的键,使用场景用于命名空间
  5. append 方法:向存储的列表追加元素,支持列表数据结构,用于消息队列的实现,历史记录的维护
  6. getRange 方法:获取列表中指定范围的元素
  7. disconnect 方法:优雅的关闭缓存的连接
  8. connect 方法:建立于缓存后端的连接
  9. isConnected 方法:检查缓存后端是否已连接并可用
  10. getBackendType 方法:返回后端类型标识符,用于日志记录,监控等

postgresql 的代码示例

typescript
/**
 * PostgreSQL Database Backend Implementation
 *
 * High-performance PostgreSQL backend for persistent, reliable storage.
 * Supports connection pooling, prepared statements, and comprehensive error handling.
 *
 * Features:
 * - Connection pooling with configurable settings
 * - Prepared statements for SQL injection protection
 * - Schema creation and management
 * - Comprehensive error handling
 * - Performance monitoring and logging
 * - Support for both connection URLs and individual parameters
 *
 * @module storage/backend/postgresql
 */

import { Pool, type PoolConfig } from 'pg';
import type { DatabaseBackend } from './database-backend.js';
import type { PostgresBackendConfig } from '../config.js';
import { StorageError, StorageConnectionError } from './types.js';
import { createLogger, type Logger } from '../../logger/index.js';

/**
 * PostgreSQL Database Backend
 *
 * Implements the DatabaseBackend interface using PostgreSQL as the underlying storage.
 * Provides ACID compliance, strong consistency, and enterprise-grade reliability.
 *
 * Key Features:
 * - Connection pooling for high concurrency
 * - Prepared statements for performance and security
 * - Automatic schema creation and management
 * - Full CRUD operations with atomic transactions
 * - Comprehensive error handling and logging
 *
 * @example
 * ```typescript
 * const backend = new PostgresBackend({
 *   type: 'postgres',
 *   host: 'localhost',
 *   port: 5432,
 *   database: 'myapp',
 *   user: 'postgres',
 *   password: 'secret'
 * });
 *
 * await backend.connect();
 * await backend.set('user:123', userData);
 * const user = await backend.get('user:123');
 * await backend.disconnect();
 * ```
 */
export class PostgresBackend implements DatabaseBackend {
	private pool: Pool | undefined;
	private connected = false;
	private readonly config: PostgresBackendConfig;
	private readonly logger: Logger;

	// Prepared statement cache
	private statements: Map<string, string> = new Map();

	constructor(config: PostgresBackendConfig) {
		this.config = config;
		this.logger = createLogger();
		this.initializeStatements();
	}

	/**
	 * Initialize prepared statement definitions
	 */
	private initializeStatements(): void {
		this.statements.set('get', 'SELECT value FROM cipher_store WHERE key = $1');
		this.statements.set(
			'set',
			`
			INSERT INTO cipher_store (key, value, created_at, updated_at)
			VALUES ($1, $2, $3, $4)
			ON CONFLICT (key) DO UPDATE SET
				value = EXCLUDED.value,
				updated_at = EXCLUDED.updated_at
		`
		);
		this.statements.set('delete', 'DELETE FROM cipher_store WHERE key = $1');
		this.statements.set('list', 'SELECT key FROM cipher_store WHERE key LIKE $1 ORDER BY key');
		this.statements.set(
			'listAppend',
			`
			INSERT INTO cipher_lists (key, value, position, created_at)
			VALUES ($1, $2, COALESCE((SELECT MAX(position) + 1 FROM cipher_lists WHERE key = $1), 0), $3)
		`
		);
		this.statements.set(
			'getRange',
			`
			SELECT value FROM cipher_lists
			WHERE key = $1
			ORDER BY position
			LIMIT $2 OFFSET $3
		`
		);
		this.statements.set('deleteList', 'DELETE FROM cipher_lists WHERE key = $1');
		this.statements.set(
			'updateListMetadata',
			`
			INSERT INTO cipher_list_metadata (key, count, created_at, updated_at)
			VALUES ($1, (SELECT COUNT(*) FROM cipher_lists WHERE key = $1), $2, $3)
			ON CONFLICT (key) DO UPDATE SET
				count = (SELECT COUNT(*) FROM cipher_lists WHERE key = $1),
				updated_at = EXCLUDED.updated_at
		`
		);
	}

	/**
	 * Connect to PostgreSQL database
	 */
	async connect(): Promise<void> {
		if (this.connected) {
			this.logger.debug('PostgreSQL backend already connected');
			return;
		}

		try {
			this.logger.info('Connecting to PostgreSQL database');

			// Build connection configuration
			const poolConfig = this.buildPoolConfig();

			// Create connection pool
			this.pool = new Pool(poolConfig);

			// Test connection
			const client = await this.pool.connect();
			try {
				await client.query('SELECT 1');
				this.logger.debug('PostgreSQL connection test successful');
			} finally {
				client.release();
			}

			// Create tables if they don't exist
			await this.createTables();

			this.connected = true;
			this.logger.info('PostgreSQL backend connected successfully', {
				host: this.config.host || 'localhost',
				database: this.config.database,
				pool: {
					max: poolConfig.max,
					min: poolConfig.min,
				},
			});
		} catch (error) {
			this.logger.error('Failed to connect to PostgreSQL database', {
				error: error instanceof Error ? error.message : String(error),
			});
			throw new StorageConnectionError(
				'Failed to connect to PostgreSQL database',
				'postgres',
				error as Error
			);
		}
	}

	/**
	 * Disconnect from PostgreSQL database
	 */
	async disconnect(): Promise<void> {
		if (!this.connected || !this.pool) {
			return;
		}

		try {
			await this.pool.end();
			this.connected = false;
			this.pool = undefined;
			this.logger.info('PostgreSQL backend disconnected');
		} catch (error) {
			this.logger.error('Error disconnecting from PostgreSQL', {
				error: error instanceof Error ? error.message : String(error),
			});
			throw new StorageError('Failed to disconnect from PostgreSQL', 'disconnect', error as Error);
		}
	}

	/**
	 * Check if backend is connected
	 */
	isConnected(): boolean {
		return this.connected && this.pool !== undefined;
	}

	/**
	 * Get backend type identifier
	 */
	getBackendType(): string {
		return 'postgres';
	}

	/**
	 * Get value by key
	 */
	async get<T>(key: string): Promise<T | undefined> {
		this.checkConnection();

		try {
			const result = await this.pool!.query(this.statements.get('get')!, [key]);

			if (result.rows.length === 0) {
				return undefined;
			}

			const serialized = result.rows[0].value;
			return JSON.parse(serialized) as T;
		} catch (error) {
			this.logger.error('Error getting value from PostgreSQL', {
				key,
				error: error instanceof Error ? error.message : String(error),
			});
			throw new StorageError('Failed to get value from PostgreSQL', 'get', error as Error);
		}
	}

	/**
	 * Set key-value pair
	 */
	async set<T>(key: string, value: T): Promise<void> {
		this.checkConnection();

		try {
			const serialized = JSON.stringify(value);
			const now = new Date();
			await this.pool!.query(this.statements.get('set')!, [key, serialized, now, now]);
		} catch (error) {
			this.logger.error('Error setting value in PostgreSQL', {
				key,
				error: error instanceof Error ? error.message : String(error),
			});
			throw new StorageError('Failed to set value in PostgreSQL', 'set', error as Error);
		}
	}

	/**
	 * Delete key
	 */
	async delete(key: string): Promise<void> {
		this.checkConnection();

		try {
			// Use transaction to delete from both key-value store and lists
			const client = await this.pool!.connect();
			try {
				await client.query('BEGIN');
				await client.query(this.statements.get('delete')!, [key]);
				await client.query(this.statements.get('deleteList')!, [key]);
				await client.query('COMMIT');
			} catch (error) {
				await client.query('ROLLBACK');
				throw error;
			} finally {
				client.release();
			}
		} catch (error) {
			this.logger.error('Error deleting value from PostgreSQL', {
				key,
				error: error instanceof Error ? error.message : String(error),
			});
			throw new StorageError('Failed to delete value from PostgreSQL', 'delete', error as Error);
		}
	}

	/**
	 * List keys by prefix
	 */
	async list(prefix: string): Promise<string[]> {
		this.checkConnection();

		try {
			const result = await this.pool!.query(this.statements.get('list')!, [prefix + '%']);
			return result.rows.map(row => row.key);
		} catch (error) {
			this.logger.error('Error listing keys from PostgreSQL', {
				prefix,
				error: error instanceof Error ? error.message : String(error),
			});
			throw new StorageError('Failed to list keys from PostgreSQL', 'list', error as Error);
		}
	}

	/**
	 * Append item to list
	 */
	async append<T>(key: string, item: T): Promise<void> {
		this.checkConnection();

		try {
			const client = await this.pool!.connect();
			try {
				await client.query('BEGIN');

				const serialized = JSON.stringify(item);
				const now = new Date();

				await client.query(this.statements.get('listAppend')!, [key, serialized, now]);
				await client.query(this.statements.get('updateListMetadata')!, [key, now, now]);

				await client.query('COMMIT');
			} catch (error) {
				await client.query('ROLLBACK');
				throw error;
			} finally {
				client.release();
			}
		} catch (error) {
			this.logger.error('Error appending to list in PostgreSQL', {
				key,
				error: error instanceof Error ? error.message : String(error),
			});
			throw new StorageError('Failed to append to list in PostgreSQL', 'append', error as Error);
		}
	}

	/**
	 * Get range of items from list
	 */
	async getRange<T>(key: string, start: number, count: number): Promise<T[]> {
		this.checkConnection();

		try {
			const result = await this.pool!.query(this.statements.get('getRange')!, [key, count, start]);
			return result.rows.map(row => JSON.parse(row.value) as T);
		} catch (error) {
			this.logger.error('Error getting range from PostgreSQL', {
				key,
				start,
				count,
				error: error instanceof Error ? error.message : String(error),
			});
			throw new StorageError('Failed to get range from PostgreSQL', 'getRange', error as Error);
		}
	}

	/**
	 * Get database information
	 */
	getInfo(): Record<string, any> {
		const baseInfo = {
			type: 'postgres',
			connected: this.connected,
		};

		if (!this.connected || !this.pool) {
			return baseInfo;
		}

		return {
			...baseInfo,
			pool: {
				totalCount: this.pool.totalCount,
				idleCount: this.pool.idleCount,
				waitingCount: this.pool.waitingCount,
			},
			config: {
				host: this.config.host || 'localhost',
				port: this.config.port || 5432,
				database: this.config.database,
				ssl: this.config.ssl || false,
			},
		};
	}

	/**
	 * Perform database maintenance
	 */
	async maintenance(): Promise<void> {
		this.checkConnection();

		try {
			this.logger.info('Running PostgreSQL maintenance');

			// Analyze tables for query optimization
			await this.pool!.query('ANALYZE cipher_store');
			await this.pool!.query('ANALYZE cipher_lists');
			await this.pool!.query('ANALYZE cipher_list_metadata');

			this.logger.info('PostgreSQL maintenance completed successfully');
		} catch (error) {
			this.logger.error('Error during PostgreSQL maintenance', {
				error: error instanceof Error ? error.message : String(error),
			});
			throw new StorageError('PostgreSQL maintenance failed', 'maintenance', error as Error);
		}
	}

	// Private helper methods

	/**
	 * Build connection pool configuration
	 */
	private buildPoolConfig(): PoolConfig {
		// If URL is provided, use it
		if (this.config.url) {
			return {
				connectionString: this.config.url,
				max: this.config.pool?.max || this.config.maxConnections || 10,
				min: this.config.pool?.min || 2,
				idleTimeoutMillis:
					this.config.pool?.idleTimeoutMillis || this.config.idleTimeoutMillis || 30000,
				connectionTimeoutMillis:
					this.config.pool?.acquireTimeoutMillis || this.config.connectionTimeoutMillis || 10000,
				ssl: this.config.ssl,
			};
		}

		// Build from individual parameters
		return {
			host: this.config.host || 'localhost',
			port: this.config.port || 5432,
			database: this.config.database,
			user: this.config.user,
			password: this.config.password,
			max: this.config.pool?.max || this.config.maxConnections || 10,
			min: this.config.pool?.min || 2,
			idleTimeoutMillis:
				this.config.pool?.idleTimeoutMillis || this.config.idleTimeoutMillis || 30000,
			connectionTimeoutMillis:
				this.config.pool?.acquireTimeoutMillis || this.config.connectionTimeoutMillis || 10000,
			ssl: this.config.ssl,
		};
	}

	/**
	 * Create database tables
	 */
	private async createTables(): Promise<void> {
		if (!this.pool) {
			throw new StorageError('Database not connected', 'createTables');
		}

		// Key-value store table
		await this.pool.query(`
			CREATE TABLE IF NOT EXISTS cipher_store (
				key TEXT PRIMARY KEY,
				value TEXT NOT NULL,
				created_at TIMESTAMP WITH TIME ZONE NOT NULL,
				updated_at TIMESTAMP WITH TIME ZONE NOT NULL
			)
		`);

		// Lists table for list operations
		await this.pool.query(`
			CREATE TABLE IF NOT EXISTS cipher_lists (
				key TEXT NOT NULL,
				value TEXT NOT NULL,
				position INTEGER NOT NULL,
				created_at TIMESTAMP WITH TIME ZONE NOT NULL,
				PRIMARY KEY (key, position)
			)
		`);

		// List metadata table
		await this.pool.query(`
			CREATE TABLE IF NOT EXISTS cipher_list_metadata (
				key TEXT PRIMARY KEY,
				count INTEGER NOT NULL DEFAULT 0,
				created_at TIMESTAMP WITH TIME ZONE NOT NULL,
				updated_at TIMESTAMP WITH TIME ZONE NOT NULL
			)
		`);

		// Create indexes for performance
		await this.pool.query(
			'CREATE INDEX IF NOT EXISTS idx_cipher_store_updated_at ON cipher_store(updated_at)'
		);
		await this.pool.query('CREATE INDEX IF NOT EXISTS idx_cipher_lists_key ON cipher_lists(key)');
		await this.pool.query(
			'CREATE INDEX IF NOT EXISTS idx_cipher_lists_created_at ON cipher_lists(created_at)'
		);

		this.logger.debug('PostgreSQL tables and indexes created');
	}

	/**
	 * Check if backend is connected
	 */
	private checkConnection(): void {
		if (!this.connected || !this.pool) {
			throw new StorageError('PostgreSQL backend not connected', 'checkConnection');
		}
	}
}

5.2、管理层的设计

其是存储系统的核心编排器,负责管理双后端存储架构

typescript
/**
 * Storage Manager Implementation
 *
 * Orchestrates the dual-backend storage system with cache and database backends.
 * Provides lazy loading, graceful fallbacks, and connection management.
 *
 * @module storage/manager
 */

import type { CacheBackend, DatabaseBackend, StorageBackends, StorageConfig } from './types.js';
import { StorageSchema } from './config.js';
import { Logger, createLogger } from '../logger/index.js';
import {
	LOG_PREFIXES,
	ERROR_MESSAGES,
	TIMEOUTS,
	HEALTH_CHECK,
	BACKEND_TYPES,
} from './constants.js';

/**
 * Health check result for storage backends
 */
export interface HealthCheckResult {
	cache: boolean;
	database: boolean;
	overall: boolean;
	details?: {
		cache?: { status: string; latency?: number; error?: string };
		database?: { status: string; latency?: number; error?: string };
	};
}

/**
 * Storage system information
 */
export interface StorageInfo {
	connected: boolean;
	backends: {
		cache: {
			type: string;
			connected: boolean;
			fallback: boolean;
		};
		database: {
			type: string;
			connected: boolean;
			fallback: boolean;
		};
	};
	connectionAttempts: number;
	lastError: string | undefined;
}

/**
 * Storage Manager
 *
 * Manages the lifecycle of storage backends with lazy loading and fallback support.
 * Follows the factory pattern with graceful degradation to in-memory storage.
 *
 * @example
 * ```typescript
 * const manager = new StorageManager(config);
 * const { cache, database } = await manager.connect();
 *
 * // Use backends
 * await cache.set('key', value, 300);
 * await database.set('user:123', userData);
 *
 * // Cleanup
 * await manager.disconnect();
 * ```
 */
export class StorageManager {
	// Core state
	private cache: CacheBackend | undefined;
	private database: DatabaseBackend | undefined;
	private connected = false;
	private readonly config: StorageConfig;
	private readonly logger: Logger;

	// Connection tracking
	private connectionAttempts = 0;
	private lastConnectionError?: Error;

	// Backend metadata
	private cacheMetadata = {
		type: 'unknown',
		isFallback: false,
		connectionTime: 0,
	};

	private databaseMetadata = {
		type: 'unknown',
		isFallback: false,
		connectionTime: 0,
	};

	// Lazy loading module references (static to share across instances)
	private static redisModule?: any;
	private static sqliteModule?: any;
	private static postgresModule?: any;

	// Health check configuration
	private readonly healthCheckKey = HEALTH_CHECK.KEY;
	private readonly healthCheckTimeout = TIMEOUTS.HEALTH_CHECK;

	/**
	 * Creates a new StorageManager instance
	 *
	 * @param config - Storage configuration with cache and database backend configs
	 * @throws {Error} If configuration is invalid
	 */
	constructor(config: StorageConfig) {
		// Validate configuration using Zod schema
		const validationResult = StorageSchema.safeParse(config);
		if (!validationResult.success) {
			throw new Error(
				`${ERROR_MESSAGES.INVALID_CONFIG}: ${validationResult.error.errors
					.map(e => `${e.path.join('.')}: ${e.message}`)
					.join(', ')}`
			);
		}

		this.config = validationResult.data;
		this.logger = createLogger({
			level: process.env.LOG_LEVEL || 'info',
		});

		this.logger.info(`${LOG_PREFIXES.MANAGER} Initialized with configuration`, {
			cacheType: this.config.cache.type,
			databaseType: this.config.database.type,
		});
	}

	/**
	 * Get the current storage configuration
	 *
	 * @returns The storage configuration
	 */
	public getConfig(): Readonly<StorageConfig> {
		return this.config;
	}

	/**
	 * Get information about the storage system
	 *
	 * @returns Storage system information including connection status and backend types
	 */
	public getInfo(): StorageInfo {
		return {
			connected: this.connected,
			backends: {
				cache: {
					type: this.cacheMetadata.type,
					connected: this.cache?.isConnected() ?? false,
					fallback: this.cacheMetadata.isFallback,
				},
				database: {
					type: this.databaseMetadata.type,
					connected: this.database?.isConnected() ?? false,
					fallback: this.databaseMetadata.isFallback,
				},
			},
			connectionAttempts: this.connectionAttempts,
			lastError: this.lastConnectionError?.message,
		};
	}

	/**
	 * Get the current storage backends if connected
	 *
	 * @returns The storage backends or null if not connected
	 */
	public getBackends(): StorageBackends | null {
		if (!this.connected || !this.cache || !this.database) {
			return null;
		}

		return {
			cache: this.cache,
			database: this.database,
		};
	}

	/**
	 * Check if the storage manager is connected
	 *
	 * @returns true if both backends are connected
	 */
	public isConnected(): boolean {
		return (
			this.connected && this.cache?.isConnected() === true && this.database?.isConnected() === true
		);
	}

	// Placeholder methods for next phases

	/**
	 * Connect to storage backends
	 *
	 * @returns The connected storage backends
	 * @throws {StorageConnectionError} If strict backends fail to connect
	 */
	public async connect(): Promise<StorageBackends> {
		// Check if already connected
		if (this.connected) {
			this.logger.debug(`${LOG_PREFIXES.MANAGER} Already connected`, {
				cacheType: this.cacheMetadata.type,
				databaseType: this.databaseMetadata.type,
			});

			return {
				cache: this.cache!,
				database: this.database!,
			};
		}

		this.connectionAttempts++;
		this.logger.info(
			`${LOG_PREFIXES.MANAGER} Starting connection attempt ${this.connectionAttempts}`
		);

		try {
			// Create and connect cache backend
			const cacheStartTime = Date.now();
			try {
				this.cache = await this.createCacheBackend();
				await this.cache.connect();
				this.cacheMetadata.connectionTime = Date.now() - cacheStartTime;

				this.logger.info(`${LOG_PREFIXES.CACHE} Connected successfully`, {
					type: this.cacheMetadata.type,
					isFallback: this.cacheMetadata.isFallback,
					connectionTime: `${this.cacheMetadata.connectionTime}ms`,
				});
			} catch (cacheError) {
				// If the configured backend fails, try fallback to in-memory
				this.logger.warn(`${LOG_PREFIXES.CACHE} Connection failed, attempting fallback`, {
					error: cacheError instanceof Error ? cacheError.message : String(cacheError),
					originalType: this.config.cache.type,
				});

				if (this.config.cache.type !== BACKEND_TYPES.IN_MEMORY) {
					const { InMemoryBackend } = await import('./backend/in-memory.js');
					this.cache = new InMemoryBackend();
					await this.cache.connect();
					this.cacheMetadata.type = BACKEND_TYPES.IN_MEMORY;
					this.cacheMetadata.isFallback = true;
					this.cacheMetadata.connectionTime = Date.now() - cacheStartTime;

					this.logger.info(`${LOG_PREFIXES.CACHE} Connected to fallback backend`, {
						type: this.cacheMetadata.type,
						originalType: this.config.cache.type,
					});
				} else {
					throw cacheError; // Re-throw if already using in-memory
				}
			}

			// Create and connect database backend
			const dbStartTime = Date.now();
			try {
				this.database = await this.createDatabaseBackend();
				await this.database.connect();
				this.databaseMetadata.connectionTime = Date.now() - dbStartTime;

				this.logger.info(`${LOG_PREFIXES.DATABASE} Connected successfully`, {
					type: this.databaseMetadata.type,
					isFallback: this.databaseMetadata.isFallback,
					connectionTime: `${this.databaseMetadata.connectionTime}ms`,
				});
			} catch (err) {
				this.logger.error('Failed to connect to SQLite database', err);
				// If the configured backend fails, try fallback to in-memory
				this.logger.warn(`${LOG_PREFIXES.DATABASE} Connection failed, attempting fallback`, {
					error: err instanceof Error ? err.message : String(err),
					originalType: this.config.database.type,
				});

				if (this.config.database.type !== BACKEND_TYPES.IN_MEMORY) {
					const { InMemoryBackend } = await import('./backend/in-memory.js');
					this.database = new InMemoryBackend();
					await this.database.connect();
					this.databaseMetadata.type = BACKEND_TYPES.IN_MEMORY;
					this.databaseMetadata.isFallback = true;
					this.databaseMetadata.connectionTime = Date.now() - dbStartTime;

					this.logger.info(`${LOG_PREFIXES.DATABASE} Connected to fallback backend`, {
						type: this.databaseMetadata.type,
						originalType: this.config.database.type,
					});
				} else {
					throw err; // Re-throw if already using in-memory
				}
			}

			this.connected = true;

			this.logger.info(`${LOG_PREFIXES.MANAGER} Storage system connected`, {
				cacheBackend: this.cacheMetadata.type,
				databaseBackend: this.databaseMetadata.type,
				totalConnectionTime: `${this.cacheMetadata.connectionTime + this.databaseMetadata.connectionTime}ms`,
			});

			return {
				cache: this.cache!,
				database: this.database!,
			};
		} catch (error) {
			// Store error for reporting
			this.lastConnectionError = error as Error;

			// Disconnect any successfully connected backends
			if (this.cache?.isConnected()) {
				await this.cache.disconnect().catch(err =>
					this.logger.error(`${LOG_PREFIXES.CACHE} Error during cleanup disconnect`, {
						error: err,
					})
				);
			}

			if (this.database?.isConnected()) {
				await this.database.disconnect().catch(err =>
					this.logger.error(`${LOG_PREFIXES.DATABASE} Error during cleanup disconnect`, {
						error: err,
					})
				);
			}

			// Reset state
			this.cache = undefined;
			this.database = undefined;
			this.connected = false;

			throw error;
		}
	}

	/**
	 * Disconnect from all storage backends
	 */
	public async disconnect(): Promise<void> {
		if (!this.connected) {
			this.logger.debug(`${LOG_PREFIXES.MANAGER} Already disconnected`);
			return;
		}

		this.logger.info(`${LOG_PREFIXES.MANAGER} Disconnecting storage backends`);

		const disconnectPromises: Promise<void>[] = [];

		// Disconnect cache backend
		if (this.cache?.isConnected()) {
			disconnectPromises.push(
				this.cache
					.disconnect()
					.then(() => {
						this.logger.info(`${LOG_PREFIXES.CACHE} Disconnected successfully`);
					})
					.catch(error => {
						this.logger.error(`${LOG_PREFIXES.CACHE} Disconnect error`, { error });
						throw error;
					})
			);
		}

		// Disconnect database backend
		if (this.database?.isConnected()) {
			disconnectPromises.push(
				this.database
					.disconnect()
					.then(() => {
						this.logger.info(`${LOG_PREFIXES.DATABASE} Disconnected successfully`);
					})
					.catch(error => {
						this.logger.error(`${LOG_PREFIXES.DATABASE} Disconnect error`, { error });
						throw error;
					})
			);
		}

		// Wait for all disconnects with timeout
		try {
			await Promise.race([
				Promise.all(disconnectPromises),
				new Promise((_, reject) =>
					setTimeout(() => reject(new Error('Disconnect timeout')), TIMEOUTS.SHUTDOWN)
				),
			]);
		} finally {
			// Always clean up state
			this.cache = undefined;
			this.database = undefined;
			this.connected = false;

			// Reset metadata
			this.cacheMetadata = {
				type: 'unknown',
				isFallback: false,
				connectionTime: 0,
			};

			this.databaseMetadata = {
				type: 'unknown',
				isFallback: false,
				connectionTime: 0,
			};

			this.logger.info(`${LOG_PREFIXES.MANAGER} Storage system disconnected`);
		}
	}

	/**
	 * Perform health check on all backends
	 *
	 * @returns Health check results for each backend
	 */
	public async healthCheck(): Promise<HealthCheckResult> {
		// Implementation in Phase 5
		throw new Error('Not implemented yet - Phase 5');
	}

	// Private helper methods

	/**
	 * Create cache backend based on configuration
	 */
	private async createCacheBackend(): Promise<CacheBackend> {
		const config = this.config.cache;

		this.logger.debug(`${LOG_PREFIXES.CACHE} Creating backend`, { type: config.type });

		switch (config.type) {
			case BACKEND_TYPES.REDIS: {
				try {
					// Lazy load Redis module
					if (!StorageManager.redisModule) {
						this.logger.debug(`${LOG_PREFIXES.CACHE} Lazy loading Redis module`);
						const { RedisBackend } = await import('./backend/redis-backend.js');
						StorageManager.redisModule = RedisBackend;
					}

					const RedisBackend = StorageManager.redisModule;
					this.cacheMetadata.type = BACKEND_TYPES.REDIS;
					this.cacheMetadata.isFallback = false;

					return new RedisBackend(config);
				} catch (error) {
					this.logger.debug(`${LOG_PREFIXES.CACHE} Failed to create Redis backend`, {
						error: error instanceof Error ? error.message : String(error),
					});
					throw error; // Let connection handler deal with fallback
				}
			}

			case BACKEND_TYPES.IN_MEMORY:
			default: {
				// Use in-memory backend
				const { InMemoryBackend } = await import('./backend/in-memory.js');
				this.cacheMetadata.type = BACKEND_TYPES.IN_MEMORY;
				this.cacheMetadata.isFallback = false;

				return new InMemoryBackend();
			}
		}
	}

	/**
	 * Create database backend based on configuration
	 */
	private async createDatabaseBackend(): Promise<DatabaseBackend> {
		const config = this.config.database;

		this.logger.debug(`${LOG_PREFIXES.DATABASE} Creating backend`, { type: config.type });

		switch (config.type) {
			case BACKEND_TYPES.SQLITE: {
				try {
					// Lazy load SQLite module
					if (!StorageManager.sqliteModule) {
						this.logger.debug(`${LOG_PREFIXES.DATABASE} Lazy loading SQLite module`);
						const { SqliteBackend } = await import('./backend/sqlite.js');
						StorageManager.sqliteModule = SqliteBackend;
					}

					const SqliteBackend = StorageManager.sqliteModule;
					this.databaseMetadata.type = BACKEND_TYPES.SQLITE;
					this.databaseMetadata.isFallback = false;

					return new SqliteBackend(config);
				} catch (error) {
					this.logger.debug(`${LOG_PREFIXES.DATABASE} Failed to create SQLite backend`, {
						error: error instanceof Error ? error.message : String(error),
					});
					throw error; // Let connection handler deal with fallback
				}
			}

			case BACKEND_TYPES.POSTGRES: {
				try {
					// Lazy load PostgreSQL module
					if (!StorageManager.postgresModule) {
						this.logger.debug(`${LOG_PREFIXES.DATABASE} Lazy loading PostgreSQL module`);
						const { PostgresBackend } = await import('./backend/postgresql.js');
						StorageManager.postgresModule = PostgresBackend;
					}

					const PostgresBackend = StorageManager.postgresModule;
					this.databaseMetadata.type = BACKEND_TYPES.POSTGRES;
					this.databaseMetadata.isFallback = false;

					return new PostgresBackend(config);
				} catch (error) {
					this.logger.debug(`${LOG_PREFIXES.DATABASE} Failed to create PostgreSQL backend`, {
						error: error instanceof Error ? error.message : String(error),
					});
					throw error; // Let connection handler deal with fallback
				}
			}

			case BACKEND_TYPES.IN_MEMORY:
			default: {
				// Use in-memory backend
				const { InMemoryBackend } = await import('./backend/in-memory.js');
				this.databaseMetadata.type = BACKEND_TYPES.IN_MEMORY;
				this.databaseMetadata.isFallback = false;

				return new InMemoryBackend();
			}
		}
	}
}

管理层的方法设计

  1. connect 方法:建立存储连接
  2. disconnect 方法:断开存储连接
  3. getBackends 方法:获取存储后端
  4. getInfo 方法:获取系统信息
  5. isConnected 方法:检查连接状态
  6. healthCheck 方法:健康检查
typescript
  import { StorageManager } from
  './src/core/storage/manager.js';
  import type { StorageConfig } from
  './src/core/storage/types.js';

  // 1. 配置存储系统
  const config: StorageConfig = {
    cache: {
      type: 'redis',
      host: 'localhost',
      port: 6379,
      db: 0,
      keyPrefix: 'app:cache:'
    },
    database: {
      type: 'sqlite',
      path: './data/app.db'
    }
  };

  // 2. 创建并连接存储管理器
  async function initializeStorage() {
    const manager = new StorageManager(config);

    try {
      // 连接到存储后端(自动处理降级)
      const { cache, database } = await
  manager.connect();

      // 检查连接状态
      const info = manager.getInfo();
      console.log('Storage Status:', {
        cacheType: info.backends.cache.type,
        cacheFallback:
  info.backends.cache.fallback,
        databaseType:
  info.backends.database.type,
        databaseFallback:
  info.backends.database.fallback
      });

      return { manager, cache, database };
    } catch (error) {
      console.error('Failed to initialize 
  storage:', error);
      throw error;
    }
  }

  // 3. 使用存储系统
  async function useStorage() {
    const { manager, cache, database } = await
  initializeStorage();

    try {
      // === 缓存操作(临时数据,支持TTL)===

      // 缓存用户会话(1小时过期)
      await cache.set('session:user123', {
        userId: '123',
        username: 'john',
        loginTime: Date.now()
      }, 3600);

      // 读取缓存
      const session = await
  cache.get('session:user123');
      console.log('Session:', session);

      // === 数据库操作(持久化数据)===

      // 保存用户配置(永久存储)
      await database.set('config:user123', {
        theme: 'dark',
        language: 'en',
        notifications: true
      });

      // 读取配置
      const config = await
  database.get('config:user123');
      console.log('User config:', config);

      // 列表操作 - 记录用户活动
      await database.append('activity:user123',
  {
        action: 'login',
        timestamp: Date.now(),
        ip: '192.168.1.1'
      });

      await database.append('activity:user123',
  {
        action: 'view_profile',
        timestamp: Date.now(),
        ip: '192.168.1.1'
      });

      // 获取最近的活动记录
      const activities = await
  database.getRange('activity:user123', 0, 10);
      console.log('Recent activities:',
  activities);

      // 查找所有用户配置键
      const configKeys = await
  database.list('config:');
      console.log('All config keys:',
  configKeys);

    } finally {
      // 清理:断开连接
      await manager.disconnect();
    }
  }

5.3、策略层的设计

策略创建工厂,用于床和组装存储系统中的各种策略组件,它的核心作用是集中管理不同的策略创建逻辑,让系统能够灵活的切换和组合不同的存储行为策略

主要步骤如下:

  1. 创建了策略工厂,用于实例化策略方法
  2. 构建读策略,写策略等多种策略函数和模块
typescript
//策略工厂
import { ReadStrategy, ReadStrategyConfig } from '../interfaces/read-strategy.js';
import { WriteStrategy, WriteStrategyConfig } from '../interfaces/write-strategy.js';
import { HealthManager, HealthManagerConfig } from '../interfaces/health-manager.js';
import { WALManager } from '../interfaces/wal-manager.js';

// 策略实现导入
import { PrimaryFirstReadStrategy } from '../implementations/read/primary-first-read.js';
import { IdempotentWriteStrategy } from '../implementations/write/idempotent-write.js';
import { CircuitBreakerHealthManager } from '../implementations/health/circuit-breaker-health.js';
import { RedisStreamWALManager, RedisStreamWALConfig } from '../implementations/wal/redis-stream-wal.js';

/**
 * 策略工厂
 * 
 * 负责创建和配置各种策略实例
 */
export class StrategyFactory {
    /**
     * 创建读取策略
     */
    static createReadStrategy(
        type: ReadStrategyType,
        config: ReadStrategyConfig,
        healthManager: HealthManager
    ): ReadStrategy {
        switch (type) {
            case 'primary-first':
                return new PrimaryFirstReadStrategy(config, healthManager);
            
            case 'cache-enhanced':
                // TODO: 实现缓存增强读策略
                throw new Error('Cache-enhanced read strategy not implemented yet');
            
            case 'failover':
                // TODO: 实现故障转移读策略
                throw new Error('Failover read strategy not implemented yet');
            
            default:
                throw new Error(`Unknown read strategy type: ${type}`);
        }
    }
    
    /**
     * 创建写入策略
     */
    static createWriteStrategy(
        type: WriteStrategyType,
        config: WriteStrategyConfig,
        healthManager: HealthManager
    ): WriteStrategy {
        switch (type) {
            case 'idempotent':
                return new IdempotentWriteStrategy(config, healthManager);
            
            case 'wal-first':
                // TODO: 实现WAL优先写策略
                throw new Error('WAL-first write strategy not implemented yet');
            
            case 'dual-write':
                // TODO: 实现双写策略
                throw new Error('Dual-write strategy not implemented yet');
            
            default:
                throw new Error(`Unknown write strategy type: ${type}`);
        }
    }
    
    /**
     * 创建健康管理器
     */
    static createHealthManager(
        type: HealthManagerType,
        config: HealthManagerConfig
    ): HealthManager {
        switch (type) {
            case 'circuit-breaker':
                return new CircuitBreakerHealthManager(config);
            
            case 'simple':
                // TODO: 实现简单健康管理器
                throw new Error('Simple health manager not implemented yet');
            
            default:
                throw new Error(`Unknown health manager type: ${type}`);
        }
    }
    
    /**
     * 创建WAL管理器
     */
    static createWALManager(
        type: WALManagerType,
        config: WALManagerConfig
    ): WALManager {
        switch (type) {
            case 'redis-stream':
                return new RedisStreamWALManager(config as RedisStreamWALConfig);
            
            case 'file':
                // TODO: 实现文件WAL
                throw new Error('File WAL manager not implemented yet');
            
            case 'in-memory':
                // TODO: 实现内存WAL(仅用于开发)
                throw new Error('In-memory WAL manager not implemented yet');
            
            default:
                throw new Error(`Unknown WAL manager type: ${type}`);
        }
    }
    
    /**
     * 创建完整的策略集合
     */
    static createStrategySet(config: StrategySetConfig): StrategySet {
        // 1. 创建健康管理器(其他策略依赖它)
        const healthManager = this.createHealthManager(
            config.health.type,
            config.health.config
        );
        
        // 2. 创建读写策略
        const readStrategy = this.createReadStrategy(
            config.read.type,
            config.read.config,
            healthManager
        );
        
        const writeStrategy = this.createWriteStrategy(
            config.write.type,
            config.write.config,
            healthManager
        );
        
        // 3. 创建WAL管理器(可选)
        let walManager: WALManager | undefined;
        if (config.wal) {
            walManager = this.createWALManager(
                config.wal.type,
                config.wal.config
            );
        }
        
        return {
            readStrategy,
            writeStrategy,
            healthManager,
            walManager
        };
    }
}

// 类型定义
export type ReadStrategyType = 'primary-first' | 'cache-enhanced' | 'failover';
export type WriteStrategyType = 'idempotent' | 'wal-first' | 'dual-write';
export type HealthManagerType = 'circuit-breaker' | 'simple';
export type WALManagerType = 'redis-stream' | 'file' | 'in-memory';

export interface StrategySetConfig {
    read: {
        type: ReadStrategyType;
        config: ReadStrategyConfig;
    };
    write: {
        type: WriteStrategyType;
        config: WriteStrategyConfig;
    };
    health: {
        type: HealthManagerType;
        config: HealthManagerConfig;
    };
    wal?: {
        type: WALManagerType;
        config: WALManagerConfig;
    };
}

export interface StrategySet {
    readStrategy: ReadStrategy;
    writeStrategy: WriteStrategy;
    healthManager: HealthManager;
    walManager?: WALManager;
}

export type WALManagerConfig = RedisStreamWALConfig | FileWALConfig | InMemoryWALConfig;

// 各种WAL配置类型
export interface FileWALConfig {
    filePath: string;
    maxFileSize: number;
    rotateOnSize: boolean;
    flushIntervalMs?: number;
}

export interface InMemoryWALConfig {
    maxEntries: number;
    flushIntervalMs?: number;
}
typescript
//读策略接口设计 - read-strategy
export interface ReadStrategy {
    /**
     * 执行读取操作
     * @param key 要读取的键
     * @param managers 可用的存储管理器集合
     * @returns 读取的数据
     */
    execute(key: string, managers: StorageManagerSet): Promise<any>;
    
    /**
     * 策略名称
     */
    readonly name: string;
    
    /**
     * 策略配置
     */
    readonly config: ReadStrategyConfig;
}

export interface ReadStrategyConfig {
    /** 是否将空结果视为失败 */
    treatEmptyAsFailure?: boolean;
    
    /** 最大重试次数 */
    maxRetries?: number;
    
    /** 重试延迟(ms) */
    retryDelay?: number;
    
    /** 是否启用缓存合并 */
    enableCacheMerge?: boolean;
    
    /** 读取超时时间(ms) */
    timeout?: number;
}

export interface ReadContext {
    key: string;
    managers: StorageManagerSet;
    attempt: number;
    startTime: number;
    metadata?: Record<string, any>;
}

export interface StorageManagerSet {
    primary: StorageManager;
    backup?: StorageManager;
    cache?: StorageManager;
}


//读策略方法的实际实现
import { 
    ReadStrategy, 
    ReadStrategyConfig, 
    ReadContext, 
    StorageManagerSet 
} from '../interfaces/read-strategy.js';
import { HealthManager } from '../interfaces/health-manager.js';

/**
 * 主存储优先读取策略
 * 
 * 策略:
 * 1. 优先尝试主存储
 * 2. 主存储失败或无数据时,尝试备用存储
 * 3. 支持熔断器模式,避免频繁访问故障存储
 */
export class PrimaryFirstReadStrategy implements ReadStrategy {
    readonly name = 'primary-first-read';
    
    constructor(
        public readonly config: ReadStrategyConfig,
        private healthManager: HealthManager
    ) {}
    
    async execute(key: string, managers: StorageManagerSet): Promise<any> {
        const context: ReadContext = {
            key,
            managers,
            attempt: 1,
            startTime: Date.now()
        };
        
        // 策略1: 尝试主存储
        const primaryResult = await this.tryPrimary(context);
        if (primaryResult.success) {
            return primaryResult.data;
        }
        
        // 策略2: 主存储失败,尝试备用存储
        if (managers.backup) {
            const backupResult = await this.tryBackup(context);
            if (backupResult.success) {
                return backupResult.data;
            }
        }
        
        // 策略3: 都失败了,抛出错误
        throw new Error(`All storage backends failed for key: ${key}`);
    }
    
    private async tryPrimary(context: ReadContext): Promise<StorageResult> {
        if (!this.healthManager.isHealthy('primary')) {
            return { success: false, reason: 'primary-unhealthy' };
        }
        
        try {
            const backends = context.managers.primary.getBackends();
            if (!backends) {
                throw new Error('Primary storage not connected');
            }
            
            const result = await this.executeWithTimeout(
                () => backends.database.get(context.key),
                this.config.timeout || 5000
            );
            
            // 配置决定:空结果是否视为失败
            if (result === null && this.config.treatEmptyAsFailure) {
                throw new Error('Primary returned empty result');
            }
            
            // 记录成功
            this.healthManager.markSuccess('primary', 'read', Date.now() - context.startTime);
            
            return { success: true, data: result };
            
        } catch (error) {
            // 记录失败
            this.healthManager.markFailure('primary', error as Error, 'read');
            
            return { 
                success: false, 
                reason: 'primary-failed',
                error: (error as Error).message 
            };
        }
    }
    
    private async tryBackup(context: ReadContext): Promise<StorageResult> {
        if (!this.healthManager.isHealthy('backup')) {
            return { success: false, reason: 'backup-unhealthy' };
        }
        
        try {
            const backends = context.managers.backup!.getBackends();
            if (!backends) {
                throw new Error('Backup storage not connected');
            }
            
            const result = await this.executeWithTimeout(
                () => backends.database.get(context.key),
                this.config.timeout || 5000
            );
            
            // 记录成功
            this.healthManager.markSuccess('backup', 'read', Date.now() - context.startTime);
            
            return { success: true, data: result };
            
        } catch (error) {
            // 记录失败
            this.healthManager.markFailure('backup', error as Error, 'read');
            
            return { 
                success: false, 
                reason: 'backup-failed',
                error: (error as Error).message 
            };
        }
    }
    
    private async executeWithTimeout<T>(
        operation: () => Promise<T>,
        timeoutMs: number
    ): Promise<T> {
        return Promise.race([
            operation(),
            new Promise<T>((_, reject) => {
                setTimeout(() => reject(new Error('Operation timeout')), timeoutMs);
            })
        ]);
    }
}

interface StorageResult {
    success: boolean;
    data?: any;
    reason?: string;
    error?: string;
}

策略工厂中的方法:

  1. 读取策略:根据传入的参数,调用策略层中的相应的读取策略
  2. 写入策略:根据传入的参数,调用写入层的相应的写入策略
  3. 健康管理器:监控存储后端的健康状态,应对出现断连状态时及时察觉和更换组件
  4. WAL 管理器:预写入日志
  5. 创建完整策略集合:该方法一次性创建所有需要的策略组件,并处理它们之间的依赖关系

5.4、应用层的设计

typescript
import { StorageManager } from '../../../../../src/core/storage/manager.js';  // 引用现有的StorageManager
import { ReadStrategy } from '../interfaces/read-strategy.js';
import { WriteStrategy } from '../interfaces/write-strategy.js';
import { HealthManager } from '../interfaces/health-manager.js';
import { WALManager } from '../interfaces/wal-manager.js';

/**
 * 策略化存储管理器
 * 
 * 职责:
 * 1. 协调多个StorageManager实例
 * 2. 注入和管理各种策略
 * 3. 提供统一的高级存储接口
 * 4. 维护连接生命周期
 */
export class StrategicStorageManager {
    private managers: StorageManagerSet;
    private connected = false;
    
    constructor(
        private config: StrategicStorageConfig,
        private readStrategy: ReadStrategy,
        private writeStrategy: WriteStrategy,
        private healthManager: HealthManager,
        private walManager?: WALManager
    ) {
        this.managers = {
            primary: new StorageManager(config.primary),
            backup: config.backup ? new StorageManager(config.backup) : undefined,
            cache: config.cache ? new StorageManager(config.cache) : undefined
        };
    }
    
    /**
     * 连接所有存储管理器
     */
    async connect(): Promise<void> {
        if (this.connected) return;
        
        try {
            // 并行连接所有管理器
            const connections = [
                this.managers.primary.connect()
            ];
            
            if (this.managers.backup) {
                connections.push(this.managers.backup.connect());
            }
            
            if (this.managers.cache) {
                connections.push(this.managers.cache.connect());
            }
            
            await Promise.all(connections);
            
            // 启动WAL和健康检查
            if (this.walManager) {
                await this.walManager.startFlushWorker();
            }
            
            this.healthManager.startHealthCheck();
            
            this.connected = true;
            console.log('Strategic storage manager connected successfully');
            
        } catch (error) {
            console.error('Failed to connect strategic storage manager:', error);
            throw error;
        }
    }
    
    /**
     * 断开所有连接
     */
    async disconnect(): Promise<void> {
        if (!this.connected) return;
        
        try {
            // 停止后台服务
            this.healthManager.stopHealthCheck();
            
            if (this.walManager) {
                await this.walManager.stopFlushWorker();
            }
            
            // 断开所有管理器
            const disconnections = [
                this.managers.primary.disconnect()
            ];
            
            if (this.managers.backup) {
                disconnections.push(this.managers.backup.disconnect());
            }
            
            if (this.managers.cache) {
                disconnections.push(this.managers.cache.disconnect());
            }
            
            await Promise.all(disconnections);
            
            this.connected = false;
            console.log('Strategic storage manager disconnected');
            
        } catch (error) {
            console.error('Error during strategic storage manager disconnect:', error);
            throw error;
        }
    }
    
    /**
     * 策略化读取操作
     */
    async get(key: string): Promise<any> {
        this.ensureConnected();
        return await this.readStrategy.execute(key, this.managers);
    }
    
    /**
     * 策略化写入操作
     */
    async set(key: string, value: any): Promise<void> {
        this.ensureConnected();
        return await this.writeStrategy.execute(key, value, this.managers, this.walManager);
    }
    
    /**
     * 批量读取操作
     */
    async mget(keys: string[]): Promise<Record<string, any>> {
        this.ensureConnected();
        
        const results: Record<string, any> = {};
        
        // 并行读取所有键
        const promises = keys.map(async key => {
            try {
                const value = await this.readStrategy.execute(key, this.managers);
                return { key, value, success: true };
            } catch (error) {
                return { key, error, success: false };
            }
        });
        
        const completed = await Promise.allSettled(promises);
        
        for (const result of completed) {
            if (result.status === 'fulfilled' && result.value.success) {
                results[result.value.key] = result.value.value;
            }
        }
        
        return results;
    }
    
    /**
     * 批量写入操作
     */
    async mset(entries: Record<string, any>): Promise<void> {
        this.ensureConnected();
        
        // 并行写入所有条目
        const promises = Object.entries(entries).map(([key, value]) => 
            this.writeStrategy.execute(key, value, this.managers, this.walManager)
        );
        
        await Promise.all(promises);
    }
    
    /**
     * 删除操作
     */
    async delete(key: string): Promise<void> {
        this.ensureConnected();
        
        // 使用写入策略处理删除操作
        await this.writeStrategy.execute(key, null, this.managers, this.walManager);
    }
    
    /**
     * 检查键是否存在
     */
    async exists(key: string): Promise<boolean> {
        try {
            const value = await this.get(key);
            return value !== null && value !== undefined;
        } catch {
            return false;
        }
    }
    
    /**
     * 获取健康状态
     */
    getHealthStatus(): Record<string, any> {
        return {
            connected: this.connected,
            backends: this.healthManager.getAllHealthStats(),
            walStats: this.walManager?.getStats()
        };
    }
    
    /**
     * 获取存储管理器(用于直接访问)
     */
    getStorageManagers(): StorageManagerSet {
        return this.managers;
    }
    
    /**
     * 获取底层backend(保持兼容性)
     */
    getBackends(type: 'primary' | 'backup' | 'cache' = 'primary') {
        const manager = this.managers[type];
        return manager ? manager.getBackends() : null;
    }
    
    /**
     * 强制刷新WAL
     */
    async flushWAL(): Promise<void> {
        if (this.walManager) {
            const unflushed = await this.walManager.getUnflushedEntries();
            console.log(`Flushing ${unflushed.length} WAL entries`);
            // WAL刷新逻辑会在后台工作进程中处理
        }
    }
    
    /**
     * 清理过期的WAL条目
     */
    async cleanupWAL(beforeTimestamp?: number): Promise<number> {
        if (!this.walManager) return 0;
        
        const timestamp = beforeTimestamp || Date.now() - 24 * 60 * 60 * 1000; // 24小时前
        return await this.walManager.cleanup(timestamp);
    }
    
    private ensureConnected(): void {
        if (!this.connected) {
            throw new Error('Strategic storage manager not connected');
        }
    }
}

export interface StrategicStorageConfig {
    primary: StorageConfig;
    backup?: StorageConfig;
    cache?: StorageConfig;
}

export interface StorageManagerSet {
    primary: StorageManager;
    backup?: StorageManager;
    cache?: StorageManager;
}

// 从现有项目导入配置类型
interface StorageConfig {
    database: {
        type: 'postgresql' | 'sqlite' | 'redis' | 'in-memory';
        [key: string]: any;
    };
    cache?: {
        type: 'redis' | 'in-memory';
        [key: string]: any;
    };
}
  1. connect 方法:连接所有存储器
  2. disconnect 方法:断开所有连接
  3. get 方法:策略化读取,其调用的是策略层的方法
  4. set 方法:策略化写入,其调用的也是策略层的方法
  5. delete 方法:删除数据
  6. mget 方法:批量读取-并行运行
  7. mset 方法:批量写入-并行运行
  8. exists 方法:检查键是否存在
  9. getHealthStatus:获取健康状态
  10. flushWAL 方法:强制刷新 WAL
  11. cleanupWAL 方法:清理过期 WAL
  12. getStorageManagers 方法:提供访问存储管理器
  13. getBackends 方法:获取后端,可以获取特定后端

应用层结合策略层的使用就非常的简单,借助策略工厂实例化策略模块,供应用层调用

typescript
/**
 * 基础使用示例
 * 
 * 展示如何使用策略化存储管理器进行基本的读写操作
 */

async function basicUsageExample() {
    console.log('=== 基础使用示例 ===');
    
    // 1. 配置存储
    const storageConfig: StrategicStorageConfig = {
        // 主存储:PostgreSQL
        primary: {
            database: {
                type: 'postgresql',
                url: 'postgresql://user:pass@localhost:5432/cipher_primary'
            }
        },
        // 备用存储:SQLite
        backup: {
            database: {
                type: 'sqlite',
                path: './data',
                database: 'cipher-backup.db'
            }
        }
    };
    
    // 2. 配置策略
    const strategyConfig: StrategySetConfig = {
        read: {
            type: 'primary-first',
            config: {
                treatEmptyAsFailure: false,
                timeout: 5000
            }
        },
        write: {
            type: 'idempotent',
            config: {
                enableIdempotency: true,
                timeout: 5000
            }
        },
        health: {
            type: 'circuit-breaker',
            config: {
                failureThreshold: 5,
                failureWindow: 60000,
                circuitOpenDuration: 30000,
                recoverySuccessThreshold: 3,
                healthCheckInterval: 10000,
                maxFailureHistory: 100,
                latencyWindowSize: 50
            }
        }
    };
    
    // 3. 创建策略集合
    const strategies = StrategyFactory.createStrategySet(strategyConfig);
    
    // 4. 创建管理器 - 应用层的创建
    const manager = new StrategicStorageManager(
        storageConfig,
        strategies.readStrategy,
        strategies.writeStrategy,
        strategies.healthManager,
        strategies.walManager
    );
    
    try {
        // 5. 连接
        await manager.connect();
        console.log('✅ 存储管理器已连接');
        
        // 6. 基本写入操作
        await manager.set('user:123', {
            id: 123,
            name: 'John Doe',
            email: 'john@example.com',
            createdAt: new Date().toISOString()
        });
        console.log('✅ 用户数据已写入');
        
        // 7. 基本读取操作
        const user = await manager.get('user:123');
        console.log('✅ 用户数据已读取:', user);
        
        // 8. 批量操作
        await manager.mset({
            'session:abc': { userId: 123, token: 'xyz', expiresAt: Date.now() + 3600000 },
            'session:def': { userId: 456, token: 'uvw', expiresAt: Date.now() + 3600000 }
        });
        console.log('✅ 批量写入完成');
        
        const sessions = await manager.mget(['session:abc', 'session:def']);
        console.log('✅ 批量读取完成:', sessions);
        
        // 9. 检查健康状态
        const health = manager.getHealthStatus();
        console.log('✅ 健康状态:', JSON.stringify(health, null, 2));
        
    } catch (error) {
        console.error('❌ 操作失败:', error);
    } finally {
        // 10. 清理
        await manager.disconnect();
        console.log('✅ 存储管理器已断开');
    }
}

六、最佳实践

  • 使用快速、低延迟的数据库作为主数据库,例如 Redis、Postgres
  • 使用一个高度耐用、可能比较慢的数据库进行备份,例如 S3,SQlit,云数据库
  • 在生产环境中,考虑使用持久的 WAL,而不仅仅是内存中,以实现最大程度的持久性