# -*- coding: utf-8 -*- import os import httpx # 青龙面板的地址 url = "https://ql.erhe.link" # 登录青龙面板 def login(): response = httpx.post(f"{url}/api/user/login", json={"username": "toor", "password": "!QAZ2wsx+0913"}) print(response) if response.status_code != 200: print(response.status_code) print(response.text) exit(0) return response.json()['data']['token'] # 获取任务列表 def get_tasks(token): response = httpx.get(f"{url}/api/crons", headers={"Authorization": f"Bearer {token}"}) return response.json()['data']['data'] # 创建任务 def create_task(task_template, token): payload = { "name": task_template["name"], "command": task_template["command"], "schedule": task_template["schedule"], "labels": task_template["labels"] } headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } response = httpx.post(f"{url}/api/crons", headers=headers, json=payload) return response.json() # 创建视图分类 def create_view_type(token): view_type_list = ['base', 'spider_common'] for view_type in view_type_list: payload = { "name": view_type, "filters": { 'property': 'labels', 'operation': 'Reg', 'value': view_type }, 'filterRelation': 'and' } headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } response = httpx.post(f"{url}/api/crons", headers=headers, json=payload) # 主逻辑 def main(): while True: try: token = login() print(f"已连接到 {url}") tasks = get_tasks(token) tasks_names = [task['name'] for task in tasks] if tasks: print("Current tasks name: \n{}, \ntotal: {}".format('\n'.join(tasks_names), str(len(tasks_names)))) else: print("Tasks list is empty") project_path = '/ql/data/scripts/AutoInfo/' base_path = os.path.join(project_path, 'base') to_email_tasks_path = os.path.join(project_path, 'to_email') to_gotify_tasks_path = os.path.join(project_path, 'to_gotify') manual_path = os.path.join(project_path, 'manual') tasks_template = [{ 'base': [ { "name": "每天开始自动创建日志", "command": "python3 {}/base_daily_logs_generate.py".format(base_path), "schedule": "0 0 * * *", "labels": ["base"] }, { "name": "每天结束自动发送日志", "command": "python3 {}/base_daily_logs_send.py".format(base_path), "schedule": "58 23 * * *", "labels": ["base"] }, { "name": "每天自动删除旧数据", "command": "python3 {}/base_timing_remove_data.py".format(base_path), "schedule": "1 0 * * *", "labels": ["base"] }, { "name": "每天新闻汇总,发送到邮箱", "command": "python3 {}/base_news_data_collation.py".format(base_path), "schedule": "0 10 6,12,18 * * *", "labels": ["base"] } ], 'to-email': [ { "name": "对比大乐透最新一期数据,匹配已购买号码,发送消息", "command": "python3 {}/dlt.py".format(to_email_tasks_path), "schedule": "30 22 * * 1,3,6", "labels": ["to-email"] }, { "name": "链捕手快讯消息推送", "command": "python3 {}/chaincatcher.py".format(to_email_tasks_path), "schedule": "0 */2 * * *", "labels": ["to-email"] }, { "name": "anyknew聚合新闻消息推送", "command": "python3 {}/anyknew.py".format(to_email_tasks_path), "schedule": "0 */3 * * *", "labels": ["to-email"] }, { "name": "反斗限免消息推送", "command": "python3 {}/apprcn.py".format(to_email_tasks_path), "schedule": "0 */12 * * *", "labels": ["to-email"] }, { "name": "chiphell消息推送", "command": "python3 {}/chiphell.py".format(to_email_tasks_path), "schedule": "0 */12 * * *", "labels": ["to-email"] }, { "name": "hello-github消息推送", "command": "python3 {}/hello_github.py".format(to_email_tasks_path), "schedule": "0 */12 * * *", "labels": ["to-email"] } ], 'to-gotify': [ { "name": "获取未来 7 天的天气预报", "command": "python3 {}/weather7day.py".format(to_gotify_tasks_path), "schedule": "0 0 6,22 * * *", "labels": ["to-gotify"] }, { "name": "获取coin实时数据", "command": "python3 {}/coin_detail.py".format(to_gotify_tasks_path), "schedule": "0 * * * *", "labels": ["to-gotify"] }, { "name": "空投任务消息", "command": "python3 {}/airdrop_tasks.py".format(to_gotify_tasks_path), "schedule": "0 8,20 * * *", "labels": ["to-gotify"] }, { "name": "币界网消息推送", "command": "python3 {}/coin_world.py".format(to_gotify_tasks_path), "schedule": "0 8,20 * * *", "labels": ["to-gotify"] }, { "name": " web3新闻消息推送", "command": "python3 {}/web3_news.py".format(to_gotify_tasks_path), "schedule": "0 9,21 * * *", "labels": ["to-gotify"] } ], 'manual': [ ] }] for task_template in tasks_template: for task_type, task_list in task_template.items(): for task in task_list: task_name = task["name"] if task_name in tasks_names: print("Task {} already exists.".format(task_name)) else: result = create_task(task, token) print("Task creation result:", result) # 创建所有任务之后, 创建视图分类 # create_view_type(token) break # 正常执行完成后退出循环 except Exception as e: print("An error occurred: ", e) print("Retrying...") if __name__ == "__main__": main() print('done!')