|
|
@@ -1,6 +1,8 @@
|
|
|
# -*- coding: utf-8 -*-
|
|
|
import os
|
|
|
-import httpx
|
|
|
+import json
|
|
|
+import urllib.request
|
|
|
+import urllib.parse
|
|
|
|
|
|
# 青龙面板的地址
|
|
|
url = "https://ql.erhe.link"
|
|
|
@@ -8,19 +10,33 @@ url = "https://ql.erhe.link"
|
|
|
|
|
|
# 登录青龙面板
|
|
|
def login():
|
|
|
- response = httpx.post(f"{url}/api/user/login", json={"username": "toor", "password": "!QAZ2wsx+0913"})
|
|
|
- print(response)
|
|
|
- if response.status_code != 200:
|
|
|
- print(response.status_code)
|
|
|
- print(response.text)
|
|
|
+ data = json.dumps({"username": "toor", "password": "!QAZ2wsx+0913"}).encode('utf-8')
|
|
|
+ req = urllib.request.Request(
|
|
|
+ f"{url}/api/user/login",
|
|
|
+ data=data,
|
|
|
+ headers={'Content-Type': 'application/json'}
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ with urllib.request.urlopen(req) as response:
|
|
|
+ result = json.loads(response.read().decode('utf-8'))
|
|
|
+ return result['data']['token']
|
|
|
+ except urllib.error.HTTPError as e:
|
|
|
+ print(f"Login failed with status code: {e.code}")
|
|
|
+ print(e.read().decode('utf-8'))
|
|
|
exit(0)
|
|
|
- return response.json()['data']['token']
|
|
|
|
|
|
|
|
|
# 获取任务列表
|
|
|
def get_tasks(token):
|
|
|
- response = httpx.get(f"{url}/api/crons", headers={"Authorization": f"Bearer {token}"})
|
|
|
- return response.json()['data']['data']
|
|
|
+ req = urllib.request.Request(
|
|
|
+ f"{url}/api/crons",
|
|
|
+ headers={"Authorization": f"Bearer {token}"}
|
|
|
+ )
|
|
|
+
|
|
|
+ with urllib.request.urlopen(req) as response:
|
|
|
+ result = json.loads(response.read().decode('utf-8'))
|
|
|
+ return result['data']['data']
|
|
|
|
|
|
|
|
|
# 创建任务
|
|
|
@@ -31,12 +47,19 @@ def create_task(task_template, token):
|
|
|
"schedule": task_template["schedule"],
|
|
|
"labels": task_template["labels"]
|
|
|
}
|
|
|
- headers = {
|
|
|
- "Authorization": f"Bearer {token}",
|
|
|
- "Content-Type": "application/json"
|
|
|
- }
|
|
|
- response = httpx.post(f"{url}/api/crons", headers=headers, json=payload)
|
|
|
- return response.json()
|
|
|
+
|
|
|
+ data = json.dumps(payload).encode('utf-8')
|
|
|
+ req = urllib.request.Request(
|
|
|
+ f"{url}/api/crons",
|
|
|
+ data=data,
|
|
|
+ headers={
|
|
|
+ "Authorization": f"Bearer {token}",
|
|
|
+ "Content-Type": "application/json"
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ with urllib.request.urlopen(req) as response:
|
|
|
+ return json.loads(response.read().decode('utf-8'))
|
|
|
|
|
|
|
|
|
# 创建视图分类
|
|
|
@@ -52,11 +75,23 @@ def create_view_type(token):
|
|
|
},
|
|
|
'filterRelation': 'and'
|
|
|
}
|
|
|
- headers = {
|
|
|
- "Authorization": f"Bearer {token}",
|
|
|
- "Content-Type": "application/json"
|
|
|
- }
|
|
|
- response = httpx.post(f"{url}/api/crons", headers=headers, json=payload)
|
|
|
+
|
|
|
+ data = json.dumps(payload).encode('utf-8')
|
|
|
+ req = urllib.request.Request(
|
|
|
+ f"{url}/api/crons",
|
|
|
+ data=data,
|
|
|
+ headers={
|
|
|
+ "Authorization": f"Bearer {token}",
|
|
|
+ "Content-Type": "application/json"
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ with urllib.request.urlopen(req) as response:
|
|
|
+ result = json.loads(response.read().decode('utf-8'))
|
|
|
+ print(f"View type {view_type} created: {result}")
|
|
|
+ except urllib.error.HTTPError as e:
|
|
|
+ print(f"Failed to create view type {view_type}: {e}")
|
|
|
|
|
|
|
|
|
# 主逻辑
|
|
|
@@ -200,4 +235,4 @@ def main():
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
- print('done!')
|
|
|
+ print('done!')
|