Featured image of post 集中管理与实时审计:构建Linux集群(1300台服务器)日志平台的最佳实践

集中管理与实时审计:构建Linux集群(1300台服务器)日志平台的最佳实践

随着企业IT基础设施的不断扩大,Linux服务器的数量也日益增多,传统的单机日志管理方式已无法满足对日志数据集中管理、审计和分析的需求。尤其是在大型集群环境中,如何高效地收集、存储和分析日志成为了一项重要的技术挑战。

简介

随着企业IT基础设施的不断扩大,Linux服务器的数量也日益增多,传统的单机日志管理方式已无法满足对日志数据集中管理、审计和分析的需求。尤其是在大型集群环境中,如何高效地收集、存储和分析日志成为了一项重要的技术挑战。

背景

在实现对大型Linux集群的日志集中管理与审计,特别是针对rsyslog日志的收集、操作命令日志以及登录日志的审计。通过部署集中的rsyslog服务端,能够统一收集1300多台Linux服务器的系统日志,确保日志数据的集中化管理。使用Filebeat作为日志收集工具,将日志数据推送至Logstash进行清洗和转换,最终存储到Elasticsearch中,并通过Kibana实现实时数据可视化展示。这种方式不仅简化了日志管理流程,还提高了系统的监控效率和安全性。

需求

统一管理 1300 台服务器的Linux系统日志,能够及时发现问题和告警。 在这里插入图片描述

解决方案

  1. 集中管理:通过统一的服务端收集所有Linux服务器的日志数据,减少单独配置每台服务器的工作量。
  2. 日志审计:对操作命令日志、登录日志等进行审计,确保系统行为的可追溯性。
  3. 数据清洗与分析:通过日志清洗与格式转换,确保日志数据的标准化,便于后续的分析和可视化展示。
  4. 实时展示:利用Kibana将清洗后的数据实时可视化,帮助运维人员快速发现潜在问题。

整体架构

为实现这些目标,我们设计了以下的系统架构:

  • rsyslog:作为日志收集的核心组件,它负责将来自Linux系统的各种日志(包括/var/log/messages等)统一推送到中心化的日志服务端。
  • Filebeat:作为轻量级的日志收集器,部署在各个Linux节点上,负责将日志文件传输到Logstash。
  • Logstash:对收集到的日志进行清洗、解析和转换,确保数据符合预定格式,便于存入Elasticsearch。
  • Elasticsearch:存储经过清洗和转换的日志数据,提供强大的全文搜索和数据查询功能。
  • Kibana:通过Kibana仪表盘实时展示存储在Elasticsearch中的日志数据,帮助运维人员进行数据分析和可视化展示。

rsyslog汇总

rsyslog服务端

