之前做了个批量下载好看视频的小工具,但是很快啊,就不能用了,原因是服务端采用了新的IP访问限制策略,在短时间内频繁请求就会被临时封禁IP,虽然只会封几个小时,但这个工具显然是不能用了。
后来我尝试了使用免费代理,随机使用代理请求,但是效果不佳,一来代理请求速度慢,二来我这个程序的下载部分还依赖于you-get,还要设置you-get的代理,相当麻烦。
再后来我尝试降低调用频率,每次请求后延迟3秒进行下次请求,程序倒是可以正常运行了,但显然每次重新查询都要等很久。
于是今天给这个程序加了一个简单的缓存机制,缓存本身使用的是最简单的文件缓存,为了扩展性考虑使用策略模式,设置了抽象基类,如果想的话可以扩展成其它形式的缓存。
显然这种通用功能在pypi或者Github上会有大量的现成模块可以使用,但是因为实现起来相当简单,不费多大事,以及我目前也并不需要太强大的功能,所以就自己实现了。
缓存的抽象基类:
from abc import ABC, abstractmethod
from typing import Any
class CacheInterface(ABC):
def addCachedData(self, key: str, value: Any) -> None:
"""缓存数据
key: 索引
value: 被缓存的数据
"""
pass
def getCachedData(self, key, expired: float = 1800) -> Any:
"""获取缓存数据
key: 索引
expired: 过期时间,单位秒,默认半小时
"""
pass
def save(self)->None:
"""将缓存数据持久化保存"""
pass
实现的一个文件缓存:
from typing import Any
from .cache_interface import CacheInterface
from ..config import config
import time
import os
import json
class FileCache(CacheInterface):
"""文件缓存"""
def __init__(self) -> None:
super().__init__()
self._cachedData: dict = {} # 加载到内存中的缓存数据
self._fileName: str = "cached.tmp" # 缓存文件名称
self._fileDir: str = "{}{}tmp".format(
config["sysHome"], config["path"]) # 缓存文件目录
self._filePath: str = "{}{}{}".format(
self._fileDir, config["path"], self._fileName) # 缓存文件保存路径
# 从缓存文件加载缓存数据到内存
self._loadCachedDataFromFile()
def addCachedData(self, key: str, value: Any) -> None:
currentTime = time.time()
self._cachedData[key] = {"time": currentTime, "data": value}
def getCachedData(self, key, expired: float = 1800) -> Any:
if key not in self._cachedData:
return None
cachedData = self._cachedData[key]
cachedTime = cachedData["time"]
if(time.time() - cachedTime > expired):
# 被缓存的数据已经过期
self._cachedData.pop(key)
return None
else:
# 没有过期
return cachedData["data"]
def save(self) -> None:
self._saveCachedDataToFile()
def _loadCachedDataFromFile(self):
if not os.path.exists(self._filePath):
# 缓存文件不存在,不加载
pass
else:
# 从缓存文件加载到内存
with open(file=self._filePath, mode='r', encoding='UTF-8') as fopen:
text = fopen.read()
if len(text) == 0:
pass
else:
self._cachedData = json.loads(text)
def _saveCachedDataToFile(self):
if not os.path.exists(self._fileDir):
os.mkdir(self._fileDir)
fopen = open(file=self._filePath, mode='w', encoding='UTF-8')
print(json.dumps(self._cachedData), file=fopen)
fopen.close()
缓存管理类,充当缓存的工厂:
from .cache_interface import CacheInterface
from .file_cache import FileCache
class CacheManager:
"""缓存管理"""
cache: CacheInterface = None
def getCache(cls) -> CacheInterface:
"""获取当前系统使用的缓存模块"""
if (cls.cache is None):
cls.cache = FileCache()
return cls.cache
如果你想在应用中灵活地选择使用或者不使用缓存,可以创建一个空的缓存:
from typing import Any
from .cache_interface import CacheInterface
class NoneCache(CacheInterface):
def addCachedData(self, key: str, value: Any) -> None:
pass
def getCachedData(self, key, expired: float) -> Any:
return None
def save(self) -> None:
pass
并且在缓存管理类中依据相应的配置或者参数返回一个空缓存的实例即可,这样可以避免在所有使用缓存的地方使用if/else
进行判断。
具体的缓存使用示例如下:
def getAuthorNameFromWeb(self):
# 优先从缓存获取数据
cachedKey = "getAuthorNameFromWeb.{}".format(self.authorId)
cache = CacheManager.getCache()
cachedData = cache.getCachedData(cachedKey, self.__class__.TEN_HOURS)
if cachedData is not None:
return cachedData
url = "https://haokan.hao123.com/author/"+self.authorId
resp = self.tryRequestGet(url)
# 中文页面有时候需要指定页面编码,否则会乱码
resp.encoding = 'UTF-8'
authorPageHtml = resp.text
authorPageDom = BeautifulSoup(authorPageHtml, features="html5lib")
# 获取视频作者名字
authorNameTag = authorPageDom.find("h1", class_="uinfo-head-name")
authorName = authorNameTag.string
# 缓存数据
cache.addCachedData(cachedKey, authorName)
cache.save()
return authorName
完整代码见
文章评论