# -*- coding: utf-8 -*- # 内部数据库同步数据脚本, 用于同一台服务器, 将mongodb数据同步到pgsql from datetime import datetime from pymongo import MongoClient import psycopg2 from psycopg2 import sql MONGOLINK = 'mongodb://root:aaaAAA111!!!@erhe.top:38000/' PGSQLPARAMS = { # 'database': 'postgres', # 默认连接到postgres数据库,超级用户数据库 'user': 'toor', 'password': 'aaaAAA111', # 替换为你的密码 'host': '192.168.31.177', 'port': 5432 } def mongo(): # mongodb client = MongoClient(MONGOLINK) # 指定数据库名称 db_name = 'NEWS' # 替换为你的数据库名称 # 选择数据库 db = client[db_name] # 列出数据库中的所有集合 collections = db.list_collection_names() all_data = [] for collection_name in collections: # 选择集合 collection = db[collection_name] # 读取集合中的所有数据 for document in collection.find({}, {'_id': 0}): all_data.append(document) sorted_data = [] if all_data: sorted_data = sorted(all_data, key=lambda x: x['create_time'], reverse=True) return sorted_data def pg(sorted_data): table_name = 'auto' PGSQLPARAMS.update({'database': table_name}) conn = psycopg2.connect(**PGSQLPARAMS) # 记录相同数据, 如果查过指定数量, 则退出程序, 避免浪费资源 same_data = 500 for doc in sorted_data: if same_data > 0: try: cur = conn.cursor() create_time_dt = None if doc.get('create_time'): create_time_dt = datetime.utcfromtimestamp(doc['create_time']) values = { 'name': doc.get('title'), 'context': doc.get('context') or '', 'source_url': doc.get('source_url') or '', 'link': doc.get('line') or '', 'article_type': doc.get('article_type') or '', 'article_source': doc.get('article_source') or '', 'img_url': doc.get('img_url') or '', 'keyword': doc.get('keyword') or '', 'posted_date': doc.get('posted_date') or '', 'create_time_ts': doc.get('create_time') or '', 'create_time': create_time_dt, 'create_datetime': datetime.strptime(doc['create_datetime'], '%Y-%m-%d %H:%M:%S') if doc.get( 'create_datetime') else None } # 将create_time转换为适合数据库的时间戳格式 create_time_dt = datetime.utcfromtimestamp(values['create_time_ts']) if values[ 'create_time_ts'] else None values['create_time'] = create_time_dt # 将create_datetime转换为适合数据库的时间戳格式 create_datetime_str = doc.get('create_datetime') values['create_datetime'] = datetime.strptime(create_datetime_str, '%Y-%m-%d %H:%M:%S') if create_datetime_str else None # 检查数据库中是否已存在相同title的记录 check_query = "SELECT id FROM news_info WHERE name = %s;" cur.execute(check_query, (values['name'],)) # 如果没有找到记录,则插入新记录 if not cur.fetchone(): insert_query = """ INSERT INTO news_info (name, context, source_url, link, article_type, article_source, img_url, keyword, posted_date, create_time, create_datetime) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cur.execute(insert_query, ( values['name'], values['context'], values['source_url'], values['link'], values['article_type'], values['article_source'], values['img_url'], values['keyword'], values['posted_date'], values['create_time'], values['create_datetime'] )) conn.commit() print(f'已保存{values}') else: same_data -= 1 except Exception as e: print("Error during search: ", e) finally: # 关闭游标和连接 if 'cur' in locals(): cur.close() else: print('相同数据计数已到最大值, 退出程序') break conn.close() def create_db(): db_name = 'postgres' PGSQLPARAMS.update({'database': db_name}) # 连接到数据库 conn = psycopg2.connect(**PGSQLPARAMS) conn.autocommit = True # 确保创建数据库的命令可以立即执行 # 创建一个cursor对象 cur = conn.cursor() # 要创建的数据库名 new_db = 'auto' # 数据库所有者 owner = 'toor' try: # 检查数据库是否存在 cur.execute(sql.SQL("SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s"), (new_db,)) exists = cur.fetchone() if not exists: # 如果数据库不存在,创建它 cur.execute(sql.SQL("CREATE DATABASE {} WITH OWNER {} ENCODING 'UTF8'").format( sql.Identifier(new_db), sql.Identifier(owner) )) print(f"Database '{new_db}' created successfully.") else: print(f"Database '{new_db}' already exists.") except Exception as e: print(f"An error occurred: {e}") finally: # 关闭cursor和连接 cur.close() conn.close() def create_table(): db_name = 'auto' PGSQLPARAMS.update({'database': db_name}) # 创建数据库连接 conn = psycopg2.connect(**PGSQLPARAMS) # 创建一个 cursor 对象 cur = conn.cursor() # 定义表结构的 SQL 语句 create_table_sql = """ CREATE TABLE IF NOT EXISTS news_info ( id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, context TEXT, source_url VARCHAR(255), link VARCHAR(255), article_type VARCHAR(50), article_source VARCHAR(50), img_url VARCHAR(255), keyword VARCHAR(255), posted_date VARCHAR, create_time_ts BIGINT, create_time TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP, create_datetime TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP ); """ try: # 执行 SQL 语句来创建表 cur.execute(create_table_sql) conn.commit() # 提交事务 print("Table 'auto' created successfully.") except Exception as e: print(f"An error occurred while creating table: {e}") conn.rollback() # 发生错误时回滚事务 finally: # 关闭 cursor 和连接 cur.close() conn.close() if __name__ == '__main__': create_db() create_table() sorted_data = mongo() if sorted_data: pg(sorted_data) print("Done!")