| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import json
- import random
- import subprocess
- import httpx
- def load_config():
- with open("config.json", "r") as f:
- return json.load(f)
- config = load_config()
- END = config["total_container"]
- START_PORT = config["start_web_port"]
- START_API_PORT = config["start_port"]
- base_url_list = [f'http://192.168.31.201:{START_PORT + i}' for i in range(0, END)]
- base_api_list = [f'http://192.168.31.201:{START_API_PORT + i}' for i in range(0, END)]
- class ProxyManager:
- def __init__(self, base_url):
- self.base_url = base_url
- def switch_to_global(self):
- print(f"{self.base_url}/api/configs 切换全局代理")
- headers = {
- "accept": "application/json, text/plain, */*",
- "accept-encoding": "gzip, deflate, br, zstd",
- "accept-language": "zh-CN,zh",
- "connection": "keep-alive",
- "content-type": "application/json",
- "host": base_url.replace('http://', ''),
- "origin": base_url,
- "referer": base_url,
- "sec-ch-ua": '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"macOS"',
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-origin",
- "sec-gpc": "1",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
- }
- data = {"mode": "Global"}
- try:
- with httpx.Client() as client:
- response = client.patch(self.base_url + "/api/configs", json=data, headers=headers)
- if response.status_code == 204:
- print(f"{self.base_url}/api/configs 切换全局代理成功")
- else:
- print(f"{self.base_url}/api/configs 切换全局代理失败 {response.status_code}")
- exit(1)
- except httpx.RequestError as exc:
- print(f"请求失败: {exc}")
- exit(1)
- def get_proxy_name(self):
- print(f'{self.base_url}/api/proxies 获取代理名称')
- try:
- response = httpx.get(self.base_url + "/api/proxies")
- response.raise_for_status()
- proxies = response.json()
- proxy_name_list = [item for item in proxies['proxies']['GLOBAL']['all'] if item not in ('DIRECT', 'REJECT')]
- return proxy_name_list
- except Exception as e:
- print(f'get_proxy_name: {e}')
- exit(1)
- def switch_proxy(self, proxy_name, base_api):
- print(f'{self.base_url}/api/proxies/GLOBAL 切换代理')
- try:
- target_url = self.base_url + '/api/proxies/GLOBAL'
- response = httpx.put(target_url, json={"name": proxy_name})
- if response.status_code == 204:
- # 切换代理成功, 检测代理
- return self.check_proxy(base_api)
- else:
- print(f"切换代理失败: {response.status_code} - {proxy_name}\n重试...")
- return False
- except Exception as e:
- print(f"切换代理失败: {e}\n重试...")
- return False
- def check_proxy(self, proxy_url):
- # proxy_url: 代理地址, 没有密码
- # 测试目标地址:
- command = ["curl", "-x", proxy_url, "ip.sb"]
- # 执行命令并获取输出
- try:
- result = subprocess.run(command, capture_output=True, text=True, check=True)
- if not result:
- return False
- print("Output:", result.stdout)
- return True
- except subprocess.CalledProcessError as e:
- # 如果命令执行失败,打印错误信息
- print("Error:", e.stderr)
- return False
- used_proxy = []
- for base_url, base_api in zip(base_url_list, base_api_list):
- manager = ProxyManager(base_url)
- manager.switch_to_global()
- proxy_name_list = manager.get_proxy_name()
- while proxy_name_list:
- proxy_name = random.choice(proxy_name_list)
- if proxy_name not in used_proxy:
- if manager.switch_proxy(proxy_name, base_api):
- break
- result = used_proxy.append(proxy_name)
|