主题
Python常用库与实战应用
课程目标
通过本课程的学习,你将掌握Python中常用的第三方库及其在运维开发中的实战应用,包括网络请求、远程操作、自动化部署等场景。
1. 常用库概览
1.1 标准库与第三方库
Python拥有丰富的标准库和第三方库,为不同场景提供了强大的支持:
| 类型 | 用途 | 代表库 |
|---|---|---|
| 网络请求 | HTTP/HTTPS请求 | requests |
| 远程操作 | SSH远程执行 | paramiko |
| 自动化部署 | 应用部署工具 | fabric |
| 数据处理 | 数据结构处理 | pandas |
| 配置管理 | 配置文件解析 | configparser |
| 日志处理 | 日志记录 | logging |
| 时间处理 | 时间日期操作 | datetime |
| 命令行参数 | 命令行解析 | argparse |
1.2 库的安装方法
使用pip安装第三方库:
bash
# 安装指定库
pip install requests paramiko fabric
# 安装指定版本
pip install requests==2.31.0
# 从requirements.txt安装
pip install -r requirements.txt2. requests库详解
2.1 基本用法
requests是Python中最流行的HTTP客户端库,用于发送HTTP请求:
python
import requests
# 发送GET请求
response = requests.get('https://api.github.com')
print(response.status_code) # 状态码
print(response.json()) # JSON响应
# 发送POST请求
payload = {'key1': 'value1', 'key2': 'value2'}
response = requests.post('https://httpbin.org/post', data=payload)
print(response.text) # 文本响应2.2 高级特性
2.2.1 请求头设置
python
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get('https://api.github.com', headers=headers)2.2.2 超时设置
python
# 设置超时时间
response = requests.get('https://api.github.com', timeout=5)
# 分别设置连接超时和读取超时
response = requests.get('https://api.github.com', timeout=(3, 5))2.2.3 会话管理
python
# 使用会话保持连接
session = requests.Session()
session.auth = ('username', 'password')
session.headers.update({'x-test': 'true'})
# 发送多个请求
response1 = session.get('https://httpbin.org/cookies/set/sessioncookie/123456')
response2 = session.get('https://httpbin.org/cookies')
print(response2.text)2.3 实战案例:API接口测试
python
import requests
import json
def test_api_endpoints():
"""测试API接口"""
endpoints = [
'https://api.github.com',
'https://httpbin.org/get',
'https://jsonplaceholder.typicode.com/posts/1'
]
for endpoint in endpoints:
try:
response = requests.get(endpoint, timeout=5)
print(f"\nTesting {endpoint}:")
print(f"Status code: {response.status_code}")
print(f"Response time: {response.elapsed.total_seconds():.2f}s")
if response.status_code == 200:
data = response.json()
print(f"Response type: {type(data).__name__}")
if isinstance(data, dict):
print(f"Keys: {list(data.keys())[:5]}...")
elif isinstance(data, list):
print(f"Items count: {len(data)}")
if data:
print(f"First item: {data[0]}")
except Exception as e:
print(f"Error testing {endpoint}: {e}")
if __name__ == "__main__":
test_api_endpoints()3. paramiko库详解
3.1 基本用法
paramiko是Python中用于SSH远程连接的库:
python
import paramiko
# 创建SSH客户端
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(
hostname='192.168.1.100',
port=22,
username='root',
password='your_password'
)
# 执行命令
stdin, stdout, stderr = ssh.exec_command('ls -la')
print(stdout.read().decode('utf-8'))
# 关闭连接
ssh.close()3.2 高级特性
3.2.1 密钥认证
python
import paramiko
# 使用密钥文件认证
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
hostname='192.168.1.100',
username='root',
key_filename='/path/to/private_key'
)
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df -h')
print(stdout.read().decode('utf-8'))
ssh.close()3.2.2 SFTP文件传输
python
import paramiko
# 创建SSH客户端
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('192.168.1.100', username='root', password='your_password')
# 创建SFTP客户端
sftp = ssh.open_sftp()
# 上传文件
sftp.put('/local/path/file.txt', '/remote/path/file.txt')
# 下载文件
sftp.get('/remote/path/file.txt', '/local/path/file.txt')
# 关闭连接
sftp.close()
ssh.close()3.3 实战案例:批量执行命令
python
import paramiko
import time
def execute_commands(host, username, password, commands):
"""在远程服务器执行命令"""
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=username, password=password, timeout=10)
results = []
for command in commands:
stdin, stdout, stderr = ssh.exec_command(command)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
results.append({
'command': command,
'output': output,
'error': error
})
ssh.close()
return results
except Exception as e:
return [{'error': str(e)}]
def batch_execute():
"""批量执行命令"""
servers = [
{'host': '192.168.1.100', 'username': 'root', 'password': 'pass123'},
{'host': '192.168.1.101', 'username': 'root', 'password': 'pass123'}
]
commands = [
'uname -a',
'uptime',
'df -h',
'free -m'
]
for server in servers:
print(f"\n=== Executing on {server['host']} ===")
results = execute_commands(
server['host'],
server['username'],
server['password'],
commands
)
for result in results:
if 'error' in result and result['error']:
print(f"Error: {result['error']}")
else:
print(f"Command: {result['command']}")
print(f"Output: {result['output']}")
if result['error']:
print(f"Error: {result['error']}")
if __name__ == "__main__":
batch_execute()4. fabric库详解
4.1 基本用法
fabric是一个高级Python库,用于简化SSH命令执行和应用部署:
python
from fabric import Connection
# 创建连接
c = Connection(
host='192.168.1.100',
user='root',
connect_kwargs={'password': 'your_password'}
)
# 执行命令
result = c.run('uname -a', hide=True)
print(f"System: {result.stdout.strip()}")
# 上传文件
c.put('/local/path/file.txt', '/remote/path/file.txt')
# 下载文件
c.get('/remote/path/file.txt', '/local/path/file.txt')
# 执行本地命令
local_result = c.local('ls -la', capture=True)
print(f"Local files: {local_result.stdout.strip()}")4.2 高级特性
4.2.1 上下文管理器
python
from fabric import Connection
# 使用上下文管理器
with Connection('192.168.1.100', user='root', connect_kwargs={'password': 'your_password'}) as c:
# 执行多个命令
c.run('echo "Starting deployment..."')
c.run('git pull origin master')
c.run('echo "Deployment completed!"')4.2.2 并行执行
python
from fabric import Connection
from fabric.group import ThreadingGroup
def deploy_to_server(c):
"""部署到服务器"""
with c.cd('/app'):
c.run('git pull')
c.run('docker-compose up -d --build')
# 创建服务器组
group = ThreadingGroup(
'192.168.1.100', '192.168.1.101',
user='root',
connect_kwargs={'password': 'your_password'}
)
# 并行执行
results = group.run('uname -a')
for connection, result in results.items():
print(f"{connection.host}: {result.stdout.strip()}")
# 执行部署函数
group.run(deploy_to_server)4.3 实战案例:自动化部署脚本
python
from fabric import Connection
from fabric.group import ThreadingGroup
import time
def deploy_application():
"""自动化部署应用"""
servers = ['192.168.1.100', '192.168.1.101']
# 创建服务器组
group = ThreadingGroup(
*servers,
user='root',
connect_kwargs={'password': 'your_password'}
)
print("=== 开始部署应用 ===")
print(f"目标服务器: {', '.join(servers)}")
# 1. 检查服务器状态
print("\n1. 检查服务器状态...")
results = group.run('uptime', hide=True)
for conn, result in results.items():
print(f" {conn.host}: {result.stdout.strip()}")
# 2. 更新代码
print("\n2. 更新代码...")
with group.cd('/app/my-project'):
group.run('git pull origin master')
# 3. 安装依赖
print("\n3. 安装依赖...")
with group.cd('/app/my-project'):
group.run('pip install -r requirements.txt')
# 4. 重启服务
print("\n4. 重启服务...")
group.run('systemctl restart my-service')
# 5. 验证服务状态
print("\n5. 验证服务状态...")
time.sleep(3) # 等待服务启动
results = group.run('systemctl status my-service --no-pager', hide=True)
for conn, result in results.items():
if 'active (running)' in result.stdout:
print(f" ✅ {conn.host}: 服务运行正常")
else:
print(f" ❌ {conn.host}: 服务启动失败")
print(f" 错误: {result.stdout[:200]}...")
print("\n=== 部署完成 ===")
if __name__ == "__main__":
deploy_application()5. 其他常用库
5.1 pandas库
pandas是Python中用于数据处理和分析的强大库:
python
import pandas as pd
# 读取CSV文件
df = pd.read_csv('data.csv')
# 基本操作
print(df.head()) # 查看前5行
print(df.info()) # 查看数据信息
print(df.describe()) # 统计描述
# 数据过滤
filtered_df = df[df['age'] > 30]
# 数据分组
grouped = df.groupby('department').mean()
# 保存结果
filtered_df.to_csv('filtered_data.csv', index=False)5.2 configparser库
configparser用于解析配置文件:
python
import configparser
# 创建配置解析器
config = configparser.ConfigParser()
# 读取配置文件
config.read('config.ini')
# 获取配置值
database_host = config['database']['host']
database_port = config['database']['port']
# 写入配置
config['new_section'] = {
'key1': 'value1',
'key2': 'value2'
}
with open('config.ini', 'w') as f:
config.write(f)5.3 logging库
logging用于日志记录:
python
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='app.log',
filemode='a'
)
# 创建日志记录器
logger = logging.getLogger('myapp')
# 记录不同级别的日志
logger.debug('This is a debug message')
logger.info('This is an info message')
logger.warning('This is a warning message')
logger.error('This is an error message')
logger.critical('This is a critical message')6. 综合实战:系统监控平台
6.1 项目结构
system-monitor/
├── config.ini # 配置文件
├── monitor.py # 主监控脚本
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── ssh_utils.py # SSH工具
│ ├── api_utils.py # API工具
│ └── data_utils.py # 数据处理工具
└── requirements.txt # 依赖文件6.2 配置文件
ini
[general]
interval = 60 # 监控间隔(秒)
log_file = monitor.log
[database]
host = localhost
port = 3306
user = monitor
password = password
database = monitoring
[servers]
server1 = 192.168.1.100
server2 = 192.168.1.101
server3 = 192.168.1.102
[ssh]
username = root
password = your_password6.3 主监控脚本
python
import configparser
import logging
import time
from utils.ssh_utils import SSHConnection
from utils.data_utils import collect_system_metrics
from utils.api_utils import send_metrics_to_api
def setup_logging(log_file):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename=log_file,
filemode='a'
)
return logging.getLogger('system-monitor')
def load_config(config_file):
"""加载配置"""
config = configparser.ConfigParser()
config.read(config_file)
return config
def monitor_servers(config, logger):
"""监控服务器"""
# 获取配置
interval = int(config['general']['interval'])
servers = list(config['servers'].values())
ssh_username = config['ssh']['username']
ssh_password = config['ssh']['password']
logger.info(f"Starting system monitor for servers: {servers}")
logger.info(f"Monitoring interval: {interval} seconds")
while True:
try:
for server in servers:
logger.info(f"Collecting metrics from {server}")
# 创建SSH连接
with SSHConnection(server, ssh_username, ssh_password) as conn:
# 收集系统指标
metrics = collect_system_metrics(conn)
if metrics:
# 添加服务器信息
metrics['server'] = server
metrics['timestamp'] = time.time()
# 打印指标
logger.info(f"Metrics for {server}: {metrics}")
# 发送指标到API
# send_metrics_to_api(metrics)
# 检查阈值
check_thresholds(metrics, logger)
else:
logger.error(f"Failed to collect metrics from {server}")
except Exception as e:
logger.error(f"Error during monitoring: {e}")
logger.info(f"Sleeping for {interval} seconds...")
time.sleep(interval)
def check_thresholds(metrics, logger):
"""检查阈值"""
thresholds = {
'cpu_usage': 80.0,
'memory_usage': 85.0,
'disk_usage': 90.0
}
for metric, value in metrics.items():
if metric in thresholds and isinstance(value, (int, float)):
if value > thresholds[metric]:
logger.warning(f"{metric} threshold exceeded: {value}% (threshold: {thresholds[metric]}%)")
if __name__ == "__main__":
# 加载配置
config = load_config('config.ini')
# 设置日志
logger = setup_logging(config['general']['log_file'])
# 启动监控
try:
monitor_servers(config, logger)
except KeyboardInterrupt:
logger.info("Monitoring stopped by user")
except Exception as e:
logger.critical(f"Critical error: {e}")6.4 工具函数
python
# utils/ssh_utils.py
import paramiko
class SSHConnection:
"""SSH连接上下文管理器"""
def __init__(self, host, username, password, port=22):
self.host = host
self.username = username
self.password = password
self.port = port
self.ssh = None
def __enter__(self):
"""建立连接"""
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(
hostname=self.host,
port=self.port,
username=self.username,
password=self.password
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""关闭连接"""
if self.ssh:
self.ssh.close()
def execute(self, command):
"""执行命令"""
stdin, stdout, stderr = self.ssh.exec_command(command)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
return output, error
# utils/data_utils.py
def collect_system_metrics(ssh_conn):
"""收集系统指标"""
metrics = {}
try:
# CPU使用率
output, _ = ssh_conn.execute('top -bn1 | grep "Cpu(s)" | sed "s/.*, *\([0-9.]*\)%* id.*/\1/" | awk "{print 100 - $1}"')
metrics['cpu_usage'] = float(output.strip()) if output.strip() else 0.0
# 内存使用率
output, _ = ssh_conn.execute('free | grep Mem | awk "{print $3/$2 * 100.0}"')
metrics['memory_usage'] = float(output.strip()) if output.strip() else 0.0
# 磁盘使用率
output, _ = ssh_conn.execute('df -h / | tail -n1 | awk "{print $5}" | sed "s/%//"')
metrics['disk_usage'] = float(output.strip()) if output.strip() else 0.0
# 负载平均值
output, _ = ssh_conn.execute('uptime | awk "{print $10 $11 $12}"')
metrics['load_average'] = output.strip() if output.strip() else "0.00 0.00 0.00"
# 进程数量
output, _ = ssh_conn.execute('ps aux | wc -l')
metrics['process_count'] = int(output.strip()) if output.strip() else 0
# 网络连接数
output, _ = ssh_conn.execute('netstat -tuln | wc -l')
metrics['network_connections'] = int(output.strip()) if output.strip() else 0
except Exception as e:
print(f"Error collecting metrics: {e}")
return None
return metrics
# utils/api_utils.py
import requests
def send_metrics_to_api(metrics):
"""发送指标到API"""
try:
url = 'http://localhost:5000/api/metrics'
response = requests.post(url, json=metrics, timeout=10)
response.raise_for_status()
return True
except Exception as e:
print(f"Error sending metrics: {e}")
return False6.5 依赖文件
paramiko==3.4.0
requests==2.31.0
fabric==2.7.1
pandas==2.1.47. 性能优化和最佳实践
7.1 性能优化
- 使用连接池:对于频繁的网络请求,使用连接池减少连接建立开销
- 异步编程:使用
asyncio和aiohttp进行异步操作 - 缓存机制:对频繁访问的数据使用缓存
- 批量操作:将多个小操作合并为批量操作
- 资源释放:使用上下文管理器确保资源正确释放
7.2 最佳实践
- 错误处理:完善的异常处理机制
- 日志记录:详细的日志记录便于排查问题
- 配置管理:使用配置文件管理环境差异
- 代码组织:模块化设计,便于维护
- 安全性:避免硬编码敏感信息,使用环境变量或密钥管理
- 测试:编写单元测试确保代码质量
- 文档:完善的代码注释和文档
7.3 安全注意事项
- 密钥管理:避免在代码中硬编码密码和密钥
- 权限控制:遵循最小权限原则
- 输入验证:对所有用户输入进行验证
- 加密传输:使用HTTPS和SSH等加密协议
- 依赖安全:定期更新依赖包以修复安全漏洞
8. 总结
本课程介绍了Python中常用的第三方库及其在运维开发中的应用:
- requests库:用于HTTP请求和API测试
- paramiko库:用于SSH远程连接和命令执行
- fabric库:用于自动化部署和配置管理
- 其他常用库:pandas、configparser、logging等
通过这些库的学习和应用,你可以:
- 开发自动化脚本提高工作效率
- 构建监控系统实时监控服务器状态
- 实现自动化部署流程
- 开发API接口测试工具
- 处理和分析系统数据
这些技能将为你成为一名优秀的运维开发工程师打下坚实的基础。