1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# -*- coding: utf-8 -*-
# @Time : 2023/6/17 18:11
# @Author : 南宫乘风
# @Email : 1794748404@qq.com
# @File : all_es.py
# @Software: PyCharm
from collections import Counter
from datetime import datetime, timedelta
import requests
from elasticsearch import Elasticsearch
from monitor.es_ding import send_pretty_message
# Elasticsearch客户端实例
es = Elasticsearch(hosts=['http://172.18.xxx.xxxx:9200'], http_auth=('elastic', 'xxxxx'),
sniff_on_start=True, # 连接前测试
sniff_on_connection_fail=True, # 节点无响应时刷新节点
sniff_timeout=300, # 设置超时时间
headers={'Content-Type': 'application/json'})
def format_timestamp(timestamp):
"""格式化时间为Elasticsearch接受的字符串格式"""
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
def search_errors():
"""执行查询,获取错误日志数据"""
current_time = datetime.now()
one_minute_ago = current_time - timedelta(minutes=10)
current_time_str = format_timestamp(current_time)
one_minute_ago_str = format_timestamp(one_minute_ago)
index = 'app-prod-*' # 替换为实际的索引名称
query = {
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": one_minute_ago_str,
"lt": current_time_str,
"format": "yyyy-MM-dd HH:mm:ss",
"time_zone": "+08:00"
}
}
},
{
"match": {
"loglevel": "ERROR" #匹配项目错误等级
}
},
{
"bool": {
"must_not": [
{
"match": {
"projectname": "fox-data-spiderman" # 需要屏蔽的项目
}
}
]
}
}
]
}
},
"_source": [ ## 输出的字段
"date",
"projectname",
"threadname",
"msg"
],
"from": 0,
"size": 10000, # 返回查询的条数
}
result = es.search(index=index, body=query)
total_documents = result["hits"]["total"]["value"]
print(f"总共匹配到 {total_documents} 条文档")
result = result['hits']['hits']
all_result = []
for i in result:
all_result.append(i['_source'])
msg_counter = Counter(d['msg'] for d in all_result if 'msg' in d)
results = []
for d in all_result:
if 'msg' in d and d['msg'] in msg_counter:
count = msg_counter[d['msg']]
del msg_counter[d['msg']]
d['count'] = count
d['msg'] = d['msg'][:100] + ('...' if len(d['msg']) > 100 else '')
results.append(d)
return results
def aggregate_errors(results):
"""按项目名称聚合错误日志"""
aggregated_data = {}
for d in results:
projectname = d.get('projectname')
if projectname:
if projectname not in aggregated_data:
aggregated_data[projectname] = []
aggregated_data[projectname].append({'date': d.get('date'), 'msg': d.get('msg'), 'count': d.get('count')})
return aggregated_data
def generate_summary(projectname, messages):
"""生成Markdown格式的消息摘要"""
markdown_text = f'### {projectname} \n\n'
for message in messages:
markdown_text += f"**时间:** {message['date']}\n\n"
markdown_text += f"**告警次数:** <font color='red'><b>{message['count']}</b></font>\n\n"
markdown_text += f"{message['msg']}\n\n---\n\n"
return markdown_text
def send_message_summary(projectname, messages):
"""发送摘要消息给钉钉机器人"""
summary = generate_summary(projectname, messages)
data = {
'msgtype': 'markdown',
'markdown': {
'title': f'{projectname}消息告警',
'text': summary
}
}
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxx' # 替换为实际的Webhook URL
response = requests.post(webhook_url, json=data)
if response.status_code == 200:
print('消息发送成功')
else:
print('消息发送失败')
if __name__ == '__main__':
errors = search_errors()
aggregated_errors = aggregate_errors(errors)
for projectname, messages in aggregated_errors.items():
print(f"{projectname}:")
print(messages)
|