| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- # -*- coding: utf-8 -*-
- from urllib.parse import quote
- import subprocess
- import httpx
- nodes = [
- ["192.168.31.201", ['58001', '58002', '58003', '58004', '58005', '58006', '58007', '58008', '58009',
- '58010'], ],
- ["192.168.31.201", ['32001', '32002', '32003', '32004', '32005', '32006', '32007', '32008', '32009',
- '32010', '32011', '32012']],
- ["127.0.0.1", ['17888']]
- ]
- selected_nodes = nodes[0]
- def patch_config(url_and_port):
- url = f"http://{url_and_port}"
- key = "/api/configs"
- full_url = url + key
- data = {"mode": "Global"}
- headers = {"Content-Type": "application/json"}
- try:
- response = httpx.patch(full_url, json=data, headers=headers)
- state = response.status_code
- if state == 204:
- pass
- # print(f"{url_and_port}: 切换全局代理 OK")
- else:
- raise Exception(f"请求失败: {response.status_code}")
- except httpx.HTTPError as exc:
- print(f"请求失败: {exc}")
- exit(1)
- def check_proxy(proxy_url, choose_proxy):
- encode_proxy_name = quote(choose_proxy, safe="")
- command = [
- "curl",
- "-X", "GET",
- f"{proxy_url}/api/proxies/{encode_proxy_name}/delay?timeout=5000&url=http:%2F%2Fwww.gstatic.com%2Fgenerate_204"
- ]
- try:
- result = subprocess.run(command, capture_output=True, text=True, check=True)
- # print("Output:", result.stdout)
- if 'Timeout' in result.stdout:
- return "Timeout"
- res = eval(result.stdout).get("meanDelay")
- return f"meanDelay: {res}"
- except subprocess.CalledProcessError as e:
- print("Error:", e.stderr)
- return str(e)
- def check_now_proxy(url_and_port):
- url = f"http://{url_and_port}/api/proxies"
- print(url)
- headers = {
- "Accept": "application/json, text/plain, */*",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "zh-CN,zh;q=0.8",
- "Connection": "keep-alive",
- "Host": url_and_port,
- "Referer": f"http://{url_and_port}/",
- "Sec-CH-UA": '"Chromium";v="134", "Not:A-Brand";v="24", "Brave";v="134"',
- "Sec-CH-UA-Mobile": "?0",
- "Sec-CH-UA-Platform": '"macOS"',
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "Sec-GPC": "1",
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36"
- }
- try:
- response = httpx.get(url, headers=headers)
- json_data = response.json()
- if not json_data or response.status_code != 200:
- print("JSON data is empty or request failed")
- return
- proxies = json_data.get("proxies")
- proxy_global = proxies.get("GLOBAL")
- now_proxy = proxy_global.get("now")
- return now_proxy
- except httpx.RequestError as e:
- print(f"Request failed: {e}")
- return False
- def run():
- ip = selected_nodes[0]
- for port in selected_nodes[1]:
- url_and_port = f"{ip}:{port}"
- # 切换全局代理
- patch_config(url_and_port)
- # 获取当前代理节点
- now_proxy = check_now_proxy(url_and_port)
- # 检测当前代理节点的延迟
- check_result = check_proxy(url_and_port, now_proxy)
- message = f"{url_and_port} --- {now_proxy} --- {check_result}\n{'*' * 88}\n"
- print(message)
- def main():
- run()
- if __name__ == "__main__":
- main()
|