| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- # -*- coding: utf-8 -*-
- # 内部数据库同步数据脚本, 用于同一台服务器, 将mongodb数据同步到pgsql
- from datetime import datetime
- from pymongo import MongoClient
- import psycopg2
- from psycopg2 import sql
- MONGOLINK = 'mongodb://root:aaaAAA111!!!@erhe.top:38000/'
- PGSQLPARAMS = {
- # 'database': 'postgres', # 默认连接到postgres数据库,超级用户数据库
- 'user': 'toor',
- 'password': 'aaaAAA111', # 替换为你的密码
- 'host': '192.168.31.177',
- 'port': 5432
- }
- def mongo():
- # mongodb
- client = MongoClient(MONGOLINK)
- # 指定数据库名称
- db_name = 'NEWS' # 替换为你的数据库名称
- # 选择数据库
- db = client[db_name]
- # 列出数据库中的所有集合
- collections = db.list_collection_names()
- all_data = []
- for collection_name in collections:
- # 选择集合
- collection = db[collection_name]
- # 读取集合中的所有数据
- for document in collection.find({}, {'_id': 0}):
- all_data.append(document)
- sorted_data = []
- if all_data:
- sorted_data = sorted(all_data, key=lambda x: x['create_time'], reverse=True)
- return sorted_data
- def pg(sorted_data):
- table_name = 'auto'
- PGSQLPARAMS.update({'database': table_name})
- conn = psycopg2.connect(**PGSQLPARAMS)
- # 记录相同数据, 如果查过指定数量, 则退出程序, 避免浪费资源
- same_data = 500
- for doc in sorted_data:
- if same_data > 0:
- try:
- cur = conn.cursor()
- create_time_dt = None
- if doc.get('create_time'):
- create_time_dt = datetime.utcfromtimestamp(doc['create_time'])
- values = {
- 'name': doc.get('title'),
- 'context': doc.get('context') or '',
- 'source_url': doc.get('source_url') or '',
- 'link': doc.get('line') or '',
- 'article_type': doc.get('article_type') or '',
- 'article_source': doc.get('article_source') or '',
- 'img_url': doc.get('img_url') or '',
- 'keyword': doc.get('keyword') or '',
- 'posted_date': doc.get('posted_date') or '',
- 'create_time_ts': doc.get('create_time') or '',
- 'create_time': create_time_dt,
- 'create_datetime': datetime.strptime(doc['create_datetime'], '%Y-%m-%d %H:%M:%S') if doc.get(
- 'create_datetime') else None
- }
- # 将create_time转换为适合数据库的时间戳格式
- create_time_dt = datetime.utcfromtimestamp(values['create_time_ts']) if values[
- 'create_time_ts'] else None
- values['create_time'] = create_time_dt
- # 将create_datetime转换为适合数据库的时间戳格式
- create_datetime_str = doc.get('create_datetime')
- values['create_datetime'] = datetime.strptime(create_datetime_str,
- '%Y-%m-%d %H:%M:%S') if create_datetime_str else None
- # 检查数据库中是否已存在相同title的记录
- check_query = "SELECT id FROM news_info WHERE name = %s;"
- cur.execute(check_query, (values['name'],))
- # 如果没有找到记录,则插入新记录
- if not cur.fetchone():
- insert_query = """
- INSERT INTO news_info (name, context, source_url, link, article_type, article_source, img_url, keyword, posted_date, create_time, create_datetime)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- cur.execute(insert_query, (
- values['name'],
- values['context'],
- values['source_url'],
- values['link'],
- values['article_type'],
- values['article_source'],
- values['img_url'],
- values['keyword'],
- values['posted_date'],
- values['create_time'],
- values['create_datetime']
- ))
- conn.commit()
- print(f'已保存{values}')
- else:
- same_data -= 1
- except Exception as e:
- print("Error during search: ", e)
- finally:
- # 关闭游标和连接
- if 'cur' in locals():
- cur.close()
- else:
- print('相同数据计数已到最大值, 退出程序')
- break
- conn.close()
- def create_db():
- db_name = 'postgres'
- PGSQLPARAMS.update({'database': db_name})
- # 连接到数据库
- conn = psycopg2.connect(**PGSQLPARAMS)
- conn.autocommit = True # 确保创建数据库的命令可以立即执行
- # 创建一个cursor对象
- cur = conn.cursor()
- # 要创建的数据库名
- new_db = 'auto'
- # 数据库所有者
- owner = 'toor'
- try:
- # 检查数据库是否存在
- cur.execute(sql.SQL("SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s"), (new_db,))
- exists = cur.fetchone()
- if not exists:
- # 如果数据库不存在,创建它
- cur.execute(sql.SQL("CREATE DATABASE {} WITH OWNER {} ENCODING 'UTF8'").format(
- sql.Identifier(new_db),
- sql.Identifier(owner)
- ))
- print(f"Database '{new_db}' created successfully.")
- else:
- print(f"Database '{new_db}' already exists.")
- except Exception as e:
- print(f"An error occurred: {e}")
- finally:
- # 关闭cursor和连接
- cur.close()
- conn.close()
- def create_table():
- db_name = 'auto'
- PGSQLPARAMS.update({'database': db_name})
- # 创建数据库连接
- conn = psycopg2.connect(**PGSQLPARAMS)
- # 创建一个 cursor 对象
- cur = conn.cursor()
- # 定义表结构的 SQL 语句
- create_table_sql = """
- CREATE TABLE IF NOT EXISTS news_info (
- id SERIAL PRIMARY KEY,
- name VARCHAR(255) NOT NULL,
- context TEXT,
- source_url VARCHAR(255),
- link VARCHAR(255),
- article_type VARCHAR(50),
- article_source VARCHAR(50),
- img_url VARCHAR(255),
- keyword VARCHAR(255),
- posted_date VARCHAR,
- create_time_ts BIGINT,
- create_time TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP,
- create_datetime TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP
- );
- """
- try:
- # 执行 SQL 语句来创建表
- cur.execute(create_table_sql)
- conn.commit() # 提交事务
- print("Table 'auto' created successfully.")
- except Exception as e:
- print(f"An error occurred while creating table: {e}")
- conn.rollback() # 发生错误时回滚事务
- finally:
- # 关闭 cursor 和连接
- cur.close()
- conn.close()
- if __name__ == '__main__':
- create_db()
- create_table()
- sorted_data = mongo()
- if sorted_data:
- pg(sorted_data)
- print("Done!")
|