在大规模用户名枚举场景中,Sherlock 项目需要并发查询数百个社交网络站点。当同一用户名在短时间内被多次查询,或者多个相似用户名存在重复站点命中时,冗余的网络请求会导致不必要的延迟和资源消耗。本文将详细阐述如何为 Sherlock 实现基于 Redis 的缓存层,实现跨 API 调用的结果去重,并给出可落地的工程参数与监控要点。
冗余请求的痛点分析
Sherlock 的核心逻辑是遍历 data.json 中定义的站点列表,对每个目标用户名发起 HTTP 请求并根据响应判断账号是否存在。在实际生产环境中,冗余请求主要来自三个维度:第一是同一用户名在短时间内的重复查询,例如批量脚本对同一用户名进行多次扫描;第二是不同用户名之间存在相同的站点命中结果,尤其当用户名具有相似前缀或后缀时;第三是查询过程中因网络超时导致的自动重试,这会产生大量重复的请求队列。
以查询目标用户名「john」为例,Sherlock 可能会向 Twitter、GitHub、Instagram 等站点发送请求。如果在十分钟后再次查询「john」,这些请求的结果大概率与前次相同,但在当前实现中每次都会完整执行网络请求。根据实际测试数据,400+ 站点的完整枚举耗时通常在 30 秒到 5 分钟之间,而其中 70% 以上的站点返回的是「未找到」的相同结果。如果能够缓存这些结果,可以将重复查询的响应时间从分钟级降低到毫秒级。
Redis 缓存架构设计
缓存层的架构需要兼顾一致性与性能。考虑到用户名枚举结果的特性,我们采用「查询键 + 站点键」的双层缓存结构,并引入 TTL(Time To Live)机制平衡数据新鲜度与缓存命中率。
缓存键设计
缓存键的设计是整个方案的核心。推荐采用以下命名规范:sherlock:result:{username}:{site_id},其中 username 为小写化处理以确保一致性,site_id 对应 data.json 中的站点标识符。例如,用户名「JohnDoe」在 GitHub 上的缓存键为 sherlock:result:johndoe:github。这种设计使得缓存查询可以精确到单个站点级别,支持细粒度的缓存失效与查询。
对于批量查询场景,还需要一个汇总索引键来记录某个用户名已查询过的站点列表:sherlock:index:{username}。该键存储一个包含所有已查询站点 ID 的 Set 数据结构,用于快速判断某个站点的结果是否已经存在于缓存中,从而跳过不必要的遍历。
数据结构选择
Redis 提供了多种数据结构,适用于不同的缓存场景。对于单站点结果,推荐使用 String 类型存储 JSON 序列化的查询结果,包含响应状态码、响应体摘要、发现时间戳等字段。值的内容示例如下:
{
"found": true,
"profile_url": "https://github.com/johndoe",
"response_time_ms": 234,
"checked_at": "2026-04-03T10:30:00Z"
}
对于汇总索引键,使用 Set 数据类型存储站点 ID 列表,支持 SISMEMBER 操作实现 O (1) 复杂度的站点存在性检查。当需要批量清除某个用户名的所有缓存时,直接删除对应的 Key 即可,Redis 的键空间事件通知机制还可以用于实现缓存失效的异步广播。
TTL 配置策略
TTL 的设置需要权衡数据新鲜度与缓存效率。对于社交网络账号枚举场景,账号的存在状态相对稳定,频繁变化的概率较低。推荐根据站点特性采用分层 TTL 策略:热门站点(如 GitHub、Twitter、LinkedIn)设置较短 TTL 为 1 小时,因为这些平台的用户活跃度高,账号状态变化较快;冷门站点设置较长 TTL 为 24 小时甚至 7 天,因为这些站点的账号状态几乎不会变化。
此外,引入「缓存击穿」保护机制:当某个缓存键过期后,第一个请求需要回源查询而不是直接返回空值,同时使用 Redis 的 SETNX 命令实现分布式锁,确保同一时刻只有一个请求执行源站查询。
核心实现代码
以下代码展示了如何在 Sherlock 的查询流程中集成 Redis 缓存层。实现采用装饰器模式,对原有查询函数进行无侵入式包装:
import redis
import json
import hashlib
from functools import wraps
from typing import Optional, Dict, Any
class SherlockCache:
def __init__(self, redis_url: str = "redis://localhost:6379/0"):
self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.default_ttl = 3600 # 1 hour default
self.hot_sites_ttl = 1800 # 30 minutes for popular sites
self.cold_sites_ttl = 604800 # 7 days for niche sites
# Popular sites with shorter TTL
self.hot_sites = {"github", "twitter", "instagram", "linkedin", "facebook"}
def _make_cache_key(self, username: str, site_id: str) -> str:
normalized_username = username.lower()
return f"sherlock:result:{normalized_username}:{site_id}"
def _make_index_key(self, username: str) -> str:
return f"sherlock:index:{username.lower()}"
def _get_ttl(self, site_id: str) -> int:
if site_id in self.hot_sites:
return self.hot_sites_ttl
return self.cold_sites_ttl
def get_cached_result(self, username: str, site_id: str) -> Optional[Dict[str, Any]]:
cache_key = self._make_cache_key(username, site_id)
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
return None
def set_cached_result(
self,
username: str,
site_id: str,
result: Dict[str, Any],
found: bool
) -> None:
cache_key = self._make_cache_key(username, site_id)
index_key = self._make_index_key(username)
# Store result with appropriate TTL
ttl = self._get_ttl(site_id)
self.redis_client.setex(cache_key, ttl, json.dumps(result))
# Add to index set
self.redis_client.sadd(index_key, site_id)
# Set index TTL to slightly longer than longest data TTL
self.redis_client.expire(index_key, self.cold_sites_ttl + 3600)
def is_site_cached(self, username: str, site_id: str) -> bool:
index_key = self._make_index_key(username)
return self.redis_client.sismember(index_key, site_id)
def clear_user_cache(self, username: str) -> None:
index_key = self._make_index_key(username)
site_ids = self.redis_client.smembers(index_key)
# Delete all result keys for this user
pipe = self.redis_client.pipeline()
for site_id in site_ids:
cache_key = self._make_cache_key(username, site_id)
pipe.delete(cache_key)
pipe.delete(index_key)
pipe.execute()
def with_cache(cache: SherlockCache):
def decorator(func):
@wraps(func)
async def wrapper(username: str, site_id: str, *args, **kwargs):
# Check cache first
cached_result = cache.get_cached_result(username, site_id)
if cached_result:
cached_result["cache_hit"] = True
return cached_result
# Execute original query
result = await func(username, site_id, *args, **kwargs)
# Store in cache
cache.set_cached_result(
username,
site_id,
result,
result.get("found", False)
)
result["cache_hit"] = False
return result
return wrapper
return decorator
上述实现包含了几个关键的工程化细节。第一是缓存键的小写标准化,确保「JohnDoe」和「johndoe」被视为同一查询目标。第二是分层 TTL 策略,热门站点使用更短的过期时间以保证数据新鲜度。第三是索引键的设计,通过 Set 数据结构维护用户名的查询历史,支持快速的存在性检查。
集成到 Sherlock 主流程
在现有代码中集成缓存层需要修改核心查询函数。以下代码展示了如何在异步查询流程中嵌入缓存检查逻辑:
# In sherlock_project/sherlock.py
class Sherlock:
def __init__(self, redis_url: str = None):
# Existing initialization
self.cache = SherlockCache(redis_url) if redis_url else None
self.enable_cache = redis_url is not None
async def check_username_site(self, username: str, site: dict):
site_id = site.get("name", "").lower()
# Check cache if enabled
if self.enable_cache and self.cache:
if self.cache.is_site_cached(username, site_id):
cached = self.cache.get_cached_result(username, site_id)
if cached:
return {**cached, "source": "cache"}
# Execute original query logic
result = await self._original_check(username, site)
# Store in cache
if self.enable_cache and self.cache and result:
self.cache.set_cached_result(
username,
site_id,
{"found": result.get("found", False), "url": result.get("url")},
result.get("found", False)
)
return result
通过这种方式,缓存层对原有代码的侵入最小化。启用缓存只需在初始化 Sherlock 实例时传入 Redis 连接字符串即可。
监控指标与调优参数
生产环境中需要关注以下核心监控指标以确保缓存层有效运行:
缓存命中率是最关键的指标,反映缓存层的实际效益。计算公式为 cache_hits / (cache_hits + cache_misses) * 100%。对于典型的批量查询场景,预期命中率应在 60% 到 80% 之间。如果命中率过低,可能是 TTL 设置过长或查询模式过于分散。可以通过 Redis 的 INFO 命令获取 keyspace_hits 和 keyspace_misses 计算精确值。
缓存延迟收益衡量通过缓存节约的响应时间。记录每次查询的 source 字段(cache 或 network),计算平均响应时间的差异。在生产环境中,缓存命中的平均响应时间应低于 5 毫秒,而网络请求的平均响应时间可能在数百毫秒到数秒之间。
内存使用量需要监控 Redis 实例的总内存占用。每个缓存条目约占用 200 到 500 字节(取决于结果数据量),对于 10000 个用户名、每个用户名平均查询 200 个站点的场景,内存占用约为 400MB 到 1GB。使用 Redis 的 MEMORY USAGE 命令可以精确估算单个键的内存占用。
推荐配置参数总结:对于中等规模的部署场景(每日处理 1000 到 10000 个用户名),建议 Redis 连接池大小设置为 20 到 50,热门站点 TTL 为 30 分钟,冷门站点 TTL 为 7 天,缓存键前缀采用 sherlock:result: 以便通过 SCAN 命令进行批量管理。
缓存失效策略与边界情况处理
除了 TTL 自动失效外,还需要考虑主动失效的场景。第一是当用户主动触发重新查询时(使用 --refresh 参数),需要调用 clear_user_cache 方法清除该用户名下的所有缓存条目。第二是当 data.json 更新、站点配置发生变化时,需要按站点维度清除缓存,可以使用 SCAN 命令配合模式匹配 sherlock:result::*:{site_id} 进行批量删除。
需要特别注意的边界情况包括:分布式环境下的缓存一致性(多个 Sher lock 实例可能同时写入缓存,建议使用 Redis 事务或 Lua 脚本保证原子性),以及缓存穿透风险(对于不存在的用户名,可以存储「未找到」的结果标记而非空值,避免重复查询无效目标)。
资料来源
本文参考了 Sherlock 项目官方 GitHub 仓库(https://github.com/sherlock-project/sherlock)以及 Redis 官方文档中关于数据结构与 TTL 配置的最佳实践。