跳转到内容

Python常用库与实战应用

课程目标

通过本课程的学习,你将掌握Python中常用的第三方库及其在运维开发中的实战应用,包括网络请求、远程操作、自动化部署等场景。

1. 常用库概览

1.1 标准库与第三方库

Python拥有丰富的标准库和第三方库,为不同场景提供了强大的支持:

类型用途代表库
网络请求HTTP/HTTPS请求requests
远程操作SSH远程执行paramiko
自动化部署应用部署工具fabric
数据处理数据结构处理pandas
配置管理配置文件解析configparser
日志处理日志记录logging
时间处理时间日期操作datetime
命令行参数命令行解析argparse

1.2 库的安装方法

使用pip安装第三方库:

bash
# 安装指定库
pip install requests paramiko fabric

# 安装指定版本
pip install requests==2.31.0

# 从requirements.txt安装
pip install -r requirements.txt

2. requests库详解

2.1 基本用法

requests是Python中最流行的HTTP客户端库,用于发送HTTP请求:

python
import requests

# 发送GET请求
response = requests.get('https://api.github.com')
print(response.status_code)  # 状态码
print(response.json())  # JSON响应

# 发送POST请求
payload = {'key1': 'value1', 'key2': 'value2'}
response = requests.post('https://httpbin.org/post', data=payload)
print(response.text)  # 文本响应

2.2 高级特性

2.2.1 请求头设置

python
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get('https://api.github.com', headers=headers)

2.2.2 超时设置

python
# 设置超时时间
response = requests.get('https://api.github.com', timeout=5)

# 分别设置连接超时和读取超时
response = requests.get('https://api.github.com', timeout=(3, 5))

2.2.3 会话管理

python
# 使用会话保持连接
session = requests.Session()
session.auth = ('username', 'password')
session.headers.update({'x-test': 'true'})

# 发送多个请求
response1 = session.get('https://httpbin.org/cookies/set/sessioncookie/123456')
response2 = session.get('https://httpbin.org/cookies')
print(response2.text)

2.3 实战案例:API接口测试

python
import requests
import json

def test_api_endpoints():
    """测试API接口"""
    endpoints = [
        'https://api.github.com',
        'https://httpbin.org/get',
        'https://jsonplaceholder.typicode.com/posts/1'
    ]
    
    for endpoint in endpoints:
        try:
            response = requests.get(endpoint, timeout=5)
            print(f"\nTesting {endpoint}:")
            print(f"Status code: {response.status_code}")
            print(f"Response time: {response.elapsed.total_seconds():.2f}s")
            
            if response.status_code == 200:
                data = response.json()
                print(f"Response type: {type(data).__name__}")
                if isinstance(data, dict):
                    print(f"Keys: {list(data.keys())[:5]}...")
                elif isinstance(data, list):
                    print(f"Items count: {len(data)}")
                    if data:
                        print(f"First item: {data[0]}")
        except Exception as e:
            print(f"Error testing {endpoint}: {e}")

if __name__ == "__main__":
    test_api_endpoints()

3. paramiko库详解

3.1 基本用法

paramiko是Python中用于SSH远程连接的库:

python
import paramiko

# 创建SSH客户端
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 连接服务器
ssh.connect(
    hostname='192.168.1.100',
    port=22,
    username='root',
    password='your_password'
)

# 执行命令
stdin, stdout, stderr = ssh.exec_command('ls -la')
print(stdout.read().decode('utf-8'))

# 关闭连接
ssh.close()

3.2 高级特性

3.2.1 密钥认证

python
import paramiko

# 使用密钥文件认证
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh.connect(
    hostname='192.168.1.100',
    username='root',
    key_filename='/path/to/private_key'
)

# 执行命令
stdin, stdout, stderr = ssh.exec_command('df -h')
print(stdout.read().decode('utf-8'))

ssh.close()

3.2.2 SFTP文件传输

python
import paramiko

# 创建SSH客户端
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('192.168.1.100', username='root', password='your_password')

# 创建SFTP客户端
sftp = ssh.open_sftp()

# 上传文件
sftp.put('/local/path/file.txt', '/remote/path/file.txt')

# 下载文件
sftp.get('/remote/path/file.txt', '/local/path/file.txt')

# 关闭连接
sftp.close()
ssh.close()

3.3 实战案例:批量执行命令

python
import paramiko
import time

