跳转到内容

自定义Ansible模块开发

课程目标

  • 了解Ansible模块的工作原理
  • 掌握自定义Ansible模块的开发方法
  • 学会编写不同类型的Ansible模块
  • 理解Ansible模块的测试和调试方法

1. Ansible模块概述

1.1 什么是Ansible模块

Ansible模块是执行具体任务的最小单元,是Ansible自动化的核心组件。每个模块负责执行特定的任务,如文件操作、服务管理、包安装等。

1.2 Ansible模块的工作原理

  1. 执行流程

    • Ansible控制节点将模块代码传输到目标主机
    • 目标主机执行模块代码
    • 执行结果以JSON格式返回给控制节点
  2. 模块类型

    • 核心模块:Ansible内置的模块
    • 自定义模块:用户根据需求开发的模块
    • 集合模块:由社区或厂商提供的模块

2. 自定义Ansible模块的开发准备

2.1 开发环境搭建

bash
# 安装Ansible
pip install ansible

# 验证安装
ansible --version

2.2 模块开发目录结构

ansible_modules/
├── library/          # 自定义模块目录
│   └── my_module.py  # 自定义模块文件
├── tests/            # 测试目录
│   └── test_my_module.py  # 模块测试文件
└── README.md         # 模块说明文档

3. 编写第一个Ansible模块

3.1 模块基本结构

python
#!/usr/bin/env python3

"""
Ansible模块示例

:Author: Your Name
:Date: 2026-01-27
:Version: 1.0
:Description: 这是一个自定义Ansible模块示例
"""

import json
import sys

# 模块参数定义
DOCUMENTATION = '''
---
module: my_module
short_description: 一个简单的Ansible模块示例
description:
    - 这是一个自定义Ansible模块的示例
    - 用于演示模块开发的基本结构
options:
    name:
        description:
            - 模块参数示例
        required: true
    state:
        description:
            - 状态参数示例
        required: false
        default: present
        choices: ["present", "absent"]
'''

# 示例用法
EXAMPLES = '''
- name: 使用自定义模块
  my_module:
    name: test
    state: present
'''

# 模块返回结构
RETURN = '''
changed:
    description: 模块是否导致了变更
    type: bool
    returned: always
message:
    description: 模块执行结果的消息
    type: str
    returned: always
'''

# 主函数
def main():
    # 导入Ansible模块库
    from ansible.module_utils.basic import AnsibleModule
    
    # 定义模块参数
    module_args = {
        'name': {
            'type': 'str',
            'required': True
        },
        'state': {
            'type': 'str',
            'default': 'present',
            'choices': ['present', 'absent']
        }
    }
    
    # 创建模块实例
    module = AnsibleModule(
        argument_spec=module_args,
        supports_check_mode=True
    )
    
    # 获取参数
    name = module.params['name']
    state = module.params['state']
    
    # 模拟模块执行
    changed = False
    message = ''
    
    if state == 'present':
        message = f"资源 {name} 已创建"
        changed = True
    else:
        message = f"资源 {name} 已删除"
        changed = True
    
    # 返回结果
    module.exit_json(
        changed=changed,
        message=message
    )

if __name__ == '__main__':
    main()

3.2 模块执行原理

  1. 参数解析:Ansible模块通过AnsibleModule类解析传入的参数
  2. 任务执行:根据参数执行具体的任务逻辑
  3. 结果返回:使用exit_jsonfail_json返回执行结果

4. 不同类型的Ansible模块开发

4.1 文件操作模块

python
#!/usr/bin/env python3

"""
文件操作模块
"""

import os
import json
from ansible.module_utils.basic import AnsibleModule

