| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- import asyncio
- import os
- import random
- import re
- import sqlite3
- import httpx
- # 1, 更新数据库, 2, 读取数据库, 关键词过滤输出
- opt = 1
- # 代理 ip
- proxy = 'http://127.0.0.1:7890'
- class GETCLDATA:
- def __init__(self):
- self.base_url = 'https://t66y.com/'
- self.target_url_dict = {
- 'cavalry': 'thread0806.php?fid=15&search=&page={}',
- 'infantry': 'thread0806.php?fid=2&search=&page={}',
- }
- self.headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'
- }
- self.all_data = {}
- async def fetch_page(self, client, url, tag, page):
- # print(f'正在获取 {tag} 第 {page} 页数据')
- sleep_time = random.uniform(3, 5)
- # print(f'程序暂停{sleep_time}')
- await asyncio.sleep(sleep_time)
- try:
- response = await client.get(url, headers=self.headers)
- if response.status_code != 200:
- print('连接失败')
- return None
- response.encoding = 'utf-8'
- return response.text
- except Exception as e:
- print(e)
- return None
- def parse_html(self, html, tag):
- target_list = re.findall(r'<h3>(.*?)</h3>', html)
- if not target_list:
- print(f'未找到任何h3标签内容,tag: {tag}')
- return
- for i in target_list:
- if '隨時更新' in i or '免翻地址' in i or '发布原创' in i or '版規' in i or 'VR' in i or 'vr' in i:
- continue
- href_url_list = re.findall(r'<a href="(.*?)"', i)
- cl_id_list = re.findall(r'id="(.*?)">', i)
- title_list = re.findall(r'target="_blank" id=".*?">(.*?)</a>', i)
- for herf_url, cl_id, title in zip(href_url_list, cl_id_list, title_list):
- if not self.all_data.setdefault(tag):
- self.all_data[tag] = [
- [cl_id, self.base_url + herf_url, herf_url, title]]
- else:
- self.all_data[tag].append(
- [cl_id, self.base_url + herf_url, herf_url, title])
- async def get_data(self):
- if proxy:
- async with httpx.AsyncClient(proxy=proxy) as client:
- tasks = []
- for tag, target_url in self.target_url_dict.items():
- for page in range(1, 100):
- url = self.base_url + target_url.format(page)
- task = asyncio.create_task(
- self.fetch_page(client, url, tag, page))
- tasks.append(task)
- htmls = await asyncio.gather(*tasks)
- for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
- if html:
- self.parse_html(html, tag)
- else:
- async with httpx.AsyncClient() as client:
- tasks = []
- for tag, target_url in self.target_url_dict.items():
- for page in range(1, 100):
- url = self.base_url + target_url.format(page)
- task = asyncio.create_task(
- self.fetch_page(client, url, tag, page))
- tasks.append(task)
- htmls = await asyncio.gather(*tasks)
- for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
- if html:
- self.parse_html(html, tag)
- def save_to_db(self, tag):
- conn = sqlite3.connect('cl.db')
- c = conn.cursor()
- c.execute('''
- CREATE TABLE IF NOT EXISTS 'CL' (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- cl_id TEXT NOT NULL,
- full_url TEXT NOT NULL,
- href_url TEXT NOT NULL,
- title TEXT NOT NULL,
- tag TEXT NOT NULL,
- UNIQUE(cl_id)
- )
- ''')
- conn.commit()
- skip_counter = 0
- save_line_count = 0
- for data in self.all_data.get(tag, []):
- cl_id, full_url, href_url, title = data
- c.execute(f'SELECT cl_id FROM "CL" WHERE cl_id=?', (cl_id,))
- if not c.fetchone():
- c.execute(f'INSERT INTO "CL" (cl_id, full_url, href_url, title, tag) VALUES (?, ?, ?, ?, ?)',
- (cl_id, full_url, href_url, title, tag))
- conn.commit()
- save_line_count += 1
- else:
- skip_counter += 1
- # print(f"数据已存在,跳过。当前跳过次数:{skip_counter}")
- if skip_counter >= 10:
- break
- c.close()
- conn.close()
- return save_line_count
- async def main(self):
- await self.get_data()
- if not self.all_data:
- print('无法获取数据')
- return
- save_line_count = 0
- for tag in self.all_data:
- save_line_count += self.save_to_db(tag)
- print(f'保存成功,共保存{save_line_count}条数据')
- class LOADCLDATA:
- def __init__(self, db_name='cl.db'):
- self.db_name = db_name
- self.conn = None
- self.cursor = None
- def connect(self):
- """连接到 SQLite 数据库"""
- if not os.path.exists(self.db_name):
- c = GETCLDATA()
- asyncio.run(c.main())
- self.conn = sqlite3.connect(self.db_name)
- self.cursor = self.conn.cursor()
- def fetch_all_data(self):
- """查询表中的所有数据"""
- self.cursor.execute("SELECT * FROM CL")
- rows = self.cursor.fetchall()
- print(f'\n\n数据库共有{len(rows)}条数据')
- return rows
- def filter_by_title(self, filter_list):
- """
- 根据 filter_list 中的元素模糊匹配 title 字段,并打印匹配的结果
- :param filter_list: 包含需要匹配的关键词的列表
- """
- if not filter_list:
- print("filter_list 为空,未进行匹配。")
- return
- # 构建 SQL 的 WHERE 条件
- like_conditions = " OR ".join(["Title LIKE ?"] * len(filter_list))
- query = f"SELECT * FROM CL WHERE {like_conditions}"
- # 构建参数列表,每个关键词前后加上 % 用于模糊匹配
- params = [f'%{keyword}%' for keyword in filter_list]
- # 执行查询
- self.cursor.execute(query, params)
- matched_rows = self.cursor.fetchall()
- # 打印匹配的结果
- if matched_rows:
- print("\n匹配到的结果:")
- for row in matched_rows:
- print(
- f"ID: {row[0]}, Tag: {row[5]}, Full_URL: {row[2]}, Title: {row[4]}")
- print(f"匹配到的总行数: {len(matched_rows)}")
- else:
- print("没有匹配到任何结果。")
- def close(self):
- """关闭数据库连接"""
- if self.conn:
- self.conn.close()
- if __name__ == '__main__':
- if opt == 1:
- cl = GETCLDATA()
- asyncio.run(cl.main())
- print('done')
- elif opt == 2:
- filter_list = ['']
- cl = LOADCLDATA()
- cl.connect()
- cl.filter_by_title(filter_list)
- cl.fetch_all_data()
- cl.close()
|