jack 8 ماه پیش
والد
کامیت
de97c7d777
5فایلهای تغییر یافته به همراه215 افزوده شده و 199 حذف شده
  1. 1 3
      README.md
  2. BIN
      cl.db
  3. 214 0
      cl.py
  4. 0 65
      cl_load.py
  5. 0 131
      cl_save.py

+ 1 - 3
README.md

@@ -1,3 +1 @@
-# cl
-
-cl爬虫
+# dddd


+ 214 - 0
cl.py

@@ -0,0 +1,214 @@
+import asyncio
+import random
+import re
+import sqlite3
+import httpx
+
+# 1, 更新数据库, 2, 读取数据库, 关键词过滤输出
+opt = 2
+
+# 代理 ip
+proxy = 'http://127.0.0.1:7890'
+
+
+class GETCLDATA:
+    def __init__(self):
+        self.base_url = 'https://t66y.com/'
+        self.target_url_dict = {
+            'cavalry': 'thread0806.php?fid=15&search=&page={}',
+            'infantry': 'thread0806.php?fid=2&search=&page={}',
+        }
+        self.headers = {
+            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'
+        }
+        self.all_data = {}
+
+    async def fetch_page(self, client, url, tag, page):
+        # print(f'正在获取 {tag} 第 {page} 页数据')
+        sleep_time = random.uniform(3, 5)
+        # print(f'程序暂停{sleep_time}')
+        await asyncio.sleep(sleep_time)
+
+        try:
+            response = await client.get(url, headers=self.headers)
+            if response.status_code != 200:
+                print('连接失败')
+                return None
+            response.encoding = 'utf-8'
+            return response.text
+        except Exception as e:
+            print(e)
+            return None
+
+    def parse_html(self, html, tag):
+        target_list = re.findall(r'<h3>(.*?)</h3>', html)
+        if not target_list:
+            print(f'未找到任何h3标签内容,tag: {tag}')
+            return
+
+        for i in target_list:
+            if '隨時更新' in i or '免翻地址' in i or '发布原创' in i or '版規' in i or 'VR' in i or 'vr' in i:
+                continue
+            href_url_list = re.findall(r'<a href="(.*?)"', i)
+            cl_id_list = re.findall(r'id="(.*?)">', i)
+            title_list = re.findall(r'target="_blank" id=".*?">(.*?)</a>', i)
+
+            for herf_url, cl_id, title in zip(href_url_list, cl_id_list, title_list):
+                if not self.all_data.setdefault(tag):
+                    self.all_data[tag] = [
+                        [cl_id, self.base_url + herf_url, herf_url, title]]
+                else:
+                    self.all_data[tag].append(
+                        [cl_id, self.base_url + herf_url, herf_url, title])
+
+    async def get_data(self):
+        if proxy:
+            async with httpx.AsyncClient(proxy=proxy) as client:
+                tasks = []
+                for tag, target_url in self.target_url_dict.items():
+                    for page in range(1, 100):
+                        url = self.base_url + target_url.format(page)
+                        task = asyncio.create_task(
+                            self.fetch_page(client, url, tag, page))
+                        tasks.append(task)
+
+                htmls = await asyncio.gather(*tasks)
+
+                for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
+                    if html:
+                        self.parse_html(html, tag)
+        else:
+            async with httpx.AsyncClient() as client:
+                tasks = []
+                for tag, target_url in self.target_url_dict.items():
+                    for page in range(1, 100):
+                        url = self.base_url + target_url.format(page)
+                        task = asyncio.create_task(
+                            self.fetch_page(client, url, tag, page))
+                        tasks.append(task)
+
+                htmls = await asyncio.gather(*tasks)
+
+                for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
+                    if html:
+                        self.parse_html(html, tag)
+
+    def save_to_db(self, tag):
+        conn = sqlite3.connect('cl.db')
+        c = conn.cursor()
+
+        c.execute('''
+        CREATE TABLE IF NOT EXISTS 'CL' (
+            id INTEGER PRIMARY KEY AUTOINCREMENT,
+            cl_id TEXT NOT NULL,
+            full_url TEXT NOT NULL,
+            href_url TEXT NOT NULL,
+            title TEXT NOT NULL,
+            tag TEXT NOT NULL,
+            UNIQUE(cl_id)
+        )
+        ''')
+
+        conn.commit()
+
+        skip_counter = 0
+        save_line_count = 0
+        for data in self.all_data.get(tag, []):
+            cl_id, full_url, href_url, title = data
+
+            c.execute(f'SELECT cl_id FROM "CL" WHERE cl_id=?', (cl_id,))
+            if not c.fetchone():
+                c.execute(f'INSERT INTO "CL" (cl_id, full_url, href_url, title, tag) VALUES (?, ?, ?, ?, ?)',
+                          (cl_id, full_url, href_url, title, tag))
+                conn.commit()
+                save_line_count += 1
+            else:
+                skip_counter += 1
+                # print(f"数据已存在,跳过。当前跳过次数:{skip_counter}")
+                if skip_counter >= 10:
+                    break
+
+        c.close()
+        conn.close()
+        return save_line_count
+
+    async def main(self):
+        await self.get_data()
+
+        if not self.all_data:
+            print('无法获取数据')
+            return
+
+        save_line_count = 0
+        for tag in self.all_data:
+            save_line_count += self.save_to_db(tag)
+
+        print(f'保存成功,共保存{save_line_count}条数据')
+
+
+class LOADCLDATA:
+    def __init__(self, db_name='cl.db'):
+        self.db_name = db_name
+        self.conn = None
+        self.cursor = None
+
+    def connect(self):
+        """连接到 SQLite 数据库"""
+        self.conn = sqlite3.connect(self.db_name)
+        self.cursor = self.conn.cursor()
+
+    def fetch_all_data(self):
+        """查询表中的所有数据"""
+        self.cursor.execute("SELECT * FROM CL")
+        rows = self.cursor.fetchall()
+        print(f'\n\n数据库共有{len(rows)}条数据')
+        return rows
+
+    def filter_by_title(self, filter_list):
+        """
+        根据 filter_list 中的元素模糊匹配 title 字段,并打印匹配的结果
+        :param filter_list: 包含需要匹配的关键词的列表
+        """
+        if not filter_list:
+            print("filter_list 为空,未进行匹配。")
+            return
+
+        # 构建 SQL 的 WHERE 条件
+        like_conditions = " OR ".join(["Title LIKE ?"] * len(filter_list))
+        query = f"SELECT * FROM CL WHERE {like_conditions}"
+
+        # 构建参数列表,每个关键词前后加上 % 用于模糊匹配
+        params = [f'%{keyword}%' for keyword in filter_list]
+
+        # 执行查询
+        self.cursor.execute(query, params)
+        matched_rows = self.cursor.fetchall()
+
+        # 打印匹配的结果
+        if matched_rows:
+            print("\n匹配到的结果:")
+            for row in matched_rows:
+                print(
+                    f"ID: {row[0]}, Tag: {row[5]}, Full_URL: {row[2]}, Title: {row[4]}")
+            print(f"匹配到的总行数: {len(matched_rows)}")
+        else:
+            print("没有匹配到任何结果。")
+
+    def close(self):
+        """关闭数据库连接"""
+        if self.conn:
+            self.conn.close()
+
+
+if __name__ == '__main__':
+    if opt == 1:
+        cl = GETCLDATA()
+        asyncio.run(cl.main())
+        print('done')
+    elif opt == 2:
+        filter_list = ['']
+        cl = LOADCLDATA()
+        cl.connect()
+        cl.filter_by_title(filter_list)
+        cl.fetch_all_data()
+        cl.close()

