跳转到内容

运维安全平台实战

1. 运维安全平台概述

1.1 运维安全的重要性

在当今数字化时代,网络安全威胁日益复杂和频繁,运维安全已经成为企业IT基础设施的重要组成部分。运维安全平台的重要性体现在:

  • 威胁防护:识别和防御各种网络安全威胁
  • 漏洞管理:发现和修复系统和应用中的安全漏洞
  • 合规要求:满足行业和法规的合规要求
  • 安全审计:记录和分析安全事件,确保可追溯性
  • 风险评估:评估系统和应用的安全风险
  • 安全自动化:减少人工操作,提高安全响应速度

1.2 运维安全平台架构

mermaid
flowchart TD
    subgraph 安全检测层
        A[安全扫描器]
        B[漏洞扫描器]
        C[合规检查器]
        D[入侵检测系统]
        E[异常检测系统]
    end

    subgraph 安全管理层
        F[漏洞管理系统]
        G[合规管理系统]
        H[安全审计系统]
        I[风险评估系统]
        J[安全配置管理]
    end

    subgraph 安全响应层
        K[安全告警系统]
        L[安全事件响应]
        M[安全自动化响应]
        N[安全知识库]
    end

    subgraph 基础设施层
        O[服务器]
        P[网络设备]
        Q[应用系统]
        R[数据库]
        S[云服务]
    end

    A --> F
    B --> F
    C --> G
    D --> K
    E --> K
    F --> I
    G --> I
    H --> N
    K --> L
    K --> M
    L --> N
    M --> N
    A --> O
    A --> P
    A --> Q
    A --> R
    A --> S
    B --> O
    B --> P
    B --> Q
    B --> R
    B --> S
    C --> O
    C --> P
    C --> Q
    C --> R
    C --> S
    D --> O
    D --> P
    D --> Q
    D --> R
    D --> S
    E --> O
    E --> P
    E --> Q
    E --> R
    E --> S

1.3 主流安全工具

工具类型工具名称功能适用场景
漏洞扫描Nessus全面的漏洞扫描企业级漏洞管理
漏洞扫描OpenVAS开源漏洞扫描中小型企业
漏洞扫描Qualys云安全和合规云环境
安全扫描Nmap网络扫描和端口探测网络安全评估
安全扫描NiktoWeb服务器扫描Web应用安全
安全扫描Burp SuiteWeb应用安全测试Web应用渗透测试
入侵检测Snort网络入侵检测网络安全监控
入侵检测Suricata网络入侵检测和防御网络安全监控
日志管理ELK Stack日志收集和分析安全事件分析
日志管理Splunk安全信息和事件管理企业级安全监控
合规管理Chef InSpec合规检查配置合规性
合规管理OpenSCAP安全内容自动化协议合规性评估
安全审计AuditdLinux系统审计Linux安全审计
安全审计Windows Event LogWindows事件日志Windows安全审计

2. 安全扫描技术

2.1 网络扫描

2.1.1 Nmap扫描

bash
# 基本扫描
nmap 192.168.1.0/24

# 端口扫描
nmap -p 1-1000 192.168.1.1

# 服务版本探测
nmap -sV 192.168.1.1

# OS检测
nmap -O 192.168.1.1

# 综合扫描
nmap -A 192.168.1.1

# 扫描速度
nmap -T4 192.168.1.0/24

# 输出格式
nmap -oX scan.xml 192.168.1.1
nmap -oG scan.txt 192.168.1.1

# 防火墙绕过
nmap -f 192.168.1.1
nmap --mtu 16 192.168.1.1

# 脚本扫描
nmap --script vuln 192.168.1.1
nmap --script auth 192.168.1.1
nmap --script discovery 192.168.1.1

2.1.2 Masscan扫描

bash
# 基本扫描
masscan 192.168.1.0/24 -p 80,443

# 调整扫描速度
masscan 192.168.1.0/24 -p 1-65535 --rate 10000

# 输出格式
masscan 192.168.1.0/24 -p 80,443 -oX scan.xml
masscan 192.168.1.0/24 -p 80,443 -oG scan.txt

# 排除IP
masscan 192.168.1.0/24 -p 80,443 --excludefile exclude.txt

2.2 漏洞扫描

2.2.1 OpenVAS扫描

bash
# 启动OpenVAS
docker run -d -p 443:443 --name openvas mikesplain/openvas

# 访问Web界面
# https://localhost
# 默认用户名: admin
# 默认密码: admin

# 使用命令行扫描
openvas-cli --user=admin --pass=admin --xml="<create_task><name>scan</name><target id='target-id'/></create_task>"

2.2.2 Nessus扫描

bash
# 安装Nessus
dpkg -i Nessus-8.13.1-debian6_amd64.deb
/etc/init.d/nessusd start

# 访问Web界面
# https://localhost:8834

# 使用命令行扫描
/opt/nessus/bin/nessuscli scan --create --name "My Scan" --targets "192.168.1.0/24"
/opt/nessus/bin/nessuscli scan --launch "My Scan"

2.3 Web应用扫描

2.3.1 Nikto扫描

bash
# 基本扫描
nikto -h http://example.com

# 扫描多个目标
nikto -h targets.txt

# 端口扫描
nikto -h http://example.com -p 80,443

# 输出格式
nikto -h http://example.com -o report.html
nikto -h http://example.com -o report.txt

# 使用代理
nikto -h http://example.com -useproxy http://proxy:8080

# 调优扫描
nikto -h http://example.com -Tuning 123
# 1: 文件上传
# 2: 日志文件
# 3: 目录遍历

2.3.2 OWASP ZAP扫描

bash
# 启动ZAP
docker run -p 8080:8080 -i owasp/zap2docker-stable zap.sh -daemon -host 0.0.0.0 -port 8080 -config api.addrs.addr.name=.* -config api.addrs.addr.regex=true

# 基本扫描
curl "http://localhost:8080/JSON/ascan/action/scan/?url=http://example.com&recurse=true&inScopeOnly=null&scanPolicyName=null&method=null&postData=null&contextId=null"

# 获取扫描状态
curl "http://localhost:8080/JSON/ascan/action/status/?scanId=1"

# 获取扫描结果
curl "http://localhost:8080/JSON/core/action/alertCount/"
curl "http://localhost:8080/JSON/core/view/alerts/?baseurl=http://example.com"

