import asyncio
import random
import re
import sqlite3
import httpx
class GETCLDATA:
def __init__(self):
self.base_url = 'https://t66y.com/'
self.target_url_dict = {
'cavalry': 'thread0806.php?fid=15&search=&page={}',
'infantry': 'thread0806.php?fid=2&search=&page={}',
}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0'
}
self.all_data = {}
async def fetch_page(self, client, url, tag, page):
# print(f'正在获取 {tag} 第 {page} 页数据')
sleep_time = random.uniform(3, 5)
# print(f'程序暂停{sleep_time}')
await asyncio.sleep(sleep_time)
try:
response = await client.get(url, headers=self.headers)
if response.status_code != 200:
print('连接失败')
return None
response.encoding = 'utf-8'
return response.text
except Exception as e:
print(e)
return None
def parse_html(self, html, tag):
target_list = re.findall(r'
(.*?)
', html)
if not target_list:
print(f'未找到任何h3标签内容,tag: {tag}')
return
for i in target_list:
if '隨時更新' in i or '免翻地址' in i or '发布原创' in i or '版規' in i or 'VR' in i or 'vr' in i:
continue
href_url_list = re.findall(r'', i)
title_list = re.findall(r'target="_blank" id=".*?">(.*?)', i)
for herf_url, cl_id, title in zip(href_url_list, cl_id_list, title_list):
if not self.all_data.setdefault(tag):
self.all_data[tag] = [
[cl_id, self.base_url + herf_url, herf_url, title]]
else:
self.all_data[tag].append(
[cl_id, self.base_url + herf_url, herf_url, title])
async def get_data(self):
async with httpx.AsyncClient(proxy="http://127.0.0.1:7890") as client:
tasks = []
for tag, target_url in self.target_url_dict.items():
for page in range(1, 100):
url = self.base_url + target_url.format(page)
task = asyncio.create_task(
self.fetch_page(client, url, tag, page))
tasks.append(task)
htmls = await asyncio.gather(*tasks)
for html, (tag, page) in zip(htmls, [(tag, page) for tag in self.target_url_dict for page in range(1, 100)]):
if html:
self.parse_html(html, tag)
def save_to_db(self, tag):
conn = sqlite3.connect('cl.db')
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS 'CL' (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cl_id TEXT NOT NULL,
full_url TEXT NOT NULL,
href_url TEXT NOT NULL,
title TEXT NOT NULL,
tag TEXT NOT NULL,
UNIQUE(cl_id)
)
''')
conn.commit()
skip_counter = 0
save_line_count = 0
for data in self.all_data.get(tag, []):
cl_id, full_url, href_url, title = data
c.execute(f'SELECT cl_id FROM "CL" WHERE cl_id=?', (cl_id,))
if not c.fetchone():
c.execute(f'INSERT INTO "CL" (cl_id, full_url, href_url, title, tag) VALUES (?, ?, ?, ?, ?)',
(cl_id, full_url, href_url, title, tag))
conn.commit()
save_line_count += 1
else:
skip_counter += 1
# print(f"数据已存在,跳过。当前跳过次数:{skip_counter}")
if skip_counter >= 10:
break
if save_line_count > 0:
print(f'保存成功,共保存{save_line_count}条数据')
else:
print('全部数据已存在,没有保存任何数据')
c.close()
conn.close()
async def main(self):
await self.get_data()
if not self.all_data:
print('无法获取数据')
return
for tag in self.all_data:
self.save_to_db(tag)
if __name__ == '__main__':
cl = GETCLDATA()
asyncio.run(cl.main())
print('done')