+ 0 - 65
cl_load.py

@@ -1,65 +0,0 @@
-import sqlite3
-
-
-class CLDatabase:
-    def __init__(self, db_name='cl.db'):
-        self.db_name = db_name
-        self.conn = None
-        self.cursor = None
-
-    def connect(self):
-        """连接到 SQLite 数据库"""
-        self.conn = sqlite3.connect(self.db_name)
-        self.cursor = self.conn.cursor()
-
-    def fetch_all_data(self):
-        """查询表中的所有数据"""
-        self.cursor.execute("SELECT * FROM CL")
-        rows = self.cursor.fetchall()
-        print(f'\n\n数据库共有{len(rows)}条数据')
-        return rows
-
-    def filter_by_title(self, filter_list):
-        """
-        根据 filter_list 中的元素模糊匹配 title 字段,并打印匹配的结果
-        :param filter_list: 包含需要匹配的关键词的列表
-        """
-        if not filter_list:
-            print("filter_list 为空,未进行匹配。")
-            return
-
-        # 构建 SQL 的 WHERE 条件
-        like_conditions = " OR ".join(["Title LIKE ?"] * len(filter_list))
-        query = f"SELECT * FROM CL WHERE {like_conditions}"
-
-        # 构建参数列表,每个关键词前后加上 % 用于模糊匹配
-        params = [f'%{keyword}%' for keyword in filter_list]
-
-        # 执行查询
-        self.cursor.execute(query, params)
-        matched_rows = self.cursor.fetchall()
-
-        # 打印匹配的结果
-        if matched_rows:
-            print("\n匹配到的结果:")
-            for row in matched_rows:
-                print(
-                    f"ID: {row[0]}, Tag: {row[5]}, Full_URL: {row[2]}, Title: {row[4]}")
-            print(f"匹配到的总行数: {len(matched_rows)}")
-        else:
-            print("没有匹配到任何结果。")
-
-    def close(self):
-        """关闭数据库连接"""
-        if self.conn:
-            self.conn.close()
-
-
-# 使用示例
-if __name__ == "__main__":
-    filter_list = ['']
-    db = CLDatabase()
-    db.connect()
-    db.filter_by_title(filter_list)
-    db.fetch_all_data()
-    db.close()