def main():
    module_args = {
        'path': {
            'type': 'path',
            'required': True
        },
        'content': {
            'type': 'str',
            'required': False
        },
        'state': {
            'type': 'str',
            'default': 'file',
            'choices': ['file', 'directory', 'absent']
        }
    }
    
    module = AnsibleModule(
        argument_spec=module_args,
        supports_check_mode=True
    )
    
    path = module.params['path']
    content = module.params['content']
    state = module.params['state']
    
    changed = False
    
    try:
        if state == 'absent':
            if os.path.exists(path):
                if module.check_mode:
                    changed = True
                else:
                    if os.path.isdir(path):
                        import shutil
                        shutil.rmtree(path)
                    else:
                        os.remove(path)
                    changed = True
        elif state == 'directory':
            if not os.path.isdir(path):
                if module.check_mode:
                    changed = True
                else:
                    os.makedirs(path, exist_ok=True)
                    changed = True
        elif state == 'file':
            if not os.path.exists(path) or (content and open(path).read() != content):
                if module.check_mode:
                    changed = True
                else:
                    os.makedirs(os.path.dirname(path), exist_ok=True)
                    with open(path, 'w') as f:
                        f.write(content)
                    changed = True
        
        module.exit_json(
            changed=changed,
            path=path,
            state=state
        )
    except Exception as e:
        module.fail_json(
            msg=f"操作失败: {str(e)}"
        )

if __name__ == '__main__':
    main()

4.2 服务管理模块

python
#!/usr/bin/env python3

"""
服务管理模块
"""

import subprocess
from ansible.module_utils.basic import AnsibleModule

def main():
    module_args = {
        'name': {
            'type': 'str',
            'required': True
        },
        'state': {
            'type': 'str',
            'default': 'started',
            'choices': ['started', 'stopped', 'restarted', 'reloaded', 'enabled', 'disabled']
        }
    }
    
    module = AnsibleModule(
        argument_spec=module_args,
        supports_check_mode=True
    )
    
    name = module.params['name']
    state = module.params['state']
    
    changed = False
    
    try:
        # 检查服务是否存在
        check_cmd = ['systemctl', 'status', name]
        check_result = subprocess.run(check_cmd, capture_output=True, text=True)
        
        if check_result.returncode != 0:
            module.fail_json(msg=f"服务 {name} 不存在")
        
        # 执行服务操作
        if state in ['started', 'stopped', 'restarted', 'reloaded']:
            if module.check_mode:
                changed = True
            else:
                cmd = ['systemctl', state, name]
                subprocess.run(cmd, check=True)
                changed = True
        elif state in ['enabled', 'disabled']:
            # 检查当前状态
            is_enabled = subprocess.run(
                ['systemctl', 'is-enabled', name],
                capture_output=True,
                text=True
            ).stdout.strip() == 'enabled'
            
            if (state == 'enabled' and not is_enabled) or (state == 'disabled' and is_enabled):
                if module.check_mode:
                    changed = True
                else:
                    cmd = ['systemctl', state, name]
                    subprocess.run(cmd, check=True)
                    changed = True
        
        module.exit_json(
            changed=changed,
            service=name,
            state=state
        )
    except Exception as e:
        module.fail_json(
            msg=f"操作失败: {str(e)}"
        )

if __name__ == '__main__':
    main()

4.3 API调用模块

python
#!/usr/bin/env python3

"""
API调用模块
"""

import requests
from ansible.module_utils.basic import AnsibleModule

def main():
    module_args = {
        'url': {
            'type': 'str',
            'required': True
        },
        'method': {
            'type': 'str',
            'default': 'GET',
            'choices': ['GET', 'POST', 'PUT', 'DELETE']
        },
        'data': {
            'type': 'dict',
            'required': False
        },
        'headers': {
            'type': 'dict',
            'required': False
        },
        'timeout': {
            'type': 'int',
            'default': 30
        }
    }
    
    module = AnsibleModule(
        argument_spec=module_args,
        supports_check_mode=True
    )
    
    url = module.params['url']
    method = module.params['method']
    data = module.params['data']
    headers = module.params['headers']
    timeout = module.params['timeout']
    
    if module.check_mode:
        module.exit_json(
            changed=False,
            msg="Check mode: would make API call",
            url=url,
            method=method
        )
    
    try:
        response = requests.request(
            method=method,
            url=url,
            json=data,
            headers=headers,
            timeout=timeout
        )
        
        response.raise_for_status()
        
        module.exit_json(
            changed=True,
            status_code=response.status_code,
            response=response.json() if response.headers.get('Content-Type') == 'application/json' else response.text,
            url=url
        )
    except requests.RequestException as e:
        module.fail_json(
            msg=f"API调用失败: {str(e)}"
        )

