cl_save.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import asyncio
  2. import random
  3. import re
  4. import sqlite3
  5. import httpx
  6. class GETCLDATA:
  7. def __init__(self):
  8. self.base_url = 'https://t66y.com/'
  9. self.target_url_dict = {
  10. 'cavalry': 'thread0806.php?fid=15&search=&page={}',
  11. 'infantry': 'thread0806.php?fid=2&search=&page={}',
  12. }
  13. self.headers = {
  14. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'
  15. }
  16. self.all_data = {}
  17. async def fetch_page(self, client, url, tag, page):
  18. # print(f'正在获取 {tag} 第 {page} 页数据')
  19. sleep_time = random.uniform(3, 5)
  20. # print(f'程序暂停{sleep_time}')
  21. await asyncio.sleep(sleep_time)
  22. try:
  23. response = await client.get(url, headers=self.headers)
  24. if response.status_code != 200:
  25. print('连接失败')
  26. return None
  27. response.encoding = 'utf-8'
  28. return response.text
  29. except Exception as e:
  30. print(e)
  31. return None
  32. def parse_html(self, html, tag):
  33. target_list = re.findall(r'<h3>(.*?)</h3>', html)
  34. if not target_list:
  35. print(f'未找到任何h3标签内容,tag: {tag}')
  36. return
  37. for i in target_list:
  38. if '隨時更新' in i or '免翻地址' in i or '发布原创' in i or '版規' in i or 'VR' in i or 'vr' in i:
  39. continue
  40. href_url_list = re.findall(r'<a href="(.*?)"', i)
  41. cl_id_list = re.findall(r'id="(.*?)">', i)
  42. title_list = re.findall(r'target="_blank" id=".*?">(.*?)</a>', i)
  43. for herf_url, cl_id, title in zip(href_url_list, cl_id_list, title_list):
  44. if not self.all_data.setdefault(tag):
  45. self.all_data[tag] = [
  46. [cl_id, self.base_url + herf_url, herf_url, title]]
  47. else:
  48. self.all_data[tag].append(
  49. [cl_id, self.base_url + herf_url, herf_url, title])
  50. async def get_data(self):
  51. async with httpx.AsyncClient(proxy="http://127.0.0.1:7890") as client:
  52. tasks = []
  53. for tag, target_url in self.target_url_dict.items():
  54. for page in range(1, 100):
  55. url = self.base_url + target_url.format(page)
  56. task = asyncio.create_task(
  57. self.fetch_page(client, url, tag, page))
  58. tasks.append(task)
  59. htmls = await asyncio.gather(*tasks)
  60. for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
  61. if html:
  62. self.parse_html(html, tag)
  63. def save_to_db(self, tag):
  64. conn = sqlite3.connect('cl.db')
  65. c = conn.cursor()
  66. c.execute('''
  67. CREATE TABLE IF NOT EXISTS 'CL' (
  68. id INTEGER PRIMARY KEY AUTOINCREMENT,
  69. cl_id TEXT NOT NULL,
  70. full_url TEXT NOT NULL,
  71. href_url TEXT NOT NULL,
  72. title TEXT NOT NULL,
  73. tag TEXT NOT NULL,
  74. UNIQUE(cl_id)
  75. )
  76. ''')
  77. conn.commit()
  78. skip_counter = 0
  79. save_line_count = 0
  80. for data in self.all_data.get(tag, []):
  81. cl_id, full_url, href_url, title = data
  82. c.execute(f'SELECT cl_id FROM "CL" WHERE cl_id=?', (cl_id,))
  83. if not c.fetchone():
  84. c.execute(f'INSERT INTO "CL" (cl_id, full_url, href_url, title, tag) VALUES (?, ?, ?, ?, ?)',
  85. (cl_id, full_url, href_url, title, tag))
  86. conn.commit()
  87. save_line_count += 1
  88. else:
  89. skip_counter += 1
  90. # print(f"数据已存在,跳过。当前跳过次数:{skip_counter}")
  91. if skip_counter >= 10:
  92. break
  93. if save_line_count > 0:
  94. print(f'保存成功,共保存{save_line_count}条数据')
  95. else:
  96. print('全部数据已存在,没有保存任何数据')
  97. c.close()
  98. conn.close()
  99. async def main(self):
  100. await self.get_data()
  101. if not self.all_data:
  102. print('无法获取数据')
  103. return
  104. for tag in self.all_data:
  105. self.save_to_db(tag)
  106. if __name__ == '__main__':
  107. cl = GETCLDATA()
  108. asyncio.run(cl.main())
  109. print('done')