| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- # -*- coding: utf-8 -*-
- import sys
- import os
- import time
- from httpx import Client
- from concurrent.futures import ThreadPoolExecutor, as_completed
- sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
- from utils.utils_send_gotify import *
- from utils.utils_daily_task_scheduler import *
- DEBUG_MODE = 1
- def daily_claim_3dos(email, password):
- # client = Client(proxy="http://127.0.0.1:7890")
- client = Client()
- login_url = "https://api.dashboard.3dos.io/api/auth/login"
- login_headers = {
- "Accept": "application/json, text/plain, */*",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Content-Type": "application/json",
- "Origin": "https://dashboard.3dos.io",
- "Priority": "u=1, i",
- "Referer": "https://dashboard.3dos.io/",
- "Sec-CH-UA": '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
- "Sec-CH-UA-Mobile": "?0",
- "Sec-CH-UA-Platform": '"Windows"',
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-site",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
- }
- login_payload = {
- "email": email,
- "password": password
- }
- try:
- login_response = client.post(login_url, json=login_payload, headers=login_headers)
- except Exception as e:
- print(f"登录请求失败:{e}")
- return False
- if login_response.status_code == 200:
- # print(f"{email}: 登录成功!")
- login_data = login_response.json()
- # print("登录响应内容:", login_data)
- data = login_data.get("data")
- token = data.get("access_token") or data.get("token")
- token_type = data.get("token_type")
- if not token or not token_type:
- print("登录响应中未找到 Token 或 Token 类型!")
- return False
- authorization_header = f"{token_type} {token}"
- else:
- print("登录失败!")
- print("登录响应状态码:", login_response.status_code)
- exit(1)
- options_url = "https://api.dashboard.3dos.io/api/claim-reward"
- options_headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Access-Control-Request-Headers": "authorization,cache-control,content-type,expires,pragma",
- "Access-Control-Request-Method": "POST",
- "Origin": "https://dashboard.3dos.io",
- "Priority": "u=1, i",
- "Referer": "https://dashboard.3dos.io/",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-site",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
- }
- try:
- options_response = client.options(options_url, headers=options_headers)
- except Exception as e:
- print(f"OPTIONS 请求失败:{e}")
- exit(1)
- # print("OPTIONS 请求响应状态码:", options_response.status_code)
- claim_url = "https://api.dashboard.3dos.io/api/claim-reward"
- claim_headers = {
- "Accept": "application/json, text/plain, */*",
- "Authorization": authorization_header,
- "Cache-Control": "no-cache",
- "Content-Type": "application/json",
- "Expires": "0",
- "Origin": "https://dashboard.3dos.io",
- "Pragma": "no-cache",
- "Priority": "u=1, i",
- "Referer": "https://dashboard.3dos.io/",
- "Sec-CH-UA": '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
- "Sec-CH-UA-Mobile": "?0",
- "Sec-CH-UA-Platform": '"Windows"',
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-site",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
- }
- claim_payload = {
- "id": "daily-reward-api"
- }
- try:
- claim_response = client.post(claim_url, json=claim_payload, headers=claim_headers)
- except Exception as e:
- print(f"{email} claim-reward 请求失败:{e}")
- exit(1)
- message = claim_response.json().get("message")
- if claim_response.status_code not in [200, 429]:
- print(f"{email} claim 报错: {message}, 状态码{claim_response.status_code}")
- exit(1)
- result = f"{email}: {message}"
- return result
- def main():
- print("开始执行...")
- account_list = [
- "jack0210_@hotmail.com|||aaaAAA111!!!",
- "yujieccyj01@hotmail.com|||aaaAAA111!!!",
- "yujieccyj02@hotmail.com|||aaaAAA111!!!",
- "yujieccyj03@hotmail.com|||aaaAAA111!!!",
- "yujieccyj04@hotmail.com|||aaaAAA111!!!",
- "yujieccyj05@hotmail.com|||aaaaAA111!!!",
- "yujieccyj06@hotmail.com|||aaaAAA111!!!",
- "yujieccyj07@hotmail.com|||aaaAAA111!!!",
- "yujieccyj08@hotmail.com|||aaaAAA111!!!",
- "yujieccyj09@hotmail.com|||aaaAAA111!!!",
- "yujieccyj10@hotmail.com|||aaaAAA111!!!",
- ]
- send_str = ''
- with ThreadPoolExecutor(max_workers=len(account_list)) as executor:
- futures = []
- for account in account_list:
- email, password = account.split("|||")
- futures.append(executor.submit(daily_claim_3dos, email, password))
- for future in as_completed(futures):
- result = future.result()
- if result:
- send_str += result + '\n'
- print(result)
- time.sleep(1)
- if send_str:
- gotify_notifier = GotifyNotifier(title='3dos daily', message=send_str, token_name='CheckAndRemind')
- gotify_notifier.send_message()
- print(result)
- else:
- print("No news found.")
- if __name__ == "__main__":
- print('开始 3dos 每日签到')
- if DEBUG_MODE:
- file_path = os.path.join(os.getcwd(), "daily_3dos.txt")
- else:
- file_path = os.path.join(os.getcwd(), "auto", "daily", "daily_3dos.txt")
- T = TaskScheduler(file_path)
- # 执行前读取执行时间, 判断是否到执行时间
- if not T.load_start_time():
- print('3dos 任务未到执行时间')
- exit(1)
- # 开始执行
- main()
- # 执行完成后, 将执行时间 + 24 小时写入文件
- T.save_start_time()
|