utils_sync_psql.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. # -*- coding: utf-8 -*-
  2. # 内部数据库同步数据脚本, 用于同一台服务器, 将mongodb数据同步到pgsql
  3. from datetime import datetime
  4. from pymongo import MongoClient
  5. import psycopg2
  6. from psycopg2 import sql
  7. MONGOLINK = 'mongodb://root:aaaAAA111!!!@erhe.top:38000/'
  8. PGSQLPARAMS = {
  9. # 'database': 'postgres', # 默认连接到postgres数据库,超级用户数据库
  10. 'user': 'toor',
  11. 'password': 'aaaAAA111', # 替换为你的密码
  12. 'host': '192.168.31.177',
  13. 'port': 5432
  14. }
  15. def mongo():
  16. # mongodb
  17. client = MongoClient(MONGOLINK)
  18. # 指定数据库名称
  19. db_name = 'NEWS' # 替换为你的数据库名称
  20. # 选择数据库
  21. db = client[db_name]
  22. # 列出数据库中的所有集合
  23. collections = db.list_collection_names()
  24. all_data = []
  25. for collection_name in collections:
  26. # 选择集合
  27. collection = db[collection_name]
  28. # 读取集合中的所有数据
  29. for document in collection.find({}, {'_id': 0}):
  30. all_data.append(document)
  31. sorted_data = []
  32. if all_data:
  33. sorted_data = sorted(all_data, key=lambda x: x['create_time'], reverse=True)
  34. return sorted_data
  35. def pg(sorted_data):
  36. table_name = 'auto'
  37. PGSQLPARAMS.update({'database': table_name})
  38. conn = psycopg2.connect(**PGSQLPARAMS)
  39. # 记录相同数据, 如果查过指定数量, 则退出程序, 避免浪费资源
  40. same_data = 500
  41. for doc in sorted_data:
  42. if same_data > 0:
  43. try:
  44. cur = conn.cursor()
  45. create_time_dt = None
  46. if doc.get('create_time'):
  47. create_time_dt = datetime.utcfromtimestamp(doc['create_time'])
  48. values = {
  49. 'name': doc.get('title'),
  50. 'context': doc.get('context') or '',
  51. 'source_url': doc.get('source_url') or '',
  52. 'link': doc.get('line') or '',
  53. 'article_type': doc.get('article_type') or '',
  54. 'article_source': doc.get('article_source') or '',
  55. 'img_url': doc.get('img_url') or '',
  56. 'keyword': doc.get('keyword') or '',
  57. 'posted_date': doc.get('posted_date') or '',
  58. 'create_time_ts': doc.get('create_time') or '',
  59. 'create_time': create_time_dt,
  60. 'create_datetime': datetime.strptime(doc['create_datetime'], '%Y-%m-%d %H:%M:%S') if doc.get(
  61. 'create_datetime') else None
  62. }
  63. # 将create_time转换为适合数据库的时间戳格式
  64. create_time_dt = datetime.utcfromtimestamp(values['create_time_ts']) if values[
  65. 'create_time_ts'] else None
  66. values['create_time'] = create_time_dt
  67. # 将create_datetime转换为适合数据库的时间戳格式
  68. create_datetime_str = doc.get('create_datetime')
  69. values['create_datetime'] = datetime.strptime(create_datetime_str,
  70. '%Y-%m-%d %H:%M:%S') if create_datetime_str else None
  71. # 检查数据库中是否已存在相同title的记录
  72. check_query = "SELECT id FROM news_info WHERE name = %s;"
  73. cur.execute(check_query, (values['name'],))
  74. # 如果没有找到记录,则插入新记录
  75. if not cur.fetchone():
  76. insert_query = """
  77. INSERT INTO news_info (name, context, source_url, link, article_type, article_source, img_url, keyword, posted_date, create_time, create_datetime)
  78. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  79. """
  80. cur.execute(insert_query, (
  81. values['name'],
  82. values['context'],
  83. values['source_url'],
  84. values['link'],
  85. values['article_type'],
  86. values['article_source'],
  87. values['img_url'],
  88. values['keyword'],
  89. values['posted_date'],
  90. values['create_time'],
  91. values['create_datetime']
  92. ))
  93. conn.commit()
  94. print(f'已保存{values}')
  95. else:
  96. same_data -= 1
  97. except Exception as e:
  98. print("Error during search: ", e)
  99. finally:
  100. # 关闭游标和连接
  101. if 'cur' in locals():
  102. cur.close()
  103. else:
  104. print('相同数据计数已到最大值, 退出程序')
  105. break
  106. conn.close()
  107. def create_db():
  108. db_name = 'postgres'
  109. PGSQLPARAMS.update({'database': db_name})
  110. # 连接到数据库
  111. conn = psycopg2.connect(**PGSQLPARAMS)
  112. conn.autocommit = True # 确保创建数据库的命令可以立即执行
  113. # 创建一个cursor对象
  114. cur = conn.cursor()
  115. # 要创建的数据库名
  116. new_db = 'auto'
  117. # 数据库所有者
  118. owner = 'toor'
  119. try:
  120. # 检查数据库是否存在
  121. cur.execute(sql.SQL("SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s"), (new_db,))
  122. exists = cur.fetchone()
  123. if not exists:
  124. # 如果数据库不存在,创建它
  125. cur.execute(sql.SQL("CREATE DATABASE {} WITH OWNER {} ENCODING 'UTF8'").format(
  126. sql.Identifier(new_db),
  127. sql.Identifier(owner)
  128. ))
  129. print(f"Database '{new_db}' created successfully.")
  130. else:
  131. print(f"Database '{new_db}' already exists.")
  132. except Exception as e:
  133. print(f"An error occurred: {e}")
  134. finally:
  135. # 关闭cursor和连接
  136. cur.close()
  137. conn.close()
  138. def create_table():
  139. db_name = 'auto'
  140. PGSQLPARAMS.update({'database': db_name})
  141. # 创建数据库连接
  142. conn = psycopg2.connect(**PGSQLPARAMS)
  143. # 创建一个 cursor 对象
  144. cur = conn.cursor()
  145. # 定义表结构的 SQL 语句
  146. create_table_sql = """
  147. CREATE TABLE IF NOT EXISTS news_info (
  148. id SERIAL PRIMARY KEY,
  149. name VARCHAR(255) NOT NULL,
  150. context TEXT,
  151. source_url VARCHAR(255),
  152. link VARCHAR(255),
  153. article_type VARCHAR(50),
  154. article_source VARCHAR(50),
  155. img_url VARCHAR(255),
  156. keyword VARCHAR(255),
  157. posted_date VARCHAR,
  158. create_time_ts BIGINT,
  159. create_time TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP,
  160. create_datetime TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP
  161. );
  162. """
  163. try:
  164. # 执行 SQL 语句来创建表
  165. cur.execute(create_table_sql)
  166. conn.commit() # 提交事务
  167. print("Table 'auto' created successfully.")
  168. except Exception as e:
  169. print(f"An error occurred while creating table: {e}")
  170. conn.rollback() # 发生错误时回滚事务
  171. finally:
  172. # 关闭 cursor 和连接
  173. cur.close()
  174. conn.close()
  175. if __name__ == '__main__':
  176. create_db()
  177. create_table()
  178. sorted_data = mongo()
  179. if sorted_data:
  180. pg(sorted_data)
  181. print("Done!")