2.4 安全扫描自动化

python
# 安全扫描自动化脚本
import subprocess
import json
import time

class SecurityScanner:
    def __init__(self):
        self.scan_results = {}
    
    def nmap_scan(self, target):
        """执行Nmap扫描"""
        print(f"Starting Nmap scan on {target}")
        result = subprocess.run(
            ['nmap', '-A', '-oX', '-', target],
            capture_output=True,
            text=True
        )
        self.scan_results['nmap'] = result.stdout
        return result.stdout
    
    def nikto_scan(self, target):
        """执行Nikto扫描"""
        print(f"Starting Nikto scan on {target}")
        result = subprocess.run(
            ['nikto', '-h', target, '-o', '-'],
            capture_output=True,
            text=True
        )
        self.scan_results['nikto'] = result.stdout
        return result.stdout
    
    def openvas_scan(self, target):
        """执行OpenVAS扫描"""
        print(f"Starting OpenVAS scan on {target}")
        # 这里需要实现OpenVAS扫描逻辑
        # 由于OpenVAS需要API调用,实现会更复杂
        self.scan_results['openvas'] = f"Scan started on {target}"
        return f"Scan started on {target}"
    
    def generate_report(self, output_file):
        """生成扫描报告"""
        with open(output_file, 'w') as f:
            json.dump(self.scan_results, f, indent=2)
        print(f"Report generated: {output_file}")

# 使用示例
if __name__ == "__main__":
    scanner = SecurityScanner()
    targets = ["192.168.1.1", "example.com"]
    
    for target in targets:
        scanner.nmap_scan(target)
        if target.startswith('http'):
            scanner.nikto_scan(target)
        scanner.openvas_scan(target)
        time.sleep(5)  # 避免扫描过快
    
    scanner.generate_report('security_scan_report.json')

3. 漏洞管理系统

3.1 漏洞管理概述

漏洞管理是运维安全的核心组成部分,它包括以下流程:

  1. 发现:通过扫描和监控发现系统和应用中的漏洞
  2. 分类:根据漏洞的严重程度和影响范围进行分类
  3. 优先级:根据漏洞的风险等级确定修复优先级
  4. 修复:开发和应用漏洞修复方案
  5. 验证:验证漏洞是否已被修复
  6. 报告:生成漏洞管理报告,跟踪漏洞修复进度

3.2 漏洞管理系统实现

3.2.1 漏洞管理数据库设计

