全面解析语义缓存系统的设计实现、去重策略和智能存储管理技术,打造高性能LLM应用
随着大语言模型(LLM)的广泛应用,如何提高响应速度并降低API调用成本成为关键挑战。语义缓存(Semantic Caching)作为一种创新技术,通过理解并存储查询的语义含义,可以显著提升LLM应用的性能和经济性。
语义缓存是一种高级缓存技术,它不仅仅存储原始数据,还能理解数据背后的语义内容和用户意图。与传统缓存仅基于精确匹配工作不同,语义缓存能够识别语义上相似的查询,即使表达方式不同,也能返回相关的缓存结果。
语义缓存通过解释和存储用户查询的语义含义,使系统能够基于意图而非仅仅是字面匹配来检索信息。这种方法使数据交互更加细致入微,缓存提供的响应比传统缓存更相关,比典型的大型语言模型(LLM)响应更快。
特性 | 传统缓存 | 语义缓存 |
---|---|---|
匹配机制 | 精确字符串匹配 | 语义相似度匹配 |
缓存命中率 | 较低(需完全相同查询) | 较高(可识别相似查询) |
存储内容 | 原始数据 | 语义表示和原始数据 |
计算复杂度 | 低(简单哈希查找) | 中等(需向量相似度计算) |
适用场景 | 结构化、确定性查询 | 自然语言、变化性查询 |
语义缓存的核心在于将文本转换为数值向量(嵌入向量),这些向量能够在高维空间中表示文本的语义含义。相似含义的文本在向量空间中的距离较近,这为相似度计算提供了基础。
语义缓存系统使用余弦相似度、欧氏距离等算法计算查询向量与缓存内容的相似度。这些度量方式决定了何时可以复用已缓存的结果。
┌───────────────────────────────────────────────────────┐ │ 用户查询请求 │ └───────────────────────────────┬───────────────────────┘ ▼ ┌───────────────────────────────────────────────────────┐ │ 精确匹配缓存查找 │ │ (Redis / Memorystore / KV存储) │ └───────────────────────────────┬───────────────────────┘ │ ┌──────────────No────┴────Yes───────┐ ▼ ▼ ┌────────────────────────┐ ┌────────────────────────┐ │ 查询向量化处理 │ │ 返回缓存结果 │ │ (Embedding Model) │ └────────────────────────┘ └──────────┬─────────────┘ ▼ ┌────────────────────────┐ │ 相似查询向量检索 │ │ (Vector Database) │ └──────────┬─────────────┘ │ ┌──────────┴─────────────┐ │ 相似度评估与阈值判断 │ └──────────┬─────────────┘ │ ┌─────┴─────┐ ▼ ▼ ┌────────┐ ┌────────────────────────┐ │命中缓存│ │ 内容检索与LLM生成 │ └───┬────┘ └──────────┬─────────────┘ │ │ │ ▼ │ ┌────────────────────────┐ │ │ 更新缓存系统 │ │ └──────────┬─────────────┘ │ │ └────────┬─────────┘ ▼ ┌───────────────────────────────────────────────────────┐ │ 返回结果给用户 │ └───────────────────────────────────────────────────────┘
高效语义缓存系统由以下关键组件构成:
语义缓存系统中的数据流经过以下几个关键步骤:
精确去重是最基本的去重策略,通过计算查询字符串的哈希值实现快速匹配:
import hashlib
def generate_query_hash(query_text):
"""生成查询文本的MD5哈希值"""
return hashlib.md5(query_text.encode('utf-8')).hexdigest()
def exact_match_dedup(query, cache_store):
"""精确匹配去重"""
query_hash = generate_query_hash(query)
if query_hash in cache_store:
return cache_store[query_hash], True
return None, False
语义去重通过计算查询的向量表示,并在向量数据库中搜索相似查询实现:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
def semantic_match_dedup(query_vector, vector_db, threshold=0.95):
"""语义匹配去重"""
results = vector_db.search(query_vector, top_k=5)
if not results:
return None, False
top_match = results[0]
similarity_score = cosine_similarity([query_vector], [top_match.vector])[0][0]
if similarity_score >= threshold:
return top_match, True
return None, False
在实际系统中,通常采用多级去重策略,先检查精确匹配,再进行语义匹配,最大限度提高缓存命中率并保证响应质量:
def multi_level_dedup(query, standard_cache, vector_cache, embedding_model, threshold=0.95):
"""多级去重策略"""
# 步骤1:尝试精确匹配
query_hash = generate_query_hash(query)
if query_hash in standard_cache:
return standard_cache[query_hash], "exact_match"
# 步骤2:语义匹配
query_vector = embedding_model.embed(query)
top_match, match_found = semantic_match_dedup(
query_vector, vector_cache, threshold
)
if match_found:
# 使用语义匹配的结果从标准缓存中获取响应
match_hash = generate_query_hash(top_match.text)
if match_hash in standard_cache:
return standard_cache[match_hash], "semantic_match"
# 无匹配,需要生成新响应
return None, "no_match"
对于基于上下文的查询(如对话历史),需要将上下文信息纳入去重考量:
def context_aware_dedup(query, context, cache_system, embedding_model, threshold=0.95):
"""上下文感知去重"""
# 结合查询和上下文创建完整的输入
combined_input = context + "\n" + query
# 生成联合向量表示
combined_vector = embedding_model.embed(combined_input)
# 执行语义查找
return semantic_match_dedup(combined_vector, cache_system.vector_db, threshold)
当查询语义高度相似时,可以直接复用现有缓存的完整结果,最大限度减少LLM调用:
def full_result_reuse(query, cache_system, threshold=0.95):
"""完整结果复用策略"""
# 检查是否有语义匹配
match, found = cache_system.find_semantic_match(query, threshold)
if found:
# 直接返回匹配结果
return match.response, True
return None, False
某些情况下,可以复用部分缓存结果,然后使用LLM补充或调整:
def partial_result_reuse(query, cache_system, llm, threshold=0.85):
"""部分结果复用策略"""
# 查找相似但不完全匹配的结果
matches = cache_system.find_similar_matches(query, threshold)
if not matches:
return None, False
# 从历史响应中提取有用信息
relevant_info = extract_relevant_info(matches, query)
# 使用LLM生成补充内容
prompt = f"基于以下信息回答问题:{relevant_info}\n\n问题:{query}"
supplemented_response = llm.generate(prompt)
return supplemented_response, True
def extract_relevant_info(matches, query):
"""从匹配结果中提取与当前查询相关的信息"""
# 实现逻辑...
pass
在多个相似查询有缓存结果的情况下,可以融合多个结果生成更完整的响应:
def result_fusion(query, cache_system, llm, threshold=0.8):
"""结果融合策略"""
# 获取多个相关匹配
matches = cache_system.find_multiple_matches(query, threshold, top_k=3)
if not matches:
return None, False
# 收集所有相关响应
responses = [match.response for match in matches]
# 使用LLM融合多个响应
fusion_prompt = f"""
基于以下几个相关回答,生成一个完整的回答:
{'\n\n'.join([f'回答 {i+1}: {resp}' for i, resp in enumerate(responses)])}
问题:{query}
"""
fused_response = llm.generate(fusion_prompt)
return fused_response, True
当新信息可用时,可以基于现有缓存结果进行增量更新,避免完全重新生成:
def incremental_update(query, cached_response, new_info, llm):
"""基于新信息增量更新缓存结果"""
update_prompt = f"""
原始回答:
{cached_response}
考虑以下新信息:
{new_info}
请更新原始回答,确保包含新信息并保持一致性。问题是:{query}
"""
updated_response = llm.generate(update_prompt)
return updated_response
高效语义缓存系统通常采用多级缓存架构,结合内存缓存和持久化存储:
class MultiLevelCacheSystem:
def __init__(self, memory_cache_size=1000, disk_cache_path="./cache"):
# 内存缓存 (快速访问)
self.memory_cache = LRUCache(capacity=memory_cache_size)
# 向量数据库 (语义搜索)
self.vector_db = VectorDatabase()
# 持久化存储 (长期存储)
self.disk_store = DiskStore(path=disk_cache_path)
def get(self, query):
"""获取缓存结果"""
# 首先检查内存缓存
query_hash = generate_query_hash(query)
if query_hash in self.memory_cache:
return self.memory_cache[query_hash], "memory_hit"
# 检查向量数据库
query_vector = self.embed(query)
match, found = self.vector_db.search(query_vector)
if found:
# 将结果加入内存缓存
match_hash = generate_query_hash(match.query)
result = self.disk_store.get(match_hash)
self.memory_cache[query_hash] = result
return result, "vector_hit"
# 缓存未命中
return None, "miss"
针对语义缓存的特性,可以实现多种高级缓存淘汰策略:
class SemanticCacheManager:
def __init__(self, capacity=10000):
self.capacity = capacity
self.cache = {} # 哈希到响应的映射
self.vector_store = {} # 哈希到向量的映射
self.access_count = {} # 访问计数
self.last_access = {} # 最近访问时间
self.creation_time = {} # 创建时间
def evict_if_needed(self):
"""如果缓存超出容量,执行淘汰"""
if len(self.cache) <= self.capacity:
return
# 可选淘汰策略:
# 1. LRU (最近最少使用)
# 2. LFU (最不经常使用)
# 3. FIFO (先进先出)
# 4. 语义聚类淘汰 (保留每个语义簇的代表)
# 5. 混合策略
# 实现 LFU + TTL 淘汰
current_time = time.time()
items_to_score = []
for key in self.cache.keys():
# 计算综合评分 (访问频率和年龄)
age = current_time - self.creation_time[key]
frequency = self.access_count[key]
recency = current_time - self.last_access.get(key, current_time)
# 计算淘汰分数 (越低越可能被淘汰)
score = frequency / (1 + 0.01 * age + 0.1 * recency)
items_to_score.append((key, score))
# 按分数排序
items_to_score.sort(key=lambda x: x[1])
# 淘汰分数最低的项目
items_to_evict = items_to_score[:len(self.cache) - self.capacity]
for key, _ in items_to_evict:
self._remove_item(key)
def _remove_item(self, key):
"""从缓存中移除项目"""
del self.cache[key]
del self.vector_store[key]
del self.access_count[key]
if key in self.last_access:
del self.last_access[key]
del self.creation_time[key]
基于访问频率和重要性自适应调整缓存条目的生存时间:
def adaptive_ttl_management(cache_system, max_ttl=604800, min_ttl=3600):
"""根据访问频率和重要性自适应调整TTL"""
for key, item in cache_system.items():
# 基础TTL
base_ttl = min_ttl
# 根据访问频率增加TTL
access_count = cache_system.access_count.get(key, 0)
frequency_factor = min(access_count / 10, 5) # 最多增加5倍
# 根据重要性增加TTL (由缓存项目元数据定义)
importance = item.get('importance', 1.0) # 默认重要性为1
# 计算最终TTL
final_ttl = min(
base_ttl * frequency_factor * importance,
max_ttl
)
# 更新TTL
cache_system.set_ttl(key, int(final_ttl))
在大规模系统中,可以实现分布式语义缓存,提高吞吐量和可用性:
class DistributedSemanticCache:
def __init__(self, redis_config, vector_db_config):
# 使用Redis作为分布式KV存储
self.redis = Redis(**redis_config)
# 使用分布式向量数据库
self.vector_db = VectorDB(**vector_db_config)
def get(self, query, threshold=0.95):
"""分布式查询处理"""
# 生成查询哈希
query_hash = generate_query_hash(query)
# 尝试精确匹配
if self.redis.exists(query_hash):
return self.redis.get(query_hash), "exact_match"
# 生成嵌入向量
query_vector = self.embed_query(query)
# 执行分布式向量搜索
results = self.vector_db.search(
collection="queries",
query_vector=query_vector,
limit=5
)
if results and results[0].score >= threshold:
top_match = results[0]
match_hash = top_match.id
# 从Redis获取匹配结果
if self.redis.exists(f"response:{match_hash}"):
return self.redis.get(f"response:{match_hash}"), "semantic_match"
return None, "miss"
嵌入模型的选择对语义缓存的性能至关重要,需要平衡精度和效率:
模型 | 维度 | 优势 | 劣势 | 适用场景 |
---|---|---|---|---|
OpenAI text-embedding-3-large | 3072 | 高精度,语义理解能力强 | 计算成本高,向量大 | 需要高精度语义匹配的场景 |
OpenAI text-embedding-3-small | 1536 | 平衡精度和效率 | 相对较大的向量大小 | 一般用途语义缓存 |
ONNX模型 (如GPTCache/paraphrase-albert-onnx) | 768 | 本地运行,无API依赖 | 精度相对较低 | 离线环境或注重隐私的场景 |
SentenceTransformers | 384-768 | 开源,灵活部署 | 需要自行维护基础设施 | 成本敏感应用 |
不同向量数据库的特性对语义缓存的性能和可扩展性有显著影响:
# 使用FAISS实现简单向量存储
import faiss
import numpy as np
class FAISSVectorStore:
def __init__(self, dimension=768):
# 初始化FAISS索引
self.dimension = dimension
self.index = faiss.IndexFlatL2(dimension) # L2距离
self.queries = []
self.responses = []
def add(self, query, query_vector, response):
"""添加新向量到索引"""
if len(query_vector) != self.dimension:
raise ValueError(f"向量维度应为{self.dimension},实际为{len(query_vector)}")
# 添加到FAISS索引
vector_np = np.array([query_vector]).astype('float32')
self.index.add(vector_np)
# 存储原始查询和响应
self.queries.append(query)
self.responses.append(response)
return len(self.queries) - 1 # 返回向量ID
def search(self, query_vector, top_k=5):
"""搜索最相似的向量"""
vector_np = np.array([query_vector]).astype('float32')
distances, indices = self.index.search(vector_np, top_k)
results = []
for i in range(len(indices[0])):
idx = indices[0][i]
distance = distances[0][i]
# 距离转换为相似度分数 (1 - 归一化距离)
max_distance = float(self.dimension) # 理论最大L2距离
similarity = 1.0 - (distance / max_distance)
if idx >= 0 and idx < len(self.queries):
results.append({
'id': idx,
'query': self.queries[idx],
'response': self.responses[idx],
'similarity': similarity
})
return results
将标准键值缓存与向量语义搜索结合,创建完整的语义缓存系统:
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
@dataclass
class CacheItem:
"""缓存项目"""
query: str
response: Any
query_vector: List[float]
created_at: float
access_count: int = 0
last_accessed: Optional[float] = None
class SemanticCache:
def __init__(
self,
embedding_model,
vector_store,
similarity_threshold=0.95,
capacity=10000
):
self.embedding_model = embedding_model
self.vector_store = vector_store
self.similarity_threshold = similarity_threshold
self.capacity = capacity
# 标准缓存 (哈希到CacheItem的映射)
self.kv_cache: Dict[str, CacheItem] = {}
def get(self, query: str) -> Tuple[Optional[Any], str]:
"""获取缓存结果,返回(结果, 缓存类型)"""
query_hash = self._hash_query(query)
# 尝试精确匹配
if query_hash in self.kv_cache:
item = self.kv_cache[query_hash]
self._update_stats(query_hash)
return item.response, "exact_match"
# 尝试语义匹配
query_vector = self.embedding_model.embed(query)
results = self.vector_store.search(query_vector)
if results and results[0]['similarity'] >= self.similarity_threshold:
top_match = results[0]
top_query = top_match['query']
top_hash = self._hash_query(top_query)
if top_hash in self.kv_cache:
item = self.kv_cache[top_hash]
self._update_stats(top_hash)
# 同时为当前查询创建一个精确匹配项
self.put(query, item.response, query_vector)
return item.response, "semantic_match"
return None, "miss"
def put(self, query: str, response: Any, query_vector=None) -> None:
"""添加项目到缓存"""
# 检查并执行淘汰策略
if len(self.kv_cache) >= self.capacity:
self._evict()
query_hash = self._hash_query(query)
# 如果没有提供查询向量,则生成
if query_vector is None:
query_vector = self.embedding_model.embed(query)
# 创建缓存项
item = CacheItem(
query=query,
response=response,
query_vector=query_vector,
created_at=time.time()
)
# 添加到KV缓存
self.kv_cache[query_hash] = item
# 添加到向量存储
self.vector_store.add(query, query_vector, response)
def _hash_query(self, query: str) -> str:
"""生成查询哈希值"""
import hashlib
return hashlib.md5(query.encode('utf-8')).hexdigest()
def _update_stats(self, query_hash: str) -> None:
"""更新缓存项统计信息"""
if query_hash in self.kv_cache:
item = self.kv_cache[query_hash]
item.access_count += 1
item.last_accessed = time.time()
def _evict(self) -> None:
"""执行缓存淘汰"""
# 实现淘汰策略,如LRU、LFU或混合策略
if not self.kv_cache:
return
# 简单LFU实现
min_count = float('inf')
min_hash = None
for query_hash, item in self.kv_cache.items():
if item.access_count < min_count:
min_count = item.access_count
min_hash = query_hash
if min_hash:
del self.kv_cache[min_hash]
整合所有组件,创建一个完整的语义缓存系统:
from typing import Any, Dict, List, Optional, Tuple
import time
import hashlib
class SemanticCacheSystem:
def __init__(
self,
embedding_model,
llm_provider,
vector_store=None,
redis_client=None,
similarity_threshold=0.92,
ttl=86400 # 默认一天
):
self.embedding_model = embedding_model
self.llm = llm_provider
# 初始化向量存储
self.vector_store = vector_store or FAISSVectorStore(
dimension=embedding_model.dimension
)
# 初始化Redis客户端 (可选)
self.redis = redis_client
self.similarity_threshold = similarity_threshold
self.default_ttl = ttl
def process_query(self, query: str, context: str = "") -> Dict[str, Any]:
"""处理用户查询,返回回答和元数据"""
start_time = time.time()
# 结合上下文和查询
combined_input = context + "\n" + query if context else query
# 尝试从缓存获取
result, cache_type = self.get_from_cache(combined_input)
if result:
# 缓存命中
processing_time = time.time() - start_time
return {
"query": query,
"response": result,
"cache_hit": True,
"cache_type": cache_type,
"processing_time": processing_time
}
# 缓存未命中,调用LLM
llm_start_time = time.time()
response = self.llm.generate(combined_input)
llm_time = time.time() - llm_start_time
# 添加到缓存
self.add_to_cache(combined_input, response)
processing_time = time.time() - start_time
return {
"query": query,
"response": response,
"cache_hit": False,
"llm_time": llm_time,
"processing_time": processing_time
}
def get_from_cache(self, query: str) -> Tuple[Optional[str], str]:
"""从缓存获取结果"""
# 生成查询哈希
query_hash = hashlib.md5(query.encode('utf-8')).hexdigest()
# 步骤1: 检查Redis (如果可用)
if self.redis:
result = self.redis.get(f"cache:{query_hash}")
if result:
return result.decode('utf-8'), "redis_exact"
# 步骤2: 生成嵌入向量
query_vector = self.embedding_model.embed(query)
# 步骤3: 向量搜索
results = self.vector_store.search(query_vector)
if results and results[0]['similarity'] >= self.similarity_threshold:
top_result = results[0]
# 如果使用Redis, 将结果存入Redis以加速后续访问
if self.redis:
self.redis.setex(
f"cache:{query_hash}",
self.default_ttl,
top_result['response']
)
return top_result['response'], "vector"
return None, ""
def add_to_cache(self, query: str, response: str) -> None:
"""添加结果到缓存"""
# 生成查询哈希
query_hash = hashlib.md5(query.encode('utf-8')).hexdigest()
# 生成查询向量
query_vector = self.embedding_model.embed(query)
# 添加到向量存储
self.vector_store.add(query, query_vector, response)
# 如果使用Redis, 同时添加到Redis
if self.redis:
self.redis.setex(
f"cache:{query_hash}",
self.default_ttl,
response
)
语义缓存系统能显著提升响应速度,尤其是在处理相似查询时:
处理方式 | 平均响应时间 | 改善比例 |
---|---|---|
直接LLM调用 | 6504 ms | 基准 |
语义缓存匹配 | 1919 ms | 3.4x 提速 |
精确缓存匹配 | 53 ms | 123x 提速 |
语义缓存显著降低LLM API调用成本:
语义缓存能显著提高系统整体吞吐量,尤其是在处理大量重复或相似查询时:
语义缓存系统的效果很大程度上取决于缓存命中率,以下是影响因素:
相似度阈值是语义缓存系统的关键参数,需要根据具体应用场景进行调优:
def optimize_similarity_threshold(
test_queries,
ground_truth,
cache_system,
start_threshold=0.5,
end_threshold=1.0,
step=0.05
):
"""优化相似度阈值"""
best_threshold = start_threshold
best_f1 = 0.0
results = []
thresholds = [round(start_threshold + i * step, 2)
for i in range(int((end_threshold - start_threshold) / step) + 1)]
for threshold in thresholds:
cache_system.similarity_threshold = threshold
true_positives = 0
false_positives = 0
false_negatives = 0
for query, expected in zip(test_queries, ground_truth):
result, cache_type = cache_system.get_from_cache(query)
if result and expected == result:
true_positives += 1
elif result and expected != result:
false_positives += 1
elif not result and expected:
false_negatives += 1
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
results.append({
'threshold': threshold,
'precision': precision,
'recall': recall,
'f1': f1
})
if f1 > best_f1:
best_f1 = f1
best_threshold = threshold
return best_threshold, results
缓存预热可以提高系统初始阶段的性能,特别是对于可预测的查询模式:
def preload_cache_from_common_queries(cache_system, query_dataset, llm):
"""使用常见查询预热缓存"""
for query in query_dataset:
if not cache_system.get_from_cache(query)[0]:
# 缓存未命中,使用LLM生成回答
response = llm.generate(query)
cache_system.add_to_cache(query, response)
def generate_query_variations(base_queries, llm, variations_per_query=3):
"""生成查询变体以扩充缓存"""
variations = []
for query in base_queries:
prompt = f"""
请为以下问题生成{variations_per_query}个语义相同但表述不同的变体:
问题: {query}
仅输出变体问题,每行一个,不要有额外说明。
"""
response = llm.generate(prompt)
query_variations = response.strip().split('\n')[:variations_per_query]
variations.extend(query_variations)
return variations
动态调整缓存参数以适应不同的负载和查询模式:
class AdaptiveSemanticCache:
def __init__(self, base_threshold=0.92, min_threshold=0.85, max_threshold=0.98):
self.base_threshold = base_threshold
self.current_threshold = base_threshold
self.min_threshold = min_threshold
self.max_threshold = max_threshold
self.hits = 0
self.misses = 0
self.total_queries = 0
self.adjustment_interval = 100 # 每处理100个查询调整一次
def adjust_threshold(self):
"""根据命中率动态调整阈值"""
if self.total_queries < self.adjustment_interval:
return
hit_rate = self.hits / self.total_queries if self.total_queries > 0 else 0
# 根据命中率调整阈值
if hit_rate < 0.2: # 命中率过低
# 降低阈值以增加命中率
self.current_threshold = max(self.current_threshold - 0.02, self.min_threshold)
elif hit_rate > 0.8: # 命中率过高
# 提高阈值以增加精度
self.current_threshold = min(self.current_threshold + 0.01, self.max_threshold)
# 重置计数
self.hits = 0
self.misses = 0
self.total_queries = 0
在处理包含特定实体(如日期、数字、名称)的查询时,可以实现实体感知缓存:
import spacy
class EntityAwareCache:
def __init__(self, embedding_model, vector_store, similarity_threshold=0.95):
self.embedding_model = embedding_model
self.vector_store = vector_store
self.similarity_threshold = similarity_threshold
# 加载NER模型
self.nlp = spacy.load("zh_core_web_sm")
def get_from_cache(self, query):
"""实体感知的缓存查找"""
# 提取查询中的实体
doc = self.nlp(query)
query_entities = self._extract_entities(doc)
# 生成查询向量
query_vector = self.embedding_model.embed(query)
# 查找相似向量
results = self.vector_store.search(query_vector)
if not results:
return None, "miss"
top_match = results[0]
top_similarity = top_match['similarity']
if top_similarity >= self.similarity_threshold:
# 检查实体一致性
top_query = top_match['query']
top_doc = self.nlp(top_query)
top_entities = self._extract_entities(top_doc)
# 检查关键实体类型是否一致
entities_compatible = self._check_entities_compatibility(query_entities, top_entities)
if entities_compatible:
return top_match['response'], "semantic_match"
return None, "miss"
def _extract_entities(self, doc):
"""提取文本中的实体"""
entities = {}
for ent in doc.ents:
if ent.label_ not in entities:
entities[ent.label_] = []
entities[ent.label_].append(ent.text)
return entities
def _check_entities_compatibility(self, entities1, entities2):
"""检查两组实体是否兼容"""
critical_types = {"DATE", "MONEY", "CARDINAL", "PERCENT", "TIME"}
for entity_type in critical_types:
# 如果两个查询都包含此类型的实体,需要至少有一个匹配
if entity_type in entities1 and entity_type in entities2:
if not set(entities1[entity_type]) & set(entities2[entity_type]):
return False
return True
扩展缓存系统以支持多模态输入(如文本+图像):
class MultiModalCache:
def __init__(
self,
text_embedding_model,
image_embedding_model,
fusion_strategy="concat"
):
self.text_embedding_model = text_embedding_model
self.image_embedding_model = image_embedding_model
self.fusion_strategy = fusion_strategy
# 确定融合后的向量维度
self.dimension = self._get_fusion_dimension()
# 初始化向量存储
self.vector_store = FAISSVectorStore(dimension=self.dimension)
# 哈希到响应的映射
self.responses = {}
def _get_fusion_dimension(self):
"""根据融合策略确定最终向量维度"""
text_dim = self.text_embedding_model.dimension
image_dim = self.image_embedding_model.dimension
if self.fusion_strategy == "concat":
return text_dim + image_dim
elif self.fusion_strategy == "average":
return max(text_dim, image_dim)
else:
raise ValueError(f"不支持的融合策略: {self.fusion_strategy}")
def _fuse_embeddings(self, text_embedding, image_embedding):
"""融合文本和图像嵌入向量"""
if self.fusion_strategy == "concat":
# 简单拼接
return text_embedding + image_embedding
elif self.fusion_strategy == "average":
# 如果维度不同,需要先调整
# 这里简化处理,假设维度相同
import numpy as np
return list((np.array(text_embedding) + np.array(image_embedding)) / 2)
以下是一个简单的语义缓存系统演示,展示了从初始化到处理查询的完整流程:
import os
import time
import hashlib
import numpy as np
import json
from typing import Dict, List, Any, Optional, Tuple
# 模拟嵌入模型
class SimpleEmbeddingModel:
def __init__(self, dimension=768):
self.dimension = dimension
def embed(self, text):
"""简单模拟,实际应使用真实嵌入模型"""
# 注意:这只是演示用,不是真实的嵌入
hash_obj = hashlib.md5(text.encode('utf-8'))
hash_bytes = hash_obj.digest()
# 使用哈希生成伪随机向量
np.random.seed(int.from_bytes(hash_bytes[:4], byteorder='little'))
vector = np.random.random(self.dimension).tolist()
return vector
# 模拟LLM服务
class MockLLMService:
def __init__(self, response_time=2.0):
self.response_time = response_time
self.qa_pairs = {
"什么是语义缓存?": "语义缓存是一种先进的缓存技术,它根据查询的语义含义而非精确匹配来存储和检索数据。",
"如何实现语义缓存?": "实现语义缓存需要嵌入模型、向量数据库、相似度计算和缓存管理等组件。",
"语义缓存有什么优势?": "语义缓存可以提高响应速度,降低API调用成本,增加系统吞吐量。"
}
def generate(self, query):
"""模拟LLM生成回答"""
# 模拟处理延迟
time.sleep(self.response_time)
# 检查是否有预定义答案
for q, a in self.qa_pairs.items():
if self._similarity(query, q) > 0.8:
return a
# 默认回答
return f"针对您的问题「{query}」,我需要进一步思考..."
def _similarity(self, a, b):
"""简单字符串相似度"""
common = set(a) & set(b)
return len(common) / len(set(a) | set(b))
# 简单向量存储
class SimpleVectorStore:
def __init__(self):
self.vectors = []
self.data = []
def add(self, query, vector, response):
"""添加向量和数据"""
self.vectors.append(vector)
self.data.append({
"query": query,
"response": response
})
return len(self.vectors) - 1
def search(self, query_vector, top_k=5):
"""搜索最相似的向量"""
if not self.vectors:
return []
# 计算余弦相似度
similarities = []
for idx, vector in enumerate(self.vectors):
sim = self._cosine_similarity(query_vector, vector)
similarities.append((idx, sim))
# 按相似度排序
similarities.sort(key=lambda x: x[1], reverse=True)
# 返回前K个结果
results = []
for idx, sim in similarities[:top_k]:
item = {**self.data[idx], "similarity": sim}
results.append(item)
return results
def _cosine_similarity(self, vec1, vec2):
"""计算余弦相似度"""
dot = sum(a * b for a, b in zip(vec1, vec2))
norm1 = sum(a * a for a in vec1) ** 0.5
norm2 = sum(b * b for b in vec2) ** 0.5
return dot / (norm1 * norm2) if norm1 * norm2 > 0 else 0
# 演示语义缓存系统
class DemoSemanticCache:
def __init__(self, similarity_threshold=0.9):
self.embedding_model = SimpleEmbeddingModel()
self.llm = MockLLMService()
self.vector_store = SimpleVectorStore()
self.similarity_threshold = similarity_threshold
self.cache = {} # 哈希到响应的映射
def query(self, question):
"""处理用户查询"""
start_time = time.time()
# 步骤1: 生成查询哈希,尝试精确匹配
query_hash = hashlib.md5(question.encode('utf-8')).hexdigest()
if query_hash in self.cache:
# 精确匹配命中
exact_time = time.time() - start_time
return {
"question": question,
"answer": self.cache[query_hash],
"source": "exact_cache",
"time": exact_time
}
# 步骤2: 生成查询向量
vector_start = time.time()
query_vector = self.embedding_model.embed(question)
vector_time = time.time() - vector_start
# 步骤3: 搜索相似问题
search_start = time.time()
results = self.vector_store.search(query_vector)
search_time = time.time() - search_start
if results and results[0]["similarity"] >= self.similarity_threshold:
# 语义缓存命中
top_result = results[0]
answer = top_result["response"]
# 将当前问题添加到精确缓存
self.cache[query_hash] = answer
semantic_time = time.time() - start_time
return {
"question": question,
"answer": answer,
"source": "semantic_cache",
"similarity": top_result["similarity"],
"matched_question": top_result["query"],
"time": semantic_time
}
# 步骤4: 缓存未命中,调用LLM
llm_start = time.time()
answer = self.llm.generate(question)
llm_time = time.time() - llm_start
# 步骤5: 将问答对添加到缓存
self.cache[query_hash] = answer
self.vector_store.add(question, query_vector, answer)
total_time = time.time() - start_time
return {
"question": question,
"answer": answer,
"source": "llm",
"time": total_time,
"llm_time": llm_time,
"vector_time": vector_time,
"search_time": search_time
}
# 演示使用
def main():
print("初始化语义缓存系统...")
cache_system = DemoSemanticCache(similarity_threshold=0.85)
# 测试问题集
questions = [
"什么是语义缓存?",
"语义缓存是什么?",
"语义缓存的工作原理是什么?",
"如何实现语义缓存系统?",
"语义缓存系统有哪些优点?",
"使用语义缓存有什么好处?"
]
for i, question in enumerate(questions):
print(f"\n[查询 {i+1}] {question}")
result = cache_system.query(question)
print(f"回答: {result['answer']}")
print(f"来源: {result['source']}")
if result['source'] == "semantic_cache":
print(f"匹配问题: {result['matched_question']}")
print(f"相似度: {result['similarity']:.4f}")
print(f"处理时间: {result['time']:.4f} 秒")
# 显示分隔线
print("-" * 50)
if __name__ == "__main__":
main()
以下是一个具体语义缓存系统的性能数据:
相似度阈值 | 命中率 | 准确率 | F1分数 |
---|---|---|---|
0.80 | 78.5% | 87.2% | 82.6% |
0.85 | 69.3% | 92.8% | 79.4% |
0.90 | 58.7% | 96.5% | 72.9% |
0.95 | 42.1% | 99.1% | 59.2% |
0.98 | 23.8% | 99.8% | 38.4% |
在实施语义缓存系统时,建议遵循以下步骤:
语义缓存技术的未来发展可能包括以下方向:
通过本文介绍的原理和技术,您现在已经具备了设计和实现高效语义缓存系统的基础知识和实践指导。希望这些内容能够帮助您构建性能更优、成本更低的LLM应用。