def execute_commands(host, username, password, commands):
    """在远程服务器执行命令"""
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(host, username=username, password=password, timeout=10)
        
        results = []
        for command in commands:
            stdin, stdout, stderr = ssh.exec_command(command)
            output = stdout.read().decode('utf-8')
            error = stderr.read().decode('utf-8')
            results.append({
                'command': command,
                'output': output,
                'error': error
            })
        
        ssh.close()
        return results
    except Exception as e:
        return [{'error': str(e)}]

def batch_execute():
    """批量执行命令"""
    servers = [
        {'host': '192.168.1.100', 'username': 'root', 'password': 'pass123'},
        {'host': '192.168.1.101', 'username': 'root', 'password': 'pass123'}
    ]
    
    commands = [
        'uname -a',
        'uptime',
        'df -h',
        'free -m'
    ]
    
    for server in servers:
        print(f"\n=== Executing on {server['host']} ===")
        results = execute_commands(
            server['host'],
            server['username'],
            server['password'],
            commands
        )
        
        for result in results:
            if 'error' in result and result['error']:
                print(f"Error: {result['error']}")
            else:
                print(f"Command: {result['command']}")
                print(f"Output: {result['output']}")
                if result['error']:
                    print(f"Error: {result['error']}")

if __name__ == "__main__":
    batch_execute()

4. fabric库详解

4.1 基本用法

fabric是一个高级Python库,用于简化SSH命令执行和应用部署:

python
from fabric import Connection

# 创建连接
c = Connection(
    host='192.168.1.100',
    user='root',
    connect_kwargs={'password': 'your_password'}
)

# 执行命令
result = c.run('uname -a', hide=True)
print(f"System: {result.stdout.strip()}")

# 上传文件
c.put('/local/path/file.txt', '/remote/path/file.txt')

# 下载文件
c.get('/remote/path/file.txt', '/local/path/file.txt')

# 执行本地命令
local_result = c.local('ls -la', capture=True)
print(f"Local files: {local_result.stdout.strip()}")

4.2 高级特性

4.2.1 上下文管理器

python
from fabric import Connection

# 使用上下文管理器
with Connection('192.168.1.100', user='root', connect_kwargs={'password': 'your_password'}) as c:
    # 执行多个命令
    c.run('echo "Starting deployment..."')
    c.run('git pull origin master')
    c.run('echo "Deployment completed!"')

4.2.2 并行执行

python
from fabric import Connection
from fabric.group import ThreadingGroup

def deploy_to_server(c):
    """部署到服务器"""
    with c.cd('/app'):
        c.run('git pull')
        c.run('docker-compose up -d --build')

# 创建服务器组
group = ThreadingGroup(
    '192.168.1.100', '192.168.1.101',
    user='root',
    connect_kwargs={'password': 'your_password'}
)

# 并行执行
results = group.run('uname -a')
for connection, result in results.items():
    print(f"{connection.host}: {result.stdout.strip()}")

# 执行部署函数
group.run(deploy_to_server)

4.3 实战案例:自动化部署脚本

python
from fabric import Connection
from fabric.group import ThreadingGroup
import time

def deploy_application():
    """自动化部署应用"""
    servers = ['192.168.1.100', '192.168.1.101']
    
    # 创建服务器组
    group = ThreadingGroup(
        *servers,
        user='root',
        connect_kwargs={'password': 'your_password'}
    )
    
    print("=== 开始部署应用 ===")
    print(f"目标服务器: {', '.join(servers)}")
    
    # 1. 检查服务器状态
    print("\n1. 检查服务器状态...")
    results = group.run('uptime', hide=True)
    for conn, result in results.items():
        print(f"  {conn.host}: {result.stdout.strip()}")
    
    # 2. 更新代码
    print("\n2. 更新代码...")
    with group.cd('/app/my-project'):
        group.run('git pull origin master')
    
    # 3. 安装依赖
    print("\n3. 安装依赖...")
    with group.cd('/app/my-project'):
        group.run('pip install -r requirements.txt')
    
    # 4. 重启服务
    print("\n4. 重启服务...")
    group.run('systemctl restart my-service')
    
    # 5. 验证服务状态
    print("\n5. 验证服务状态...")
    time.sleep(3)  # 等待服务启动
    results = group.run('systemctl status my-service --no-pager', hide=True)
    for conn, result in results.items():
        if 'active (running)' in result.stdout:
            print(f"  ✅ {conn.host}: 服务运行正常")
        else:
            print(f"  ❌ {conn.host}: 服务启动失败")
            print(f"     错误: {result.stdout[:200]}...")
    
    print("\n=== 部署完成 ===")

if __name__ == "__main__":
    deploy_application()

5. 其他常用库

5.1 pandas库

pandas是Python中用于数据处理和分析的强大库:

python
import pandas as pd

