import random import httpx import time import json import uuid import logging import websockets from typing import Optional, List from websockets import WebSocketCommonProtocol logging.basicConfig(level=logging.INFO) BASE_URL_LIST = [ "http://192.168.31.201:52001", "http://192.168.31.201:52002", "http://192.168.31.201:52003", "http://192.168.31.201:52004", "http://192.168.31.201:52005", "http://192.168.31.201:52006", "http://192.168.31.201:52007", "http://192.168.31.201:52008", "http://192.168.31.201:52009", "http://192.168.31.201:52010", ] class ClashProxyManager: def __init__(self, base_url_list): self.key_group = 0 self.base_url_list = base_url_list self.all_proxies = [] self.filter_seeds = [] def get_all_proxies(self, clash_tool_url: str) -> List[str]: url = f"{clash_tool_url}/api/proxies" try: response = httpx.get(url) response.raise_for_status() proxies = response.json() logging.info("Available proxies:") # 输出读取的所有代理信息 # for proxy_name, proxy_info in proxies['proxies'].items(): # logging.info(f"Name: {proxy_name}, Type: {proxy_info.get('type', 'Unknown')}") return list(proxies['proxies'].keys()) except Exception as e: logging.error(f"Failed to get proxies: {e}") return [] def switch_proxy(self, proxy_name: str, clash_api_base_url: str) -> None: url = f"{clash_api_base_url}/api/proxies/GLOBAL" data = {"name": proxy_name} try: response = httpx.put(url, json=data) if response.status_code == 204: logging.info(f"Switched to proxy: {proxy_name}") else: logging.error(f"Failed to switch proxy: {response.status_code} - {proxy_name}") except Exception as e: logging.error(f"Failed to switch proxy: {e}") def update_configs(self): for base_url in self.base_url_list: key = "/api/configs" url = base_url + key headers = { "accept": "application/json, text/plain, */*", "accept-encoding": "gzip, deflate, br, zstd", "accept-language": "zh-CN,zh", "connection": "keep-alive", "content-type": "application/json", "host": "localhost:17888", "origin": base_url, "referer": base_url, "sec-ch-ua": '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "sec-gpc": "1", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36", } # 请求体数据 data = {"mode": "Global"} # 替换为实际的请求数据 # 使用 httpx 发送 PATCH 请求 try: with httpx.Client() as client: response = client.patch(url, headers=headers, json=data) if response.status_code == 204: print(f"{url} OK") else: print("响应内容:", response.text) except httpx.RequestError as exc: print(f"请求失败: {exc}") def main(self) -> None: # 设置全局代理 self.update_configs() # 读取所有代理 if not self.all_proxies: for base_url in self.base_url_list: clash_tool_url = f"{base_url}" proxies = self.get_all_proxies(clash_tool_url) if proxies: self.all_proxies = proxies break if not self.all_proxies: logging.error("Failed to get all proxies") return # 遍历所有的线路, 切换代理(需要所有线路不重复), 如果self.filter_seeds为空,则在self.all_proxies选择 # 切换完之后测试一下当前代理是否正常工作,否则切换下一个代理, 测试地址: https://api.ipify.org switched_agent = [] for base_url in self.base_url_list: clash_api_base_url = f"{base_url}" choice_proxy = random.choice(self.all_proxies) if choice_proxy in switched_agent: continue while True: if self.filter_seeds: pass else: self.switch_proxy(choice_proxy, clash_api_base_url) # 切换完之后测试一下当前代理是否正常工作,否则切换下一个代理 switched_agent.append(choice_proxy) if __name__ == "__main__": manager = ClashProxyManager(BASE_URL_LIST) manager.main()