jack 3 månader sedan
förälder
incheckning
c4ffdf51df

+ 1 - 1
config.py

@@ -24,7 +24,7 @@ class AppConfig(BaseModel):
     data_dir: str = "data"
     downloads_dir: str = "data/downloads"
     targets_file: str = "data/targets.txt"
-    proxy_file: str = "proxy.txt"
+    proxy_file: str = "data/proxy.txt"
     
     # 爬虫配置
     concurrency: int = 20

+ 0 - 1
proxy.txt

@@ -1 +0,0 @@
-127.0.0.1:7890

+ 2 - 0
src/__init__.py

@@ -0,0 +1,2 @@
+# src package initialization
+# This file makes Python treat the directory as a package

+ 2 - 0
src/core/__init__.py

@@ -0,0 +1,2 @@
+# core package initialization
+# Contains core business logic

+ 83 - 0
src/core/step1.py

@@ -0,0 +1,83 @@
+from __future__ import annotations
+
+import asyncio
+import json
+import logging
+import os
+import re
+import sys
+from pathlib import Path
+from typing import Dict, List, Optional
+
+import httpx
+from bs4 import BeautifulSoup
+from tqdm.asyncio import tqdm_asyncio
+
+from config import config
+from ..logging.logger import get_logger
+from ..logging.realtime_logger import realtime_logger
+
+log = get_logger("step1")
+
+ILLEGAL_CHARS = re.compile(r'[\\/:*?"<>|]')
+
+def clean_folder_name(name: str) -> str:
+    """清理文件夹名称中的非法字符"""
+    return ILLEGAL_CHARS.sub('', name).strip()
+
+async def fetch_page(url: str, client: httpx.AsyncClient) -> Optional[str]:
+    """获取页面内容"""
+    try:
+        resp = await client.get(url)
+        resp.raise_for_status()
+        return resp.text
+    except httpx.HTTPError as e:
+        log.error(f"Failed to fetch {url}: {str(e)}")
+        return None
+
+async def crawl_single_gallery(gallery_url: str, client: httpx.AsyncClient, sem: asyncio.Semaphore):
+    """爬取单个画廊"""
+    async with sem:
+        try:
+            gallery_url = gallery_url.rstrip()
+            base_url = gallery_url.split('/g/')[0]
+            folder_name = clean_folder_name(gallery_url.split('/')[-1])
+            folder_path = Path(config.targets_path) / folder_name
+            json_path = folder_path / "info.json"
+
+            if json_path.exists():
+                log.info(f"Skipping existing gallery: {gallery_url}")
+                return
+
+            os.makedirs(folder_path, exist_ok=True)
+
+            log.info(f"Processing gallery: {gallery_url}")
+            page_content = await fetch_page(gallery_url, client)
+            if not page_content:
+                return
+
+            soup = BeautifulSoup(page_content, 'html.parser')
+            json_data = {
+                "gallery_url": gallery_url,
+                "base_url": base_url,
+                "title": soup.select_one('h1#gn').text.strip(),
+                "images": []
+            }
+
+            # 提取图片信息
+            # ... (省略具体实现)
+
+            json_path.write_text(json.dumps(json_data, indent=2, ensure_ascii=False))
+            realtime_logger.broadcast_log_sync(f"Processed gallery: {gallery_url}", "INFO", "step1")
+
+        except Exception as e:
+            log.error(f"Error processing {gallery_url}: {str(e)}")
+            realtime_logger.broadcast_log_sync(f"Error in {gallery_url}: {str(e)}", "ERROR", "step1")
+            raise
+
+async def main(proxy: Optional[str] = None):
+    """主函数"""
+    # ... (省略具体实现)
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 63 - 0
src/core/step2.py

@@ -0,0 +1,63 @@
+from __future__ import annotations
+
+import asyncio
+import json
+import logging
+import os
+import re
+import sys
+from pathlib import Path
+from typing import Dict, List
+
+import aiofiles
+import httpx
+from tqdm.asyncio import tqdm_asyncio
+
+from config import config
+from ..logging.logger import get_logger
+from ..logging.realtime_logger import realtime_logger
+
+log = get_logger("step2")
+
+async def download_one(url: str, client: httpx.AsyncClient, sem: asyncio.Semaphore):
+    """下载单个图片"""
+    async with sem:
+        try:
+            resp = await client.get(url)
+            resp.raise_for_status()
+            
+            # 提取真实图片URL和扩展名
+            # ... (省略具体实现)
+            
+            async with aiofiles.open(final_path, 'wb') as fp:
+                async for chunk in resp.aiter_bytes():
+                    await fp.write(chunk)
+            
+            realtime_logger.broadcast_log_sync(f"Downloaded {url}", "INFO", "step2")
+            return True
+        except Exception as e:
+            log.error(f"Failed to download {url}: {str(e)}")
+            realtime_logger.broadcast_log_sync(f"Error downloading {url}: {str(e)}", "ERROR", "step2")
+            return False
+
+async def scan_tasks(root: Path) -> List[Dict[str, str]]:
+    """扫描任务目录"""
+    tasks = []
+    for json_path in root.rglob("info.json"):
+        try:
+            data = json.loads(json_path.read_text())
+            for url, info in data["images"].items():
+                tasks.append({
+                    "url": info["real_url"],
+                    "save_path": json_path.parent / info["filename"]
+                })
+        except Exception as e:
+            log.warning(f"Error reading {json_path}: {str(e)}")
+    return tasks
+
+async def main(proxy: Optional[str] = None):
+    """主函数"""
+    # ... (省略具体实现)
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 2 - 0
src/logging/__init__.py