if __name__ == '__main__':
    main()

5. Ansible模块的测试和调试

5.1 模块测试方法

  1. 直接执行测试
bash
# 创建测试参数文件
cat > test_args.json << 'EOF'
{
    "ANSIBLE_MODULE_ARGS": {
        "name": "test",
        "state": "present"
    }
}
EOF

# 执行模块
python3 my_module.py < test_args.json
  1. 使用Ansible测试
bash
# 创建测试playbook
cat > test_playbook.yml << 'EOF'
---
- hosts: localhost
  gather_facts: no
  tasks:
    - name: 测试自定义模块
      my_module:
        name: test
        state: present
      register: result
    
    - name: 显示结果
      debug:
        var: result
EOF

# 执行测试
ansible-playbook -M ./library test_playbook.yml

5.2 模块调试技巧

  1. 添加调试输出

    • 使用module.log()记录调试信息
    • 使用print()语句输出调试信息
  2. 错误处理

    • 使用try-except捕获异常
    • 使用module.fail_json()返回错误信息
  3. 检查模式支持

    • 使用module.check_mode判断是否为检查模式
    • 在检查模式下模拟执行,不实际修改系统

6. Ansible模块的最佳实践

6.1 代码规范

  1. 命名规范

    • 模块名称应简洁明了,使用小写字母和下划线
    • 文件名应与模块名一致
  2. 文档规范

    • 提供详细的DOCUMENTATION文档
    • 提供EXAMPLES示例
    • 提供RETURN返回值说明
  3. 代码结构

    • 使用函数封装逻辑
    • 遵循Python代码规范
    • 添加适当的注释

6.2 性能优化

  1. 减少网络传输

    • 模块代码应简洁,减少传输时间
    • 避免在模块中执行长时间运行的任务
  2. 错误处理

    • 及时捕获和处理错误
    • 提供清晰的错误信息
  3. 幂等性

    • 模块应具有幂等性,多次执行结果一致
    • 在执行前检查当前状态,避免不必要的操作

7. 实战案例:服务器状态检查模块

7.1 模块功能

  • 检查服务器的CPU、内存、磁盘使用情况
  • 检查指定服务的运行状态
  • 检查指定端口的开放状态

7.2 模块代码

python
#!/usr/bin/env python3

"""
服务器状态检查模块
"""

import os
import subprocess
import json
from ansible.module_utils.basic import AnsibleModule

def check_cpu():
    """检查CPU使用情况"""
    try:
        result = subprocess.run(
            ['top', '-bn1'],
            capture_output=True,
            text=True
        )
        for line in result.stdout.split('\n'):
            if '%Cpu(s):' in line:
                cpu_line = line.split(':')[1].strip()
                cpu_usage = 100 - float(cpu_line.split(',')[3].strip().split()[0])
                return {'usage': round(cpu_usage, 2)}
    except Exception as e:
        return {'error': str(e)}
    return {'error': '无法获取CPU信息'}

def check_memory():
    """检查内存使用情况"""
    try:
        with open('/proc/meminfo', 'r') as f:
            meminfo = {}
            for line in f:
                key, value = line.split(':', 1)
                meminfo[key] = int(value.strip().split()[0])
        
        total = meminfo.get('MemTotal', 0)
        free = meminfo.get('MemFree', 0)
        buffers = meminfo.get('Buffers', 0)
        cached = meminfo.get('Cached', 0)
        
        used = total - free - buffers - cached
        usage = (used / total) * 100 if total > 0 else 0
        
        return {
            'total': total,
            'used': used,
            'free': free + buffers + cached,
            'usage': round(usage, 2)
        }
    except Exception as e:
        return {'error': str(e)}

