jack 8 months ago
parent
commit
adb7dfe2a2
4 changed files with 275 additions and 1 deletions
  1. 5 1
      README.md
  2. BIN
      cl.db
  3. 0 0
      cl_backup.py
  4. 270 0
      main.py

+ 5 - 1
README.md

@@ -1 +1,5 @@
-# dddd
+# dddd
+
+pip install -r requirements.txt
+
+py main.py

BIN
cl.db


+ 0 - 0
cl.py → cl_backup.py


+ 270 - 0
main.py

@@ -0,0 +1,270 @@
+import asyncio
+import os
+import random
+import re
+import sqlite3
+import httpx
+import tkinter as tk
+from tkinter import messagebox
+
+
+class GETCLDATA:
+    def __init__(self, proxy=None):  # 添加 proxy 参数
+        self.base_url = 'https://t66y.com/'
+        self.target_url_dict = {
+            'cavalry': 'thread0806.php?fid=15&search=&page={}',
+            'infantry': 'thread0806.php?fid=2&search=&page={}',
+        }
+        self.headers = {
+            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'
+        }
+        self.all_data = {}
+        self.proxy = proxy  # 保存代理设置
+
+    async def fetch_page(self, client, url, tag, page):
+        sleep_time = random.uniform(3, 5)
+        await asyncio.sleep(sleep_time)
+
+        try:
+            response = await client.get(url, headers=self.headers)
+            if response.status_code != 200:
+                print('连接失败')
+                return None
+            response.encoding = 'utf-8'
+            return response.text
+        except Exception as e:
+            print(e)
+            return None
+
+    def parse_html(self, html, tag):
+        target_list = re.findall(r'<h3>(.*?)</h3>', html)
+        if not target_list:
+            print(f'未找到任何h3标签内容,tag: {tag}')
+            return
+
+        for i in target_list:
+            if '隨時更新' in i or '免翻地址' in i or '发布原创' in i or '版規' in i or 'VR' in i or 'vr' in i:
+                continue
+            href_url_list = re.findall(r'<a href="(.*?)"', i)
+            cl_id_list = re.findall(r'id="(.*?)">', i)
+            title_list = re.findall(r'target="_blank" id=".*?">(.*?)</a>', i)
+
+            for herf_url, cl_id, title in zip(href_url_list, cl_id_list, title_list):
+                if not self.all_data.setdefault(tag):
+                    self.all_data[tag] = [
+                        [cl_id, self.base_url + herf_url, herf_url, title]]
+                else:
+                    self.all_data[tag].append(
+                        [cl_id, self.base_url + herf_url, herf_url, title])
+
+    async def get_data(self):
+        if self.proxy:  # 使用传入的代理
+            async with httpx.AsyncClient(proxies=self.proxy) as client:
+                tasks = []
+                for tag, target_url in self.target_url_dict.items():
+                    for page in range(1, 100):
+                        url = self.base_url + target_url.format(page)
+                        task = asyncio.create_task(
+                            self.fetch_page(client, url, tag, page))
+                        tasks.append(task)
+
+                htmls = await asyncio.gather(*tasks)
+
+                for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
+                    if html:
+                        self.parse_html(html, tag)
+        else:
+            async with httpx.AsyncClient() as client:
+                tasks = []
+                for tag, target_url in self.target_url_dict.items():
+                    for page in range(1, 100):
+                        url = self.base_url + target_url.format(page)
+                        task = asyncio.create_task(
+                            self.fetch_page(client, url, tag, page))
+                        tasks.append(task)
+
+                htmls = await asyncio.gather(*tasks)
+
+                for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
+                    if html:
+                        self.parse_html(html, tag)
+
+    def save_to_db(self, tag):
+        conn = sqlite3.connect('cl.db')
+        c = conn.cursor()
+
+        c.execute('''
+        CREATE TABLE IF NOT EXISTS 'CL' (
+            id INTEGER PRIMARY KEY AUTOINCREMENT,
+            cl_id TEXT NOT NULL,
+            full_url TEXT NOT NULL,
+            href_url TEXT NOT NULL,
+            title TEXT NOT NULL,
+            tag TEXT NOT NULL,
+            UNIQUE(cl_id)
+        )
+        ''')
+
+        conn.commit()
+
+        skip_counter = 0
+        save_line_count = 0
+        for data in self.all_data.get(tag, []):
+            cl_id, full_url, href_url, title = data
+
+            c.execute(f'SELECT cl_id FROM "CL" WHERE cl_id=?', (cl_id,))
+            if not c.fetchone():
+                c.execute(f'INSERT INTO "CL" (cl_id, full_url, href_url, title, tag) VALUES (?, ?, ?, ?, ?)',
+                          (cl_id, full_url, href_url, title, tag))
+                conn.commit()
+                save_line_count += 1
+            else:
+                skip_counter += 1
+                if skip_counter >= 10:
+                    break
+
+        c.close()
+        conn.close()
+        return save_line_count
+
+    async def main(self):
+        await self.get_data()
+
+        if not self.all_data:
+            print('无法获取数据')
+            return 0
+
+        save_line_count = 0
+        for tag in self.all_data:
+            save_line_count += self.save_to_db(tag)
+
+        print(f'保存成功,共保存{save_line_count}条数据')
+        return save_line_count
+
+
+class LOADCLDATA:
+    def __init__(self, db_name='cl.db'):
+        self.db_name = db_name
+        self.conn = None
+        self.cursor = None
+
+    def connect(self):
+        if not os.path.exists(self.db_name):
+            c = GETCLDATA()
+            asyncio.run(c.main())
+        self.conn = sqlite3.connect(self.db_name)
+        self.cursor = self.conn.cursor()
+
+    def fetch_all_data(self):
+        self.cursor.execute("SELECT * FROM CL")
+        rows = self.cursor.fetchall()
+        print(f'\n\n数据库共有{len(rows)}条数据')
+        return rows
+
+    def filter_by_title(self, filter_list):
+        if not filter_list:
+            print("filter_list 为空,未进行匹配。")
+            return []
+
+        like_conditions = " OR ".join(["title LIKE ?"] * len(filter_list))
+        query = f"SELECT * FROM CL WHERE {like_conditions}"
+
+        params = [f'%{keyword}%' for keyword in filter_list]
+
+        self.cursor.execute(query, params)
+        matched_rows = self.cursor.fetchall()
+
+        return matched_rows
+
+    def close(self):
+        if self.conn:
+            self.conn.close()
+
+
+class ClApp:
+    def __init__(self, root):
+        self.root = root
+        self.root.title("CL")
+        screen_width = self.root.winfo_screenwidth()
+        screen_height = self.root.winfo_screenheight()
+        window_width = 800
+        window_height = 650
+        x = (screen_width - window_width) // 2 + 100
+        y = (screen_height - window_height) // 2 + 50
+        self.root.geometry(f"{window_width}x{window_height}+{x}+{y}")
+
+        self.top_frame = tk.Frame(self.root)
+        self.top_frame.pack(pady=10)
+
+        self.update_button = tk.Button(
+            self.top_frame, text="更新数据库", command=self.update_database)
+        self.update_button.pack(side=tk.LEFT, padx=5)
+
+        self.search_button = tk.Button(
+            self.top_frame, text="搜索数据库", command=self.search_database)
+        self.search_button.pack(side=tk.LEFT, padx=5)
+
+        self.search_entry = tk.Entry(self.top_frame, width=30)
+        self.search_entry.pack(side=tk.LEFT, padx=5)
+
+        self.proxy_frame = tk.Frame(self.root)
+        self.proxy_frame.pack(pady=5)
+
+        self.proxy_var = tk.BooleanVar(value=True)
+        self.proxy_checkbox = tk.Checkbutton(
+            self.proxy_frame, text="是否使用代理", variable=self.proxy_var)
+        self.proxy_checkbox.pack(side=tk.LEFT, padx=5)
+
+        self.proxy_entry = tk.Entry(self.proxy_frame, width=30)
+        self.proxy_entry.insert(0, "http://127.0.0.1:7890")
+        self.proxy_entry.pack(side=tk.LEFT, padx=5)
+
+        self.output_text = tk.Text(
+            self.root, height=35, width=100, state="disabled")
+        self.output_text.pack(pady=10)
+
+        self.clear_button = tk.Button(
+            self.root, text="清空输出", command=self.clear_output)
+        self.clear_button.pack(pady=10)
+
+    def update_database(self):
+        self.output_text.config(state="normal")
+        proxy = self.proxy_entry.get() if self.proxy_var.get() else None
+        get_cl_data = GETCLDATA(proxy=proxy)
+        asyncio.run(get_cl_data.main())
+        self.output_text.config(state="disabled")
+        messagebox.showinfo("提示", "数据库已更新完成")
+
+    def search_database(self):
+        keyword = self.search_entry.get()
+        self.output_text.config(state="normal")
+        self.output_text.delete(1.0, tk.END)  # 清空输出框
+        self.output_text.insert(tk.END, f"搜索关键词: {keyword}\n\n")
+        self.output_text.config(state="disabled")
+
+        # 调用 LOADCLDATA 类进行搜索
+        load_cl_data = LOADCLDATA()
+        load_cl_data.connect()
+        results = load_cl_data.filter_by_title([keyword])
+
+        if results:
+            self.output_text.config(state="normal")
+            output_result = ""
+            for row in results:
+                output_result += f"{row[4]}\n{row[2]}\n\n"
+            self.output_text.insert(tk.END, output_result)
+            self.output_text.config(state="disabled")
+            messagebox.showinfo("搜索完成", f"共搜索到 {len(results)} 条数据")
+        else:
+            messagebox.showinfo("搜索完成", "没有匹配到任何结果")
+        load_cl_data.close()
+
+    def clear_output(self):
+        self.output_text.config(state="normal")
+        self.output_text.delete(1.0, tk.END)
+        self.output_text.config(state="disabled")
+
+
+root = tk.Tk()
+app = ClApp(root)
+root.mainloop()