@@ -0,0 +1,2 @@
+# logging package initialization
+# Contains logging and real-time logging functionality

+ 59 - 0
src/logging/logger.py

@@ -0,0 +1,59 @@
+import logging
+import sys
+from pathlib import Path
+from typing import Optional
+
+import threading
+from config import config
+# 延迟导入,避免循环依赖
+from .realtime_logger import realtime_logger
+
+class LoggerManager:
+    @staticmethod
+    def setup_root_logger(log_file: Optional[str] = None, level: str = "INFO"):
+        logger = logging.getLogger()
+        logger.setLevel(level.upper())
+
+        console_handler = logging.StreamHandler(sys.stdout)
+        console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
+        logger.addHandler(console_handler)
+
+        if log_file:
+            file_handler = logging.FileHandler(log_file)
+            file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
+            logger.addHandler(file_handler)
+
+    @staticmethod
+    def get_logger(name: str, log_file: Optional[str] = None, level: str = "INFO"):
+        logger = logging.getLogger(name)
+        logger.setLevel(level.upper())
+
+        if not logger.handlers:
+            console_handler = logging.StreamHandler(sys.stdout)
+            console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
+            logger.addHandler(console_handler)
+
+            if log_file:
+                file_handler = logging.FileHandler(log_file)
+                file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
+                logger.addHandler(file_handler)
+
+        return logger
+
+class WebSocketLogHandler(logging.Handler):
+    def __init__(self):
+        super().__init__()
+        self.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
+
+    def emit(self, record):
+        try:
+            msg = self.format(record)
+            realtime_logger.broadcast_log_sync(msg, record.levelname, record.name)
+        except Exception:
+            self.handleError(record)
+
+def get_logger(name: str, log_file: Optional[str] = None, level: str = "INFO"):
+    return LoggerManager.get_logger(name, log_file, level)
+
+def setup_root_logger(log_file: Optional[str] = None, level: str = "INFO"):
+    LoggerManager.setup_root_logger(log_file, level)

+ 77 - 0
src/logging/realtime_logger.py

@@ -0,0 +1,77 @@
+import asyncio
+import json
+import time
+import threading
+from typing import List, Dict, Any, Optional
+from pathlib import Path
+import logging
+
+from config import config
+
+class RealtimeLogger:
+    _instance = None
+    _lock = threading.Lock()
+
+    def __init__(self):
+        self.logger = logging.getLogger("realtime_logger")
+        self._loop = None
+        self._connections: List[Dict[str, Any]] = []
+        self._log_buffer: List[Dict[str, Any]] = []
+        self._buffer_lock = threading.Lock()
+        self._max_buffer_size = 100
+
+    def set_loop(self, loop):
+        self._loop = loop
+
+    def add_connection(self, websocket):
+        with self._buffer_lock:
+            self._connections.append({"websocket": websocket, "last_active": time.time()})
+
+    def remove_connection(self, websocket):
+        with self._buffer_lock:
+            self._connections = [conn for conn in self._connections if conn["websocket"] != websocket]
+
+    async def broadcast_log(self, message: str, level: str = "INFO", source: str = "system"):
+        log_entry = {
+            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
+            "level": level,
+            "source": source,
+            "message": message
+        }
+
+        with self._buffer_lock:
+            self._log_buffer.append(log_entry)
+            if len(self._log_buffer) > self._max_buffer_size:
+                self._log_buffer.pop(0)
+
+            disconnected = []
+            for connection in self._connections:
+                try:
+                    await connection["websocket"].send_text(json.dumps(log_entry))
+                    connection["last_active"] = time.time()
+                except Exception as e:
+                    self.logger.warning(f"Failed to send log to websocket: {e}")
+                    disconnected.append(connection["websocket"])
+
+            for ws in disconnected:
+                self.remove_connection(ws)
+
+    def broadcast_log_sync(self, message: str, level: str = "INFO", source: str = "system"):
+        if self._loop is None:
+            self.logger.warning("Event loop not set for RealtimeLogger")
+            return
+
+        asyncio.run_coroutine_threadsafe(
+            self.broadcast_log(message, level, source),
+            self._loop
+        )
+
+    def get_recent_logs(self, count: int = 10) -> List[Dict[str, Any]]:
+        with self._buffer_lock:
+            return self._log_buffer[-count:].copy()
+
+    def clear_buffer(self):
+        with self._buffer_lock:
+            self._log_buffer.clear()
+
+realtime_logger = RealtimeLogger()

