cache_service.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. """
  2. 缓存服务
  3. 实现TTL缓存机制,提升数据访问性能。
  4. """
  5. import hashlib
  6. import json
  7. import time
  8. from typing import Any, Optional
  9. from threading import Lock
  10. def make_cache_key(namespace: str, **params) -> str:
  11. """
  12. 生成结构化缓存 key
  13. 通过对参数排序和哈希,确保相同参数组合总是生成相同的 key。
  14. Args:
  15. namespace: 缓存命名空间,如 "latest_news", "trending_topics"
  16. **params: 缓存参数
  17. Returns:
  18. 格式化的缓存 key,如 "latest_news:a1b2c3d4"
  19. Examples:
  20. >>> make_cache_key("latest_news", platforms=["zhihu"], limit=50)
  21. 'latest_news:8f14e45f'
  22. >>> make_cache_key("search", query="AI", mode="keyword")
  23. 'search:3c6e0b8a'
  24. """
  25. if not params:
  26. return namespace
  27. # 对参数进行规范化处理
  28. normalized_params = {}
  29. for k, v in params.items():
  30. if v is None:
  31. continue # 跳过 None 值
  32. elif isinstance(v, (list, tuple)):
  33. # 列表排序后转为字符串
  34. normalized_params[k] = json.dumps(sorted(v) if all(isinstance(i, str) for i in v) else list(v), ensure_ascii=False)
  35. elif isinstance(v, dict):
  36. # 字典按键排序后转为字符串
  37. normalized_params[k] = json.dumps(v, sort_keys=True, ensure_ascii=False)
  38. else:
  39. normalized_params[k] = str(v)
  40. # 排序参数并生成哈希
  41. sorted_params = sorted(normalized_params.items())
  42. param_str = "&".join(f"{k}={v}" for k, v in sorted_params)
  43. # 使用 MD5 生成短哈希(取前8位)
  44. hash_value = hashlib.md5(param_str.encode('utf-8')).hexdigest()[:8]
  45. return f"{namespace}:{hash_value}"
  46. class CacheService:
  47. """缓存服务类"""
  48. def __init__(self):
  49. """初始化缓存服务"""
  50. self._cache = {}
  51. self._timestamps = {}
  52. self._lock = Lock()
  53. def get(self, key: str, ttl: int = 900) -> Optional[Any]:
  54. """
  55. 获取缓存数据
  56. Args:
  57. key: 缓存键
  58. ttl: 存活时间(秒),默认15分钟
  59. Returns:
  60. 缓存的值,如果不存在或已过期则返回None
  61. """
  62. with self._lock:
  63. if key in self._cache:
  64. # 检查是否过期
  65. if time.time() - self._timestamps[key] < ttl:
  66. return self._cache[key]
  67. else:
  68. # 已过期,删除缓存
  69. del self._cache[key]
  70. del self._timestamps[key]
  71. return None
  72. def set(self, key: str, value: Any) -> None:
  73. """
  74. 设置缓存数据
  75. Args:
  76. key: 缓存键
  77. value: 缓存值
  78. """
  79. with self._lock:
  80. self._cache[key] = value
  81. self._timestamps[key] = time.time()
  82. def delete(self, key: str) -> bool:
  83. """
  84. 删除缓存
  85. Args:
  86. key: 缓存键
  87. Returns:
  88. 是否成功删除
  89. """
  90. with self._lock:
  91. if key in self._cache:
  92. del self._cache[key]
  93. del self._timestamps[key]
  94. return True
  95. return False
  96. def clear(self) -> None:
  97. """清空所有缓存"""
  98. with self._lock:
  99. self._cache.clear()
  100. self._timestamps.clear()
  101. def cleanup_expired(self, ttl: int = 900) -> int:
  102. """
  103. 清理过期缓存
  104. Args:
  105. ttl: 存活时间(秒)
  106. Returns:
  107. 清理的条目数量
  108. """
  109. with self._lock:
  110. current_time = time.time()
  111. expired_keys = [
  112. key for key, timestamp in self._timestamps.items()
  113. if current_time - timestamp >= ttl
  114. ]
  115. for key in expired_keys:
  116. del self._cache[key]
  117. del self._timestamps[key]
  118. return len(expired_keys)
  119. def get_stats(self) -> dict:
  120. """
  121. 获取缓存统计信息
  122. Returns:
  123. 统计信息字典
  124. """
  125. with self._lock:
  126. return {
  127. "total_entries": len(self._cache),
  128. "oldest_entry_age": (
  129. time.time() - min(self._timestamps.values())
  130. if self._timestamps else 0
  131. ),
  132. "newest_entry_age": (
  133. time.time() - max(self._timestamps.values())
  134. if self._timestamps else 0
  135. )
  136. }
  137. # 全局缓存实例
  138. _global_cache = None
  139. def get_cache() -> CacheService:
  140. """
  141. 获取全局缓存实例
  142. Returns:
  143. 全局缓存服务实例
  144. """
  145. global _global_cache
  146. if _global_cache is None:
  147. _global_cache = CacheService()
  148. return _global_cache