set_global_switch_proxy_random.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import random
  2. import httpx
  3. import time
  4. import json
  5. import uuid
  6. import logging
  7. import websockets
  8. from typing import Optional, List
  9. from websockets import WebSocketCommonProtocol
  10. logging.basicConfig(level=logging.INFO)
  11. BASE_URL_LIST = [
  12. "http://192.168.31.201:52001",
  13. "http://192.168.31.201:52002",
  14. "http://192.168.31.201:52003",
  15. "http://192.168.31.201:52004",
  16. "http://192.168.31.201:52005",
  17. "http://192.168.31.201:52006",
  18. "http://192.168.31.201:52007",
  19. "http://192.168.31.201:52008",
  20. "http://192.168.31.201:52009",
  21. "http://192.168.31.201:52010",
  22. ]
  23. class ClashProxyManager:
  24. def __init__(self, base_url_list):
  25. self.key_group = 0
  26. self.base_url_list = base_url_list
  27. self.all_proxies = []
  28. self.filter_seeds = []
  29. def get_all_proxies(self, clash_tool_url: str) -> List[str]:
  30. url = f"{clash_tool_url}/api/proxies"
  31. try:
  32. response = httpx.get(url)
  33. response.raise_for_status()
  34. proxies = response.json()
  35. logging.info("Available proxies:")
  36. # 输出读取的所有代理信息
  37. # for proxy_name, proxy_info in proxies['proxies'].items():
  38. # logging.info(f"Name: {proxy_name}, Type: {proxy_info.get('type', 'Unknown')}")
  39. return list(proxies['proxies'].keys())
  40. except Exception as e:
  41. logging.error(f"Failed to get proxies: {e}")
  42. return []
  43. def switch_proxy(self, proxy_name: str, clash_api_base_url: str) -> None:
  44. url = f"{clash_api_base_url}/api/proxies/GLOBAL"
  45. data = {"name": proxy_name}
  46. try:
  47. response = httpx.put(url, json=data)
  48. if response.status_code == 204:
  49. logging.info(f"Switched to proxy: {proxy_name}")
  50. else:
  51. logging.error(f"Failed to switch proxy: {response.status_code} - {proxy_name}")
  52. except Exception as e:
  53. logging.error(f"Failed to switch proxy: {e}")
  54. def update_configs(self):
  55. for base_url in self.base_url_list:
  56. key = "/api/configs"
  57. url = base_url + key
  58. headers = {
  59. "accept": "application/json, text/plain, */*",
  60. "accept-encoding": "gzip, deflate, br, zstd",
  61. "accept-language": "zh-CN,zh",
  62. "connection": "keep-alive",
  63. "content-type": "application/json",
  64. "host": "localhost:17888",
  65. "origin": base_url,
  66. "referer": base_url,
  67. "sec-ch-ua": '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"',
  68. "sec-ch-ua-mobile": "?0",
  69. "sec-ch-ua-platform": '"macOS"',
  70. "sec-fetch-dest": "empty",
  71. "sec-fetch-mode": "cors",
  72. "sec-fetch-site": "same-origin",
  73. "sec-gpc": "1",
  74. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
  75. }
  76. # 请求体数据
  77. data = {"mode": "Global"} # 替换为实际的请求数据
  78. # 使用 httpx 发送 PATCH 请求
  79. try:
  80. with httpx.Client() as client:
  81. response = client.patch(url, headers=headers, json=data)
  82. if response.status_code == 204:
  83. print(f"{url} OK")
  84. else:
  85. print("响应内容:", response.text)
  86. except httpx.RequestError as exc:
  87. print(f"请求失败: {exc}")
  88. def main(self) -> None:
  89. # 设置全局代理
  90. self.update_configs()
  91. # 读取所有代理
  92. if not self.all_proxies:
  93. for base_url in self.base_url_list:
  94. clash_tool_url = f"{base_url}"
  95. proxies = self.get_all_proxies(clash_tool_url)
  96. if proxies:
  97. self.all_proxies = proxies
  98. break
  99. if not self.all_proxies:
  100. logging.error("Failed to get all proxies")
  101. return
  102. # 遍历所有的线路, 切换代理(需要所有线路不重复), 如果self.filter_seeds为空,则在self.all_proxies选择
  103. # 切换完之后测试一下当前代理是否正常工作,否则切换下一个代理, 测试地址: https://api.ipify.org
  104. switched_agent = []
  105. for base_url in self.base_url_list:
  106. clash_api_base_url = f"{base_url}"
  107. choice_proxy = random.choice(self.all_proxies)
  108. if choice_proxy in switched_agent:
  109. continue
  110. while True:
  111. if self.filter_seeds:
  112. pass
  113. else:
  114. self.switch_proxy(choice_proxy, clash_api_base_url)
  115. # 切换完之后测试一下当前代理是否正常工作,否则切换下一个代理
  116. switched_agent.append(choice_proxy)
  117. if __name__ == "__main__":
  118. manager = ClashProxyManager(BASE_URL_LIST)
  119. manager.main()