+ 2 - 0
src/services/__init__.py

@@ -0,0 +1,2 @@
+# services package initialization
+# Contains service classes and components

+ 43 - 0
src/services/downloader.py

@@ -0,0 +1,43 @@
+import aiofiles
+import httpx
+from typing import Optional
+import os
+
+from config import config
+
+class Downloader:
+    def __init__(self, proxy_str: Optional[str] = None):
+        self.proxy = proxy_str
+
+    async def download(self, url: str, save_path: str):
+        os.makedirs(os.path.dirname(save_path), exist_ok=True)
+        
+        async with httpx.AsyncClient(proxies=self.proxy) as client:
+            response = await client.get(url)
+            response.raise_for_status()
+            
+            async with aiofiles.open(save_path, 'wb') as f:
+                await f.write(response.content)
+
+    def _get_filename(self, url: str, content_type: Optional[str] = None) -> str:
+        if not url:
+            raise Exception("URL cannot be empty")
+        
+        filename = url.split('/')[-1]
+        if content_type:
+            ext = content_type.split('/')[-1]
+            if '.' not in filename:
+                filename = f"{filename}.{ext}"
+        
+        return filename
+
+    async def download_image(self, url: str, save_dir: str):
+        async with httpx.AsyncClient(proxies=self.proxy) as client:
+            response = await client.get(url)
+            response.raise_for_status()
+            
+            filename = self._get_filename(url, response.headers.get('content-type'))
+            save_path = os.path.join(save_dir, filename)
+            
+            await self.download(url, save_path)
+            return save_path

+ 2 - 0
src/utils/__init__.py

@@ -0,0 +1,2 @@
+# utils package initialization
+# Contains utility functions and helpers

+ 57 - 0
src/utils/performance.py

@@ -0,0 +1,57 @@
+import asyncio
+import time
+from typing import Dict, Any
+from functools import wraps
+
+from ..logging.logger import get_logger
+
+class PerformanceMonitor:
+    def __init__(self):
+        self.logger = get_logger("performance")
+        self._timers: Dict[str, Dict[str, Any]] = {}
+
+    def monitor_performance(self, func):
+        @wraps(func)
+        async def async_wrapper(*args, **kwargs):
+            start_time = time.time()
+            self.logger.info(f"Starting {func.__name__}")
+            try:
+                result = await func(*args, **kwargs)
+                elapsed = time.time() - start_time
+                self.logger.info(f"Completed {func.__name__} in {elapsed:.2f}s")
+                return result
+            except Exception as e:
+                self.logger.error(f"Error in {func.__name__}: {str(e)}")
+                raise
+
+        @wraps(func)
+        def sync_wrapper(*args, **kwargs):
+            start_time = time.time()
+            self.logger.info(f"Starting {func.__name__}")
+            try:
+                result = func(*args, **kwargs)
+                elapsed = time.time() - start_time
+                self.logger.info(f"Completed {func.__name__} in {elapsed:.2f}s")
+                return result
+            except Exception as e:
+                self.logger.error(f"Error in {func.__name__}: {str(e)}")
+                raise
+
+        if asyncio.iscoroutinefunction(func):
+            return async_wrapper
+        return sync_wrapper
+
+    def start_timer(self, name: str):
+        self._timers[name] = {"start": time.time()}
+
+    def end_timer(self, name: str) -> float:
+        if name not in self._timers:
+            raise KeyError(f"No timer with name {name}")
+        
+        elapsed = time.time() - self._timers[name]["start"]
+        self._timers[name]["end"] = time.time()
+        self._timers[name]["elapsed"] = elapsed
+        return elapsed
+
+    def get_summary(self) -> Dict[str, float]:
+        return {name: data["elapsed"] for name, data in self._timers.items() if "elapsed" in data}

+ 25 - 0
src/utils/utils.py

@@ -0,0 +1,25 @@
+from typing import Optional
+
+from ..logging.logger import get_logger
+from ..core.step1 import main as step1_main
+from ..core.step2 import main as step2_main
+
+log = get_logger("utils")
+
+async def run_step1(proxy: Optional[str] = None):
+    try:
+        log.info("Running step1")
+        await step1_main(proxy)
+        log.info("Step1 completed successfully")
+    except Exception as e:
+        log.exception(f"Error in step1: {str(e)}")
+        raise
+
+async def run_step2(proxy: Optional[str] = None):
+    try:
+        log.info("Running step2")
+        await step2_main(proxy)
+        log.info("Step2 completed successfully")
+    except Exception as e:
+        log.exception(f"Error in step2: {str(e)}")
+        raise