首先配置rsyslog服务器,可以统一收集集群内部的日志。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 加载本地系统日志模块(例如通过 logger 命令发送的日志)
$ModLoad imuxsock
# 加载内核日志模块(之前由 rklogd 处理)
$ModLoad imklog
# 加载 UDP 模块,支持通过 UDP 协议接收日志
$ModLoad imudp
# 配置 UDP 服务器在 514 端口接收日志
$UDPServerRun 514
# 加载 TCP 模块,支持通过 TCP 协议接收日志
$ModLoad imtcp
# 配置 TCP 服务器在 514 端口接收日志
$InputTCPServerRun 514
# 定义一个自定义的日志格式模板 'myFormat'
$template myFormat,"%timestamp:::date-rfc3339% %fromhost-ip% %HOSTNAME% [%programname%] %syslogseverity-text%:%msg%\n"
# 设置默认的文件格式为传统的 rsyslog 格式
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat
# 加载 /etc/rsyslog.d/ 目录下的所有配置文件
$IncludeConfig /etc/rsyslog.d/*.conf
# 配置收集 info 级别的日志,排除 mail、authpriv、cron 类别的日志,输出到 /var/log/messages 文件,并使用 myFormat 格式
*.info;mail.none;authpriv.none;cron.none /var/log/messages;myFormat
# 收集所有 authpriv 类别的日志(通常是认证相关的日志),输出到 /var/log/secure 文件,并使用 myFormat 格式
authpriv.* /var/log/secure;myFormat
# 收集所有 mail 类别的日志,输出到 /var/log/maillog 文件,使用异步写入(- 表示异步)
mail.* -/var/log/maillog
# 收集所有 cron 类别的日志(定时任务日志),输出到 /var/log/cron 文件
cron.* /var/log/cron
# 收集所有紧急级别(emerg)的日志,将其通过系统消息发送
*.emerg :omusrmsg:*
# 收集 uucp 和 news 类别的严重级别(crit)日志,输出到 /var/log/spooler 文件
uucp,news.crit /var/log/spooler
# 收集所有 local7 类别的日志,输出到 /var/log/boot.log 文件
local7.* /var/log/boot.log

重启服务端

1
systemctl restart rsyslog

修改前: 在这里插入图片描述

修改后: 在这里插入图片描述

rsyslog客户端

所有集群的客户端配置最后一行IP,就可以把数据汇总在一切

1
2
3
[root@zabbix ~]# cat /etc/rsyslog.conf  |tail -n 2
#authpriv.*      @10.10.10.17
*.* @@192.168.102.20   # rsyslog 服务端的IP

重启

1
systemctl restart rsyslog.service

日志已经打印到rsyslog 服务端的 /var/log/messages /var/log/secure 等文件

在这里插入图片描述

filebeaet收集

在rsyslog 服务端的安装filebeaet,并且使用如下配置启动

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
filebeat.inputs:
- type: log
  enabled: true
  tail_files: true
  paths:
    - /var/log/messages
output.logstash:
  hosts: ["192.168.1.100:5514"]  
  # 把日志发送到logstatsh中

logstatsh配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
[root@game logstash]# cat config/rsyslog.conf
input {
   beats {
    port => 5514
    type => syslog
  }
}

filter {
    grok {
        match => { 
		"message" => "%{TIMESTAMP_ISO8601:time} %{IP:client_ip} %{HOSTNAME:host_name} \[%{DATA:type}\] %{GREEDYDATA:info}"
		}
		overwrite => ["message"]
		
    }
	mutate {  
	split => ["type",","]     
	}
	mutate{
                add_field =>   {
                 "types" => "%{[type][1]}"
} }
    mutate{remove_field => [ "tags","agent","host","log","ecs","type" ]}
	

	date {
        match => ["time", "yyyy-MM-dd HH:mm:ss,SSS", "UNIX"]
        target => "@timestamp"
        locale => "cn"
    }
}

output {
  stdout {
    codec=> rubydebug
  }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "message"
  }
}

数据格式

1
2024-12-03T18:35:30+08:00 192.168.102.30 master01 [kubelet] info: E1203 18:35:30.827608    1065 summary_sys_containers.go:83] "Failed to get system container stats" err="failed to get cgroup stats for \"/system.slice/docker.service\": failed to get container info for \"/system.slice/docker.service\": unknown container \"/system.slice/docker.service\"" containerName="/system.slice/docker.service"

GROK解析

1
%{TIMESTAMP_ISO8601:time} %{IP:client_ip} %{HOSTNAME:host_name} \[%{DATA:type}\] %{GREEDYDATA:info}

输出格式

1
2
3
4
5
6
7
{
  "client_ip": "192.168.102.30",
  "time": "2024-12-03T18:35:30+08:00",
  "type": "kubelet",
  "host_name": "master01",
  "info": "info: E1203 18:35:30.827608    1065 summary_sys_containers.go:83] \"Failed to get system container stats\" err=\"failed to get cgroup stats for \\\"/system.slice/docker.service\\\": failed to get container info for \\\"/system.slice/docker.service\\\": unknown container \\\"/system.slice/docker.service\\\"\" containerName=\"/system.slice/docker.service\"\r"
}

在这里插入图片描述

操作系统命令审计

Linux一般都是终端执行命令,我们可以让命令写到message日志上,在通过过滤,获取操作命令记录。

Linux终端配置

1
2
3
4
5
6
7
8
[root@zabbix ~]# cat /etc/profile | tail -n 2
unset MAILCHECK
export PROMPT_COMMAND='{ msg=$(history 1 | { read x y; echo $y; });logger -p local2.info "euid=$(whoami)" $(who am i) `pwd` "$msg"; }'


日志格式
Oct 11 17:32:41 zabbix root: euid=root root pts/0 2021-10-11 15:13 (10.10.10.3) /root cat /etc/profile
Oct 11 17:32:47 zabbix root: euid=root root pts/0 2021-10-11 15:13 (10.10.10.3) /root cat /etc/profile | tail -n 2

rsyslog服务端展示 在这里插入图片描述

filebeat配置

1
2
3
4
5
6
7
8
[root@logserver01 filebeat]# cat system_messages.yml 
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
  enabled: true
  tail_files: true
  paths:
    - /var/log/messages

logstatsh配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
[root@logserver01 config]# cat system_userlog_FromKafkaInES.conf
input{
    beats {
        host => '172.17.9.200'
        port => 5046
    }

#	kafka{
#		bootstrap_servers => ["172.17.8.232:6667"]
#		topics => ["sys_os_exe"]
#		codec => "json"
#		group_id => "ELK_SYSTEM_EXE_GROUP"
#		consumer_threads => 3
#		client_id => "logstash"
#		decorate_events => false
#		auto_offset_reset => "earliest"
#		request_timeout_ms => "300000"
#		session_timeout_ms => "20000"
#		max_poll_interval_ms => "600000"
#	}
}
filter{
	if ([message] =~ "euid"){
		grok{
			match => {"message" => '^(?<exetime>\d+-\d+-\d+)(?:[^\d]+)(?<hhmmss>\d+:\d+:\d+)(?:[^\d]+\d+:\d+)(?:\s)(?<deshost>[^ ]+)(?:\s)(?<name>[^ ]+)(?:\s\[)(?<loginuser>[^ |\]]*)(?:\]\s[^ ]+\seuid=)(?<exeuser>[^ ]+)(?:\s+)(?<userinfo>[^\(]+)(?:\s\()(?<srchost>[^\)]+)(?:\)\s)(?<exepath>[^ ]+)(\s+)(?<exeinfo>.*)'}
		}
		if "_grokparsefailure" in [tags] { drop { } }
		mutate{
			add_field => ["tmp_exeinfo","%{exeinfo}"]
		}
		mutate{
			split => ["exetime","-"]
			split => ["tmp_exeinfo"," "]
                }
		mutate{
			add_field => ["indextime","%{[exetime][0]}%{[exetime][1]}"]
			add_field => ["evtTime","%{[exetime][0]}-%{[exetime][1]}-%{[exetime][2]} %{hhmmss}"]
			add_field => ["cmd","%{[tmp_exeinfo][0]}"]

		}
		#Retention log insertion time to ES..............
		ruby { code => "event.set('inserttime', event.get('@timestamp').time.to_i)" }
		#replace InsertTime with evtTime "yyyy-MM-dd HH:mm:ss eg:2020-06-29 09:24:29"
		date{
                        match => ["evtTime","yyyy-MM-dd HH:mm:ss"]
              		#kibana use this time....................
			target => "@timestamp"
                }
		mutate{replace => ["evtTime","%{evtTime} +0800"]}
		date{
			match => ["evtTime","yyyy-MM-dd HH:mm:ss +0800"]
                       	timezone =>"UTC"
			#log event time timestamp................
                       	target => "logtimestamp"
                }
		#log event time long string......................
		ruby { code => "event.set('longtime', event.get('logtimestamp').time.to_i)" }

		mutate{remove_field => [ "tmp_exeinfo","evtTime","host","ecs","log","hhmmss","input","agent","exetime" ]}
	}
	else{
                drop{}
        }
}
output{
	stdout{codec => rubydebug}
	if [indextime] !~ "index"{
		elasticsearch{
			hosts => ["http://172.17.9.176:9200"]
			#hosts => "172.17.9.176"
			index => "sys_os_userlog_%{[indextime]}"
                        user => "*********"
                        password => "*********"
		}
	}
}

数据格式

1
2024-12-03T18:39:05+08:00 192.168.102.30 master01 [root] info: euid=root root pts/0 2024-12-03 18:21 (192.168.96.19) /root [2024-12-03 18:39:05]ip a

GROK解析

1
^(?<exetime>\d+-\d+-\d+)(?:[^\d]+)(?<hhmmss>\d+:\d+:\d+)(?:[^\d]+\d+:\d+)(?:\s)(?<deshost>[^ ]+)(?:\s)(?<name>[^ ]+)(?:\s\[)(?<loginuser>[^ |\]]*)(?:\]\s[^ ]+\seuid=)(?<exeuser>[^ ]+)(?:\s+)(?<userinfo>[^\(]+)(?:\s\()(?<srchost>[^\)]+)(?:\)\s)(?<exepath>[^ ]+)(\s+)(?<exeinfo>.*)

输出格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
  "loginuser": "root",
  "hhmmss": "18:39:05",
  "exepath": "/root",
  "deshost": "192.168.102.30",
  "srchost": "192.168.96.19",
  "name": "master01",
  "exeinfo": "[2024-12-03 18:39:05]ip a\r",
  "exeuser": "root",
  "userinfo": "root pts/0 2024-12-03 18:21",
  "exetime": "2024-12-03"
}

在这里插入图片描述

系统用户登录审计

filebeat配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[root@logserver01 filebeat]# cat system_secure.yml
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
  enabled: true
  tail_files: true
  paths:
    - /var/log/secure
#=========================== Filebeat outppp_id: messuts =============================
output.logstash:
  hosts: ["172.17.9.200:5045"]

logstatsh配置

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
[root@logserver01 config]# cat system_login_FromKafkaInES.conf
input{
    beats {
        host => '172.17.9.200'
        port => 5045
    }

#
#	kafka{
#		bootstrap_servers => ["172.17.8.232:6667"]
#		topics => ["sys_os_login"]
#		codec => "json"
#		group_id => "ELK_SYSTEM_LOGIN_GROUP"
#		consumer_threads => 3
#		client_id => "logstash"
#		decorate_events => false
#		auto_offset_reset => "earliest"
#		request_timeout_ms => "300000"
#		session_timeout_ms => "20000"
#		max_poll_interval_ms => "600000"
#	}
}
filter{
	#login successed log
	if ([message] =~ "Accepted"){
		grok{
			match => {"message" => '^(?<atime>\d+-\d+-\d+)(?:[^\d]+)(?<hhmmss>\d+:\d+:\d+)(?:[^\d]+\d+:\d+)(?:\s+)(?<deshost>\d+\.\d+\.\d+\.\d+)(?:\s)(?<name>[^ ]+)(?:[\S\s]*Failed\spassword\sfor[\sinvalid\suser]*\s)(?<loginuser>[^ ]+)(?:\sfrom\s)(?<srchost>[\d.]+)(?:\s\w+\s\d+\s)(?<loginmode>\w*)'}
		}
		if "_grokparsefailure" in [tags] { drop { } }
		mutate{
			add_field => ["type","systemlogin"]
			split => ["atime","-"]
                }
		mutate{
			add_field => ["indextime","%{[atime][0]}%{[atime][1]}"]
			add_field => ["evtTime","%{[atime][0]}-%{[atime][1]}-%{[atime][2]} %{hhmmss}"]
		}
		#Retention log insertion time to ES..............
		ruby { code => "event.set('inserttime', event.get('@timestamp').time.to_i)" }

		#replace InsertTime with evtTime "yyyy-MM-dd HH:mm:ss eg:2020-06-29 09:24:29"
		date{
                        match => ["evtTime","yyyy-MM-dd HH:mm:ss"]
              		#kibana use this time....................
			target => "@timestamp"
                }
		mutate{replace => ["evtTime","%{evtTime} +0800"]}
		date{
			match => ["evtTime","yyyy-MM-dd HH:mm:ss +0800"]
                       	timezone =>"UTC"
			#log event time timestamp................
                       	target => "logtimestamp"
                }
		#log event time long string......................
		ruby { code => "event.set('longtime', event.get('logtimestamp').time.to_i)" }
		mutate{remove_field => [ "evtTime","host","ecs","log","hhmmss","input","agent","atime" ]}
	}
	#login failed log
	else if ([message] =~ "Failed password for"){
		grok{
			match => {"message" => '^(?<atime>\d+-\d+-\d+)(?:[^\d]+)(?<hhmmss>\d+:\d+:\d+)(?:[^\d]+\d+:\d+)(?:\s+)(?<deshost>\d+\.\d+\.\d+\.\d+)(?:[\S\s]*Failed\spassword\sfor[\sinvalid\suser]*\s)(?<loginuser>[^ ]+)(?:\sfrom\s)(?<srchost>[\d.]+)(?:\s\w+\s\d+\s)(?<loginmode>\w*)'}
		}
		if "_grokparsefailure" in [tags] { drop { } }
                mutate{
                        add_field => ["type","systemloginfailed"]
                        split => ["atime","-"]
                }
                mutate{
                        add_field => ["indextime","%{[atime][0]}%{[atime][1]}"]
                        add_field => ["evtTime","%{[atime][0]}-%{[atime][1]}-%{[atime][2]} %{hhmmss}"]
                }
                #Retention log insertion time to ES..............
                ruby { code => "event.set('inserttime', event.get('@timestamp').time.to_i)" }

                #replace InsertTime with evtTime "yyyy-MM-dd HH:mm:ss eg:2020-06-29 09:24:29"
                date{
                        match => ["evtTime","yyyy-MM-dd HH:mm:ss"]
                        #kibana use this time....................
                        target => "@timestamp"
                }
                mutate{replace => ["evtTime","%{evtTime} +0800"]}
                date{
                        match => ["evtTime","yyyy-MM-dd HH:mm:ss +0800"]
                        timezone =>"UTC"
                        #log event time timestamp................
                        target => "logtimestamp"
                }
                #log event time long string......................
                ruby { code => "event.set('longtime', event.get('logtimestamp').time.to_i)" }

                mutate{remove_field => [ "evtTime","host","ecs","log","hhmmss","input","agent","atime" ]}
	}
	#other log
	else{
		drop{}
	}
}
output{
	if [type] == "systemlogin"{
		if [indextime] !~ "index"{
			stdout{codec => rubydebug}
			elasticsearch{
				hosts => "172.17.9.176"
				index => "sys_os_systemlogin_%{[indextime]}"
				user => "elastic"
				password => "f5OPbv6sqfstmc+"
                	}
		}
		
	}
	else if [type] == "systemloginfailed"{
                if [indextime] !~ "index"{
                        stdout{codec => rubydebug}
                        elasticsearch{
                               hosts => "172.17.9.176"
                               index => "sys_os_systemloginfailed_%{[indextime]}"
                               user => "elastic"
                               password => "xxxxxxx+"
                        }
                }

        }
}

成功日志解析

日志格式

1
2024-12-03T18:45:09+08:00 192.168.102.42 node03 [sshd] info: Accepted password for root from 192.168.96.19 port 14347 ssh2

GROK语法

1
%{TIMESTAMP_ISO8601:atime} %{IP:deshost} %{HOSTNAME:name} \[%{WORD:type}\] %{DATA:loglevel}: Accepted password for %{WORD:loginuser} from %{IP:srchost} port %{NUMBER:port} %{WORD:loginmode}

日志格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
  "loginuser": "root",
  "atime": "2024-12-03T18:45:09+08:00",
  "type": "sshd",
  "deshost": "192.168.102.42",
  "srchost": "192.168.96.19",
  "port": "14347",
  "loglevel": "info",
  "name": "node03",
  "loginmode": "ssh2"
}

在这里插入图片描述

失败日志格式

日志格式

1
2024-12-03T19:01:57+08:00 192.168.102.42 node03 [sshd] info: Failed password for invalid user 123 from 192.168.96.19 port 12103 ssh2

GROK语法

1
^(?<atime>\d+-\d+-\d+)(?:[^\d]+)(?<hhmmss>\d+:\d+:\d+)(?:[^\d]+\d+:\d+)(?:\s+)(?<deshost>\d+\.\d+\.\d+\.\d+)(?:[\S\s]*Failed\spassword\sfor[\sinvalid\suser]*\s)(?<loginuser>[^ ]+)(?:\sfrom\s)(?<srchost>[\d.]+)(?:\s\w+\s\d+\s)(?<loginmode>\w*)

日志输出

1
2
3
4
5
6
7
8
{
  "loginuser": "123",
  "atime": "2024-12-03",
  "hhmmss": "19:01:57",
  "deshost": "192.168.102.42",
  "srchost": "192.168.96.19",
  "loginmode": "ssh2"
}

在这里插入图片描述