proxy 源代码

import re

from quart import Response, stream_with_context

from api.core.anime import AnimeInfo
from api.core.helper import HtmlParseHelper
from api.utils.logger import logger
from api.utils.tool import extract_domain
from api.utils.useragent import get_random_ua

__all__ = ["RequestProxy", "AnimeProxy"]


class HttpProxy(HtmlParseHelper):

    def __init__(self):
        super().__init__()

    def set_proxy_headers(self, url: str) -> dict:
        """
        为特定的直链设置代理 Headers, 如果服务器存在防盗链, 可以尝试重写本方法,
        若本方法返回空则使用默认 Headers,
        若设置的 Headers 不包含 User-Agent 则随机生成一个
        """
        return {}

    def _get_proxy_headers(self, url: str) -> dict:
        """获取代理访问使用的 Headers"""
        headers = self.set_proxy_headers(url)
        if not headers:
            return {"User-Agent": get_random_ua()}
        if "user-agent" not in (key.lower() for key in headers.keys()):
            headers["User-Agent"] = get_random_ua()
        return headers


[文档]class RequestProxy(HttpProxy):
[文档] async def make_response(self, url: str) -> Response: """代理访问远程请求""" await self.init_session() resp = await self.get(url, allow_redirects=True) if not resp or resp.status != 200: return Response("resource maybe not available", status=404) data = await resp.read() resp_headers = { "Content-Type": resp.content_type, "Content-Length": resp.content_length } return Response(data, headers=resp_headers, status=200)
[文档] async def close(self): await self.close_session()
class BaseAnimeProxy(HttpProxy): def __init__(self, info: AnimeInfo): super().__init__() self._info = info def is_available(self) -> bool: return self._info.is_available() def get_stream_format(self) -> str: return self._info.format def get_url_info(self) -> AnimeInfo: """获取资源信息""" return self._info def get_real_url(self) -> str: """获取被代理的资源直链""" if self.is_available(): return self._info.real_url return "" def enforce_proxy(self, url: str) -> bool: """ 对于某些已知的无法直接访问的资源, 强制代理其流量 :param url: 资源的链接 :return: 是否强制代理 """ return False def is_enforce_proxy(self) -> bool: if self.enforce_proxy(self.get_real_url()): logger.info(f"Forced proxy the video stream: {self.get_real_url()}") return True return False class StreamProxy(BaseAnimeProxy): """ 支持 MP4, FLV 之类的数据流代理播放 """ async def make_response_with_range(self, range_field: str = None) -> Response: """ 读取远程的视频流,并伪装成本地的响应返回给客户端, 206 连续请求会导致连接中断, asyncio 库在 Windows 平台触发 ConnectionAbortedError, 偶尔出现 LocalProtocolError, 是 RFC2616 与 RFC7231 HEAD 请求冲突导致, See: https://bugs.python.org/issue26509 https://gitlab.com/pgjones/quart/-/issues/45 """ url = self._info.real_url proxy_headers = self._get_proxy_headers(url) if range_field is not None: proxy_headers["range"] = range_field logger.debug(f"Client request stream range: {range_field}") await self.init_session() resp = await self.get(url, headers=proxy_headers) if not resp: return Response(b"", status=404) @stream_with_context async def stream_iter(): while chunk := await resp.content.read(4096): yield chunk status = 206 if self._info.format == "mp4" else 200 # 否则无法拖到进度条 return Response(stream_iter(), headers=dict(resp.headers), status=status) class M3U8Proxy(BaseAnimeProxy): """ 支持 M3u8 视频代理播放 """ def __init__(self, info: AnimeInfo): super().__init__(info) self._chunk_proxy_router = "" self._cache_m3u8_text = "" def set_chunk_proxy_router(self, domain: str): """设置 m3u8 数据块代理路由地址""" self._chunk_proxy_router = domain async def read_data(self, url: str) -> bytes: """使用设置的 Header 读取 URL 对应的资源, 返回原始二进制数据""" await self.init_session() proxy_headers = self._get_proxy_headers(url) resp = await self.get(url, headers=proxy_headers, allow_redirects=True) if not resp or resp.status != 200: return b"" return await resp.read() async def read_text(self, url: str, encoding="utf-8") -> str: """使用设置的 Header 读取 URL 对应的资源, 返回编码后的文本""" return (await self.read_data(url)).decode(encoding) async def get_m3u8_text(self, index_url: str) -> str: """ 获取 index.m3u8 文件的内容, 如果该文件需要进一步处理, 比如需要跳转一次才能得到 m3u8 的内容, 或者接口返回的数据经过加密、压缩时, 请重写本方法以获取 m3u8 文件的真实内容 :param index_url: index.m3u8 文件的链接 :return: index.m3u8 的内容 """ return await self.read_text(index_url) def fix_m3u8_key_url(self, index_url: str, key_url: str) -> str: """ 修复 m3u8 密钥的链接(通常使用 AES-128 加密数据流), 默认以 index.m3u8 同级路径补全 key 的链接, 其它情况请重写本方法 :param index_url: index.m3u8 的链接 :param key_url: 密钥的链接(可能不完整) :return: 密钥的完整链接 """ if key_url.startswith("http"): return key_url path = '/'.join(index_url.split('/')[:-1]) return path + '/' + key_url def fix_m3u8_chunk_url(self, index_url: str, chunk_url: str) -> str: """ 替换 m3u8 文件中数据块的链接, 通常需要补全域名, 默认情况使用 index.m3u8 的域名补全数据块域名部分, 其它情况请重新此方法 :param index_url: index.m3u8 的链接 :param chunk_url: m3u8 文件中数据块的链接(通常不完整) :return: 修复完成的 m3u8 文件 """ if chunk_url.startswith("http"): # url 无需补全 return chunk_url elif chunk_url.startswith('/'): return extract_domain(index_url) + chunk_url else: return extract_domain(index_url) + '/' + chunk_url def fix_chunk_data(self, url: str, chunk: bytes) -> bytes: """ 修复数 m3u8 数据据块, 用于解除数据混淆 比如常见的图片隐写, 每一段视频数据存放于一张图片中, 需要剔除图片的数据 可使用 binwalk 等工具对二进制数据进行分析, 以确定图像与视频流的边界位置 :param url: 数据块的链接 :param chunk: 数据块的二进制数据 :return: 修复完成的二进制数据 """ return chunk async def _fix_m3u8_text(self, text: str) -> str: fixed_m3u8_text = [] for line in text.splitlines(): if line.startswith("#EXT-X-KEY"): # 修复密钥链接 key_url = re.search(r'URI="(.+?)"', line).group(1) fixed_key_url = self._chunk_proxy_router + '/' + self.fix_m3u8_key_url(self._info.real_url, key_url) line = line.replace(key_url, fixed_key_url) if not line.startswith("#"): # 修复数据块链接 line = self._chunk_proxy_router + '/' + self.fix_m3u8_chunk_url(self._info.real_url, line) fixed_m3u8_text.append(line) return '\n'.join(fixed_m3u8_text) async def _get_fixed_m3u8_text(self) -> str: text = await self.get_m3u8_text(self._info.real_url) return await self._fix_m3u8_text(text) async def make_response_for_m3u8(self) -> Response: if not self._cache_m3u8_text: self._cache_m3u8_text = await self._get_fixed_m3u8_text() logger.debug(f"Cache m3u8 text, size: {len(self._cache_m3u8_text) // 1024}kb") return Response(self._cache_m3u8_text, mimetype="application/vnd.apple.mpegurl") async def make_response_for_chunk(self, url: str, params: dict = None) -> Response: proxy_headers = self._get_proxy_headers(url) resp = await self.get(url, params=params, headers=proxy_headers) if not resp: return Response(b"", status=404) chunk = self.fix_chunk_data(url, await resp.read()) return Response(chunk, headers=dict(resp.headers), status=200)
[文档]class AnimeProxy(StreamProxy, M3U8Proxy): """ 代理访问视频数据流, 以绕过资源服务器的防盗链和本地浏览器跨域策略 """ def __init__(self, info: AnimeInfo): super(AnimeProxy, self).__init__(info)