import asyncio import os import random import re import sqlite3 import httpx import tkinter as tk from tkinter import messagebox class GETCLDATA: def __init__(self, proxy=None): # 添加 proxy 参数 self.base_url = 'https://t66y.com/' self.target_url_dict = { 'cavalry': 'thread0806.php?fid=15&search=&page={}', 'infantry': 'thread0806.php?fid=2&search=&page={}', } self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0' } self.all_data = {} self.proxy = proxy # 保存代理设置 async def fetch_page(self, client, url, tag, page): sleep_time = random.uniform(3, 5) await asyncio.sleep(sleep_time) try: response = await client.get(url, headers=self.headers) if response.status_code != 200: print('连接失败') return None response.encoding = 'utf-8' return response.text except Exception as e: print(e) return None def parse_html(self, html, tag): target_list = re.findall(r'

(.*?)

', html) if not target_list: print(f'未找到任何h3标签内容,tag: {tag}') return for i in target_list: if '隨時更新' in i or '免翻地址' in i or '发布原创' in i or '版規' in i or 'VR' in i or 'vr' in i: continue href_url_list = re.findall(r'', i) title_list = re.findall(r'target="_blank" id=".*?">(.*?)', i) for herf_url, cl_id, title in zip(href_url_list, cl_id_list, title_list): if not self.all_data.setdefault(tag): self.all_data[tag] = [ [cl_id, self.base_url + herf_url, herf_url, title]] else: self.all_data[tag].append( [cl_id, self.base_url + herf_url, herf_url, title]) async def get_data(self): if self.proxy: # 使用传入的代理 async with httpx.AsyncClient(proxies=self.proxy) as client: tasks = [] for tag, target_url in self.target_url_dict.items(): for page in range(1, 100): url = self.base_url + target_url.format(page) task = asyncio.create_task( self.fetch_page(client, url, tag, page)) tasks.append(task) htmls = await asyncio.gather(*tasks) for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]): if html: self.parse_html(html, tag) else: async with httpx.AsyncClient() as client: tasks = [] for tag, target_url in self.target_url_dict.items(): for page in range(1, 100): url = self.base_url + target_url.format(page) task = asyncio.create_task( self.fetch_page(client, url, tag, page)) tasks.append(task) htmls = await asyncio.gather(*tasks) for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]): if html: self.parse_html(html, tag) def save_to_db(self, tag): conn = sqlite3.connect('cl.db') c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS 'CL' ( id INTEGER PRIMARY KEY AUTOINCREMENT, cl_id TEXT NOT NULL, full_url TEXT NOT NULL, href_url TEXT NOT NULL, title TEXT NOT NULL, tag TEXT NOT NULL, UNIQUE(cl_id) ) ''') conn.commit() skip_counter = 0 save_line_count = 0 for data in self.all_data.get(tag, []): cl_id, full_url, href_url, title = data c.execute(f'SELECT cl_id FROM "CL" WHERE cl_id=?', (cl_id,)) if not c.fetchone(): c.execute(f'INSERT INTO "CL" (cl_id, full_url, href_url, title, tag) VALUES (?, ?, ?, ?, ?)', (cl_id, full_url, href_url, title, tag)) conn.commit() save_line_count += 1 else: skip_counter += 1 if skip_counter >= 10: break c.close() conn.close() return save_line_count async def main(self): await self.get_data() if not self.all_data: print('无法获取数据') return 0 save_line_count = 0 for tag in self.all_data: save_line_count += self.save_to_db(tag) print(f'保存成功,共保存{save_line_count}条数据') return save_line_count class LOADCLDATA: def __init__(self, db_name='cl.db'): self.db_name = db_name self.conn = None self.cursor = None def connect(self): if not os.path.exists(self.db_name): c = GETCLDATA() asyncio.run(c.main()) self.conn = sqlite3.connect(self.db_name) self.cursor = self.conn.cursor() def fetch_all_data(self): self.cursor.execute("SELECT * FROM CL") rows = self.cursor.fetchall() print(f'\n\n数据库共有{len(rows)}条数据') return rows def filter_by_title(self, filter_list): if not filter_list: print("filter_list 为空,未进行匹配。") return [] like_conditions = " OR ".join(["title LIKE ?"] * len(filter_list)) query = f"SELECT * FROM CL WHERE {like_conditions}" params = [f'%{keyword}%' for keyword in filter_list] self.cursor.execute(query, params) matched_rows = self.cursor.fetchall() return matched_rows def close(self): if self.conn: self.conn.close() class ClApp: def __init__(self, root): self.root = root self.root.title("CL") screen_width = self.root.winfo_screenwidth() screen_height = self.root.winfo_screenheight() window_width = 800 window_height = 650 x = (screen_width - window_width) // 2 + 100 y = (screen_height - window_height) // 2 + 50 self.root.geometry(f"{window_width}x{window_height}+{x}+{y}") self.top_frame = tk.Frame(self.root) self.top_frame.pack(pady=10) self.update_button = tk.Button( self.top_frame, text="更新数据库", command=self.update_database) self.update_button.pack(side=tk.LEFT, padx=5) self.search_button = tk.Button( self.top_frame, text="搜索数据库", command=self.search_database) self.search_button.pack(side=tk.LEFT, padx=5) self.search_entry = tk.Entry(self.top_frame, width=30) self.search_entry.pack(side=tk.LEFT, padx=5) self.proxy_frame = tk.Frame(self.root) self.proxy_frame.pack(pady=5) self.proxy_var = tk.BooleanVar(value=True) self.proxy_checkbox = tk.Checkbutton( self.proxy_frame, text="是否使用代理", variable=self.proxy_var) self.proxy_checkbox.pack(side=tk.LEFT, padx=5) self.proxy_entry = tk.Entry(self.proxy_frame, width=30) self.proxy_entry.insert(0, "http://127.0.0.1:7890") self.proxy_entry.pack(side=tk.LEFT, padx=5) self.output_text = tk.Text( self.root, height=35, width=100, state="disabled") self.output_text.pack(pady=10) self.clear_button = tk.Button( self.root, text="清空输出", command=self.clear_output) self.clear_button.pack(pady=10) def update_database(self): self.output_text.config(state="normal") proxy = self.proxy_entry.get() if self.proxy_var.get() else None get_cl_data = GETCLDATA(proxy=proxy) asyncio.run(get_cl_data.main()) self.output_text.config(state="disabled") messagebox.showinfo("提示", "数据库已更新完成") def search_database(self): keyword = self.search_entry.get() self.output_text.config(state="normal") self.output_text.delete(1.0, tk.END) # 清空输出框 self.output_text.insert(tk.END, f"搜索关键词: {keyword}\n\n") self.output_text.config(state="disabled") # 调用 LOADCLDATA 类进行搜索 load_cl_data = LOADCLDATA() load_cl_data.connect() results = load_cl_data.filter_by_title([keyword]) if results: self.output_text.config(state="normal") output_result = "" for row in results: output_result += f"{row[4]}\n{row[2]}\n\n" self.output_text.insert(tk.END, output_result) self.output_text.config(state="disabled") messagebox.showinfo("搜索完成", f"共搜索到 {len(results)} 条数据") else: messagebox.showinfo("搜索完成", "没有匹配到任何结果") load_cl_data.close() def clear_output(self): self.output_text.config(state="normal") self.output_text.delete(1.0, tk.END) self.output_text.config(state="disabled") root = tk.Tk() app = ClApp(root) root.mainloop()