主题
CMDB系统开发实战
1. CMDB系统概述
1.1 CMDB的定义和价值
CMDB(Configuration Management Database)是指配置管理数据库,是一个存储和管理企业IT基础设施和应用服务配置信息的综合性系统。它是IT服务管理(ITSM)的核心组件,也是DevOps和自动化运维的重要基础。
核心价值:
- 资产可视化:全面掌握IT资产的数量、分布和状态
- 配置管理:集中管理配置信息,确保配置的一致性和可追溯性
- 关系管理:记录IT组件之间的依赖关系,支持影响分析
- 变更管理:跟踪配置变更,评估变更影响,减少变更风险
- 合规审计:提供完整的配置历史,支持合规审计和风险管理
- 自动化基础:为自动化运维和DevOps提供数据支撑
- 成本优化:基于资产信息,优化资源配置,降低IT成本
1.2 CMDB的应用场景
| 场景 | 传统方式 | CMDB系统 | 效率提升 |
|---|---|---|---|
| 资产管理 | Excel表格,手动维护 | 自动发现,实时更新 | 90%+ |
| 配置管理 | 分散存储,版本混乱 | 集中管理,版本控制 | 85%+ |
| 变更影响分析 | 经验判断,准确性低 | 依赖关系分析,科学评估 | 95%+ |
| 故障排查 | 人工分析,耗时长 | 快速定位,关联分析 | 80%+ |
| 合规审计 | 手动收集,工作量大 | 自动生成报告,实时审计 | 85%+ |
| 资源优化 | 凭感觉,缺乏数据 | 基于数据,科学决策 | 70%+ |
2. CMDB系统技术栈
2.1 核心技术选型
| 技术 | 用途 | 优势 | 适用场景 |
|---|---|---|---|
| Python | 后端开发、脚本开发 | 语法简洁,库丰富 | 大多数CMDB场景 |
| Go | 高性能服务、API开发 | 编译型,性能优异 | 高并发场景 |
| Vue.js | 前端开发 | 响应式,开发效率高 | CMDB前端界面 |
| React | 前端开发 | 组件化,生态成熟 | 复杂CMDB前端 |
| Flask | Python Web框架 | 轻量级,灵活 | 小型CMDB系统 |
| Django | Python Web框架 | 全功能,ORM强大 | 大型CMDB系统 |
| Gin | Go Web框架 | 高性能,简洁 | 高并发CMDB系统 |
| MySQL | 关系型数据库 | 成熟稳定,适合结构化数据 | 传统CMDB数据存储 |
| PostgreSQL | 关系型数据库 | 功能强大,支持JSON | 复杂CMDB数据存储 |
| MongoDB | 文档数据库 | 灵活,适合半结构化数据 | 动态配置信息存储 |
| Redis | 缓存数据库 | 高性能,支持过期时间 | 缓存热点数据,会话管理 |
| Elasticsearch | 搜索引擎 | 全文检索,聚合分析 | 资产搜索,日志分析 |
2.2 技术架构设计
典型CMDB系统架构:
mermaid
graph TD
subgraph 数据源层
A[服务器] -->|自动发现| B
C[网络设备] -->|自动发现| B
D[应用服务] -->|自动发现| B
E[云资源] -->|API集成| B
F[手动录入] -->|UI界面| B
end
subgraph 采集层
B[采集器] -->|数据处理| C
C[数据清洗] -->|数据标准化| D
end
subgraph 存储层
D[数据标准化] -->|存储| E[关系数据库]
D -->|存储| F[文档数据库]
D -->|存储| G[缓存数据库]
end
subgraph 服务层
E -->|提供数据| H[API服务]
F -->|提供数据| H
G -->|提供数据| H
H -->|业务逻辑| I[业务服务]
I -->|认证授权| J[安全服务]
I -->|工作流| K[工作流服务]
end
subgraph 应用层
H -->|调用| L[前端应用]
I -->|调用| L
J -->|调用| L
K -->|调用| L
L -->|展示| M[资产管理]
L -->|展示| N[配置管理]
L -->|展示| O[关系管理]
L -->|展示| P[变更管理]
L -->|展示| Q[报表分析]
end
subgraph 集成层
H -->|集成| R[监控系统]
H -->|集成| S[发布系统]
H -->|集成| T[自动化系统]
H -->|集成| U[云平台]
end3. CMDB系统核心功能设计
3.1 资产管理
功能模块:
- 资产录入:手动录入资产信息,支持批量导入
- 资产发现:自动发现网络内的IT资产,如服务器、网络设备等
- 资产分类:根据类型、用途、位置等维度对资产进行分类
- 资产标签:为资产添加标签,支持多维度管理
- 资产状态:管理资产的生命周期状态,如在用、闲置、报废等
- 资产查询:支持多条件组合查询,快速定位资产
- 资产导出:支持导出资产信息,生成报表
数据模型:
python
# 资产模型
class Asset(Base):
__tablename__ = 'assets'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
asset_type = Column(String(50), nullable=False) # server, network, storage, application
asset_model = Column(String(100))
serial_number = Column(String(100), unique=True)
status = Column(String(20), default='in_use') # in_use, idle, maintenance, retired
location = Column(String(100))
department = Column(String(100))
owner = Column(String(100))
purchase_date = Column(Date)
warranty_end_date = Column(Date)
price = Column(DECIMAL(10, 2))
manufacturer = Column(String(100))
supplier = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系
server = relationship('Server', back_populates='asset', uselist=False)
network_device = relationship('NetworkDevice', back_populates='asset', uselist=False)
storage = relationship('Storage', back_populates='asset', uselist=False)
application = relationship('Application', back_populates='asset', uselist=False)
tags = relationship('AssetTag', back_populates='asset')3.2 配置管理
功能模块:
- 配置项管理:管理IT基础设施和应用服务的配置项
- 配置版本:跟踪配置项的版本变更,支持版本回滚
- 配置基线:建立配置基线,用于变更比较和合规检查
- 配置差异:自动检测配置差异,生成差异报告
- 配置审计:定期审计配置,确保配置的合规性
- 配置模板:基于模板快速创建配置项
- 配置导入导出:支持配置的导入导出,便于迁移和备份
数据模型:
python
# 配置项模型
class ConfigurationItem(Base):
__tablename__ = 'configuration_items'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
ci_type = Column(String(50), nullable=False) # hardware, software, service
asset_id = Column(Integer, ForeignKey('assets.id'))
version = Column(String(20))
status = Column(String(20), default='active') # active, inactive, deprecated
description = Column(Text)
attributes = Column(JSON) # 存储动态配置属性
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系
asset = relationship('Asset', backref='configuration_items')
versions = relationship('CI Version', back_populates='ci')
baselines = relationship('CIBaseline', back_populates='ci')3.3 关系管理
功能模块:
- 依赖关系:记录IT组件之间的依赖关系,如应用依赖服务器
- 拓扑关系:可视化展示IT组件之间的拓扑结构
- 影响分析:基于依赖关系,分析变更的潜在影响
- 根因分析:基于拓扑关系,快速定位故障的根因
- 关系自动发现:通过自动发现,识别和更新关系
- 关系验证:定期验证关系的准确性,确保关系数据的质量
数据模型:
python
# 关系模型
class Relationship(Base):
__tablename__ = 'relationships'
id = Column(Integer, primary_key=True, autoincrement=True)
source_id = Column(Integer, ForeignKey('configuration_items.id'), nullable=False)
target_id = Column(Integer, ForeignKey('configuration_items.id'), nullable=False)
relationship_type = Column(String(50), nullable=False) # depends_on, contains, hosted_on
description = Column(Text)
strength = Column(Integer, default=1) # 关系强度,1-5
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系
source = relationship('ConfigurationItem', foreign_keys=[source_id], backref='outgoing_relationships')
target = relationship('ConfigurationItem', foreign_keys=[target_id], backref='incoming_relationships')3.4 变更管理
功能模块:
- 变更申请:提交变更申请,包括变更内容、影响范围等
- 变更审批:基于工作流,执行变更审批流程
- 变更执行:执行变更,更新配置信息
- 变更记录:记录变更的详细信息,包括变更前后的配置差异
- 变更回滚:当变更失败时,支持快速回滚到之前的配置状态
- 变更统计:统计变更的数量、类型、成功率等指标
数据模型:
python
# 变更模型
class Change(Base):
__tablename__ = 'changes'
id = Column(Integer, primary_key=True, autoincrement=True)
change_id = Column(String(50), unique=True, nullable=False) # 变更单号
title = Column(String(200), nullable=False)
description = Column(Text)
change_type = Column(String(50), nullable=False) # standard, emergency, normal
priority = Column(String(20), default='medium') # low, medium, high, critical
status = Column(String(20), default='draft') # draft, pending, approved, rejected, implemented, closed
requested_by = Column(String(100), nullable=False)
approved_by = Column(String(100))
implemented_by = Column(String(100))
impact_assessment = Column(Text)
risk_assessment = Column(Text)
implementation_plan = Column(Text)
rollback_plan = Column(Text)
start_date = Column(DateTime)
end_date = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系
ci_changes = relationship('CIChange', back_populates='change')
approvals = relationship('Approval', back_populates='change')3.5 自动发现
功能模块:
- 网络扫描:通过网络扫描,发现网络内的设备和服务
- Agent发现:通过部署Agent,收集详细的配置信息
- API集成:通过云平台和第三方系统的API,获取配置信息
- 定期发现:设置定期发现任务,保持配置信息的实时性
- 发现规则:配置发现规则,过滤和分类发现的资产
- 发现结果处理:自动处理发现结果,更新CMDB数据
数据模型:
python
# 发现任务模型
class DiscoveryTask(Base):
__tablename__ = 'discovery_tasks'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
task_type = Column(String(50), nullable=False) # network_scan, agent, api
target = Column(Text, nullable=False) # 扫描目标或API地址
schedule = Column(String(100)) # cron表达式
status = Column(String(20), default='inactive') # active, inactive
created_by = Column(String(100), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系
discoveries = relationship('Discovery', back_populates='task')
# 发现结果模型
class Discovery(Base):
__tablename__ = 'discoveries'
id = Column(Integer, primary_key=True, autoincrement=True)
task_id = Column(Integer, ForeignKey('discovery_tasks.id'), nullable=False)
discovery_time = Column(DateTime, default=datetime.utcnow)
status = Column(String(20), default='running') # running, completed, failed
result = Column(JSON) # 发现结果
error_message = Column(Text) # 错误信息
# 关联关系
task = relationship('DiscoveryTask', back_populates='discoveries')3.6 报表分析
功能模块:
- 资产报表:生成资产的数量、分布、状态等报表
- 配置报表:生成配置的变更、合规性等报表
- 变更报表:生成变更的数量、类型、成功率等报表
- 趋势分析:分析资产、配置、变更的趋势
- 自定义报表:支持用户自定义报表,满足特定需求
- 报表导出:支持导出报表为Excel、PDF等格式
数据模型:
python
# 报表模型
class Report(Base):
__tablename__ = 'reports'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
report_type = Column(String(50), nullable=False) # asset, configuration, change, custom
description = Column(Text)
query = Column(Text, nullable=False) # 报表查询语句
parameters = Column(JSON) # 报表参数
created_by = Column(String(100), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系
report_instances = relationship('ReportInstance', back_populates='report')3.7 集成接口
功能模块:
- API接口:提供RESTful API,支持与其他系统集成
- Webhook:支持配置Webhook,接收外部系统的事件通知
- 数据同步:支持与其他系统的数据双向同步
- 认证授权:实施API认证和授权,确保接口安全
- 接口文档:提供详细的API文档,便于集成开发
数据模型:
python
# API密钥模型
class APIKey(Base):
__tablename__ = 'api_keys'
id = Column(Integer, primary_key=True, autoincrement=True)
key = Column(String(100), unique=True, nullable=False)
name = Column(String(100), nullable=False)
description = Column(Text)
permissions = Column(JSON) # 权限列表
status = Column(String(20), default='active') # active, inactive
created_by = Column(String(100), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
expires_at = Column(DateTime)4. CMDB系统技术实现
4.1 基于Python的CMDB后端
4.1.1 使用Flask开发
python
# 安装依赖
pip install flask flask-sqlalchemy flask-restful flask-jwt-extended flask-cors pymysql python-dotenv
# 创建CMDB后端
cat > app.py << 'EOF'
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_restful import Api, Resource
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
from flask_cors import CORS
from datetime import datetime
import json
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:password@localhost/cmdb'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['JWT_SECRET_KEY'] = 'your-secret-key'
CORS(app)
db = SQLAlchemy(app)
api = Api(app)
jwt = JWTManager(app)
# 数据模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(120), nullable=False)
role = db.Column(db.String(20), nullable=False)
class Asset(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
asset_type = db.Column(db.String(50), nullable=False)
asset_model = db.Column(db.String(100))
serial_number = db.Column(db.String(100), unique=True)
status = db.Column(db.String(20), default='in_use')
location = db.Column(db.String(100))
department = db.Column(db.String(100))
owner = db.Column(db.String(100))
purchase_date = db.Column(db.DateTime)
warranty_end_date = db.Column(db.DateTime)
price = db.Column(db.Float)
manufacturer = db.Column(db.String(100))
supplier = db.Column(db.String(100))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class ConfigurationItem(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
ci_type = db.Column(db.String(50), nullable=False)
asset_id = db.Column(db.Integer, db.ForeignKey('asset.id'))
version = db.Column(db.String(20))
status = db.Column(db.String(20), default='active')
description = db.Column(db.Text)
attributes = db.Column(db.JSON)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Relationship(db.Model):
id = db.Column(db.Integer, primary_key=True)
source_id = db.Column(db.Integer, db.ForeignKey('configuration_item.id'))
target_id = db.Column(db.Integer, db.ForeignKey('configuration_item.id'))
relationship_type = db.Column(db.String(50), nullable=False)
description = db.Column(db.Text)
strength = db.Column(db.Integer, default=1)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Change(db.Model):
id = db.Column(db.Integer, primary_key=True)
change_id = db.Column(db.String(50), unique=True, nullable=False)
title = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text)
change_type = db.Column(db.String(50), nullable=False)
priority = db.Column(db.String(20), default='medium')
status = db.Column(db.String(20), default='draft')
requested_by = db.Column(db.String(100), nullable=False)
approved_by = db.Column(db.String(100))
implemented_by = db.Column(db.String(100))
impact_assessment = db.Column(db.Text)
risk_assessment = db.Column(db.Text)
implementation_plan = db.Column(db.Text)
rollback_plan = db.Column(db.Text)
start_date = db.Column(db.DateTime)
end_date = db.Column(db.DateTime)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 创建数据库表
with app.app_context():
db.create_all()
# API资源
class LoginResource(Resource):
def post(self):
data = request.get_json()
user = User.query.filter_by(username=data['username']).first()
if not user or user.password != data['password']: # 实际应使用密码哈希
return {'error': '用户名或密码错误'}, 401
access_token = create_access_token(identity=user.username)
return {'access_token': access_token, 'role': user.role}
class AssetResource(Resource):
@jwt_required()
def get(self):
assets = Asset.query.all()
return jsonify([{
'id': asset.id,
'name': asset.name,
'asset_type': asset.asset_type,
'asset_model': asset.asset_model,
'serial_number': asset.serial_number,
'status': asset.status,
'location': asset.location,
'department': asset.department,
'owner': asset.owner,
'purchase_date': asset.purchase_date.isoformat() if asset.purchase_date else None,
'warranty_end_date': asset.warranty_end_date.isoformat() if asset.warranty_end_date else None,
'price': asset.price,
'manufacturer': asset.manufacturer,
'supplier': asset.supplier,
'created_at': asset.created_at.isoformat(),
'updated_at': asset.updated_at.isoformat()
} for asset in assets])
@jwt_required()
def post(self):
data = request.get_json()
asset = Asset(
name=data['name'],
asset_type=data['asset_type'],
asset_model=data.get('asset_model'),
serial_number=data.get('serial_number'),
status=data.get('status', 'in_use'),
location=data.get('location'),
department=data.get('department'),
owner=data.get('owner'),
purchase_date=datetime.fromisoformat(data['purchase_date']) if data.get('purchase_date') else None,
warranty_end_date=datetime.fromisoformat(data['warranty_end_date']) if data.get('warranty_end_date') else None,
price=data.get('price'),
manufacturer=data.get('manufacturer'),
supplier=data.get('supplier')
)
db.session.add(asset)
db.session.commit()
return {'id': asset.id, 'status': 'created'}, 201
# 注册API资源
api.add_resource(LoginResource, '/api/auth/login')
api.add_resource(AssetResource, '/api/assets')
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
EOF
# 运行后端服务
python app.py4.2 基于Vue.js的CMDB前端
4.2.1 初始化项目
bash
# 安装Vue CLI
npm install -g @vue/cli
# 创建项目
vue create cmdb-frontend
cd cmdb-frontend
# 安装依赖
npm install axios vue-router element-plus echarts vuex
# 创建前端项目结构
mkdir -p src/views src/components src/api src/utils src/store
# 修改主文件
cat > src/main.js << 'EOF'
import { createApp } from 'vue'
import App from './App.vue'
import router from './router'
import store from './store'
import ElementPlus from 'element-plus'
import 'element-plus/dist/index.css'
const app = createApp(App)
app.use(ElementPlus)
app.use(router)
app.use(store)
app.mount('#app')
EOF
# 创建路由
cat > src/router/index.js << 'EOF'
import { createRouter, createWebHistory } from 'vue-router'
import Login from '../views/Login.vue'
import Home from '../views/Home.vue'
import Assets from '../views/Assets.vue'
import Configurations from '../views/Configurations.vue'
import Relationships from '../views/Relationships.vue'
import Changes from '../views/Changes.vue'
import Reports from '../views/Reports.vue'
import Settings from '../views/Settings.vue'
const routes = [
{
path: '/login',
name: 'Login',
component: Login
},
{
path: '/',
name: 'Home',
component: Home,
meta: { requiresAuth: true }
},
{
path: '/assets',
name: 'Assets',
component: Assets,
meta: { requiresAuth: true }
},
{
path: '/configurations',
name: 'Configurations',
component: Configurations,
meta: { requiresAuth: true }
},
{
path: '/relationships',
name: 'Relationships',
component: Relationships,
meta: { requiresAuth: true }
},
{
path: '/changes',
name: 'Changes',
component: Changes,
meta: { requiresAuth: true }
},
{
path: '/reports',
name: 'Reports',
component: Reports,
meta: { requiresAuth: true }
},
{
path: '/settings',
name: 'Settings',
component: Settings,
meta: { requiresAuth: true }
}
]
const router = createRouter({
history: createWebHistory(process.env.BASE_URL),
routes
})
// 路由守卫
router.beforeEach((to, from, next) => {
const requiresAuth = to.matched.some(record => record.meta.requiresAuth)
const token = localStorage.getItem('token')
if (requiresAuth && !token) {
next('/login')
} else {
next()
}
})
export default router
EOF
# 创建存储
cat > src/store/index.js << 'EOF'
import { createStore } from 'vuex'
import axios from 'axios'
export default createStore({
state: {
user: null,
token: localStorage.getItem('token') || '',
assets: [],
configurations: [],
relationships: [],
changes: []
},
mutations: {
setUser(state, user) {
state.user = user
},
setToken(state, token) {
state.token = token
localStorage.setItem('token', token)
},
setAssets(state, assets) {
state.assets = assets
},
setConfigurations(state, configurations) {
state.configurations = configurations
},
setRelationships(state, relationships) {
state.relationships = relationships
},
setChanges(state, changes) {
state.changes = changes
}
},
actions: {
async login({ commit }, credentials) {
const response = await axios.post('http://localhost:5000/api/auth/login', credentials)
commit('setToken', response.data.access_token)
commit('setUser', { username: credentials.username, role: response.data.role })
return response.data
},
logout({ commit }) {
commit('setToken', '')
commit('setUser', null)
localStorage.removeItem('token')
},
async fetchAssets({ commit }) {
const response = await axios.get('http://localhost:5000/api/assets', {
headers: {
Authorization: `Bearer ${localStorage.getItem('token')}`
}
})
commit('setAssets', response.data)
}
},
getters: {
isLoggedIn: state => !!state.token,
currentUser: state => state.user
}
})
EOF
# 创建登录页面
cat > src/views/Login.vue << 'EOF'
<template>
<div class="login-container">
<el-card class="login-card">
<template #header>
<div class="login-header">
<span>CMDB系统登录</span>
</div>
</template>
<el-form :model="loginForm" @submit.prevent="login">
<el-form-item label="用户名" prop="username">
<el-input v-model="loginForm.username" placeholder="请输入用户名" />
</el-form-item>
<el-form-item label="密码" prop="password">
<el-input v-model="loginForm.password" type="password" placeholder="请输入密码" />
</el-form-item>
<el-form-item>
<el-button type="primary" native-type="submit" class="login-button">登录</el-button>
</el-form-item>
</el-form>
</el-card>
</div>
</template>
<script>
import { ref } from 'vue'
import { useRouter } from 'vue-router'
import { useStore } from 'vuex'
export default {
name: 'Login',
setup() {
const router = useRouter()
const store = useStore()
const loginForm = ref({
username: '',
password: ''
})
const login = async () => {
try {
await store.dispatch('login', loginForm.value)
router.push('/')
} catch (error) {
console.error('登录失败:', error)
}
}
return {
loginForm,
login
}
}
}
</script>
<style scoped>
.login-container {
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
background-color: #f5f7fa;
}
.login-card {
width: 400px;
box-shadow: 0 2px 12px 0 rgba(0, 0, 0, 0.1);
}
.login-header {
text-align: center;
font-size: 18px;
font-weight: bold;
}
.login-button {
width: 100%;
}
</style>
EOF
# 创建首页
cat > src/views/Home.vue << 'EOF'
<template>
<div class="home">
<el-card>
<template #header>
<div class="card-header">
<span>CMDB系统概览</span>
<el-button type="primary" size="small" @click="refresh">刷新</el-button>
</div>
</template>
<div class="overview-grid">
<div class="overview-item">
<el-card shadow="hover">
<template #header>
<div class="item-header">
<span>资产总数</span>
<el-icon><DataAnalysis /></el-icon>
</div>
</template>
<div class="item-value">{{ assetCount }}</div>
</el-card>
</div>
<div class="overview-item">
<el-card shadow="hover">
<template #header>
<div class="item-header">
<span>配置项总数</span>
<el-icon><Setting /></el-icon>
</div>
</template>
<div class="item-value">{{ configCount }}</div>
</el-card>
</div>
<div class="overview-item">
<el-card shadow="hover">
<template #header>
<div class="item-header">
<span>关系总数</span>
<el-icon><Link /></el-icon>
</div>
</template>
<div class="item-value">{{ relationshipCount }}</div>
</el-card>
</div>
<div class="overview-item">
<el-card shadow="hover">
<template #header>
<div class="item-header">
<span>待处理变更</span>
<el-icon><Timer /></el-icon>
</div>
</template>
<div class="item-value">{{ pendingChanges }}</div>
</el-card>
</div>
</div>
<div class="chart-container">
<el-card shadow="hover" class="chart-card">
<template #header>
<div class="item-header">
<span>资产类型分布</span>
</div>
</template>
<div ref="assetChart" class="chart"></div>
</el-card>
<el-card shadow="hover" class="chart-card">
<template #header>
<div class="item-header">
<span>资产状态分布</span>
</div>
</template>
<div ref="statusChart" class="chart"></div>
</el-card>
</div>
</el-card>
</div>
</template>
<script>
import { ref, onMounted, nextTick, computed } from 'vue'
import { useStore } from 'vuex'
import * as echarts from 'echarts'
export default {
name: 'Home',
setup() {
const store = useStore()
const assetChart = ref(null)
const statusChart = ref(null)
// 计算属性
const assetCount = computed(() => store.state.assets.length)
const configCount = computed(() => store.state.configurations.length)
const relationshipCount = computed(() => store.state.relationships.length)
const pendingChanges = computed(() => store.state.changes.filter(c => c.status === 'pending').length)
// 刷新数据
const refresh = async () => {
await store.dispatch('fetchAssets')
initCharts()
}
// 初始化图表
const initCharts = () => {
nextTick(() => {
// 资产类型分布图表
const assetChartInstance = echarts.init(assetChart.value)
assetChartInstance.setOption({
title: {
text: '资产类型分布',
left: 'center'
},
tooltip: {
trigger: 'item'
},
series: [{
name: '资产类型',
type: 'pie',
radius: '60%',
data: [
{ value: 30, name: '服务器' },
{ value: 20, name: '网络设备' },
{ value: 15, name: '存储设备' },
{ value: 10, name: '安全设备' },
{ value: 25, name: '应用服务' }
],
emphasis: {
itemStyle: {
shadowBlur: 10,
shadowOffsetX: 0,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
}]
})
// 资产状态分布图表
const statusChartInstance = echarts.init(statusChart.value)
statusChartInstance.setOption({
title: {
text: '资产状态分布',
left: 'center'
},
tooltip: {
trigger: 'item'
},
series: [{
name: '资产状态',
type: 'pie',
radius: '60%',
data: [
{ value: 60, name: '在用' },
{ value: 15, name: '闲置' },
{ value: 10, name: '维护' },
{ value: 15, name: '报废' }
],
emphasis: {
itemStyle: {
shadowBlur: 10,
shadowOffsetX: 0,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
}]
})
// 响应式调整
window.addEventListener('resize', () => {
assetChartInstance.resize()
statusChartInstance.resize()
})
})
}
// 生命周期
onMounted(async () => {
await store.dispatch('fetchAssets')
initCharts()
})
return {
assetCount,
configCount,
relationshipCount,
pendingChanges,
assetChart,
statusChart,
refresh
}
}
}
</script>
<style scoped>
.home {
padding: 20px;
}
.card-header {
display: flex;
justify-content: space-between;
align-items: center;
}
.overview-grid {
display: grid;
grid-template-columns: repeat(4, 1fr);
gap: 20px;
margin-bottom: 20px;
}
.overview-item {
flex: 1;
}
.item-header {
display: flex;
justify-content: space-between;
align-items: center;
}
.item-value {
font-size: 36px;
font-weight: bold;
text-align: center;
margin-top: 20px;
color: #409eff;
}
.chart-container {
display: grid;
grid-template-columns: repeat(2, 1fr);
gap: 20px;
}
.chart-card {
height: 400px;
}
.chart {
width: 100%;
height: 350px;
}
@media (max-width: 1200px) {
.overview-grid {
grid-template-columns: repeat(2, 1fr);
}
.chart-container {
grid-template-columns: 1fr;
}
}
</style>
EOF
# 运行前端服务
npm run serve5. CMDB系统自动发现实现
5.1 网络扫描实现
技术选型:
- Nmap:网络扫描工具,用于发现网络设备和服务
- Python-nmap:Python的Nmap接口,用于编程实现网络扫描
- SNMP:简单网络管理协议,用于获取网络设备的详细信息
- SSH:安全Shell,用于远程登录服务器,获取配置信息
实现代码:
python
# 网络扫描实现
import nmap
import snmpwalk
import paramiko
import json
def scan_network(network_range):
"""扫描网络,发现设备"""
nm = nmap.PortScanner()
nm.scan(network_range, arguments='-sV -O')
devices = []
for host in nm.all_hosts():
device = {
'ip': host,
'hostname': nm[host].hostname(),
'state': nm[host].state(),
'os': nm[host].get('osmatch', [{}])[0].get('name', 'Unknown'),
'ports': []
}
for proto in nm[host].all_protocols():
lport = nm[host][proto].keys()
for port in lport:
device['ports'].append({
'port': port,
'state': nm[host][proto][port]['state'],
'service': nm[host][proto][port]['name'],
'product': nm[host][proto][port].get('product', ''),
'version': nm[host][proto][port].get('version', '')
})
devices.append(device)
return devices
def get_snmp_info(ip, community='public'):
"""通过SNMP获取设备信息"""
try:
info = {
'sysDescr': snmpwalk.snmpwalk(ip, community, '1.3.6.1.2.1.1.1.0')[0][1],
'sysName': snmpwalk.snmpwalk(ip, community, '1.3.6.1.2.1.1.5.0')[0][1],
'sysUpTime': snmpwalk.snmpwalk(ip, community, '1.3.6.1.2.1.1.3.0')[0][1],
'sysContact': snmpwalk.snmpwalk(ip, community, '1.3.6.1.2.1.1.4.0')[0][1],
'sysLocation': snmpwalk.snmpwalk(ip, community, '1.3.6.1.2.1.1.6.0')[0][1]
}
return info
except Exception as e:
print(f"SNMP获取失败: {e}")
return {}
def get_ssh_info(ip, username, password):
"""通过SSH获取服务器信息"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=username, password=password)
# 获取系统信息
stdin, stdout, stderr = client.exec_command('uname -a')
uname = stdout.read().decode('utf-8').strip()
# 获取CPU信息
stdin, stdout, stderr = client.exec_command('cat /proc/cpuinfo | grep model name | head -1')
cpu = stdout.read().decode('utf-8').strip().split(':')[1].strip() if stdout else 'Unknown'
# 获取内存信息
stdin, stdout, stderr = client.exec_command('free -h')
memory = stdout.read().decode('utf-8').strip()
# 获取磁盘信息
stdin, stdout, stderr = client.exec_command('df -h')
disk = stdout.read().decode('utf-8').strip()
client.close()
return {
'uname': uname,
'cpu': cpu,
'memory': memory,
'disk': disk
}
except Exception as e:
print(f"SSH获取失败: {e}")
return {}
# 测试网络扫描
if __name__ == '__main__':
network_range = '192.168.1.0/24'
devices = scan_network(network_range)
print(json.dumps(devices, indent=2, ensure_ascii=False))
# 测试SNMP
for device in devices:
snmp_info = get_snmp_info(device['ip'])
if snmp_info:
device['snmp_info'] = snmp_info
# 测试SSH
for device in devices:
if 'Linux' in device.get('os', ''):
ssh_info = get_ssh_info(device['ip'], 'root', 'password')
if ssh_info:
device['ssh_info'] = ssh_info
print(json.dumps(devices, indent=2, ensure_ascii=False))5.2 Agent实现
技术选型:
- Python:开发Agent脚本
- psutil:获取系统信息
- requests:与CMDB服务器通信
- schedule:定期执行任务
实现代码:
python
# Agent实现
import psutil
import platform
import socket
import uuid
import requests
import schedule
import time
import json
def get_system_info():
"""获取系统信息"""
info = {
'hostname': socket.gethostname(),
'ip_address': socket.gethostbyname(socket.gethostname()),
'os': platform.system(),
'os_version': platform.version(),
'architecture': platform.architecture(),
'machine': platform.machine(),
'processor': platform.processor(),
'system_uuid': hex(uuid.getnode()),
'cpu': {
'count': psutil.cpu_count(logical=True),
'percent': psutil.cpu_percent(interval=1),
'freq': psutil.cpu_freq()._asdict() if hasattr(psutil, 'cpu_freq') else {}
},
'memory': {
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
'used': psutil.virtual_memory().used,
'percent': psutil.virtual_memory().percent
},
'disk': [],
'network': []
}
# 获取磁盘信息
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
info['disk'].append({
'device': partition.device,
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'opts': partition.opts,
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': usage.percent
})
except Exception as e:
pass
# 获取网络信息
for interface, addrs in psutil.net_if_addrs().items():
addresses = []
for addr in addrs:
if addr.family == socket.AF_INET:
addresses.append({
'address': addr.address,
'netmask': addr.netmask,
'broadcast': addr.broadcast
})
if addresses:
info['network'].append({
'interface': interface,
'addresses': addresses
})
return info
def send_to_cmdb(info, cmdb_url='http://localhost:5000/api/assets/discovery'):
"""发送信息到CMDB"""
try:
response = requests.post(cmdb_url, json=info, headers={
'Content-Type': 'application/json'
})
print(f"发送成功: {response.status_code}")
return response.status_code
except Exception as e:
print(f"发送失败: {e}")
return 500
def job():
"""定时任务"""
print(f"执行发现任务: {time.strftime('%Y-%m-%d %H:%M:%S')}")
info = get_system_info()
send_to_cmdb(info)
# 主函数
def main():
# 立即执行一次
job()
# 定时执行
schedule.every(1).hour.do(job)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
main()5.3 API集成实现
技术选型:
- Python:开发集成脚本
- boto3:AWS SDK,用于获取AWS资源信息
- azure-mgmt:Azure SDK,用于获取Azure资源信息
- google-cloud:GCP SDK,用于获取GCP资源信息
- aliyun-python-sdk:阿里云SDK,用于获取阿里云资源信息
- tencentcloud-sdk-python:腾讯云SDK,用于获取腾讯云资源信息
实现代码:
python
# AWS集成实现
import boto3
import json
def get_aws_ec2_instances():
"""获取AWS EC2实例"""
ec2 = boto3.resource('ec2', region_name='us-east-1')
instances = []
for instance in ec2.instances.all():
instances.append({
'id': instance.id,
'name': next((tag['Value'] for tag in instance.tags if tag['Key'] == 'Name'), ''),
'state': instance.state['Name'],
'instance_type': instance.instance_type,
'public_ip': instance.public_ip_address,
'private_ip': instance.private_ip_address,
'vpc_id': instance.vpc_id,
'subnet_id': instance.subnet_id,
'launch_time': instance.launch_time.isoformat(),
'tags': instance.tags
})
return instances
def get_aws_s3_buckets():
"""获取AWS S3存储桶"""
s3 = boto3.resource('s3')
buckets = []
for bucket in s3.buckets.all():
buckets.append({
'name': bucket.name,
'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else ''
})
return buckets
# 阿里云集成实现
from aliyunsdkcore.client import AcsClient
from aliyunsdkecs.request.v20140526 import DescribeInstancesRequest
def get_aliyun_ecs_instances(access_key, secret_key, region_id):
"""获取阿里云ECS实例"""
client = AcsClient(access_key, secret_key, region_id)
request = DescribeInstancesRequest.DescribeInstancesRequest()
request.set_PageSize(100)
response = client.do_action_with_exception(request)
data = json.loads(response)
instances = []
for instance in data['Instances']['Instance']:
instances.append({
'id': instance['InstanceId'],
'name': instance.get('InstanceName', ''),
'status': instance['Status'],
'instance_type': instance['InstanceType'],
'public_ip': instance.get('PublicIpAddress', {}).get('IpAddress', []),
'private_ip': instance.get('InnerIpAddress', {}).get('IpAddress', []),
'vpc_id': instance.get('VpcAttributes', {}).get('VpcId', ''),
'launch_time': instance['CreationTime']
})
return instances
# 测试API集成
if __name__ == '__main__':
# 测试AWS
ec2_instances = get_aws_ec2_instances()
print("AWS EC2实例:")
print(json.dumps(ec2_instances, indent=2, ensure_ascii=False))
s3_buckets = get_aws_s3_buckets()
print("AWS S3存储桶:")
print(json.dumps(s3_buckets, indent=2, ensure_ascii=False))
# 测试阿里云
access_key = 'your-access-key'
secret_key = 'your-secret-key'
region_id = 'cn-beijing'
aliyun_instances = get_aliyun_ecs_instances(access_key, secret_key, region_id)
print("阿里云ECS实例:")
print(json.dumps(aliyun_instances, indent=2, ensure_ascii=False))6. CMDB系统集成与扩展
6.1 与监控系统集成
集成方式:
- 数据同步:将CMDB中的资产信息同步到监控系统,自动创建监控对象
- 状态关联:将监控系统的告警状态关联到CMDB中的资产,展示资产健康状态
- 事件关联:将监控系统的事件关联到CMDB中的配置项,支持影响分析
- 性能数据:将监控系统的性能数据关联到CMDB中的资产,支持性能分析
示例配置:
yaml
# 监控系统集成配置
integration:
monitoring:
- name: "prometheus"
type: "prometheus"
url: "http://prometheus:9090"
credentials:
username: "admin"
password: "password"
sync:
enabled: true
interval: "3600" # 同步间隔(秒)
mapping:
asset_type: "server"
metrics:
- name: "cpu_usage"
query: "100 - (avg by(instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100"
- name: "memory_usage"
query: "(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes * 100"
- name: "disk_usage"
query: "(node_filesystem_size_bytes{mountpoint=\"/\"} - node_filesystem_avail_bytes{mountpoint=\"/\"}) / node_filesystem_size_bytes{mountpoint=\"/\"} * 100"6.2 与发布系统集成
集成方式:
- 环境管理:将CMDB中的环境信息同步到发布系统,支持环境选择
- 配置管理:将发布系统的配置变更同步到CMDB,保持配置一致性
- 影响分析:基于CMDB的依赖关系,分析发布的影响范围
- 变更记录:将发布系统的发布记录同步到CMDB的变更管理模块
示例配置:
yaml
# 发布系统集成配置
integration:
deployment:
- name: "jenkins"
type: "jenkins"
url: "http://jenkins:8080"
credentials:
username: "admin"
password: "password"
sync:
enabled: true
interval: "1800" # 同步间隔(秒)
mapping:
job_name: "app-deploy"
environment: "production"
config_items:
- name: "app-config"
path: "config/application.yml"6.3 与自动化系统集成
集成方式:
- 资产数据:为自动化系统提供资产信息,支持自动化任务的目标选择
- 配置数据:为自动化系统提供配置信息,支持配置自动化
- 关系数据:为自动化系统提供依赖关系,支持影响分析
- 执行结果:将自动化执行结果同步到CMDB,更新配置状态
示例配置:
yaml
# 自动化系统集成配置
integration:
automation:
- name: "ansible"
type: "ansible"
url: "http://ansible:8080"
credentials:
username: "admin"
password: "password"
sync:
enabled: true
interval: "1800" # 同步间隔(秒)
mapping:
inventory: "cmdb_inventory"
variables:
- name: "cmdb_id"
value: "{{ asset.id }}"
- name: "cmdb_name"
value: "{{ asset.name }}"
- name: "cmdb_location"
value: "{{ asset.location }}"7. CMDB系统最佳实践
7.1 数据模型设计最佳实践
设计原则:
- 灵活性:支持扩展,适应业务变化
- 一致性:数据模型统一,避免数据冗余
- 完整性:确保数据的完整性和准确性
- 性能:考虑查询性能,合理设计索引
- 可扩展性:支持未来的功能扩展
设计建议:
- 分层设计:将数据模型分为基础层、业务层和扩展层
- 关系设计:合理设计表之间的关系,避免过度关联
- 字段设计:使用合适的字段类型,避免使用大字段存储频繁查询的数据
- 索引设计:为常用查询字段创建索引,提高查询性能
- 版本控制:对配置项实施版本控制,支持历史追溯
7.2 数据质量保障
保障措施:
- 数据验证:在数据录入时进行验证,确保数据的准确性
- 数据清洗:定期清洗数据,处理重复、错误的数据
- 数据同步:确保与其他系统的数据同步,保持数据一致性
- 数据审计:定期审计数据,发现和纠正数据问题
- 数据治理:建立数据治理机制,明确数据责任
质量指标:
- 数据完整性:必填字段的完整率
- 数据准确性:数据的准确程度
- 数据一致性:不同系统之间数据的一致程度
- 数据时效性:数据的更新及时程度
- 数据可用性:数据的可访问和可使用程度
7.3 自动发现最佳实践
实施策略:
- 多种发现方式:结合网络扫描、Agent和API集成,提高发现的全面性
- 定期发现:设置合理的发现周期,保持数据的实时性
- 发现规则:配置发现规则,过滤和分类发现的资产
- 冲突解决:建立冲突解决机制,处理自动发现和手动录入的冲突
- 增量发现:支持增量发现,只处理变更的资产
最佳实践:
- 先试点后推广:先在小范围试点,验证效果后再推广
- 持续优化:根据发现结果,持续优化发现策略
- 与变更管理结合:将自动发现与变更管理结合,确保配置变更的可控性
- 安全考虑:在实施自动发现时,考虑网络安全和权限控制
7.4 集成与扩展最佳实践
集成策略:
- API优先:优先使用API进行集成,确保接口的标准化
- 松耦合:保持系统之间的松耦合,减少相互依赖
- 数据同步:建立明确的数据同步机制,确保数据一致性
- 错误处理:实施完善的错误处理机制,确保集成的可靠性
- 监控告警:监控集成状态,及时发现和解决集成问题
扩展建议:
- 模块化设计:采用模块化设计,便于功能扩展
- 插件机制:支持插件机制,方便添加新功能
- 配置驱动:使用配置驱动的方式,减少硬编码
- 文档化:提供详细的集成文档,便于其他系统集成
7.5 性能优化最佳实践
优化策略:
- 数据库优化:合理设计索引,优化查询语句,定期清理数据
- 缓存策略:使用缓存,减少数据库查询压力
- 负载均衡:实施负载均衡,分散系统压力
- 异步处理:对耗时操作采用异步处理,提高系统响应速度
- 批量操作:支持批量操作,减少网络往返次数
监控指标:
- 响应时间:API和页面的响应时间
- 吞吐量:系统的处理能力
- 资源利用率:CPU、内存、磁盘等资源的使用情况
- 错误率:系统的错误率
- 并发数:系统的并发处理能力
8. 小结
8.1 CMDB系统开发的关键要素
- 明确需求:理解业务需求,确定CMDB的范围和功能
- 数据模型:设计合理的数据模型,支持业务需求的变化
- 自动发现:实施自动发现,减少手动维护工作量
- 数据质量:建立数据质量保障机制,确保数据的准确性和完整性
- 集成扩展:设计良好的集成接口,支持与其他系统的集成
- 性能优化:优化系统性能,确保系统的响应速度和稳定性
- 安全保障:实施安全措施,确保系统和数据的安全
- 用户体验:设计良好的用户界面,提高用户体验
- 持续改进:基于用户反馈,持续优化系统功能
- 培训推广:加强用户培训,促进系统的推广和使用
8.2 CMDB系统的未来发展
- 智能化:结合AI技术,实现智能发现、智能分析和智能预测
- 云原生:适应云原生环境,支持容器、微服务和Serverless
- DevOps集成:深度集成DevOps工具链,支持持续集成和持续部署
- 自助服务:提供自助服务门户,让用户自主管理资产和配置
- 多租户:支持多租户架构,满足大型企业的需求
- 实时分析:支持实时数据分析,提供实时的业务洞察
- 边缘计算:支持边缘计算场景,管理边缘设备和服务
- 区块链:利用区块链技术,提高配置数据的可信度和不可篡改性
8.3 学习建议
- 循序渐进:从简单的资产登记开始,逐步扩展到完整的CMDB功能
- 实践为主:通过实际项目,积累CMDB系统的开发和实施经验
- 持续学习:关注CMDB领域的新技术和最佳实践
- 系统思考:从整体架构角度,设计和优化CMDB系统
- 协作交流:与团队成员和行业专家交流,学习经验和教训
- 总结反思:定期总结CMDB系统的实施经验,持续改进
通过本课程的学习,你已经掌握了CMDB系统开发的核心技能和最佳实践。在实际工作中,应根据具体业务需求灵活运用这些知识,构建适合自己企业的CMDB系统,为IT服务管理和自动化运维提供有力的数据支撑。