主题
运维安全平台实战
1. 运维安全平台概述
1.1 运维安全的重要性
在当今数字化时代,网络安全威胁日益复杂和频繁,运维安全已经成为企业IT基础设施的重要组成部分。运维安全平台的重要性体现在:
- 威胁防护:识别和防御各种网络安全威胁
- 漏洞管理:发现和修复系统和应用中的安全漏洞
- 合规要求:满足行业和法规的合规要求
- 安全审计:记录和分析安全事件,确保可追溯性
- 风险评估:评估系统和应用的安全风险
- 安全自动化:减少人工操作,提高安全响应速度
1.2 运维安全平台架构
mermaid
flowchart TD
subgraph 安全检测层
A[安全扫描器]
B[漏洞扫描器]
C[合规检查器]
D[入侵检测系统]
E[异常检测系统]
end
subgraph 安全管理层
F[漏洞管理系统]
G[合规管理系统]
H[安全审计系统]
I[风险评估系统]
J[安全配置管理]
end
subgraph 安全响应层
K[安全告警系统]
L[安全事件响应]
M[安全自动化响应]
N[安全知识库]
end
subgraph 基础设施层
O[服务器]
P[网络设备]
Q[应用系统]
R[数据库]
S[云服务]
end
A --> F
B --> F
C --> G
D --> K
E --> K
F --> I
G --> I
H --> N
K --> L
K --> M
L --> N
M --> N
A --> O
A --> P
A --> Q
A --> R
A --> S
B --> O
B --> P
B --> Q
B --> R
B --> S
C --> O
C --> P
C --> Q
C --> R
C --> S
D --> O
D --> P
D --> Q
D --> R
D --> S
E --> O
E --> P
E --> Q
E --> R
E --> S1.3 主流安全工具
| 工具类型 | 工具名称 | 功能 | 适用场景 |
|---|---|---|---|
| 漏洞扫描 | Nessus | 全面的漏洞扫描 | 企业级漏洞管理 |
| 漏洞扫描 | OpenVAS | 开源漏洞扫描 | 中小型企业 |
| 漏洞扫描 | Qualys | 云安全和合规 | 云环境 |
| 安全扫描 | Nmap | 网络扫描和端口探测 | 网络安全评估 |
| 安全扫描 | Nikto | Web服务器扫描 | Web应用安全 |
| 安全扫描 | Burp Suite | Web应用安全测试 | Web应用渗透测试 |
| 入侵检测 | Snort | 网络入侵检测 | 网络安全监控 |
| 入侵检测 | Suricata | 网络入侵检测和防御 | 网络安全监控 |
| 日志管理 | ELK Stack | 日志收集和分析 | 安全事件分析 |
| 日志管理 | Splunk | 安全信息和事件管理 | 企业级安全监控 |
| 合规管理 | Chef InSpec | 合规检查 | 配置合规性 |
| 合规管理 | OpenSCAP | 安全内容自动化协议 | 合规性评估 |
| 安全审计 | Auditd | Linux系统审计 | Linux安全审计 |
| 安全审计 | Windows Event Log | Windows事件日志 | Windows安全审计 |
2. 安全扫描技术
2.1 网络扫描
2.1.1 Nmap扫描
bash
# 基本扫描
nmap 192.168.1.0/24
# 端口扫描
nmap -p 1-1000 192.168.1.1
# 服务版本探测
nmap -sV 192.168.1.1
# OS检测
nmap -O 192.168.1.1
# 综合扫描
nmap -A 192.168.1.1
# 扫描速度
nmap -T4 192.168.1.0/24
# 输出格式
nmap -oX scan.xml 192.168.1.1
nmap -oG scan.txt 192.168.1.1
# 防火墙绕过
nmap -f 192.168.1.1
nmap --mtu 16 192.168.1.1
# 脚本扫描
nmap --script vuln 192.168.1.1
nmap --script auth 192.168.1.1
nmap --script discovery 192.168.1.12.1.2 Masscan扫描
bash
# 基本扫描
masscan 192.168.1.0/24 -p 80,443
# 调整扫描速度
masscan 192.168.1.0/24 -p 1-65535 --rate 10000
# 输出格式
masscan 192.168.1.0/24 -p 80,443 -oX scan.xml
masscan 192.168.1.0/24 -p 80,443 -oG scan.txt
# 排除IP
masscan 192.168.1.0/24 -p 80,443 --excludefile exclude.txt2.2 漏洞扫描
2.2.1 OpenVAS扫描
bash
# 启动OpenVAS
docker run -d -p 443:443 --name openvas mikesplain/openvas
# 访问Web界面
# https://localhost
# 默认用户名: admin
# 默认密码: admin
# 使用命令行扫描
openvas-cli --user=admin --pass=admin --xml="<create_task><name>scan</name><target id='target-id'/></create_task>"2.2.2 Nessus扫描
bash
# 安装Nessus
dpkg -i Nessus-8.13.1-debian6_amd64.deb
/etc/init.d/nessusd start
# 访问Web界面
# https://localhost:8834
# 使用命令行扫描
/opt/nessus/bin/nessuscli scan --create --name "My Scan" --targets "192.168.1.0/24"
/opt/nessus/bin/nessuscli scan --launch "My Scan"2.3 Web应用扫描
2.3.1 Nikto扫描
bash
# 基本扫描
nikto -h http://example.com
# 扫描多个目标
nikto -h targets.txt
# 端口扫描
nikto -h http://example.com -p 80,443
# 输出格式
nikto -h http://example.com -o report.html
nikto -h http://example.com -o report.txt
# 使用代理
nikto -h http://example.com -useproxy http://proxy:8080
# 调优扫描
nikto -h http://example.com -Tuning 123
# 1: 文件上传
# 2: 日志文件
# 3: 目录遍历2.3.2 OWASP ZAP扫描
bash
# 启动ZAP
docker run -p 8080:8080 -i owasp/zap2docker-stable zap.sh -daemon -host 0.0.0.0 -port 8080 -config api.addrs.addr.name=.* -config api.addrs.addr.regex=true
# 基本扫描
curl "http://localhost:8080/JSON/ascan/action/scan/?url=http://example.com&recurse=true&inScopeOnly=null&scanPolicyName=null&method=null&postData=null&contextId=null"
# 获取扫描状态
curl "http://localhost:8080/JSON/ascan/action/status/?scanId=1"
# 获取扫描结果
curl "http://localhost:8080/JSON/core/action/alertCount/"
curl "http://localhost:8080/JSON/core/view/alerts/?baseurl=http://example.com"2.4 安全扫描自动化
python
# 安全扫描自动化脚本
import subprocess
import json
import time
class SecurityScanner:
def __init__(self):
self.scan_results = {}
def nmap_scan(self, target):
"""执行Nmap扫描"""
print(f"Starting Nmap scan on {target}")
result = subprocess.run(
['nmap', '-A', '-oX', '-', target],
capture_output=True,
text=True
)
self.scan_results['nmap'] = result.stdout
return result.stdout
def nikto_scan(self, target):
"""执行Nikto扫描"""
print(f"Starting Nikto scan on {target}")
result = subprocess.run(
['nikto', '-h', target, '-o', '-'],
capture_output=True,
text=True
)
self.scan_results['nikto'] = result.stdout
return result.stdout
def openvas_scan(self, target):
"""执行OpenVAS扫描"""
print(f"Starting OpenVAS scan on {target}")
# 这里需要实现OpenVAS扫描逻辑
# 由于OpenVAS需要API调用,实现会更复杂
self.scan_results['openvas'] = f"Scan started on {target}"
return f"Scan started on {target}"
def generate_report(self, output_file):
"""生成扫描报告"""
with open(output_file, 'w') as f:
json.dump(self.scan_results, f, indent=2)
print(f"Report generated: {output_file}")
# 使用示例
if __name__ == "__main__":
scanner = SecurityScanner()
targets = ["192.168.1.1", "example.com"]
for target in targets:
scanner.nmap_scan(target)
if target.startswith('http'):
scanner.nikto_scan(target)
scanner.openvas_scan(target)
time.sleep(5) # 避免扫描过快
scanner.generate_report('security_scan_report.json')3. 漏洞管理系统
3.1 漏洞管理概述
漏洞管理是运维安全的核心组成部分,它包括以下流程:
- 发现:通过扫描和监控发现系统和应用中的漏洞
- 分类:根据漏洞的严重程度和影响范围进行分类
- 优先级:根据漏洞的风险等级确定修复优先级
- 修复:开发和应用漏洞修复方案
- 验证:验证漏洞是否已被修复
- 报告:生成漏洞管理报告,跟踪漏洞修复进度
3.2 漏洞管理系统实现
3.2.1 漏洞管理数据库设计
sql
-- 漏洞表
CREATE TABLE vulnerabilities (
id INT PRIMARY KEY AUTO_INCREMENT,
vuln_id VARCHAR(100) NOT NULL, -- CVE ID或其他漏洞标识符
title VARCHAR(255) NOT NULL,
description TEXT,
severity ENUM('low', 'medium', 'high', 'critical') NOT NULL,
cvss_score FLOAT,
discovered_date DATETIME NOT NULL,
last_seen_date DATETIME,
status ENUM('open', 'in_progress', 'fixed', 'false_positive') DEFAULT 'open',
fix_deadline DATETIME,
assignee VARCHAR(100),
remediation TEXT,
references TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 资产表
CREATE TABLE assets (
id INT PRIMARY KEY AUTO_INCREMENT,
asset_id VARCHAR(100) NOT NULL,
name VARCHAR(255) NOT NULL,
type ENUM('server', 'network', 'application', 'database', 'cloud') NOT NULL,
ip_address VARCHAR(50),
hostname VARCHAR(255),
location VARCHAR(255),
owner VARCHAR(100),
environment ENUM('production', 'staging', 'development') NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 漏洞-资产关联表
CREATE TABLE vulnerability_assets (
id INT PRIMARY KEY AUTO_INCREMENT,
vulnerability_id INT NOT NULL,
asset_id INT NOT NULL,
discovered_date DATETIME NOT NULL,
fixed_date DATETIME,
verification_date DATETIME,
verified_by VARCHAR(100),
FOREIGN KEY (vulnerability_id) REFERENCES vulnerabilities(id),
FOREIGN KEY (asset_id) REFERENCES assets(id)
);
-- 扫描表
CREATE TABLE scans (
id INT PRIMARY KEY AUTO_INCREMENT,
scan_id VARCHAR(100) NOT NULL,
scan_type ENUM('network', 'web', 'vulnerability', 'compliance') NOT NULL,
scanner_name VARCHAR(100) NOT NULL,
scan_date DATETIME NOT NULL,
status ENUM('running', 'completed', 'failed') NOT NULL,
scan_target TEXT NOT NULL,
scan_results TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);3.2.2 漏洞管理API实现
python
# 漏洞管理API
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/vulnerability_management'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class Vulnerability(db.Model):
id = db.Column(db.Integer, primary_key=True)
vuln_id = db.Column(db.String(100), nullable=False)
title = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text)
severity = db.Column(db.Enum('low', 'medium', 'high', 'critical'), nullable=False)
cvss_score = db.Column(db.Float)
discovered_date = db.Column(db.DateTime, nullable=False)
last_seen_date = db.Column(db.DateTime)
status = db.Column(db.Enum('open', 'in_progress', 'fixed', 'false_positive'), default='open')
fix_deadline = db.Column(db.DateTime)
assignee = db.Column(db.String(100))
remediation = db.Column(db.Text)
references = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Asset(db.Model):
id = db.Column(db.Integer, primary_key=True)
asset_id = db.Column(db.String(100), nullable=False)
name = db.Column(db.String(255), nullable=False)
type = db.Column(db.Enum('server', 'network', 'application', 'database', 'cloud'), nullable=False)
ip_address = db.Column(db.String(50))
hostname = db.Column(db.String(255))
location = db.Column(db.String(255))
owner = db.Column(db.String(100))
environment = db.Column(db.Enum('production', 'staging', 'development'), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class VulnerabilityAsset(db.Model):
id = db.Column(db.Integer, primary_key=True)
vulnerability_id = db.Column(db.Integer, db.ForeignKey('vulnerability.id'), nullable=False)
asset_id = db.Column(db.Integer, db.ForeignKey('asset.id'), nullable=False)
discovered_date = db.Column(db.DateTime, nullable=False)
fixed_date = db.Column(db.DateTime)
verification_date = db.Column(db.DateTime)
verified_by = db.Column(db.String(100))
# 漏洞管理API路由
@app.route('/api/vulnerabilities', methods=['GET'])
def get_vulnerabilities():
"""获取漏洞列表"""
vulnerabilities = Vulnerability.query.all()
result = []
for vuln in vulnerabilities:
result.append({
'id': vuln.id,
'vuln_id': vuln.vuln_id,
'title': vuln.title,
'description': vuln.description,
'severity': vuln.severity,
'cvss_score': vuln.cvss_score,
'discovered_date': vuln.discovered_date.isoformat(),
'last_seen_date': vuln.last_seen_date.isoformat() if vuln.last_seen_date else None,
'status': vuln.status,
'fix_deadline': vuln.fix_deadline.isoformat() if vuln.fix_deadline else None,
'assignee': vuln.assignee,
'remediation': vuln.remediation,
'references': vuln.references
})
return jsonify(result)
@app.route('/api/vulnerabilities', methods=['POST'])
def create_vulnerability():
"""创建漏洞"""
data = request.json
vulnerability = Vulnerability(
vuln_id=data['vuln_id'],
title=data['title'],
description=data.get('description'),
severity=data['severity'],
cvss_score=data.get('cvss_score'),
discovered_date=datetime.fromisoformat(data['discovered_date']),
last_seen_date=datetime.fromisoformat(data.get('last_seen_date')) if data.get('last_seen_date') else None,
status=data.get('status', 'open'),
fix_deadline=datetime.fromisoformat(data.get('fix_deadline')) if data.get('fix_deadline') else None,
assignee=data.get('assignee'),
remediation=data.get('remediation'),
references=data.get('references')
)
db.session.add(vulnerability)
db.session.commit()
return jsonify({'id': vulnerability.id}), 201
@app.route('/api/vulnerabilities/<int:id>', methods=['PUT'])
def update_vulnerability(id):
"""更新漏洞"""
data = request.json
vulnerability = Vulnerability.query.get(id)
if not vulnerability:
return jsonify({'error': 'Vulnerability not found'}), 404
if 'vuln_id' in data:
vulnerability.vuln_id = data['vuln_id']
if 'title' in data:
vulnerability.title = data['title']
if 'description' in data:
vulnerability.description = data['description']
if 'severity' in data:
vulnerability.severity = data['severity']
if 'cvss_score' in data:
vulnerability.cvss_score = data['cvss_score']
if 'status' in data:
vulnerability.status = data['status']
if 'fix_deadline' in data:
vulnerability.fix_deadline = datetime.fromisoformat(data['fix_deadline'])
if 'assignee' in data:
vulnerability.assignee = data['assignee']
if 'remediation' in data:
vulnerability.remediation = data['remediation']
if 'references' in data:
vulnerability.references = data['references']
vulnerability.last_seen_date = datetime.utcnow()
db.session.commit()
return jsonify({'id': vulnerability.id})
@app.route('/api/vulnerabilities/<int:id>', methods=['DELETE'])
def delete_vulnerability(id):
"""删除漏洞"""
vulnerability = Vulnerability.query.get(id)
if not vulnerability:
return jsonify({'error': 'Vulnerability not found'}), 404
db.session.delete(vulnerability)
db.session.commit()
return jsonify({'message': 'Vulnerability deleted'})
# 资产管理API路由
@app.route('/api/assets', methods=['GET'])
def get_assets():
"""获取资产列表"""
assets = Asset.query.all()
result = []
for asset in assets:
result.append({
'id': asset.id,
'asset_id': asset.asset_id,
'name': asset.name,
'type': asset.type,
'ip_address': asset.ip_address,
'hostname': asset.hostname,
'location': asset.location,
'owner': asset.owner,
'environment': asset.environment
})
return jsonify(result)
@app.route('/api/assets', methods=['POST'])
def create_asset():
"""创建资产"""
data = request.json
asset = Asset(
asset_id=data['asset_id'],
name=data['name'],
type=data['type'],
ip_address=data.get('ip_address'),
hostname=data.get('hostname'),
location=data.get('location'),
owner=data.get('owner'),
environment=data['environment']
)
db.session.add(asset)
db.session.commit()
return jsonify({'id': asset.id}), 201
if __name__ == '__main__':
db.create_all()
app.run(debug=True, host='0.0.0.0', port=5000)3.2.3 漏洞管理前端实现
vue
<template>
<div class="vulnerability-management">
<h1>漏洞管理系统</h1>
<!-- 漏洞统计 -->
<div class="stats">
<el-card class="stat-card">
<template #header>
<div class="card-header">
<span>总漏洞数</span>
</div>
</template>
<div class="stat-value">{{ totalVulnerabilities }}</div>
</el-card>
<el-card class="stat-card">
<template #header>
<div class="card-header">
<span>高危漏洞</span>
</div>
</template>
<div class="stat-value high">{{ highVulnerabilities }}</div>
</el-card>
<el-card class="stat-card">
<template #header>
<div class="card-header">
<span>未修复漏洞</span>
</div>
</template>
<div class="stat-value critical">{{ openVulnerabilities }}</div>
</el-card>
<el-card class="stat-card">
<template #header>
<div class="card-header">
<span>修复率</span>
</div>
</template>
<div class="stat-value">{{ fixRate }}%</div>
</el-card>
</div>
<!-- 漏洞列表 -->
<el-card>
<template #header>
<div class="card-header">
<span>漏洞列表</span>
<el-button type="primary" @click="openAddDialog">添加漏洞</el-button>
</div>
</template>
<el-table :data="vulnerabilities" style="width: 100%">
<el-table-column prop="vuln_id" label="漏洞ID" width="150" />
<el-table-column prop="title" label="标题" />
<el-table-column prop="severity" label="严重程度" width="100">
<template #default="scope">
<el-tag :type="getSeverityType(scope.row.severity)">
{{ scope.row.severity }}
</el-tag>
</template>
</el-table-column>
<el-table-column prop="cvss_score" label="CVSS评分" width="100" />
<el-table-column prop="discovered_date" label="发现日期" width="150" />
<el-table-column prop="status" label="状态" width="120">
<template #default="scope">
<el-tag :type="getStatusType(scope.row.status)">
{{ scope.row.status }}
</el-tag>
</template>
</el-table-column>
<el-table-column prop="assignee" label="负责人" width="120" />
<el-table-column label="操作" width="150">
<template #default="scope">
<el-button size="small" @click="editVulnerability(scope.row)">编辑</el-button>
<el-button size="small" type="danger" @click="deleteVulnerability(scope.row.id)">删除</el-button>
</template>
</el-table-column>
</el-table>
<el-pagination
v-model:current-page="currentPage"
v-model:page-size="pageSize"
:page-sizes="[10, 20, 50, 100]"
layout="total, sizes, prev, pager, next, jumper"
:total="totalVulnerabilities"
@size-change="handleSizeChange"
@current-change="handleCurrentChange"
style="margin-top: 20px"
/>
</el-card>
<!-- 添加/编辑漏洞对话框 -->
<el-dialog
v-model="dialogVisible"
:title="isEditing ? '编辑漏洞' : '添加漏洞'"
width="600px"
>
<el-form :model="formData" label-width="100px">
<el-form-item label="漏洞ID">
<el-input v-model="formData.vuln_id" />
</el-form-item>
<el-form-item label="标题">
<el-input v-model="formData.title" />
</el-form-item>
<el-form-item label="描述">
<el-input v-model="formData.description" type="textarea" :rows="3" />
</el-form-item>
<el-form-item label="严重程度">
<el-select v-model="formData.severity">
<el-option label="低" value="low" />
<el-option label="中" value="medium" />
<el-option label="高" value="high" />
<el-option label="严重" value="critical" />
</el-select>
</el-form-item>
<el-form-item label="CVSS评分">
<el-input v-model.number="formData.cvss_score" />
</el-form-item>
<el-form-item label="发现日期">
<el-date-picker v-model="formData.discovered_date" type="datetime" />
</el-form-item>
<el-form-item label="状态">
<el-select v-model="formData.status">
<el-option label="未修复" value="open" />
<el-option label="修复中" value="in_progress" />
<el-option label="已修复" value="fixed" />
<el-option label="误报" value="false_positive" />
</el-select>
</el-form-item>
<el-form-item label="负责人">
<el-input v-model="formData.assignee" />
</el-form-item>
<el-form-item label="修复方案">
<el-input v-model="formData.remediation" type="textarea" :rows="3" />
</el-form-item>
</el-form>
<template #footer>
<span class="dialog-footer">
<el-button @click="dialogVisible = false">取消</el-button>
<el-button type="primary" @click="saveVulnerability">保存</el-button>
</span>
</template>
</el-dialog>
</div>
</template>
<script>
export default {
data() {
return {
vulnerabilities: [],
totalVulnerabilities: 0,
highVulnerabilities: 0,
openVulnerabilities: 0,
fixRate: 0,
currentPage: 1,
pageSize: 10,
dialogVisible: false,
isEditing: false,
formData: {
id: '',
vuln_id: '',
title: '',
description: '',
severity: 'medium',
cvss_score: 0,
discovered_date: new Date(),
status: 'open',
assignee: '',
remediation: ''
}
}
},
mounted() {
this.loadVulnerabilities()
},
methods: {
loadVulnerabilities() {
// 实际实现中,这里需要调用API获取漏洞列表
this.vulnerabilities = [
{
id: 1,
vuln_id: 'CVE-2021-44228',
title: 'Log4Shell 远程代码执行漏洞',
description: 'Apache Log4j 存在远程代码执行漏洞,攻击者可以通过构造特殊请求触发漏洞',
severity: 'critical',
cvss_score: 10.0,
discovered_date: '2021-12-10T00:00:00Z',
status: 'in_progress',
assignee: '张三',
remediation: '升级Log4j到2.17.0或更高版本'
},
{
id: 2,
vuln_id: 'CVE-2022-22965',
title: 'Spring Cloud Gateway 远程代码执行漏洞',
description: 'Spring Cloud Gateway 存在远程代码执行漏洞,攻击者可以通过构造特殊请求触发漏洞',
severity: 'high',
cvss_score: 9.8,
discovered_date: '2022-03-31T00:00:00Z',
status: 'open',
assignee: '李四',
remediation: '升级Spring Cloud Gateway到3.1.1或更高版本'
},
{
id: 3,
vuln_id: 'CVE-2022-0847',
title: 'Dirty Pipe 本地提权漏洞',
description: 'Linux内核存在本地提权漏洞,攻击者可以通过该漏洞获取root权限',
severity: 'high',
cvss_score: 7.8,
discovered_date: '2022-03-07T00:00:00Z',
status: 'fixed',
assignee: '王五',
remediation: '升级Linux内核到5.16.11或更高版本'
}
]
this.calculateStats()
},
calculateStats() {
this.totalVulnerabilities = this.vulnerabilities.length
this.highVulnerabilities = this.vulnerabilities.filter(v => v.severity === 'high' || v.severity === 'critical').length
this.openVulnerabilities = this.vulnerabilities.filter(v => v.status === 'open').length
this.fixRate = Math.round((this.vulnerabilities.filter(v => v.status === 'fixed').length / this.totalVulnerabilities) * 100) || 0
},
getSeverityType(severity) {
switch(severity) {
case 'critical': return 'danger'
case 'high': return 'warning'
case 'medium': return 'info'
case 'low': return 'success'
default: return ''
}
},
getStatusType(status) {
switch(status) {
case 'open': return 'danger'
case 'in_progress': return 'warning'
case 'fixed': return 'success'
case 'false_positive': return 'info'
default: return ''
}
},
openAddDialog() {
this.isEditing = false
this.formData = {
id: '',
vuln_id: '',
title: '',
description: '',
severity: 'medium',
cvss_score: 0,
discovered_date: new Date(),
status: 'open',
assignee: '',
remediation: ''
}
this.dialogVisible = true
},
editVulnerability(vulnerability) {
this.isEditing = true
this.formData = { ...vulnerability }
this.formData.discovered_date = new Date(vulnerability.discovered_date)
this.dialogVisible = true
},
saveVulnerability() {
// 实际实现中,这里需要调用API保存漏洞
if (this.isEditing) {
const index = this.vulnerabilities.findIndex(v => v.id === this.formData.id)
if (index !== -1) {
this.vulnerabilities[index] = { ...this.formData }
}
} else {
this.formData.id = Date.now()
this.vulnerabilities.push({ ...this.formData })
}
this.calculateStats()
this.dialogVisible = false
},
deleteVulnerability(id) {
// 实际实现中,这里需要调用API删除漏洞
this.vulnerabilities = this.vulnerabilities.filter(v => v.id !== id)
this.calculateStats()
},
handleSizeChange(size) {
this.pageSize = size
this.loadVulnerabilities()
},
handleCurrentChange(current) {
this.currentPage = current
this.loadVulnerabilities()
}
}
}
</script>
<style scoped>
h1 {
margin-bottom: 20px;
}
.stats {
display: flex;
gap: 20px;
margin-bottom: 30px;
}
.stat-card {
flex: 1;
}
.card-header {
display: flex;
justify-content: space-between;
align-items: center;
}
.stat-value {
font-size: 36px;
font-weight: bold;
text-align: center;
margin-top: 20px;
}
.stat-value.high {
color: #e6a23c;
}
.stat-value.critical {
color: #f56c6c;
}
.stat-value.low {
color: #67c23a;
}
.stat-value.medium {
color: #409eff;
}
</style>4. 合规检查与管理
4.1 合规检查概述
合规检查是运维安全的重要组成部分,它确保系统和应用符合以下要求:
- 行业标准:如PCI DSS(支付卡行业数据安全标准)、HIPAA(健康保险便携性和责任法案)等
- 法规要求:如GDPR(通用数据保护条例)、SOX(萨班斯-奥克斯利法案)等
- 内部政策:企业内部制定的安全政策和标准
- 技术标准:如ISO 27001(信息安全管理体系)、NIST(美国国家标准与技术研究院)框架等
4.2 合规检查工具
4.2.1 OpenSCAP
bash
# 安装OpenSCAP
apt-get install openscap-scanner scap-security-guide
# 扫描系统
oscap xccdf eval --profile xccdf_org.ssgproject.content_profile_stig-rhel7-server-upstream --results scan.xml --report scan.html /usr/share/xml/scap/ssg/content/ssg-rhel7-ds.xml
# 查看扫描结果
firefox scan.html
# 查看合规状态
oscap xccdf eval --profile xccdf_org.ssgproject.content_profile_stig-rhel7-server-upstream --results scan.xml /usr/share/xml/scap/ssg/content/ssg-rhel7-ds.xml | grep -E "PASS|FAIL|ERROR"4.2.2 Chef InSpec
bash
# 安装Chef InSpec
curl https://omnitruck.chef.io/install.sh | bash -s -- -P inspec
# 创建合规检查配置文件
cat > compliance_profile.rb << EOF
name "Linux Compliance Profile"
title "Linux Compliance Profile"
maintainer "Your Name"
summary "InSpec Compliance Profile for Linux"
version "1.0.0"
control "os-01" do
title "Check OS Version"
desc "Ensure the OS version is up to date"
impact 1.0
describe command('cat /etc/os-release') do
its('stdout') { should include('Ubuntu 20.04') }
end
end
control "ssh-01" do
title "SSH Configuration"
desc "Ensure SSH is properly configured"
impact 0.7
describe sshd_config do
its('PermitRootLogin') { should eq('no') }
its('PasswordAuthentication') { should eq('no') }
its('PubkeyAuthentication') { should eq('yes') }
end
end
control "firewall-01" do
title "Firewall Configuration"
desc "Ensure firewall is enabled and configured"
impact 0.8
describe service('ufw') do
it { should be_installed }
it { should be_running }
end
end
EOF
# 执行合规检查
inspec exec compliance_profile.rb --reporter html:compliance_report.html
# 查看合规报告
firefox compliance_report.html4.3 合规管理系统实现
4.3.1 合规管理数据库设计
sql
-- 合规标准表
CREATE TABLE compliance_standards (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
description TEXT,
version VARCHAR(50),
type VARCHAR(100), -- 行业标准、法规要求、内部政策等
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 合规控制表
CREATE TABLE compliance_controls (
id INT PRIMARY KEY AUTO_INCREMENT,
standard_id INT NOT NULL,
control_id VARCHAR(100) NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT,
requirement TEXT,
severity ENUM('low', 'medium', 'high') NOT NULL,
test_procedure TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (standard_id) REFERENCES compliance_standards(id)
);
-- 合规检查结果表
CREATE TABLE compliance_scans (
id INT PRIMARY KEY AUTO_INCREMENT,
scan_id VARCHAR(100) NOT NULL,
standard_id INT NOT NULL,
asset_id INT NOT NULL,
scan_date DATETIME NOT NULL,
status ENUM('pass', 'fail', 'partial') NOT NULL,
pass_count INT NOT NULL,
fail_count INT NOT NULL,
total_count INT NOT NULL,
compliance_score FLOAT NOT NULL,
scan_results TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (standard_id) REFERENCES compliance_standards(id),
FOREIGN KEY (asset_id) REFERENCES assets(id)
);
-- 合规检查详细结果表
CREATE TABLE compliance_scan_details (
id INT PRIMARY KEY AUTO_INCREMENT,
scan_id INT NOT NULL,
control_id INT NOT NULL,
status ENUM('pass', 'fail', 'not_applicable', 'error') NOT NULL,
message TEXT,
evidence TEXT,
FOREIGN KEY (scan_id) REFERENCES compliance_scans(id),
FOREIGN KEY (control_id) REFERENCES compliance_controls(id)
);4.3.2 合规管理API实现
python
# 合规管理API
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/compliance_management'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class ComplianceStandard(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text)
version = db.Column(db.String(50))
type = db.Column(db.String(100))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class ComplianceControl(db.Model):
id = db.Column(db.Integer, primary_key=True)
standard_id = db.Column(db.Integer, db.ForeignKey('compliance_standard.id'), nullable=False)
control_id = db.Column(db.String(100), nullable=False)
title = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text)
requirement = db.Column(db.Text)
severity = db.Column(db.Enum('low', 'medium', 'high'), nullable=False)
test_procedure = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class ComplianceScan(db.Model):
id = db.Column(db.Integer, primary_key=True)
scan_id = db.Column(db.String(100), nullable=False)
standard_id = db.Column(db.Integer, db.ForeignKey('compliance_standard.id'), nullable=False)
asset_id = db.Column(db.Integer, nullable=False)
scan_date = db.Column(db.DateTime, nullable=False)
status = db.Column(db.Enum('pass', 'fail', 'partial'), nullable=False)
pass_count = db.Column(db.Integer, nullable=False)
fail_count = db.Column(db.Integer, nullable=False)
total_count = db.Column(db.Integer, nullable=False)
compliance_score = db.Column(db.Float, nullable=False)
scan_results = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
class ComplianceScanDetail(db.Model):
id = db.Column(db.Integer, primary_key=True)
scan_id = db.Column(db.Integer, db.ForeignKey('compliance_scan.id'), nullable=False)
control_id = db.Column(db.Integer, db.ForeignKey('compliance_control.id'), nullable=False)
status = db.Column(db.Enum('pass', 'fail', 'not_applicable', 'error'), nullable=False)
message = db.Column(db.Text)
evidence = db.Column(db.Text)
# 合规管理API路由
@app.route('/api/compliance/standards', methods=['GET'])
def get_compliance_standards():
"""获取合规标准列表"""
standards = ComplianceStandard.query.all()
result = []
for standard in standards:
result.append({
'id': standard.id,
'name': standard.name,
'description': standard.description,
'version': standard.version,
'type': standard.type
})
return jsonify(result)
@app.route('/api/compliance/standards', methods=['POST'])
def create_compliance_standard():
"""创建合规标准"""
data = request.json
standard = ComplianceStandard(
name=data['name'],
description=data.get('description'),
version=data.get('version'),
type=data.get('type')
)
db.session.add(standard)
db.session.commit()
return jsonify({'id': standard.id}), 201
@app.route('/api/compliance/standards/<int:id>/controls', methods=['GET'])
def get_compliance_controls(id):
"""获取合规控制列表"""
controls = ComplianceControl.query.filter_by(standard_id=id).all()
result = []
for control in controls:
result.append({
'id': control.id,
'control_id': control.control_id,
'title': control.title,
'description': control.description,
'requirement': control.requirement,
'severity': control.severity,
'test_procedure': control.test_procedure
})
return jsonify(result)
@app.route('/api/compliance/scans', methods=['POST'])
def create_compliance_scan():
"""创建合规扫描"""
data = request.json
scan = ComplianceScan(
scan_id=data['scan_id'],
standard_id=data['standard_id'],
asset_id=data['asset_id'],
scan_date=datetime.fromisoformat(data['scan_date']),
status=data['status'],
pass_count=data['pass_count'],
fail_count=data['fail_count'],
total_count=data['total_count'],
compliance_score=data['compliance_score'],
scan_results=data.get('scan_results')
)
db.session.add(scan)
db.session.commit()
# 创建扫描详情
for detail in data.get('details', []):
scan_detail = ComplianceScanDetail(
scan_id=scan.id,
control_id=detail['control_id'],
status=detail['status'],
message=detail.get('message'),
evidence=detail.get('evidence')
)
db.session.add(scan_detail)
db.session.commit()
return jsonify({'id': scan.id}), 201
@app.route('/api/compliance/scans/<int:id>', methods=['GET'])
def get_compliance_scan(id):
"""获取合规扫描详情"""
scan = ComplianceScan.query.get(id)
if not scan:
return jsonify({'error': 'Scan not found'}), 404
details = ComplianceScanDetail.query.filter_by(scan_id=id).all()
detail_result = []
for detail in details:
detail_result.append({
'id': detail.id,
'control_id': detail.control_id,
'status': detail.status,
'message': detail.message,
'evidence': detail.evidence
})
result = {
'id': scan.id,
'scan_id': scan.scan_id,
'standard_id': scan.standard_id,
'asset_id': scan.asset_id,
'scan_date': scan.scan_date.isoformat(),
'status': scan.status,
'pass_count': scan.pass_count,
'fail_count': scan.fail_count,
'total_count': scan.total_count,
'compliance_score': scan.compliance_score,
'scan_results': scan.scan_results,
'details': detail_result
}
return jsonify(result)
if __name__ == '__main__':
db.create_all()
app.run(debug=True, host='0.0.0.0', port=5000)5. 安全审计系统
5.1 安全审计概述
安全审计是运维安全的重要组成部分,它包括以下内容:
- 事件记录:记录系统和应用的安全事件
- 日志管理:收集、存储和管理安全日志
- 事件分析:分析安全事件,识别潜在的安全威胁
- 合规审计:确保系统和应用符合合规要求
- 取证分析:在安全事件发生后进行取证分析
- 审计报告:生成安全审计报告,提供合规证明
5.2 安全审计工具
5.2.1 Linux Auditd
bash
# 安装Auditd
apt-get install auditd audispd-plugins
# 启动Auditd
systemctl start auditd
systemctl enable auditd
# 配置Auditd规则
cat > /etc/audit/rules.d/audit.rules << EOF
# 监控系统调用
-a always,exit -F arch=b64 -S execve -k exec
# 监控文件访问
-w /etc/passwd -p wa -k passwd_changes
-w /etc/shadow -p wa -k shadow_changes
-w /etc/sudoers -p wa -k sudoers_changes
# 监控登录活动
-w /var/log/auth.log -p wa -k auth_log
# 监控网络配置
-w /etc/network/ -p wa -k network_config
# 监控防火墙配置
-w /etc/ufw/ -p wa -k firewall_config
EOF
# 重新加载规则
auditctl -R /etc/audit/rules.d/audit.rules
# 查看Auditd状态
auditctl -s
# 查看审计日志
auditd -l
aureport -a
aureport -x # 执行命令报告
aureport -l # 登录报告
aureport -f # 文件访问报告
# 实时监控审计日志
auditd -f5.2.2 ELK Stack
bash
# 使用Docker启动ELK Stack
docker run -d --name elk -p 5601:5601 -p 9200:9200 -p 5044:5044 sebp/elk
# 配置Filebeat收集审计日志
cat > /etc/filebeat/filebeat.yml << EOF
filebeat.inputs:
- type: log
paths:
- /var/log/audit/audit.log
fields:
log_type: audit
output.elasticsearch:
hosts: ["localhost:9200"]
setup.kibana:
host: "localhost:5601"
EOF
# 启动Filebeat
systemctl start filebeat
systemctl enable filebeat
# 访问Kibana
# http://localhost:5601
# 创建索引模式: filebeat-*
# 查看审计日志5.3 安全审计系统实现
5.3.1 安全审计数据库设计
sql
-- 审计事件表
CREATE TABLE audit_events (
id INT PRIMARY KEY AUTO_INCREMENT,
event_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL, -- 登录、文件访问、命令执行等
event_time DATETIME NOT NULL,
source_ip VARCHAR(50),
user VARCHAR(100),
action VARCHAR(255) NOT NULL,
object VARCHAR(255), -- 被操作的对象
result ENUM('success', 'failure') NOT NULL,
details TEXT, -- 事件详细信息
severity ENUM('low', 'medium', 'high', 'critical') DEFAULT 'medium',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- 审计规则表
CREATE TABLE audit_rules (
id INT PRIMARY KEY AUTO_INCREMENT,
rule_name VARCHAR(255) NOT NULL,
event_type VARCHAR(100) NOT NULL,
pattern TEXT, -- 事件匹配模式
severity ENUM('low', 'medium', 'high', 'critical') DEFAULT 'medium',
enabled BOOLEAN DEFAULT TRUE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 审计报告表
CREATE TABLE audit_reports (
id INT PRIMARY KEY AUTO_INCREMENT,
report_name VARCHAR(255) NOT NULL,
report_type VARCHAR(100) NOT NULL, -- 每日、每周、每月、自定义
start_time DATETIME NOT NULL,
end_time DATETIME NOT NULL,
total_events INT NOT NULL,
failed_events INT NOT NULL,
high_severity_events INT NOT NULL,
report_content TEXT, -- 报告内容
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);5.3.2 安全审计API实现
python
# 安全审计API
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/security_audit'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class AuditEvent(db.Model):
id = db.Column(db.Integer, primary_key=True)
event_id = db.Column(db.String(100), nullable=False)
event_type = db.Column(db.String(100), nullable=False)
event_time = db.Column(db.DateTime, nullable=False)
source_ip = db.Column(db.String(50))
user = db.Column(db.String(100))
action = db.Column(db.String(255), nullable=False)
object = db.Column(db.String(255))
result = db.Column(db.Enum('success', 'failure'), nullable=False)
details = db.Column(db.Text)
severity = db.Column(db.Enum('low', 'medium', 'high', 'critical'), default='medium')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
class AuditRule(db.Model):
id = db.Column(db.Integer, primary_key=True)
rule_name = db.Column(db.String(255), nullable=False)
event_type = db.Column(db.String(100), nullable=False)
pattern = db.Column(db.Text)
severity = db.Column(db.Enum('low', 'medium', 'high', 'critical'), default='medium')
enabled = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class AuditReport(db.Model):
id = db.Column(db.Integer, primary_key=True)
report_name = db.Column(db.String(255), nullable=False)
report_type = db.Column(db.String(100), nullable=False)
start_time = db.Column(db.DateTime, nullable=False)
end_time = db.Column(db.DateTime, nullable=False)
total_events = db.Column(db.Integer, nullable=False)
failed_events = db.Column(db.Integer, nullable=False)
high_severity_events = db.Column(db.Integer, nullable=False)
report_content = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 安全审计API路由
@app.route('/api/audit/events', methods=['GET'])
def get_audit_events():
"""获取审计事件列表"""
# 支持分页和过滤
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
event_type = request.args.get('event_type')
user = request.args.get('user')
start_time = request.args.get('start_time')
end_time = request.args.get('end_time')
query = AuditEvent.query
if event_type:
query = query.filter_by(event_type=event_type)
if user:
query = query.filter_by(user=user)
if start_time:
query = query.filter(AuditEvent.event_time >= datetime.fromisoformat(start_time))
if end_time:
query = query.filter(AuditEvent.event_time <= datetime.fromisoformat(end_time))
pagination = query.order_by(AuditEvent.event_time.desc()).paginate(page=page, per_page=per_page, error_out=False)
events = pagination.items
result = []
for event in events:
result.append({
'id': event.id,
'event_id': event.event_id,
'event_type': event.event_type,
'event_time': event.event_time.isoformat(),
'source_ip': event.source_ip,
'user': event.user,
'action': event.action,
'object': event.object,
'result': event.result,
'details': event.details,
'severity': event.severity
})
return jsonify({
'events': result,
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
})
@app.route('/api/audit/events', methods=['POST'])
def create_audit_event():
"""创建审计事件"""
data = request.json
event = AuditEvent(
event_id=data['event_id'],
event_type=data['event_type'],
event_time=datetime.fromisoformat(data['event_time']),
source_ip=data.get('source_ip'),
user=data.get('user'),
action=data['action'],
object=data.get('object'),
result=data['result'],
details=data.get('details'),
severity=data.get('severity', 'medium')
)
db.session.add(event)
db.session.commit()
return jsonify({'id': event.id}), 201
@app.route('/api/audit/rules', methods=['GET'])
def get_audit_rules():
"""获取审计规则列表"""
rules = AuditRule.query.all()
result = []
for rule in rules:
result.append({
'id': rule.id,
'rule_name': rule.rule_name,
'event_type': rule.event_type,
'pattern': rule.pattern,
'severity': rule.severity,
'enabled': rule.enabled
})
return jsonify(result)
@app.route('/api/audit/reports', methods=['POST'])
def create_audit_report():
"""创建审计报告"""
data = request.json
report = AuditReport(
report_name=data['report_name'],
report_type=data['report_type'],
start_time=datetime.fromisoformat(data['start_time']),
end_time=datetime.fromisoformat(data['end_time']),
total_events=data['total_events'],
failed_events=data['failed_events'],
high_severity_events=data['high_severity_events'],
report_content=data.get('report_content')
)
db.session.add(report)
db.session.commit()
return jsonify({'id': report.id}), 201
@app.route('/api/audit/reports/<int:id>', methods=['GET'])
def get_audit_report(id):
"""获取审计报告"""
report = AuditReport.query.get(id)
if not report:
return jsonify({'error': 'Report not found'}), 404
result = {
'id': report.id,
'report_name': report.report_name,
'report_type': report.report_type,
'start_time': report.start_time.isoformat(),
'end_time': report.end_time.isoformat(),
'total_events': report.total_events,
'failed_events': report.failed_events,
'high_severity_events': report.high_severity_events,
'report_content': report.report_content,
'created_at': report.created_at.isoformat()
}
return jsonify(result)
if __name__ == '__main__':
db.create_all()
app.run(debug=True, host='0.0.0.0', port=5000)6. 安全监控与告警
6.1 安全监控概述
安全监控是运维安全的重要组成部分,它包括以下内容:
- 实时监控:实时监控系统和应用的安全状态
- 异常检测:检测系统和应用中的异常行为
- 威胁识别:识别潜在的安全威胁
- 安全告警:当检测到安全事件时生成告警
- 事件关联:关联多个安全事件,识别复杂的安全攻击
- 安全可视化:通过仪表盘直观展示安全状态
6.2 安全监控工具
6.2.1 Prometheus + Grafana
bash
# 使用Docker启动Prometheus和Grafana
docker-compose up -d
# docker-compose.yml
version: '3'
services:
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['node-exporter:9100']
- job_name: 'auditd'
static_configs:
- targets: ['auditd-exporter:9102']
# 安装node_exporter
docker run -d --name node-exporter -p 9100:9100 prom/node-exporter
# 访问Grafana
# http://localhost:3000
# 添加Prometheus数据源
# 导入安全监控仪表盘6.2.2 Wazuh
bash
# 使用Docker启动Wazuh
docker run -d --name wazuh -p 5601:5601 -p 9200:9200 -p 50000:50000 wazuh/wazuh-docker
# 访问Wazuh界面
# http://localhost:5601
# 默认用户名: admin
# 默认密码: admin
# 安装Wazuh agent
apt-get install wazuh-agent
# 配置Wazuh agent
cat > /var/ossec/etc/ossec.conf << EOF
<ossec_config>
<client>
<server>
<address>wazuh-server-ip</address>
<port>1514</port>
<protocol>tcp</protocol>
</server>
<config-profile>ubuntu, debian</config-profile>
<notify_time>10</notify_time>
<time-reconnect>60</time-reconnect>
<auto_restart>yes</auto_restart>
<crypto_method>aes</crypto_method>
</client>
<syscheck>
<directories check_all="yes" report_changes="yes">/etc,/usr/bin,/usr/sbin</directories>
<directories check_all="yes" report_changes="yes">/bin,/sbin</directories>
</syscheck>
<rootcheck></rootcheck>
<wodle name="command">
<disabled>no</disabled>
<command>df -h</command>
<frequency>3600</frequency>
<report_changes>yes</report_changes>
</wodle>
</ossec_config>
EOF
# 启动Wazuh agent
systemctl start wazuh-agent
systemctl enable wazuh-agent6.3 安全监控系统实现
6.3.1 安全监控数据库设计
sql
-- 安全事件表
CREATE TABLE security_events (
id INT PRIMARY KEY AUTO_INCREMENT,
event_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL, -- 入侵尝试、异常访问、漏洞利用等
event_time DATETIME NOT NULL,
source_ip VARCHAR(50),
destination_ip VARCHAR(50),
source_port INT,
destination_port INT,
protocol VARCHAR(50),
severity ENUM('low', 'medium', 'high', 'critical') NOT NULL,
status ENUM('new', 'processed', 'dismissed', 'resolved') DEFAULT 'new',
description TEXT,
details TEXT, -- 事件详细信息
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 安全告警表
CREATE TABLE security_alerts (
id INT PRIMARY KEY AUTO_INCREMENT,
alert_id VARCHAR(100) NOT NULL,
event_id INT NOT NULL,
alert_time DATETIME NOT NULL,
severity ENUM('low', 'medium', 'high', 'critical') NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT,
status ENUM('triggered', 'acknowledged', 'resolved', 'suppressed') DEFAULT 'triggered',
assigned_to VARCHAR(100),
notification_sent BOOLEAN DEFAULT FALSE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATET