1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
# -*- coding: utf-8 -*-
# @Time : 2022/7/12 10:44
# @File : MongoDB.py
# @Software: PyCharm
import configparser
import time
import pymongo
from alarm.tool.Loggers import logger
class mongo(object):
def __init__(self, config_file, db):
"""
:param config_file: 配置文件路径
:param db: 获取配置的环境
"""
# 实例化configparser
config = configparser.ConfigParser()
# 从配置文件中读取数据库的相关信息
config.read(config_file, encoding='utf-8')
self.host = config[db]['host']
self.port = int(config[db]['port'])
self.user = config[db]['user']
self.password = config[db]['password']
self.database = config[db]['database']
self.db = db
self.conn = None
self._conn()
def _conn(self):
try:
logger.info(f"读取环境:{self.db},连接信息:主机ip:{self.host},端口:{self.port},用户:{self.user},连接数据库:{self.database}")
# self.conn = pymongo.MongoClient(host=self.host,
# port=self.port)
self.conn = pymongo.MongoClient(host=self.host,
port=self.port) # username=self.user, password=self.password
# self.db_conn = self.conn[self.database]
# self.db_conn=self.db_conn.authenticate(self.user,self.password)
self.conn[self.database].authenticate(self.user, self.password, self.database)
self.db_conn = self.conn[self.database]
if self.conn.server_info():
logger.info(f"数据库: {self.database}初始化连接成功")
return True
except Exception as e:
logger.error(f"数据库: {self.database}初始化连接失败,错误:{e}")
return False
# MongoDB数据库关闭
def close(self):
self.conn.close()
logger.info(f"数据库关闭成功")
# 查询调用状态
def get_state(self):
return self.conn is not None # and self.db_conn is not None
def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
_number = 0
_status = True
logger.info(f"检查数据库{self.database}连通性,连接IP:{self.host}")
while _status and _number <= num:
try:
self.conn.server_info() # 检查数据库是否正常连通
_status = False
logger.info(f"数据库{self.database}连接============正常,连接IP:{self.host} ")
except:
if self._conn() == True: # 重新连接,成功退出
_status = False
break
_number += 1
logger.info(f"数据库{self.database}连接============失败,连接IP:{self.host} ")
time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
def insert_one(self, collection, data):
self._reConn()
if self.get_state():
ret = self.db_conn[collection].insert_one(data)
return ret.inserted_id
else:
return ""
def insert_many(self, collection, data):
if self.get_state():
ret = self.db_conn[collection].insert_many(data)
return ret.inserted_id
else:
return ""
def update(self, collection, data):
# data format:
# {key:[old_data,new_data]}
data_filter = {}
data_revised = {}
for key in data.keys():
data_filter[key] = data[key][0]
data_revised[key] = data[key][1]
if self.get_state():
return self.db_conn[collection].update_many(data_filter, {"$set": data_revised}).modified_count
return 0
def find(self, col, condition, column=None):
"""
查询数据代码
:param col: 数据库中的集合
:param condition: 查询条件,查询条件必须是个字典
:param column: find 的第二个参数是可选的,可以指定需要返回的键。这个特别的 "$slice" 运算符可以返回一个数组键中元素的子集。
:return: list 返回查询到记录的列表
"""
# print(col, condition)
# data= self.db_conn["sms_log"]
# data=self.db_conn["sms_log"].find({"status":"2","createTime":{"$gte": "2022/07/12 22:18:26"}},{"status":1,"channelCode":1,"_id":0})
# data = self.db_conn["authCode"].find({"use": False,"createdTime": {"$gte": 1657865035}})
# print(list(data))
self._reConn()
if self.get_state():
if column is None:
return list(self.db_conn[col].find(condition))
else:
return list(self.db_conn[col].find(condition, column))
else:
return None
def get_last_data(self, col, number=1):
if self.get_state():
# last_data = list(self.db_conn["authCode"].find().sort("_id", -1 ).limit(50))
last_data = list(self.db_conn[col].find().sort("_id", -1).limit(number))
return last_data
def delete(self, col, condition):
if self.get_state():
return self.db_conn[col].delete_many(filter=condition).deleted_count
return 0
def aggregate(self, col, condition):
if self.get_state():
return list(self.db_conn[col].aggregate(condition))
# 时间戳转换时间
def timestamp_to_time(timestamp):
timeArray = time.localtime(timestamp) # 转换为可用的时间,就是下面的%Y %m %d
# day_time = time.strftime("%Y-%m-%d", timeArray) # 取上面的timeArray中的对应值0
second_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray) # 这个一样的
return second_time # 返回相应的值
# 时间转换时间戳
def time_to_timestamp(time_str):
# 转换成时间数组
timeArray = time.strptime(time_str, "%Y-%m-%d %H:%M:%S")
# 转换成时间戳
timestamp = time.mktime(timeArray)
# print(timestamp)
return timestamp
if __name__ == '__main__':
logger.info("开始实例化数据库对象 ")
config_file = '../config/dbconfig.conf'
db = 'Devfubaodai'
db = mongo(config_file, db)
# 获取时间
data_time = '2022-07-18 00:00:00'
timestamp = time_to_timestamp(data_time)
# print(timestamp)
data = db.get_last_data("authCode", 1)
last_time_data = data[0]["createdTime"]
satrt_time_data = data[0]["createdTime"]-600
print(timestamp_to_time(last_time_data))
print(timestamp_to_time(satrt_time_data))
data = db.find("authCode", {"createdTime": {"$gte": satrt_time_data, "$lte": last_time_data}}, {"name": 1, "use": 1, "_id": 0})
num, fail = 0, 0
for i in data:
num += 1
print(i)
print(num)
# logger.info(f"短信---最新50条数据,成功使用数量:{num},未使用的数量:{fail}")
db.close()
logger.info("--------------------------------------------------------------------------------------------------")
|