cl.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import asyncio
  2. import random
  3. import re
  4. import sqlite3
  5. import httpx
  6. # 1, 更新数据库, 2, 读取数据库, 关键词过滤输出
  7. opt = 2
  8. # 代理 ip
  9. proxy = 'http://127.0.0.1:7890'
  10. class GETCLDATA:
  11. def __init__(self):
  12. self.base_url = 'https://t66y.com/'
  13. self.target_url_dict = {
  14. 'cavalry': 'thread0806.php?fid=15&search=&page={}',
  15. 'infantry': 'thread0806.php?fid=2&search=&page={}',
  16. }
  17. self.headers = {
  18. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'
  19. }
  20. self.all_data = {}
  21. async def fetch_page(self, client, url, tag, page):
  22. # print(f'正在获取 {tag} 第 {page} 页数据')
  23. sleep_time = random.uniform(3, 5)
  24. # print(f'程序暂停{sleep_time}')
  25. await asyncio.sleep(sleep_time)
  26. try:
  27. response = await client.get(url, headers=self.headers)
  28. if response.status_code != 200:
  29. print('连接失败')
  30. return None
  31. response.encoding = 'utf-8'
  32. return response.text
  33. except Exception as e:
  34. print(e)
  35. return None
  36. def parse_html(self, html, tag):
  37. target_list = re.findall(r'<h3>(.*?)</h3>', html)
  38. if not target_list:
  39. print(f'未找到任何h3标签内容,tag: {tag}')
  40. return
  41. for i in target_list:
  42. if '隨時更新' in i or '免翻地址' in i or '发布原创' in i or '版規' in i or 'VR' in i or 'vr' in i:
  43. continue
  44. href_url_list = re.findall(r'<a href="(.*?)"', i)
  45. cl_id_list = re.findall(r'id="(.*?)">', i)
  46. title_list = re.findall(r'target="_blank" id=".*?">(.*?)</a>', i)
  47. for herf_url, cl_id, title in zip(href_url_list, cl_id_list, title_list):
  48. if not self.all_data.setdefault(tag):
  49. self.all_data[tag] = [
  50. [cl_id, self.base_url + herf_url, herf_url, title]]
  51. else:
  52. self.all_data[tag].append(
  53. [cl_id, self.base_url + herf_url, herf_url, title])
  54. async def get_data(self):
  55. if proxy:
  56. async with httpx.AsyncClient(proxy=proxy) as client:
  57. tasks = []
  58. for tag, target_url in self.target_url_dict.items():
  59. for page in range(1, 100):
  60. url = self.base_url + target_url.format(page)
  61. task = asyncio.create_task(
  62. self.fetch_page(client, url, tag, page))
  63. tasks.append(task)
  64. htmls = await asyncio.gather(*tasks)
  65. for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
  66. if html:
  67. self.parse_html(html, tag)
  68. else:
  69. async with httpx.AsyncClient() as client:
  70. tasks = []
  71. for tag, target_url in self.target_url_dict.items():
  72. for page in range(1, 100):
  73. url = self.base_url + target_url.format(page)
  74. task = asyncio.create_task(
  75. self.fetch_page(client, url, tag, page))
  76. tasks.append(task)
  77. htmls = await asyncio.gather(*tasks)
  78. for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
  79. if html:
  80. self.parse_html(html, tag)
  81. def save_to_db(self, tag):
  82. conn = sqlite3.connect('cl.db')
  83. c = conn.cursor()
  84. c.execute('''
  85. CREATE TABLE IF NOT EXISTS 'CL' (
  86. id INTEGER PRIMARY KEY AUTOINCREMENT,
  87. cl_id TEXT NOT NULL,
  88. full_url TEXT NOT NULL,
  89. href_url TEXT NOT NULL,
  90. title TEXT NOT NULL,
  91. tag TEXT NOT NULL,
  92. UNIQUE(cl_id)
  93. )
  94. ''')
  95. conn.commit()
  96. skip_counter = 0
  97. save_line_count = 0
  98. for data in self.all_data.get(tag, []):
  99. cl_id, full_url, href_url, title = data
  100. c.execute(f'SELECT cl_id FROM "CL" WHERE cl_id=?', (cl_id,))
  101. if not c.fetchone():
  102. c.execute(f'INSERT INTO "CL" (cl_id, full_url, href_url, title, tag) VALUES (?, ?, ?, ?, ?)',
  103. (cl_id, full_url, href_url, title, tag))
  104. conn.commit()
  105. save_line_count += 1
  106. else:
  107. skip_counter += 1
  108. # print(f"数据已存在,跳过。当前跳过次数:{skip_counter}")
  109. if skip_counter >= 10:
  110. break
  111. c.close()
  112. conn.close()
  113. return save_line_count
  114. async def main(self):
  115. await self.get_data()
  116. if not self.all_data:
  117. print('无法获取数据')
  118. return
  119. save_line_count = 0
  120. for tag in self.all_data:
  121. save_line_count += self.save_to_db(tag)
  122. print(f'保存成功,共保存{save_line_count}条数据')
  123. class LOADCLDATA:
  124. def __init__(self, db_name='cl.db'):
  125. self.db_name = db_name
  126. self.conn = None
  127. self.cursor = None
  128. def connect(self):
  129. """连接到 SQLite 数据库"""
  130. self.conn = sqlite3.connect(self.db_name)
  131. self.cursor = self.conn.cursor()
  132. def fetch_all_data(self):
  133. """查询表中的所有数据"""
  134. self.cursor.execute("SELECT * FROM CL")
  135. rows = self.cursor.fetchall()
  136. print(f'\n\n数据库共有{len(rows)}条数据')
  137. return rows
  138. def filter_by_title(self, filter_list):
  139. """
  140. 根据 filter_list 中的元素模糊匹配 title 字段,并打印匹配的结果
  141. :param filter_list: 包含需要匹配的关键词的列表
  142. """
  143. if not filter_list:
  144. print("filter_list 为空,未进行匹配。")
  145. return
  146. # 构建 SQL 的 WHERE 条件
  147. like_conditions = " OR ".join(["Title LIKE ?"] * len(filter_list))
  148. query = f"SELECT * FROM CL WHERE {like_conditions}"
  149. # 构建参数列表,每个关键词前后加上 % 用于模糊匹配
  150. params = [f'%{keyword}%' for keyword in filter_list]
  151. # 执行查询
  152. self.cursor.execute(query, params)
  153. matched_rows = self.cursor.fetchall()
  154. # 打印匹配的结果
  155. if matched_rows:
  156. print("\n匹配到的结果:")
  157. for row in matched_rows:
  158. print(
  159. f"ID: {row[0]}, Tag: {row[5]}, Full_URL: {row[2]}, Title: {row[4]}")
  160. print(f"匹配到的总行数: {len(matched_rows)}")
  161. else:
  162. print("没有匹配到任何结果。")
  163. def close(self):
  164. """关闭数据库连接"""
  165. if self.conn:
  166. self.conn.close()
  167. if __name__ == '__main__':
  168. if opt == 1:
  169. cl = GETCLDATA()
  170. asyncio.run(cl.main())
  171. print('done')
  172. elif opt == 2:
  173. filter_list = ['']
  174. cl = LOADCLDATA()
  175. cl.connect()
  176. cl.filter_by_title(filter_list)
  177. cl.fetch_all_data()
  178. cl.close()