def check_disk(path='/'):
    """检查磁盘使用情况"""
    try:
        stat = os.statvfs(path)
        total = stat.f_frsize * stat.f_blocks
        free = stat.f_frsize * stat.f_bavail
        used = total - free
        usage = (used / total) * 100 if total > 0 else 0
        
        return {
            'total': total,
            'used': used,
            'free': free,
            'usage': round(usage, 2),
            'path': path
        }
    except Exception as e:
        return {'error': str(e)}

def check_service(name):
    """检查服务状态"""
    try:
        result = subprocess.run(
            ['systemctl', 'status', name],
            capture_output=True,
            text=True
        )
        if result.returncode == 0:
            return {'status': 'running'}
        else:
            return {'status': 'stopped'}
    except Exception as e:
        return {'error': str(e)}

def check_port(port):
    """检查端口状态"""
    try:
        import socket
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(1)
            result = s.connect_ex(('localhost', port))
            if result == 0:
                return {'status': 'open'}
            else:
                return {'status': 'closed'}
    except Exception as e:
        return {'error': str(e)}

def main():
    module_args = {
        'check_cpu': {
            'type': 'bool',
            'default': False
        },
        'check_memory': {
            'type': 'bool',
            'default': False
        },
        'check_disk': {
            'type': 'bool',
            'default': False
        },
        'disk_path': {
            'type': 'path',
            'default': '/'
        },
        'check_service': {
            'type': 'bool',
            'default': False
        },
        'service_name': {
            'type': 'str',
            'required': False
        },
        'check_port': {
            'type': 'bool',
            'default': False
        },
        'port': {
            'type': 'int',
            'required': False
        }
    }
    
    module = AnsibleModule(
        argument_spec=module_args,
        supports_check_mode=True
    )
    
    results = {}
    
    if module.params['check_cpu']:
        results['cpu'] = check_cpu()
    
    if module.params['check_memory']:
        results['memory'] = check_memory()
    
    if module.params['check_disk']:
        results['disk'] = check_disk(module.params['disk_path'])
    
    if module.params['check_service']:
        if not module.params['service_name']:
            module.fail_json(msg="服务名称必须指定")
        results['service'] = check_service(module.params['service_name'])
    
    if module.params['check_port']:
        if not module.params['port']:
            module.fail_json(msg="端口号必须指定")
        results['port'] = check_port(module.params['port'])
    
    module.exit_json(
        changed=False,
        results=results
    )

if __name__ == '__main__':
    main()

7.3 模块使用示例

yaml
---
- hosts: all
  gather_facts: no
  tasks:
    - name: 检查服务器状态
      server_status:
        check_cpu: true
        check_memory: true
        check_disk: true
        check_service: true
        service_name: sshd
        check_port: true
        port: 22
      register: status
    
    - name: 显示状态
      debug:
        var: status.results
    
    - name: 检查磁盘使用率
      server_status:
        check_disk: true
        disk_path: /var
      register: disk_status
    
    - name: 磁盘使用率告警
      debug:
        msg: "磁盘使用率超过80%"
      when: disk_status.results.disk.usage > 80

7. 课程总结

7.1 重点回顾

  • Ansible模块结构:掌握模块的基本结构和执行流程
  • 模块开发方法:学会编写不同类型的Ansible模块
  • 测试和调试:掌握模块的测试和调试方法
  • 最佳实践:遵循Ansible模块开发的最佳实践

7.2 实践建议

  1. 从简单开始:先开发简单的模块,逐步增加复杂度
  2. 参考现有模块:学习Ansible核心模块的实现方法
  3. 编写文档:为模块提供详细的文档和示例
  4. 测试充分:在不同环境中测试模块的可靠性

7.3 进阶学习

  • Ansible集合开发:将相关模块组织成集合
  • Ansible插件开发:开发其他类型的Ansible插件
  • Ansible角色开发:结合模块开发完整的角色
  • Ansible塔/自动化控制器:学习使用Ansible的企业级解决方案

通过本课程的学习,你已经掌握了自定义Ansible模块的开发方法,可以根据实际需求开发各种类型的Ansible模块,扩展Ansible的功能,提高自动化运维的效率。

评论区

专业的Linux技术学习平台,从入门到精通的完整学习路径