| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import random
- import httpx
- import time
- import json
- import uuid
- import logging
- import websockets
- from typing import Optional, List
- from websockets import WebSocketCommonProtocol
- logging.basicConfig(level=logging.INFO)
- BASE_URL_LIST = [
- "http://192.168.31.201:52001",
- "http://192.168.31.201:52002",
- "http://192.168.31.201:52003",
- "http://192.168.31.201:52004",
- "http://192.168.31.201:52005",
- "http://192.168.31.201:52006",
- "http://192.168.31.201:52007",
- "http://192.168.31.201:52008",
- "http://192.168.31.201:52009",
- "http://192.168.31.201:52010",
- ]
- class ClashProxyManager:
- def __init__(self, base_url_list):
- self.key_group = 0
- self.base_url_list = base_url_list
- self.all_proxies = []
- self.filter_seeds = []
- def get_all_proxies(self, clash_tool_url: str) -> List[str]:
- url = f"{clash_tool_url}/api/proxies"
- try:
- response = httpx.get(url)
- response.raise_for_status()
- proxies = response.json()
- logging.info("Available proxies:")
- # 输出读取的所有代理信息
- # for proxy_name, proxy_info in proxies['proxies'].items():
- # logging.info(f"Name: {proxy_name}, Type: {proxy_info.get('type', 'Unknown')}")
- return list(proxies['proxies'].keys())
- except Exception as e:
- logging.error(f"Failed to get proxies: {e}")
- return []
- def switch_proxy(self, proxy_name: str, clash_api_base_url: str) -> None:
- url = f"{clash_api_base_url}/api/proxies/GLOBAL"
- data = {"name": proxy_name}
- try:
- response = httpx.put(url, json=data)
- if response.status_code == 204:
- logging.info(f"Switched to proxy: {proxy_name}")
- else:
- logging.error(f"Failed to switch proxy: {response.status_code} - {proxy_name}")
- except Exception as e:
- logging.error(f"Failed to switch proxy: {e}")
- def update_configs(self):
- for base_url in self.base_url_list:
- key = "/api/configs"
- url = base_url + key
- headers = {
- "accept": "application/json, text/plain, */*",
- "accept-encoding": "gzip, deflate, br, zstd",
- "accept-language": "zh-CN,zh",
- "connection": "keep-alive",
- "content-type": "application/json",
- "host": "localhost:17888",
- "origin": base_url,
- "referer": base_url,
- "sec-ch-ua": '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"macOS"',
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-origin",
- "sec-gpc": "1",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
- }
- # 请求体数据
- data = {"mode": "Global"} # 替换为实际的请求数据
- # 使用 httpx 发送 PATCH 请求
- try:
- with httpx.Client() as client:
- response = client.patch(url, headers=headers, json=data)
- if response.status_code == 204:
- print(f"{url} OK")
- else:
- print("响应内容:", response.text)
- except httpx.RequestError as exc:
- print(f"请求失败: {exc}")
- def main(self) -> None:
- # 设置全局代理
- self.update_configs()
- # 读取所有代理
- if not self.all_proxies:
- for base_url in self.base_url_list:
- clash_tool_url = f"{base_url}"
- proxies = self.get_all_proxies(clash_tool_url)
- if proxies:
- self.all_proxies = proxies
- break
- if not self.all_proxies:
- logging.error("Failed to get all proxies")
- return
- # 遍历所有的线路, 切换代理(需要所有线路不重复), 如果self.filter_seeds为空,则在self.all_proxies选择
- # 切换完之后测试一下当前代理是否正常工作,否则切换下一个代理, 测试地址: https://api.ipify.org
- switched_agent = []
- for base_url in self.base_url_list:
- clash_api_base_url = f"{base_url}"
- choice_proxy = random.choice(self.all_proxies)
- if choice_proxy in switched_agent:
- continue
- while True:
- if self.filter_seeds:
- pass
- else:
- self.switch_proxy(choice_proxy, clash_api_base_url)
- # 切换完之后测试一下当前代理是否正常工作,否则切换下一个代理
- switched_agent.append(choice_proxy)
- if __name__ == "__main__":
- manager = ClashProxyManager(BASE_URL_LIST)
- manager.main()
|