sql
-- 漏洞表
CREATE TABLE vulnerabilities (
    id INT PRIMARY KEY AUTO_INCREMENT,
    vuln_id VARCHAR(100) NOT NULL,  -- CVE ID或其他漏洞标识符
    title VARCHAR(255) NOT NULL,
    description TEXT,
    severity ENUM('low', 'medium', 'high', 'critical') NOT NULL,
    cvss_score FLOAT,
    discovered_date DATETIME NOT NULL,
    last_seen_date DATETIME,
    status ENUM('open', 'in_progress', 'fixed', 'false_positive') DEFAULT 'open',
    fix_deadline DATETIME,
    assignee VARCHAR(100),
    remediation TEXT,
    references TEXT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 资产表
CREATE TABLE assets (
    id INT PRIMARY KEY AUTO_INCREMENT,
    asset_id VARCHAR(100) NOT NULL,
    name VARCHAR(255) NOT NULL,
    type ENUM('server', 'network', 'application', 'database', 'cloud') NOT NULL,
    ip_address VARCHAR(50),
    hostname VARCHAR(255),
    location VARCHAR(255),
    owner VARCHAR(100),
    environment ENUM('production', 'staging', 'development') NOT NULL,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 漏洞-资产关联表
CREATE TABLE vulnerability_assets (
    id INT PRIMARY KEY AUTO_INCREMENT,
    vulnerability_id INT NOT NULL,
    asset_id INT NOT NULL,
    discovered_date DATETIME NOT NULL,
    fixed_date DATETIME,
    verification_date DATETIME,
    verified_by VARCHAR(100),
    FOREIGN KEY (vulnerability_id) REFERENCES vulnerabilities(id),
    FOREIGN KEY (asset_id) REFERENCES assets(id)
);

-- 扫描表
CREATE TABLE scans (
    id INT PRIMARY KEY AUTO_INCREMENT,
    scan_id VARCHAR(100) NOT NULL,
    scan_type ENUM('network', 'web', 'vulnerability', 'compliance') NOT NULL,
    scanner_name VARCHAR(100) NOT NULL,
    scan_date DATETIME NOT NULL,
    status ENUM('running', 'completed', 'failed') NOT NULL,
    scan_target TEXT NOT NULL,
    scan_results TEXT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

3.2.2 漏洞管理API实现

python
# 漏洞管理API
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/vulnerability_management'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

class Vulnerability(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    vuln_id = db.Column(db.String(100), nullable=False)
    title = db.Column(db.String(255), nullable=False)
    description = db.Column(db.Text)
    severity = db.Column(db.Enum('low', 'medium', 'high', 'critical'), nullable=False)
    cvss_score = db.Column(db.Float)
    discovered_date = db.Column(db.DateTime, nullable=False)
    last_seen_date = db.Column(db.DateTime)
    status = db.Column(db.Enum('open', 'in_progress', 'fixed', 'false_positive'), default='open')
    fix_deadline = db.Column(db.DateTime)
    assignee = db.Column(db.String(100))
    remediation = db.Column(db.Text)
    references = db.Column(db.Text)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class Asset(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    asset_id = db.Column(db.String(100), nullable=False)
    name = db.Column(db.String(255), nullable=False)
    type = db.Column(db.Enum('server', 'network', 'application', 'database', 'cloud'), nullable=False)
    ip_address = db.Column(db.String(50))
    hostname = db.Column(db.String(255))
    location = db.Column(db.String(255))
    owner = db.Column(db.String(100))
    environment = db.Column(db.Enum('production', 'staging', 'development'), nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class VulnerabilityAsset(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    vulnerability_id = db.Column(db.Integer, db.ForeignKey('vulnerability.id'), nullable=False)
    asset_id = db.Column(db.Integer, db.ForeignKey('asset.id'), nullable=False)
    discovered_date = db.Column(db.DateTime, nullable=False)
    fixed_date = db.Column(db.DateTime)
    verification_date = db.Column(db.DateTime)
    verified_by = db.Column(db.String(100))

# 漏洞管理API路由

@app.route('/api/vulnerabilities', methods=['GET'])
def get_vulnerabilities():
    """获取漏洞列表"""
    vulnerabilities = Vulnerability.query.all()
    result = []
    for vuln in vulnerabilities:
        result.append({
            'id': vuln.id,
            'vuln_id': vuln.vuln_id,
            'title': vuln.title,
            'description': vuln.description,
            'severity': vuln.severity,
            'cvss_score': vuln.cvss_score,
            'discovered_date': vuln.discovered_date.isoformat(),
            'last_seen_date': vuln.last_seen_date.isoformat() if vuln.last_seen_date else None,
            'status': vuln.status,
            'fix_deadline': vuln.fix_deadline.isoformat() if vuln.fix_deadline else None,
            'assignee': vuln.assignee,
            'remediation': vuln.remediation,
            'references': vuln.references
        })
    return jsonify(result)

@app.route('/api/vulnerabilities', methods=['POST'])
def create_vulnerability():
    """创建漏洞"""
    data = request.json
    vulnerability = Vulnerability(
        vuln_id=data['vuln_id'],
        title=data['title'],
        description=data.get('description'),
        severity=data['severity'],
        cvss_score=data.get('cvss_score'),
        discovered_date=datetime.fromisoformat(data['discovered_date']),
        last_seen_date=datetime.fromisoformat(data.get('last_seen_date')) if data.get('last_seen_date') else None,
        status=data.get('status', 'open'),
        fix_deadline=datetime.fromisoformat(data.get('fix_deadline')) if data.get('fix_deadline') else None,
        assignee=data.get('assignee'),
        remediation=data.get('remediation'),
        references=data.get('references')
    )
    db.session.add(vulnerability)
    db.session.commit()
    return jsonify({'id': vulnerability.id}), 201

@app.route('/api/vulnerabilities/<int:id>', methods=['PUT'])
def update_vulnerability(id):
    """更新漏洞"""
    data = request.json
    vulnerability = Vulnerability.query.get(id)
    if not vulnerability:
        return jsonify({'error': 'Vulnerability not found'}), 404
    
    if 'vuln_id' in data:
        vulnerability.vuln_id = data['vuln_id']
    if 'title' in data:
        vulnerability.title = data['title']
    if 'description' in data:
        vulnerability.description = data['description']
    if 'severity' in data:
        vulnerability.severity = data['severity']
    if 'cvss_score' in data:
        vulnerability.cvss_score = data['cvss_score']
    if 'status' in data:
        vulnerability.status = data['status']
    if 'fix_deadline' in data:
        vulnerability.fix_deadline = datetime.fromisoformat(data['fix_deadline'])
    if 'assignee' in data:
        vulnerability.assignee = data['assignee']
    if 'remediation' in data:
        vulnerability.remediation = data['remediation']
    if 'references' in data:
        vulnerability.references = data['references']
    
    vulnerability.last_seen_date = datetime.utcnow()
    db.session.commit()
    return jsonify({'id': vulnerability.id})

@app.route('/api/vulnerabilities/<int:id>', methods=['DELETE'])
def delete_vulnerability(id):
    """删除漏洞"""
    vulnerability = Vulnerability.query.get(id)
    if not vulnerability:
        return jsonify({'error': 'Vulnerability not found'}), 404
    
    db.session.delete(vulnerability)
    db.session.commit()
    return jsonify({'message': 'Vulnerability deleted'})

# 资产管理API路由

@app.route('/api/assets', methods=['GET'])
def get_assets():
    """获取资产列表"""
    assets = Asset.query.all()
    result = []
    for asset in assets:
        result.append({
            'id': asset.id,
            'asset_id': asset.asset_id,
            'name': asset.name,
            'type': asset.type,
            'ip_address': asset.ip_address,
            'hostname': asset.hostname,
            'location': asset.location,
            'owner': asset.owner,
            'environment': asset.environment
        })
    return jsonify(result)

@app.route('/api/assets', methods=['POST'])
def create_asset():
    """创建资产"""
    data = request.json
    asset = Asset(
        asset_id=data['asset_id'],
        name=data['name'],
        type=data['type'],
        ip_address=data.get('ip_address'),
        hostname=data.get('hostname'),
        location=data.get('location'),
        owner=data.get('owner'),
        environment=data['environment']
    )
    db.session.add(asset)
    db.session.commit()
    return jsonify({'id': asset.id}), 201

if __name__ == '__main__':
    db.create_all()
    app.run(debug=True, host='0.0.0.0', port=5000)

3.2.3 漏洞管理前端实现

vue
<template>
  <div class="vulnerability-management">
    <h1>漏洞管理系统</h1>
    
    <!-- 漏洞统计 -->
    <div class="stats">
      <el-card class="stat-card">
        <template #header>
          <div class="card-header">
            <span>总漏洞数</span>
          </div>
        </template>
        <div class="stat-value">{{ totalVulnerabilities }}</div>
      </el-card>
      <el-card class="stat-card">
        <template #header>
          <div class="card-header">
            <span>高危漏洞</span>
          </div>
        </template>
        <div class="stat-value high">{{ highVulnerabilities }}</div>
      </el-card>
      <el-card class="stat-card">
        <template #header>
          <div class="card-header">
            <span>未修复漏洞</span>
          </div>
        </template>
        <div class="stat-value critical">{{ openVulnerabilities }}</div>
      </el-card>
      <el-card class="stat-card">
        <template #header>
          <div class="card-header">
            <span>修复率</span>
          </div>
        </template>
        <div class="stat-value">{{ fixRate }}%</div>
      </el-card>
    </div>
    
    <!-- 漏洞列表 -->
    <el-card>
      <template #header>
        <div class="card-header">
          <span>漏洞列表</span>
          <el-button type="primary" @click="openAddDialog">添加漏洞</el-button>
        </div>
      </template>
      <el-table :data="vulnerabilities" style="width: 100%">
        <el-table-column prop="vuln_id" label="漏洞ID" width="150" />
        <el-table-column prop="title" label="标题" />
        <el-table-column prop="severity" label="严重程度" width="100">
          <template #default="scope">
            <el-tag :type="getSeverityType(scope.row.severity)">
              {{ scope.row.severity }}
            </el-tag>
          </template>
        </el-table-column>
        <el-table-column prop="cvss_score" label="CVSS评分" width="100" />
        <el-table-column prop="discovered_date" label="发现日期" width="150" />
        <el-table-column prop="status" label="状态" width="120">
          <template #default="scope">
            <el-tag :type="getStatusType(scope.row.status)">
              {{ scope.row.status }}
            </el-tag>
          </template>
        </el-table-column>
        <el-table-column prop="assignee" label="负责人" width="120" />
        <el-table-column label="操作" width="150">
          <template #default="scope">
            <el-button size="small" @click="editVulnerability(scope.row)">编辑</el-button>
            <el-button size="small" type="danger" @click="deleteVulnerability(scope.row.id)">删除</el-button>
          </template>
        </el-table-column>
      </el-table>
      <el-pagination
        v-model:current-page="currentPage"
        v-model:page-size="pageSize"
        :page-sizes="[10, 20, 50, 100]"
        layout="total, sizes, prev, pager, next, jumper"
        :total="totalVulnerabilities"
        @size-change="handleSizeChange"
        @current-change="handleCurrentChange"
        style="margin-top: 20px"
      />
    </el-card>
    
    <!-- 添加/编辑漏洞对话框 -->
    <el-dialog
      v-model="dialogVisible"
      :title="isEditing ? '编辑漏洞' : '添加漏洞'"
      width="600px"
    >
      <el-form :model="formData" label-width="100px">
        <el-form-item label="漏洞ID">
          <el-input v-model="formData.vuln_id" />
        </el-form-item>
        <el-form-item label="标题">
          <el-input v-model="formData.title" />
        </el-form-item>
        <el-form-item label="描述">
          <el-input v-model="formData.description" type="textarea" :rows="3" />
        </el-form-item>
        <el-form-item label="严重程度">
          <el-select v-model="formData.severity">
            <el-option label="低" value="low" />
            <el-option label="中" value="medium" />
            <el-option label="高" value="high" />
            <el-option label="严重" value="critical" />
          </el-select>
        </el-form-item>
        <el-form-item label="CVSS评分">
          <el-input v-model.number="formData.cvss_score" />
        </el-form-item>
        <el-form-item label="发现日期">
          <el-date-picker v-model="formData.discovered_date" type="datetime" />
        </el-form-item>
        <el-form-item label="状态">
          <el-select v-model="formData.status">
            <el-option label="未修复" value="open" />
            <el-option label="修复中" value="in_progress" />
            <el-option label="已修复" value="fixed" />
            <el-option label="误报" value="false_positive" />
          </el-select>
        </el-form-item>
        <el-form-item label="负责人">
          <el-input v-model="formData.assignee" />
        </el-form-item>
        <el-form-item label="修复方案">
          <el-input v-model="formData.remediation" type="textarea" :rows="3" />
        </el-form-item>
      </el-form>
      <template #footer>
        <span class="dialog-footer">
          <el-button @click="dialogVisible = false">取消</el-button>
          <el-button type="primary" @click="saveVulnerability">保存</el-button>
        </span>
      </template>
    </el-dialog>
  </div>
</template>

<script>
export default {
  data() {
    return {
      vulnerabilities: [],
      totalVulnerabilities: 0,
      highVulnerabilities: 0,
      openVulnerabilities: 0,
      fixRate: 0,
      currentPage: 1,
      pageSize: 10,
      dialogVisible: false,
      isEditing: false,
      formData: {
        id: '',
        vuln_id: '',
        title: '',
        description: '',
        severity: 'medium',
        cvss_score: 0,
        discovered_date: new Date(),
        status: 'open',
        assignee: '',
        remediation: ''
      }
    }
  },
  mounted() {
    this.loadVulnerabilities()
  },
  methods: {
    loadVulnerabilities() {
      // 实际实现中,这里需要调用API获取漏洞列表
      this.vulnerabilities = [
        {
          id: 1,
          vuln_id: 'CVE-2021-44228',
          title: 'Log4Shell 远程代码执行漏洞',
          description: 'Apache Log4j 存在远程代码执行漏洞,攻击者可以通过构造特殊请求触发漏洞',
          severity: 'critical',
          cvss_score: 10.0,
          discovered_date: '2021-12-10T00:00:00Z',
          status: 'in_progress',
          assignee: '张三',
          remediation: '升级Log4j到2.17.0或更高版本'
        },
        {
          id: 2,
          vuln_id: 'CVE-2022-22965',
          title: 'Spring Cloud Gateway 远程代码执行漏洞',
          description: 'Spring Cloud Gateway 存在远程代码执行漏洞,攻击者可以通过构造特殊请求触发漏洞',
          severity: 'high',
          cvss_score: 9.8,
          discovered_date: '2022-03-31T00:00:00Z',
          status: 'open',
          assignee: '李四',
          remediation: '升级Spring Cloud Gateway到3.1.1或更高版本'
        },
        {
          id: 3,
          vuln_id: 'CVE-2022-0847',
          title: 'Dirty Pipe 本地提权漏洞',
          description: 'Linux内核存在本地提权漏洞,攻击者可以通过该漏洞获取root权限',
          severity: 'high',
          cvss_score: 7.8,
          discovered_date: '2022-03-07T00:00:00Z',
          status: 'fixed',
          assignee: '王五',
          remediation: '升级Linux内核到5.16.11或更高版本'
        }
      ]
      this.calculateStats()
    },
    calculateStats() {
      this.totalVulnerabilities = this.vulnerabilities.length
      this.highVulnerabilities = this.vulnerabilities.filter(v => v.severity === 'high' || v.severity === 'critical').length
      this.openVulnerabilities = this.vulnerabilities.filter(v => v.status === 'open').length
      this.fixRate = Math.round((this.vulnerabilities.filter(v => v.status === 'fixed').length / this.totalVulnerabilities) * 100) || 0
    },
    getSeverityType(severity) {
      switch(severity) {
        case 'critical': return 'danger'
        case 'high': return 'warning'
        case 'medium': return 'info'
        case 'low': return 'success'
        default: return ''
      }
    },
    getStatusType(status) {
      switch(status) {
        case 'open': return 'danger'
        case 'in_progress': return 'warning'
        case 'fixed': return 'success'
        case 'false_positive': return 'info'
        default: return ''
      }
    },
    openAddDialog() {
      this.isEditing = false
      this.formData = {
        id: '',
        vuln_id: '',
        title: '',
        description: '',
        severity: 'medium',
        cvss_score: 0,
        discovered_date: new Date(),
        status: 'open',
        assignee: '',
        remediation: ''
      }
      this.dialogVisible = true
    },
    editVulnerability(vulnerability) {
      this.isEditing = true
      this.formData = { ...vulnerability }
      this.formData.discovered_date = new Date(vulnerability.discovered_date)
      this.dialogVisible = true
    },
    saveVulnerability() {
      // 实际实现中,这里需要调用API保存漏洞
      if (this.isEditing) {
        const index = this.vulnerabilities.findIndex(v => v.id === this.formData.id)
        if (index !== -1) {
          this.vulnerabilities[index] = { ...this.formData }
        }
      } else {
        this.formData.id = Date.now()
        this.vulnerabilities.push({ ...this.formData })
      }
      this.calculateStats()
      this.dialogVisible = false
    },
    deleteVulnerability(id) {
      // 实际实现中,这里需要调用API删除漏洞
      this.vulnerabilities = this.vulnerabilities.filter(v => v.id !== id)
      this.calculateStats()
    },
    handleSizeChange(size) {
      this.pageSize = size
      this.loadVulnerabilities()
    },
    handleCurrentChange(current) {
      this.currentPage = current
      this.loadVulnerabilities()
    }
  }
}
</script>

<style scoped>
h1 {
  margin-bottom: 20px;
}

.stats {
  display: flex;
  gap: 20px;
  margin-bottom: 30px;
}

.stat-card {
  flex: 1;
}

.card-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
}

.stat-value {
  font-size: 36px;
  font-weight: bold;
  text-align: center;
  margin-top: 20px;
}

.stat-value.high {
  color: #e6a23c;
}

.stat-value.critical {
  color: #f56c6c;
}

.stat-value.low {
  color: #67c23a;
}

.stat-value.medium {
  color: #409eff;
}
</style>

4. 合规检查与管理

4.1 合规检查概述

合规检查是运维安全的重要组成部分,它确保系统和应用符合以下要求:

  • 行业标准:如PCI DSS(支付卡行业数据安全标准)、HIPAA(健康保险便携性和责任法案)等
  • 法规要求:如GDPR(通用数据保护条例)、SOX(萨班斯-奥克斯利法案)等
  • 内部政策:企业内部制定的安全政策和标准
  • 技术标准:如ISO 27001(信息安全管理体系)、NIST(美国国家标准与技术研究院)框架等

4.2 合规检查工具

4.2.1 OpenSCAP

bash
# 安装OpenSCAP
apt-get install openscap-scanner scap-security-guide

# 扫描系统
oscap xccdf eval --profile xccdf_org.ssgproject.content_profile_stig-rhel7-server-upstream --results scan.xml --report scan.html /usr/share/xml/scap/ssg/content/ssg-rhel7-ds.xml

# 查看扫描结果
firefox scan.html

# 查看合规状态
oscap xccdf eval --profile xccdf_org.ssgproject.content_profile_stig-rhel7-server-upstream --results scan.xml /usr/share/xml/scap/ssg/content/ssg-rhel7-ds.xml | grep -E "PASS|FAIL|ERROR"

4.2.2 Chef InSpec

bash
# 安装Chef InSpec
curl https://omnitruck.chef.io/install.sh | bash -s -- -P inspec

# 创建合规检查配置文件
cat > compliance_profile.rb << EOF
name "Linux Compliance Profile"
title "Linux Compliance Profile"
maintainer "Your Name"
summary "InSpec Compliance Profile for Linux"
version "1.0.0"

control "os-01" do
  title "Check OS Version"
  desc "Ensure the OS version is up to date"
  impact 1.0
  describe command('cat /etc/os-release') do
    its('stdout') { should include('Ubuntu 20.04') }
  end
end

control "ssh-01" do
  title "SSH Configuration"
  desc "Ensure SSH is properly configured"
  impact 0.7
  describe sshd_config do
    its('PermitRootLogin') { should eq('no') }
    its('PasswordAuthentication') { should eq('no') }
    its('PubkeyAuthentication') { should eq('yes') }
  end
end

control "firewall-01" do
  title "Firewall Configuration"
  desc "Ensure firewall is enabled and configured"
  impact 0.8
  describe service('ufw') do
    it { should be_installed }
    it { should be_running }
  end
end
EOF

# 执行合规检查
inspec exec compliance_profile.rb --reporter html:compliance_report.html

# 查看合规报告
firefox compliance_report.html

4.3 合规管理系统实现

4.3.1 合规管理数据库设计

sql
-- 合规标准表
CREATE TABLE compliance_standards (
    id INT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    version VARCHAR(50),
    type VARCHAR(100),  -- 行业标准、法规要求、内部政策等
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 合规控制表
CREATE TABLE compliance_controls (
    id INT PRIMARY KEY AUTO_INCREMENT,
    standard_id INT NOT NULL,
    control_id VARCHAR(100) NOT NULL,
    title VARCHAR(255) NOT NULL,
    description TEXT,
    requirement TEXT,
    severity ENUM('low', 'medium', 'high') NOT NULL,
    test_procedure TEXT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    FOREIGN KEY (standard_id) REFERENCES compliance_standards(id)
);

-- 合规检查结果表
CREATE TABLE compliance_scans (
    id INT PRIMARY KEY AUTO_INCREMENT,
    scan_id VARCHAR(100) NOT NULL,
    standard_id INT NOT NULL,
    asset_id INT NOT NULL,
    scan_date DATETIME NOT NULL,
    status ENUM('pass', 'fail', 'partial') NOT NULL,
    pass_count INT NOT NULL,
    fail_count INT NOT NULL,
    total_count INT NOT NULL,
    compliance_score FLOAT NOT NULL,
    scan_results TEXT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (standard_id) REFERENCES compliance_standards(id),
    FOREIGN KEY (asset_id) REFERENCES assets(id)
);

-- 合规检查详细结果表
CREATE TABLE compliance_scan_details (
    id INT PRIMARY KEY AUTO_INCREMENT,
    scan_id INT NOT NULL,
    control_id INT NOT NULL,
    status ENUM('pass', 'fail', 'not_applicable', 'error') NOT NULL,
    message TEXT,
    evidence TEXT,
    FOREIGN KEY (scan_id) REFERENCES compliance_scans(id),
    FOREIGN KEY (control_id) REFERENCES compliance_controls(id)
);

4.3.2 合规管理API实现

python
# 合规管理API
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/compliance_management'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

class ComplianceStandard(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255), nullable=False)
    description = db.Column(db.Text)
    version = db.Column(db.String(50))
    type = db.Column(db.String(100))
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class ComplianceControl(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    standard_id = db.Column(db.Integer, db.ForeignKey('compliance_standard.id'), nullable=False)
    control_id = db.Column(db.String(100), nullable=False)
    title = db.Column(db.String(255), nullable=False)
    description = db.Column(db.Text)
    requirement = db.Column(db.Text)
    severity = db.Column(db.Enum('low', 'medium', 'high'), nullable=False)
    test_procedure = db.Column(db.Text)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class ComplianceScan(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    scan_id = db.Column(db.String(100), nullable=False)
    standard_id = db.Column(db.Integer, db.ForeignKey('compliance_standard.id'), nullable=False)
    asset_id = db.Column(db.Integer, nullable=False)
    scan_date = db.Column(db.DateTime, nullable=False)
    status = db.Column(db.Enum('pass', 'fail', 'partial'), nullable=False)
    pass_count = db.Column(db.Integer, nullable=False)
    fail_count = db.Column(db.Integer, nullable=False)
    total_count = db.Column(db.Integer, nullable=False)
    compliance_score = db.Column(db.Float, nullable=False)
    scan_results = db.Column(db.Text)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

class ComplianceScanDetail(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    scan_id = db.Column(db.Integer, db.ForeignKey('compliance_scan.id'), nullable=False)
    control_id = db.Column(db.Integer, db.ForeignKey('compliance_control.id'), nullable=False)
    status = db.Column(db.Enum('pass', 'fail', 'not_applicable', 'error'), nullable=False)
    message = db.Column(db.Text)
    evidence = db.Column(db.Text)

# 合规管理API路由

@app.route('/api/compliance/standards', methods=['GET'])
def get_compliance_standards():
    """获取合规标准列表"""
    standards = ComplianceStandard.query.all()
    result = []
    for standard in standards:
        result.append({
            'id': standard.id,
            'name': standard.name,
            'description': standard.description,
            'version': standard.version,
            'type': standard.type
        })
    return jsonify(result)

@app.route('/api/compliance/standards', methods=['POST'])
def create_compliance_standard():
    """创建合规标准"""
    data = request.json
    standard = ComplianceStandard(
        name=data['name'],
        description=data.get('description'),
        version=data.get('version'),
        type=data.get('type')
    )
    db.session.add(standard)
    db.session.commit()
    return jsonify({'id': standard.id}), 201

@app.route('/api/compliance/standards/<int:id>/controls', methods=['GET'])
def get_compliance_controls(id):
    """获取合规控制列表"""
    controls = ComplianceControl.query.filter_by(standard_id=id).all()
    result = []
    for control in controls:
        result.append({
            'id': control.id,
            'control_id': control.control_id,
            'title': control.title,
            'description': control.description,
            'requirement': control.requirement,
            'severity': control.severity,
            'test_procedure': control.test_procedure
        })
    return jsonify(result)

@app.route('/api/compliance/scans', methods=['POST'])
def create_compliance_scan():
    """创建合规扫描"""
    data = request.json
    scan = ComplianceScan(
        scan_id=data['scan_id'],
        standard_id=data['standard_id'],
        asset_id=data['asset_id'],
        scan_date=datetime.fromisoformat(data['scan_date']),
        status=data['status'],
        pass_count=data['pass_count'],
        fail_count=data['fail_count'],
        total_count=data['total_count'],
        compliance_score=data['compliance_score'],
        scan_results=data.get('scan_results')
    )
    db.session.add(scan)
    db.session.commit()
    
    # 创建扫描详情
    for detail in data.get('details', []):
        scan_detail = ComplianceScanDetail(
            scan_id=scan.id,
            control_id=detail['control_id'],
            status=detail['status'],
            message=detail.get('message'),
            evidence=detail.get('evidence')
        )
        db.session.add(scan_detail)
    
    db.session.commit()
    return jsonify({'id': scan.id}), 201

@app.route('/api/compliance/scans/<int:id>', methods=['GET'])
def get_compliance_scan(id):
    """获取合规扫描详情"""
    scan = ComplianceScan.query.get(id)
    if not scan:
        return jsonify({'error': 'Scan not found'}), 404
    
    details = ComplianceScanDetail.query.filter_by(scan_id=id).all()
    detail_result = []
    for detail in details:
        detail_result.append({
            'id': detail.id,
            'control_id': detail.control_id,
            'status': detail.status,
            'message': detail.message,
            'evidence': detail.evidence
        })
    
    result = {
        'id': scan.id,
        'scan_id': scan.scan_id,
        'standard_id': scan.standard_id,
        'asset_id': scan.asset_id,
        'scan_date': scan.scan_date.isoformat(),
        'status': scan.status,
        'pass_count': scan.pass_count,
        'fail_count': scan.fail_count,
        'total_count': scan.total_count,
        'compliance_score': scan.compliance_score,
        'scan_results': scan.scan_results,
        'details': detail_result
    }
    return jsonify(result)

if __name__ == '__main__':
    db.create_all()
    app.run(debug=True, host='0.0.0.0', port=5000)

5. 安全审计系统

5.1 安全审计概述

安全审计是运维安全的重要组成部分,它包括以下内容:

  • 事件记录:记录系统和应用的安全事件
  • 日志管理:收集、存储和管理安全日志
  • 事件分析:分析安全事件,识别潜在的安全威胁
  • 合规审计:确保系统和应用符合合规要求
  • 取证分析:在安全事件发生后进行取证分析
  • 审计报告:生成安全审计报告,提供合规证明

5.2 安全审计工具

5.2.1 Linux Auditd

bash
# 安装Auditd
apt-get install auditd audispd-plugins

# 启动Auditd
systemctl start auditd
systemctl enable auditd

# 配置Auditd规则
cat > /etc/audit/rules.d/audit.rules << EOF
# 监控系统调用
-a always,exit -F arch=b64 -S execve -k exec

# 监控文件访问
-w /etc/passwd -p wa -k passwd_changes
-w /etc/shadow -p wa -k shadow_changes
-w /etc/sudoers -p wa -k sudoers_changes

# 监控登录活动
-w /var/log/auth.log -p wa -k auth_log

# 监控网络配置
-w /etc/network/ -p wa -k network_config

# 监控防火墙配置
-w /etc/ufw/ -p wa -k firewall_config
EOF

# 重新加载规则
auditctl -R /etc/audit/rules.d/audit.rules

# 查看Auditd状态
auditctl -s

# 查看审计日志
auditd -l
aureport -a
aureport -x  # 执行命令报告
aureport -l  # 登录报告
aureport -f  # 文件访问报告

# 实时监控审计日志
auditd -f

5.2.2 ELK Stack

bash
# 使用Docker启动ELK Stack
docker run -d --name elk -p 5601:5601 -p 9200:9200 -p 5044:5044 sebp/elk

# 配置Filebeat收集审计日志
cat > /etc/filebeat/filebeat.yml << EOF
filebeat.inputs:
- type: log
  paths:
    - /var/log/audit/audit.log
  fields:
    log_type: audit

output.elasticsearch:
  hosts: ["localhost:9200"]

setup.kibana:
  host: "localhost:5601"
EOF

# 启动Filebeat
systemctl start filebeat
systemctl enable filebeat

# 访问Kibana
# http://localhost:5601
# 创建索引模式: filebeat-*
# 查看审计日志

5.3 安全审计系统实现

5.3.1 安全审计数据库设计

sql
-- 审计事件表
CREATE TABLE audit_events (
    id INT PRIMARY KEY AUTO_INCREMENT,
    event_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,  -- 登录、文件访问、命令执行等
    event_time DATETIME NOT NULL,
    source_ip VARCHAR(50),
    user VARCHAR(100),
    action VARCHAR(255) NOT NULL,
    object VARCHAR(255),  -- 被操作的对象
    result ENUM('success', 'failure') NOT NULL,
    details TEXT,  -- 事件详细信息
    severity ENUM('low', 'medium', 'high', 'critical') DEFAULT 'medium',
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- 审计规则表
CREATE TABLE audit_rules (
    id INT PRIMARY KEY AUTO_INCREMENT,
    rule_name VARCHAR(255) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    pattern TEXT,  -- 事件匹配模式
    severity ENUM('low', 'medium', 'high', 'critical') DEFAULT 'medium',
    enabled BOOLEAN DEFAULT TRUE,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 审计报告表
CREATE TABLE audit_reports (
    id INT PRIMARY KEY AUTO_INCREMENT,
    report_name VARCHAR(255) NOT NULL,
    report_type VARCHAR(100) NOT NULL,  -- 每日、每周、每月、自定义
    start_time DATETIME NOT NULL,
    end_time DATETIME NOT NULL,
    total_events INT NOT NULL,
    failed_events INT NOT NULL,
    high_severity_events INT NOT NULL,
    report_content TEXT,  -- 报告内容
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

5.3.2 安全审计API实现

python
# 安全审计API
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/security_audit'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

class AuditEvent(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    event_id = db.Column(db.String(100), nullable=False)
    event_type = db.Column(db.String(100), nullable=False)
    event_time = db.Column(db.DateTime, nullable=False)
    source_ip = db.Column(db.String(50))
    user = db.Column(db.String(100))
    action = db.Column(db.String(255), nullable=False)
    object = db.Column(db.String(255))
    result = db.Column(db.Enum('success', 'failure'), nullable=False)
    details = db.Column(db.Text)
    severity = db.Column(db.Enum('low', 'medium', 'high', 'critical'), default='medium')
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

class AuditRule(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    rule_name = db.Column(db.String(255), nullable=False)
    event_type = db.Column(db.String(100), nullable=False)
    pattern = db.Column(db.Text)
    severity = db.Column(db.Enum('low', 'medium', 'high', 'critical'), default='medium')
    enabled = db.Column(db.Boolean, default=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class AuditReport(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    report_name = db.Column(db.String(255), nullable=False)
    report_type = db.Column(db.String(100), nullable=False)
    start_time = db.Column(db.DateTime, nullable=False)
    end_time = db.Column(db.DateTime, nullable=False)
    total_events = db.Column(db.Integer, nullable=False)
    failed_events = db.Column(db.Integer, nullable=False)
    high_severity_events = db.Column(db.Integer, nullable=False)
    report_content = db.Column(db.Text)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

# 安全审计API路由

@app.route('/api/audit/events', methods=['GET'])
def get_audit_events():
    """获取审计事件列表"""
    # 支持分页和过滤
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    event_type = request.args.get('event_type')
    user = request.args.get('user')
    start_time = request.args.get('start_time')
    end_time = request.args.get('end_time')
    
    query = AuditEvent.query
    
    if event_type:
        query = query.filter_by(event_type=event_type)
    if user:
        query = query.filter_by(user=user)
    if start_time:
        query = query.filter(AuditEvent.event_time >= datetime.fromisoformat(start_time))
    if end_time:
        query = query.filter(AuditEvent.event_time <= datetime.fromisoformat(end_time))
    
    pagination = query.order_by(AuditEvent.event_time.desc()).paginate(page=page, per_page=per_page, error_out=False)
    events = pagination.items
    
    result = []
    for event in events:
        result.append({
            'id': event.id,
            'event_id': event.event_id,
            'event_type': event.event_type,
            'event_time': event.event_time.isoformat(),
            'source_ip': event.source_ip,
            'user': event.user,
            'action': event.action,
            'object': event.object,
            'result': event.result,
            'details': event.details,
            'severity': event.severity
        })
    
    return jsonify({
        'events': result,
        'total': pagination.total,
        'pages': pagination.pages,
        'current_page': page
    })

@app.route('/api/audit/events', methods=['POST'])
def create_audit_event():
    """创建审计事件"""
    data = request.json
    event = AuditEvent(
        event_id=data['event_id'],
        event_type=data['event_type'],
        event_time=datetime.fromisoformat(data['event_time']),
        source_ip=data.get('source_ip'),
        user=data.get('user'),
        action=data['action'],
        object=data.get('object'),
        result=data['result'],
        details=data.get('details'),
        severity=data.get('severity', 'medium')
    )
    db.session.add(event)
    db.session.commit()
    return jsonify({'id': event.id}), 201

@app.route('/api/audit/rules', methods=['GET'])
def get_audit_rules():
    """获取审计规则列表"""
    rules = AuditRule.query.all()
    result = []
    for rule in rules:
        result.append({
            'id': rule.id,
            'rule_name': rule.rule_name,
            'event_type': rule.event_type,
            'pattern': rule.pattern,
            'severity': rule.severity,
            'enabled': rule.enabled
        })
    return jsonify(result)

@app.route('/api/audit/reports', methods=['POST'])
def create_audit_report():
    """创建审计报告"""
    data = request.json
    report = AuditReport(
        report_name=data['report_name'],
        report_type=data['report_type'],
        start_time=datetime.fromisoformat(data['start_time']),
        end_time=datetime.fromisoformat(data['end_time']),
        total_events=data['total_events'],
        failed_events=data['failed_events'],
        high_severity_events=data['high_severity_events'],
        report_content=data.get('report_content')
    )
    db.session.add(report)
    db.session.commit()
    return jsonify({'id': report.id}), 201

@app.route('/api/audit/reports/<int:id>', methods=['GET'])
def get_audit_report(id):
    """获取审计报告"""
    report = AuditReport.query.get(id)
    if not report:
        return jsonify({'error': 'Report not found'}), 404
    
    result = {
        'id': report.id,
        'report_name': report.report_name,
        'report_type': report.report_type,
        'start_time': report.start_time.isoformat(),
        'end_time': report.end_time.isoformat(),
        'total_events': report.total_events,
        'failed_events': report.failed_events,
        'high_severity_events': report.high_severity_events,
        'report_content': report.report_content,
        'created_at': report.created_at.isoformat()
    }
    return jsonify(result)

if __name__ == '__main__':
    db.create_all()
    app.run(debug=True, host='0.0.0.0', port=5000)

6. 安全监控与告警

6.1 安全监控概述

安全监控是运维安全的重要组成部分,它包括以下内容:

  • 实时监控:实时监控系统和应用的安全状态
  • 异常检测:检测系统和应用中的异常行为
  • 威胁识别:识别潜在的安全威胁
  • 安全告警:当检测到安全事件时生成告警
  • 事件关联:关联多个安全事件,识别复杂的安全攻击
  • 安全可视化:通过仪表盘直观展示安全状态

6.2 安全监控工具

6.2.1 Prometheus + Grafana

bash
# 使用Docker启动Prometheus和Grafana
docker-compose up -d

# docker-compose.yml
version: '3'
services:
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'node'
    static_configs:
      - targets: ['node-exporter:9100']
  - job_name: 'auditd'
    static_configs:
      - targets: ['auditd-exporter:9102']

# 安装node_exporter
docker run -d --name node-exporter -p 9100:9100 prom/node-exporter

# 访问Grafana
# http://localhost:3000
# 添加Prometheus数据源
# 导入安全监控仪表盘

6.2.2 Wazuh

bash
# 使用Docker启动Wazuh
docker run -d --name wazuh -p 5601:5601 -p 9200:9200 -p 50000:50000 wazuh/wazuh-docker

# 访问Wazuh界面
# http://localhost:5601
# 默认用户名: admin
# 默认密码: admin

# 安装Wazuh agent
apt-get install wazuh-agent

# 配置Wazuh agent
cat > /var/ossec/etc/ossec.conf << EOF
<ossec_config>
  <client>
    <server>
      <address>wazuh-server-ip</address>
      <port>1514</port>
      <protocol>tcp</protocol>
    </server>
    <config-profile>ubuntu, debian</config-profile>
    <notify_time>10</notify_time>
    <time-reconnect>60</time-reconnect>
    <auto_restart>yes</auto_restart>
    <crypto_method>aes</crypto_method>
  </client>
  <syscheck>
    <directories check_all="yes" report_changes="yes">/etc,/usr/bin,/usr/sbin</directories>
    <directories check_all="yes" report_changes="yes">/bin,/sbin</directories>
  </syscheck>
  <rootcheck></rootcheck>
  <wodle name="command">
    <disabled>no</disabled>
    <command>df -h</command>
    <frequency>3600</frequency>
    <report_changes>yes</report_changes>
  </wodle>
</ossec_config>
EOF

# 启动Wazuh agent
systemctl start wazuh-agent
systemctl enable wazuh-agent

6.3 安全监控系统实现

6.3.1 安全监控数据库设计

sql
-- 安全事件表
CREATE TABLE security_events (
    id INT PRIMARY KEY AUTO_INCREMENT,
    event_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,  -- 入侵尝试、异常访问、漏洞利用等
    event_time DATETIME NOT NULL,
    source_ip VARCHAR(50),
    destination_ip VARCHAR(50),
    source_port INT,
    destination_port INT,
    protocol VARCHAR(50),
    severity ENUM('low', 'medium', 'high', 'critical') NOT NULL,
    status ENUM('new', 'processed', 'dismissed', 'resolved') DEFAULT 'new',
    description TEXT,
    details TEXT,  -- 事件详细信息
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 安全告警表
CREATE TABLE security_alerts (
    id INT PRIMARY KEY AUTO_INCREMENT,
    alert_id VARCHAR(100) NOT NULL,
    event_id INT NOT NULL,
    alert_time DATETIME NOT NULL,
    severity ENUM('low', 'medium', 'high', 'critical') NOT NULL,
    title VARCHAR(255) NOT NULL,
    description TEXT,
    status ENUM('triggered', 'acknowledged', 'resolved', 'suppressed') DEFAULT 'triggered',
    assigned_to VARCHAR(100),
    notification_sent BOOLEAN DEFAULT FALSE,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATET

评论区

专业的Linux技术学习平台,从入门到精通的完整学习路径