+ 0 - 131
cl_save.py

@@ -1,131 +0,0 @@
-import asyncio
-import random
-import re
-import sqlite3
-import httpx
-
-
-class GETCLDATA:
-    def __init__(self):
-        self.base_url = 'https://t66y.com/'
-        self.target_url_dict = {
-            'cavalry': 'thread0806.php?fid=15&search=&page={}',
-            'infantry': 'thread0806.php?fid=2&search=&page={}',
-        }
-        self.headers = {
-            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'
-        }
-        self.all_data = {}
-
-    async def fetch_page(self, client, url, tag, page):
-        # print(f'正在获取 {tag} 第 {page} 页数据')
-        sleep_time = random.uniform(3, 5)
-        # print(f'程序暂停{sleep_time}')
-        await asyncio.sleep(sleep_time)
-
-        try:
-            response = await client.get(url, headers=self.headers)
-            if response.status_code != 200:
-                print('连接失败')
-                return None
-            response.encoding = 'utf-8'
-            return response.text
-        except Exception as e:
-            print(e)
-            return None
-
-    def parse_html(self, html, tag):
-        target_list = re.findall(r'<h3>(.*?)</h3>', html)
-        if not target_list:
-            print(f'未找到任何h3标签内容,tag: {tag}')
-            return
-
-        for i in target_list:
-            if '隨時更新' in i or '免翻地址' in i or '发布原创' in i or '版規' in i or 'VR' in i or 'vr' in i:
-                continue
-            href_url_list = re.findall(r'<a href="(.*?)"', i)
-            cl_id_list = re.findall(r'id="(.*?)">', i)
-            title_list = re.findall(r'target="_blank" id=".*?">(.*?)</a>', i)
-
-            for herf_url, cl_id, title in zip(href_url_list, cl_id_list, title_list):
-                if not self.all_data.setdefault(tag):
-                    self.all_data[tag] = [
-                        [cl_id, self.base_url + herf_url, herf_url, title]]
-                else:
-                    self.all_data[tag].append(
-                        [cl_id, self.base_url + herf_url, herf_url, title])
-
-    async def get_data(self):
-        async with httpx.AsyncClient(proxy="http://127.0.0.1:7890") as client:
-            tasks = []
-            for tag, target_url in self.target_url_dict.items():
-                for page in range(1, 100):
-                    url = self.base_url + target_url.format(page)
-                    task = asyncio.create_task(
-                        self.fetch_page(client, url, tag, page))
-                    tasks.append(task)
-
-            htmls = await asyncio.gather(*tasks)
-
-            for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
-                if html:
-                    self.parse_html(html, tag)
-
-    def save_to_db(self, tag):
-        conn = sqlite3.connect('cl.db')
-        c = conn.cursor()
-
-        c.execute('''
-        CREATE TABLE IF NOT EXISTS 'CL' (
-            id INTEGER PRIMARY KEY AUTOINCREMENT,
-            cl_id TEXT NOT NULL,
-            full_url TEXT NOT NULL,
-            href_url TEXT NOT NULL,
-            title TEXT NOT NULL,
-            tag TEXT NOT NULL,
-            UNIQUE(cl_id)
-        )
-        ''')
-
-        conn.commit()
-
-        skip_counter = 0
-        save_line_count = 0
-        for data in self.all_data.get(tag, []):
-            cl_id, full_url, href_url, title = data
-
-            c.execute(f'SELECT cl_id FROM "CL" WHERE cl_id=?', (cl_id,))
-            if not c.fetchone():
-                c.execute(f'INSERT INTO "CL" (cl_id, full_url, href_url, title, tag) VALUES (?, ?, ?, ?, ?)',
-                          (cl_id, full_url, href_url, title, tag))
-                conn.commit()
-                save_line_count += 1
-            else:
-                skip_counter += 1
-                # print(f"数据已存在,跳过。当前跳过次数:{skip_counter}")
-                if skip_counter >= 10:
-                    break
-
-        if save_line_count > 0:
-            print(f'保存成功,共保存{save_line_count}条数据')
-        else:
-            print('全部数据已存在,没有保存任何数据')
-
-        c.close()
-        conn.close()
-
-    async def main(self):
-        await self.get_data()
-
-        if not self.all_data:
-            print('无法获取数据')
-            return
-
-        for tag in self.all_data:
-            self.save_to_db(tag)
-
-
-if __name__ == '__main__':
-    cl = GETCLDATA()
-    asyncio.run(cl.main())
-    print('done')