# 读取CSV文件
df = pd.read_csv('data.csv')

# 基本操作
print(df.head())  # 查看前5行
print(df.info())  # 查看数据信息
print(df.describe())  # 统计描述

# 数据过滤
filtered_df = df[df['age'] > 30]

# 数据分组
grouped = df.groupby('department').mean()

# 保存结果
filtered_df.to_csv('filtered_data.csv', index=False)

5.2 configparser库

configparser用于解析配置文件:

python
import configparser

# 创建配置解析器
config = configparser.ConfigParser()

# 读取配置文件
config.read('config.ini')

# 获取配置值
database_host = config['database']['host']
database_port = config['database']['port']

# 写入配置
config['new_section'] = {
    'key1': 'value1',
    'key2': 'value2'
}

with open('config.ini', 'w') as f:
    config.write(f)

5.3 logging库

logging用于日志记录:

python
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    filename='app.log',
    filemode='a'
)

# 创建日志记录器
logger = logging.getLogger('myapp')

# 记录不同级别的日志
logger.debug('This is a debug message')
logger.info('This is an info message')
logger.warning('This is a warning message')
logger.error('This is an error message')
logger.critical('This is a critical message')

6. 综合实战:系统监控平台

6.1 项目结构

system-monitor/
├── config.ini          # 配置文件
├── monitor.py          # 主监控脚本
├── utils/              # 工具函数
│   ├── __init__.py
│   ├── ssh_utils.py    # SSH工具
│   ├── api_utils.py    # API工具
│   └── data_utils.py   # 数据处理工具
└── requirements.txt    # 依赖文件

6.2 配置文件

ini
[general]
interval = 60  # 监控间隔(秒)
log_file = monitor.log

[database]
host = localhost
port = 3306
user = monitor
password = password
database = monitoring

[servers]
server1 = 192.168.1.100
server2 = 192.168.1.101
server3 = 192.168.1.102

[ssh]
username = root
password = your_password

6.3 主监控脚本

python
import configparser
import logging
import time
from utils.ssh_utils import SSHConnection
from utils.data_utils import collect_system_metrics
from utils.api_utils import send_metrics_to_api

