| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- # -*- coding: utf-8 -*-
- import os
- import requests
- # 青龙面板的地址
- url = "https://qinglong.erhe.link"
- # 登录青龙面板
- def login():
- response = requests.post(f"{url}/api/user/login", json={"username": "toor", "password": "!QAZ2wsx+0913"})
- print(response)
- if response.status_code != 200:
- print(response.status_code)
- print(response.text)
- exit(0)
- return response.json()['data']['token']
- # 获取任务列表
- def get_tasks(token):
- response = requests.get(f"{url}/api/crons", headers={"Authorization": f"Bearer {token}"})
- return response.json()['data']['data']
- # 创建任务
- def create_task(task_template, token):
- payload = {
- "name": task_template["name"],
- "command": task_template["command"],
- "schedule": task_template["schedule"],
- "labels": task_template["labels"]
- }
- headers = {
- "Authorization": f"Bearer {token}",
- "Content-Type": "application/json"
- }
- response = requests.post(f"{url}/api/crons", headers=headers, json=payload)
- return response.json()
- # 创建视图分类
- def create_view_type(token):
- view_type_list = ['base', 'spider_common']
- for view_type in view_type_list:
- payload = {
- "name": view_type,
- "filters": {
- 'property': 'labels',
- 'operation': 'Reg',
- 'value': view_type
- },
- 'filterRelation': 'and'
- }
- headers = {
- "Authorization": f"Bearer {token}",
- "Content-Type": "application/json"
- }
- response = requests.post(f"{url}/api/crons", headers=headers, json=payload)
- # 主逻辑
- def main():
- while True:
- try:
- token = login()
- print(f"已连接到 {url}")
- tasks = get_tasks(token)
- tasks_names = [task['name'] for task in tasks]
- if tasks:
- print("Current tasks name: \n{}, \ntotal: {}".format('\n'.join(tasks_names), str(len(tasks_names))))
- else:
- print("Tasks list is empty")
- project_path = '/ql/data/scripts/AutoInfo/'
- base_path = os.path.join(project_path, 'base')
- message_path = os.path.join(project_path, 'message')
- manual_path = os.path.join(project_path, 'manual')
- tasks_template = [{
- 'base_tasks': [
- {
- "name": "每天开始自动创建日志",
- "command": "python3 {}/base_daily_logs_generate.py".format(base_path),
- "schedule": "0 0 * * *",
- "labels": ["base"]
- },
- {
- "name": "每天结束自动发送日志",
- "command": "python3 {}/base_daily_logs_send.py".format(base_path),
- "schedule": "58 23 * * *",
- "labels": ["base"]
- },
- {
- "name": "每天自动删除旧数据",
- "command": "python3 {}/base_timing_remove_data.py".format(base_path),
- "schedule": "1 0 * * *",
- "labels": ["base"]
- },
- {
- "name": "每天新闻汇总,发送到邮箱",
- "command": "python3 {}/base_news_data_collation.py".format(base_path),
- "schedule": "0 10 6,12,18 * * *",
- "labels": ["base"]
- }
- ],
- 'message_tasks': [
- {
- "name": "获取coin实时数据",
- "command": "python3 {}/message_coin_detail.py".format(message_path),
- "schedule": "0 * * * *",
- "labels": ["message"]
- },
- {
- "name": "对比大乐透最新一期数据,匹配已购买号码,发送消息",
- "command": "python3 {}/message_dlt.py".format(message_path),
- "schedule": "30 22 * * 1,3,6",
- "labels": ["message"]
- },
- {
- "name": "获取未来 7 天的天气预报",
- "command": "python3 {}/message_get_one_week_weather.py".format(message_path),
- "schedule": "0 0 6,22 * * *",
- "labels": ["message"]
- },
- {
- "name": "从 freshrss-psql 数据库中读取数据并发送",
- "command": "python3 {}/message_rss_data_handel.py".format(message_path),
- "schedule": "30 6,9,12,18,22 * * *",
- "labels": ["message"]
- },
- {
- "name": "空投任务消息",
- "command": "python3 {}/message_airdrop_tasks.py".format(message_path),
- "schedule": "0 8,20 * * *",
- "labels": ["message"]
- },
- {
- "name": "链捕手快讯消息推送",
- "command": "python3 {}/message_chaincatcher.py".format(message_path),
- "schedule": "0 */2 * * *",
- "labels": ["message"]
- }
- ],
- 'manual': [
- ]
- }]
- for task_template in tasks_template:
- for task_type, task_list in task_template.items():
- for task in task_list:
- task_name = task["name"]
- if task_name in tasks_names:
- print("Task {} already exists.".format(task_name))
- else:
- result = create_task(task, token)
- print("Task creation result:", result)
- # 创建所有任务之后, 创建视图分类
- # create_view_type(token)
- break # 正常执行完成后退出循环
- except Exception as e:
- print("An error occurred: ", e)
- print("Retrying...")
- if __name__ == "__main__":
- main()
- print('done!')
|