| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- # -*-coding: utf-8 -*-
- import pymongo
- from pymongo import errors
- import time
- import sys
- import os
- sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
- from base.base_load_config import load_config, get_base_path
- config_json = load_config()
- base_project = get_base_path()
- DB_USER = config_json.get('DB_USER')
- DB_PASSWORD = config_json.get('DB_PASSWORD')
- DB_IP = config_json.get('DB_IP')
- DB_PORT = config_json.get('DB_PORT')
- MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'
- class MongoHandle(object):
- def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0):
- self.client = pymongo.MongoClient(MONGO_LINK)
- self.db = db
- self.collection = collection
- if del_db and db:
- # 检查数据库是否存在
- if db in self.client.list_database_names():
- # 删除数据库
- self.client.drop_database(db)
- self.db = self.client[db]
- if del_collection and self.collection:
- # 检查集合是否存在
- if self.collection in self.db.list_collection_names():
- # 删除集合
- self.db.drop_collection(collection)
- self.collection = self.db[collection]
- if auto_remove:
- self.auto_remove_data(auto_remove)
- def write_data(self, data):
- self.collection.insert_one(data)
- def load_data(self):
- # MongoDB 会在第一次写入时自动创建数据库和集合
- return list(self.collection.find({}, {'_id': False}))
- def auto_remove_data(self, day):
- for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}):
- self.collection.delete_one({'_id': data['_id']})
- # if __name__ == '__main__':
- # mongo = MongoHandle('test_db', 'test_collection', False, False, 0)
- # mongo.collection.insert_one({'name': 'test'})
- # mongo.collection.insert_many([{'name': 'test1'}, {'name': 'test2'}])
- # print(mongo.collection.find_one())
- # print(mongo.collection.find())
- # print('done!')
|