def setup_logging(log_file):
    """设置日志"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        filename=log_file,
        filemode='a'
    )
    return logging.getLogger('system-monitor')

def load_config(config_file):
    """加载配置"""
    config = configparser.ConfigParser()
    config.read(config_file)
    return config

def monitor_servers(config, logger):
    """监控服务器"""
    # 获取配置
    interval = int(config['general']['interval'])
    servers = list(config['servers'].values())
    ssh_username = config['ssh']['username']
    ssh_password = config['ssh']['password']
    
    logger.info(f"Starting system monitor for servers: {servers}")
    logger.info(f"Monitoring interval: {interval} seconds")
    
    while True:
        try:
            for server in servers:
                logger.info(f"Collecting metrics from {server}")
                
                # 创建SSH连接
                with SSHConnection(server, ssh_username, ssh_password) as conn:
                    # 收集系统指标
                    metrics = collect_system_metrics(conn)
                    
                    if metrics:
                        # 添加服务器信息
                        metrics['server'] = server
                        metrics['timestamp'] = time.time()
                        
                        # 打印指标
                        logger.info(f"Metrics for {server}: {metrics}")
                        
                        # 发送指标到API
                        # send_metrics_to_api(metrics)
                        
                        # 检查阈值
                        check_thresholds(metrics, logger)
                    else:
                        logger.error(f"Failed to collect metrics from {server}")
                        
        except Exception as e:
            logger.error(f"Error during monitoring: {e}")
        
        logger.info(f"Sleeping for {interval} seconds...")
        time.sleep(interval)

def check_thresholds(metrics, logger):
    """检查阈值"""
    thresholds = {
        'cpu_usage': 80.0,
        'memory_usage': 85.0,
        'disk_usage': 90.0
    }
    
    for metric, value in metrics.items():
        if metric in thresholds and isinstance(value, (int, float)):
            if value > thresholds[metric]:
                logger.warning(f"{metric} threshold exceeded: {value}% (threshold: {thresholds[metric]}%)")

if __name__ == "__main__":
    # 加载配置
    config = load_config('config.ini')
    
    # 设置日志
    logger = setup_logging(config['general']['log_file'])
    
    # 启动监控
    try:
        monitor_servers(config, logger)
    except KeyboardInterrupt:
        logger.info("Monitoring stopped by user")
    except Exception as e:
        logger.critical(f"Critical error: {e}")

6.4 工具函数

python
# utils/ssh_utils.py
import paramiko

class SSHConnection:
    """SSH连接上下文管理器"""
    
    def __init__(self, host, username, password, port=22):
        self.host = host
        self.username = username
        self.password = password
        self.port = port
        self.ssh = None
    
    def __enter__(self):
        """建立连接"""
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.ssh.connect(
            hostname=self.host,
            port=self.port,
            username=self.username,
            password=self.password
        )
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """关闭连接"""
        if self.ssh:
            self.ssh.close()
    
    def execute(self, command):
        """执行命令"""
        stdin, stdout, stderr = self.ssh.exec_command(command)
        output = stdout.read().decode('utf-8')
        error = stderr.read().decode('utf-8')
        return output, error

# utils/data_utils.py
def collect_system_metrics(ssh_conn):
    """收集系统指标"""
    metrics = {}
    
    try:
        # CPU使用率
        output, _ = ssh_conn.execute('top -bn1 | grep "Cpu(s)" | sed "s/.*, *\([0-9.]*\)%* id.*/\1/" | awk "{print 100 - $1}"')
        metrics['cpu_usage'] = float(output.strip()) if output.strip() else 0.0
        
        # 内存使用率
        output, _ = ssh_conn.execute('free | grep Mem | awk "{print $3/$2 * 100.0}"')
        metrics['memory_usage'] = float(output.strip()) if output.strip() else 0.0
        
        # 磁盘使用率
        output, _ = ssh_conn.execute('df -h / | tail -n1 | awk "{print $5}" | sed "s/%//"')
        metrics['disk_usage'] = float(output.strip()) if output.strip() else 0.0
        
        # 负载平均值
        output, _ = ssh_conn.execute('uptime | awk "{print $10 $11 $12}"')
        metrics['load_average'] = output.strip() if output.strip() else "0.00 0.00 0.00"
        
        # 进程数量
        output, _ = ssh_conn.execute('ps aux | wc -l')
        metrics['process_count'] = int(output.strip()) if output.strip() else 0
        
        # 网络连接数
        output, _ = ssh_conn.execute('netstat -tuln | wc -l')
        metrics['network_connections'] = int(output.strip()) if output.strip() else 0
        
    except Exception as e:
        print(f"Error collecting metrics: {e}")
        return None
    
    return metrics

# utils/api_utils.py
import requests

def send_metrics_to_api(metrics):
    """发送指标到API"""
    try:
        url = 'http://localhost:5000/api/metrics'
        response = requests.post(url, json=metrics, timeout=10)
        response.raise_for_status()
        return True
    except Exception as e:
        print(f"Error sending metrics: {e}")
        return False

6.5 依赖文件

paramiko==3.4.0
requests==2.31.0
fabric==2.7.1
pandas==2.1.4

7. 性能优化和最佳实践

7.1 性能优化

  1. 使用连接池:对于频繁的网络请求,使用连接池减少连接建立开销
  2. 异步编程:使用asyncioaiohttp进行异步操作
  3. 缓存机制:对频繁访问的数据使用缓存
  4. 批量操作:将多个小操作合并为批量操作
  5. 资源释放:使用上下文管理器确保资源正确释放

7.2 最佳实践

  1. 错误处理:完善的异常处理机制
  2. 日志记录:详细的日志记录便于排查问题
  3. 配置管理:使用配置文件管理环境差异
  4. 代码组织:模块化设计,便于维护
  5. 安全性:避免硬编码敏感信息,使用环境变量或密钥管理
  6. 测试:编写单元测试确保代码质量
  7. 文档:完善的代码注释和文档

7.3 安全注意事项

  1. 密钥管理:避免在代码中硬编码密码和密钥
  2. 权限控制:遵循最小权限原则
  3. 输入验证:对所有用户输入进行验证
  4. 加密传输:使用HTTPS和SSH等加密协议
  5. 依赖安全:定期更新依赖包以修复安全漏洞

8. 总结

本课程介绍了Python中常用的第三方库及其在运维开发中的应用:

  1. requests库:用于HTTP请求和API测试
  2. paramiko库:用于SSH远程连接和命令执行
  3. fabric库:用于自动化部署和配置管理
  4. 其他常用库:pandas、configparser、logging等

通过这些库的学习和应用,你可以:

  • 开发自动化脚本提高工作效率
  • 构建监控系统实时监控服务器状态
  • 实现自动化部署流程
  • 开发API接口测试工具
  • 处理和分析系统数据

这些技能将为你成为一名优秀的运维开发工程师打下坚实的基础。

评论区

专业的Linux技术学习平台,从入门到精通的完整学习路径