佛山市护送信息网

一文详解8个Python自动化脚本让你告别重复劳动

2026-04-03 07:19:02 浏览次数:0
详细信息

一文详解:8个Python自动化脚本让你告别重复劳动

本文将介绍8个实用Python自动化脚本,涵盖文件管理、数据处理、办公自动化等常见场景,帮助你提升工作效率。

1. 文件批量重命名脚本

import os
import re

def batch_rename(path, pattern, replacement, extension=None):
    """
    批量重命名文件
    :param path: 目录路径
    :param pattern: 要替换的模式(正则表达式)
    :param replacement: 替换为的内容
    :param extension: 文件扩展名过滤(可选)
    """
    for filename in os.listdir(path):
        # 按扩展名过滤
        if extension and not filename.endswith(extension):
            continue

        # 构建新文件名
        new_name = re.sub(pattern, replacement, filename)

        # 重命名文件
        old_path = os.path.join(path, filename)
        new_path = os.path.join(path, new_name)

        # 避免重名覆盖
        if old_path != new_path and not os.path.exists(new_path):
            os.rename(old_path, new_path)
            print(f"重命名: {filename} -> {new_name}")

# 使用示例:将所有.jpg文件中的"IMG_"替换为"photo_"
batch_rename("./photos", r"IMG_", "photo_", ".jpg")

2. 自动整理下载文件夹

import os
import shutil
from pathlib import Path

def organize_downloads(download_folder, organize_rules=None):
    """
    自动整理下载文件夹
    :param download_folder: 下载文件夹路径
    :param organize_rules: 整理规则字典 {扩展名: 目标文件夹}
    """
    if organize_rules is None:
        # 默认整理规则
        organize_rules = {
            # 文档类
            '.pdf': '文档',
            '.doc': '文档',
            '.docx': '文档',
            '.txt': '文档',
            # 图片类
            '.jpg': '图片',
            '.jpeg': '图片',
            '.png': '图片',
            '.gif': '图片',
            # 压缩包
            '.zip': '压缩包',
            '.rar': '压缩包',
            '.7z': '压缩包',
            # 程序文件
            '.exe': '程序',
            '.msi': '程序',
            '.py': '代码',
        }

    download_path = Path(download_folder)

    # 创建分类文件夹
    for folder in set(organize_rules.values()):
        (download_path / folder).mkdir(exist_ok=True)

    # 移动文件
    for item in download_path.iterdir():
        if item.is_file():
            ext = item.suffix.lower()
            if ext in organize_rules:
                target_folder = organize_rules[ext]
                target_path = download_path / target_folder / item.name

                # 处理重名文件
                counter = 1
                while target_path.exists():
                    name_parts = item.stem, item.suffix
                    new_name = f"{name_parts[0]}_{counter}{name_parts[1]}"
                    target_path = download_path / target_folder / new_name
                    counter += 1

                shutil.move(str(item), str(target_path))
                print(f"已移动: {item.name} -> {target_folder}/")

# 使用示例
organize_downloads(r"C:\Users\YourName\Downloads")

3. Excel数据批量处理脚本

import pandas as pd
import os
from datetime import datetime

def process_excel_files(folder_path, output_file="汇总结果.xlsx"):
    """
    批量处理Excel文件并汇总
    :param folder_path: Excel文件所在文件夹
    :param output_file: 输出文件名
    """
    all_data = []

    for file in os.listdir(folder_path):
        if file.endswith(('.xlsx', '.xls')):
            file_path = os.path.join(folder_path, file)

            try:
                # 读取Excel文件(可指定工作表)
                df = pd.read_excel(file_path, sheet_name=0)

                # 添加来源文件列
                df['来源文件'] = file
                df['处理时间'] = datetime.now()

                # 数据清洗示例:去除空行
                df = df.dropna(how='all')

                all_data.append(df)
                print(f"已处理: {file} (共{len(df)}行)")

            except Exception as e:
                print(f"处理文件 {file} 时出错: {e}")

    if all_data:
        # 合并所有数据
        combined_df = pd.concat(all_data, ignore_index=True)

        # 保存到新文件
        output_path = os.path.join(folder_path, output_file)
        combined_df.to_excel(output_path, index=False)
        print(f"数据已汇总保存至: {output_path} (共{len(combined_df)}行)")

        # 生成统计报告
        generate_report(combined_df, folder_path)
    else:
        print("未找到可处理的Excel文件")

def generate_report(df, folder_path):
    """生成数据统计报告"""
    report = []
    report.append("="*50)
    report.append("数据处理统计报告")
    report.append("="*50)
    report.append(f"总行数: {len(df)}")
    report.append(f"总列数: {len(df.columns)}")
    report.append("\n列信息:")
    for col in df.columns:
        report.append(f"  - {col}: {df[col].dtype}")

    report.append("\n文件来源统计:")
    file_counts = df['来源文件'].value_counts()
    for file, count in file_counts.items():
        report.append(f"  - {file}: {count}行")

    # 保存报告
    report_path = os.path.join(folder_path, "数据处理报告.txt")
    with open(report_path, 'w', encoding='utf-8') as f:
        f.write('\n'.join(report))

    print(f"统计报告已生成: {report_path}")

# 使用示例
process_excel_files("./data_files")

4. 网页内容自动抓取脚本

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from urllib.parse import urljoin

class WebScraper:
    def __init__(self, base_url, headers=None):
        self.base_url = base_url
        self.headers = headers or {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        self.session = requests.Session()

    def fetch_page(self, url):
        """获取网页内容"""
        try:
            response = self.session.get(url, headers=self.headers, timeout=10)
            response.raise_for_status()
            return response.text
        except requests.RequestException as e:
            print(f"请求失败 {url}: {e}")
            return None

    def scrape_links(self, selector, max_pages=5):
        """抓取链接"""
        all_links = []

        for page in range(1, max_pages + 1):
            if page == 1:
                url = self.base_url
            else:
                url = f"{self.base_url}?page={page}"

            html = self.fetch_page(url)
            if not html:
                break

            soup = BeautifulSoup(html, 'html.parser')
            links = soup.select(selector)

            for link in links:
                href = link.get('href')
                if href:
                    full_url = urljoin(self.base_url, href)
                    all_links.append({
                        '标题': link.get_text(strip=True),
                        '链接': full_url
                    })

            print(f"第{page}页完成,找到{len(links)}个链接")
            time.sleep(1)  # 礼貌性延迟

        return all_links

    def save_to_csv(self, data, filename):
        """保存到CSV"""
        if data:
            df = pd.DataFrame(data)
            df.to_csv(filename, index=False, encoding='utf-8-sig')
            print(f"数据已保存至 {filename}")

# 使用示例
if __name__ == "__main__":
    # 示例:抓取新闻标题和链接
    scraper = WebScraper("https://news.example.com")
    links = scraper.scrape_links("h2.news-title a", max_pages=3)
    scraper.save_to_csv(links, "news_links.csv")

5. 自动发送邮件脚本

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import pandas as pd
import os

class EmailAutomation:
    def __init__(self, smtp_server, smtp_port, sender_email, sender_password):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.sender_email = sender_email
        self.sender_password = sender_password

    def send_email(self, recipient, subject, body, attachments=None):
        """发送单封邮件"""
        msg = MIMEMultipart()
        msg['From'] = self.sender_email
        msg['To'] = recipient
        msg['Subject'] = subject

        # 添加正文
        msg.attach(MIMEText(body, 'html' if '<' in body else 'plain'))

        # 添加附件
        if attachments:
            for attachment in attachments:
                if os.path.exists(attachment):
                    with open(attachment, 'rb') as f:
                        part = MIMEBase('application', 'octet-stream')
                        part.set_payload(f.read())
                        encoders.encode_base64(part)
                        part.add_header(
                            'Content-Disposition',
                            f'attachment; filename="{os.path.basename(attachment)}"'
                        )
                        msg.attach(part)

        # 发送邮件
        try:
            with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
                server.starttls()
                server.login(self.sender_email, self.sender_password)
                server.send_message(msg)
            print(f"邮件发送成功: {recipient}")
            return True
        except Exception as e:
            print(f"发送失败 {recipient}: {e}")
            return False

    def batch_send(self, recipients_df, subject_template, body_template):
        """
        批量发送邮件
        :param recipients_df: 包含收件人信息的DataFrame
        :param subject_template: 主题模板,使用{字段名}占位
        :param body_template: 正文模板,使用{字段名}占位
        """
        results = []

        for _, row in recipients_df.iterrows():
            # 个性化替换
            subject = subject_template
            body = body_template

            for col in recipients_df.columns:
                placeholder = '{' + col + '}'
                if placeholder in subject:
                    subject = subject.replace(placeholder, str(row[col]))
                if placeholder in body:
                    body = body.replace(placeholder, str(row[col]))

            # 发送邮件
            success = self.send_email(
                recipient=row['邮箱'],
                subject=subject,
                body=body
            )

            results.append({
                '姓名': row.get('姓名', ''),
                '邮箱': row['邮箱'],
                '状态': '成功' if success else '失败'
            })

        # 保存发送结果
        pd.DataFrame(results).to_csv('邮件发送结果.csv', index=False)
        return results

# 使用示例
def main():
    # 初始化邮件客户端
    email_client = EmailAutomation(
        smtp_server="smtp.gmail.com",
        smtp_port=587,
        sender_email="your_email@gmail.com",
        sender_password="your_password"
    )

    # 读取收件人列表
    recipients = pd.read_csv('收件人列表.csv')

    # 邮件模板
    subject_template = "尊敬的{姓名},您的月度报告已生成"
    body_template = """
    <html>
    <body>
        <h2>尊敬的{姓名},您好!</h2>
        <p>您的{部门}月度工作报告已经生成。</p>
        <p>请查收附件,如有问题请及时联系。</p>
        <br>
        <p>此致<br>公司人力资源部</p>
    </body>
    </html>
    """

    # 批量发送
    email_client.batch_send(recipients, subject_template, body_template)

# 注意:实际使用需要配置邮箱的SMTP服务和授权码

6. 定时任务自动化脚本

import schedule
import time
import datetime
import logging
from typing import Callable

class TaskScheduler:
    def __init__(self):
        self.tasks = []
        self.running = False

        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('task_scheduler.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def add_daily_task(self, task_func: Callable, time_str: str, *args, **kwargs):
        """添加每日定时任务"""
        schedule.every().day.at(time_str).do(
            self._wrapper, task_func, *args, **kwargs
        )
        self.tasks.append({
            'function': task_func.__name__,
            'time': time_str,
            'type': 'daily'
        })
        self.logger.info(f"添加每日任务: {task_func.__name__} 在 {time_str}")

    def add_hourly_task(self, task_func: Callable, minute: int = 0, *args, **kwargs):
        """添加每小时定时任务"""
        schedule.every().hour.at(f":{minute:02d}").do(
            self._wrapper, task_func, *args, **kwargs
        )
        self.tasks.append({
            'function': task_func.__name__,
            'minute': minute,
            'type': 'hourly'
        })
        self.logger.info(f"添加每小时任务: {task_func.__name__} 在第 {minute} 分钟")

    def add_weekly_task(self, task_func: Callable, day: str, time_str: str, *args, **kwargs):
        """添加每周定时任务"""
        getattr(schedule.every(), day).at(time_str).do(
            self._wrapper, task_func, *args, **kwargs
        )
        self.tasks.append({
            'function': task_func.__name__,
            'day': day,
            'time': time_str,
            'type': 'weekly'
        })
        self.logger.info(f"添加每周任务: {task_func.__name__} 在 {day} {time_str}")

    def _wrapper(self, task_func, *args, **kwargs):
        """任务执行包装器,添加异常处理"""
        try:
            self.logger.info(f"开始执行任务: {task_func.__name__}")
            result = task_func(*args, **kwargs)
            self.logger.info(f"任务完成: {task_func.__name__}")
            return result
        except Exception as e:
            self.logger.error(f"任务失败 {task_func.__name__}: {e}")
            return None

    def start(self, run_once=False):
        """启动调度器"""
        self.running = True
        self.logger.info("任务调度器启动")

        if run_once:
            schedule.run_all()
            self.logger.info("所有任务已执行一次")
        else:
            while self.running:
                schedule.run_pending()
                time.sleep(1)

    def stop(self):
        """停止调度器"""
        self.running = False
        self.logger.info("任务调度器停止")

# 示例任务函数
def backup_database():
    """示例:数据库备份任务"""
    import shutil
    import datetime

    source = "data/app.db"
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    destination = f"backups/app_{timestamp}.db"

    shutil.copy2(source, destination)
    print(f"数据库已备份至: {destination}")

def generate_daily_report():
    """示例:生成日报"""
    import pandas as pd
    import numpy as np

    # 模拟生成报告
    data = {
        '日期': [datetime.date.today()],
        '销售额': [np.random.randint(1000, 5000)],
        '订单数': [np.random.randint(50, 200)],
        '用户访问': [np.random.randint(1000, 5000)]
    }

    df = pd.DataFrame(data)
    filename = f"reports/日报_{datetime.date.today()}.csv"
    df.to_csv(filename, index=False)
    print(f"日报已生成: {filename}")

def cleanup_temp_files():
    """示例:清理临时文件"""
    import os
    import glob

    temp_files = glob.glob("temp/*.tmp")
    for file in temp_files:
        try:
            os.remove(file)
            print(f"已删除: {file}")
        except:
            pass

# 使用示例
if __name__ == "__main__":
    scheduler = TaskScheduler()

    # 添加定时任务
    scheduler.add_daily_task(backup_database, "02:00")
    scheduler.add_daily_task(generate_daily_report, "08:30")
    scheduler.add_hourly_task(cleanup_temp_files, minute=30)

    # 打印任务列表
    print("已配置的任务:")
    for task in scheduler.tasks:
        print(f"- {task['function']}: {task}")

    # 运行调度器(这里只运行一次作为演示)
    print("\n开始执行任务...")
    scheduler.start(run_once=True)

7. PDF文档批量处理脚本

import os
from PyPDF2 import PdfReader, PdfWriter
import pandas as pd
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
import io

class PDFAutomation:
    @staticmethod
    def merge_pdfs(input_folder, output_file):
        """合并多个PDF文件"""
        pdf_writer = PdfWriter()

        # 按文件名排序
        pdf_files = sorted(
            [f for f in os.listdir(input_folder) if f.lower().endswith('.pdf')]
        )

        for pdf_file in pdf_files:
            pdf_path = os.path.join(input_folder, pdf_file)
            pdf_reader = PdfReader(pdf_path)

            for page_num in range(len(pdf_reader.pages)):
                page = pdf_reader.pages[page_num]
                pdf_writer.add_page(page)

            print(f"已添加: {pdf_file}")

        # 保存合并后的PDF
        with open(output_file, 'wb') as out:
            pdf_writer.write(out)

        print(f"PDF合并完成: {output_file}")
        return output_file

    @staticmethod
    def split_pdf(input_file, output_folder, pages_per_file=10):
        """拆分PDF文件"""
        pdf_reader = PdfReader(input_file)
        total_pages = len(pdf_reader.pages)

        os.makedirs(output_folder, exist_ok=True)

        file_count = 0
        for start_page in range(0, total_pages, pages_per_file):
            pdf_writer = PdfWriter()
            end_page = min(start_page + pages_per_file, total_pages)

            for page_num in range(start_page, end_page):
                page = pdf_reader.pages[page_num]
                pdf_writer.add_page(page)

            output_file = os.path.join(
                output_folder, 
                f"{os.path.splitext(os.path.basename(input_file))[0]}_part{file_count+1}.pdf"
            )

            with open(output_file, 'wb') as out:
                pdf_writer.write(out)

            print(f"已创建: {output_file} (页 {start_page+1}-{end_page})")
            file_count += 1

        return file_count

    @staticmethod
    def extract_pages(input_file, output_file, page_numbers):
        """提取指定页面"""
        pdf_reader = PdfReader(input_file)
        pdf_writer = PdfWriter()

        for page_num in page_numbers:
            if 1 <= page_num <= len(pdf_reader.pages):
                page = pdf_reader.pages[page_num-1]
                pdf_writer.add_page(page)
            else:
                print(f"警告: 页面 {page_num} 不存在,已跳过")

        with open(output_file, 'wb') as out:
            pdf_writer.write(out)

        print(f"页面提取完成: {output_file}")
        return output_file

    @staticmethod
    def add_watermark(input_file, output_file, watermark_text="机密"):
        """添加水印"""
        # 创建水印PDF
        packet = io.BytesIO()
        can = canvas.Canvas(packet, pagesize=letter)

        # 设置水印样式
        can.setFont("Helvetica", 40)
        can.setFillColorRGB(0.8, 0.8, 0.8, alpha=0.3)  # 浅灰色,半透明
        can.rotate(45)  # 旋转45度
        can.drawString(200, 100, watermark_text)

        can.save()
        packet.seek(0)

        # 读取水印PDF
        watermark_reader = PdfReader(packet)
        watermark_page = watermark_reader.pages[0]

        # 读取原始PDF并添加水印
        pdf_reader = PdfReader(input_file)
        pdf_writer = PdfWriter()

        for page_num in range(len(pdf_reader.pages)):
            page = pdf_reader.pages[page_num]
            page.merge_page(watermark_page)
            pdf_writer.add_page(page)

        # 保存带水印的PDF
        with open(output_file, 'wb') as out:
            pdf_writer.write(out)

        print(f"水印添加完成: {output_file}")
        return output_file

# 使用示例
def main():
    pdf_tool = PDFAutomation()

    # 1. 合并PDF
    pdf_tool.merge_pdfs("./pdf_documents", "合并文档.pdf")

    # 2. 拆分PDF
    pdf_tool.split_pdf("大型文档.pdf", "./拆分结果", pages_per_file=5)

    # 3. 提取页面
    pdf_tool.extract_pages("报告.pdf", "摘要.pdf", [1, 3, 5])

    # 4. 添加水印
    pdf_tool.add_watermark("合同.pdf", "合同_水印.pdf", "公司机密")

if __name__ == "__main__":
    # 注意:需要安装 reportlab 和 PyPDF2
    # pip install reportlab PyPDF2
    main()

8. 系统监控与自动化报告脚本

import psutil
import platform
import os
import datetime
import json
import logging
from typing import Dict, Any

class SystemMonitor:
    def __init__(self, log_file="system_monitor.log"):
        self.log_file = log_file
        self.setup_logging()

    def setup_logging(self):
        """配置日志系统"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(message)s',
            handlers=[
                logging.FileHandler(self.log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def get_system_info(self) -> Dict[str, Any]:
        """获取系统信息"""
        info = {
            'timestamp': datetime.datetime.now().isoformat(),
            'system': {
                'platform': platform.system(),
                'platform_version': platform.version(),
                'architecture': platform.architecture()[0],
                'processor': platform.processor(),
            },
            'cpu': {
                'usage_percent': psutil.cpu_percent(interval=1),
                'cores': psutil.cpu_count(logical=False),
                'threads': psutil.cpu_count(logical=True),
                'frequency': psutil.cpu_freq().current if psutil.cpu_freq() else None,
            },
            'memory': {
                'total_gb': round(psutil.virtual_memory().total / (1024**3), 2),
                'available_gb': round(psutil.virtual_memory().available / (1024**3), 2),
                'used_percent': psutil.virtual_memory().percent,
            },
            'disk': [],
            'network': {
                'bytes_sent': psutil.net_io_counters().bytes_sent,
                'bytes_received': psutil.net_io_counters().bytes_recv,
            },
            'processes': len(psutil.pids()),
        }

        # 磁盘信息
        for partition in psutil.disk_partitions():
            try:
                usage = psutil.disk_usage(partition.mountpoint)
                info['disk'].append({
                    'device': partition.device,
                    'mountpoint': partition.mountpoint,
                    'total_gb': round(usage.total / (1024**3), 2),
                    'used_gb': round(usage.used / (1024**3), 2),
                    'free_gb': round(usage.free / (1024**3), 2),
                    'used_percent': usage.percent,
                })
            except:
                continue

        return info

    def monitor_resources(self, duration_seconds=60, interval=5):
        """监控系统资源"""
        start_time = datetime.datetime.now()
        end_time = start_time + datetime.timedelta(seconds=duration_seconds)

        monitoring_data = []

        print(f"开始监控,将持续 {duration_seconds} 秒...")

        while datetime.datetime.now() < end_time:
            data = self.get_system_info()
            monitoring_data.append(data)

            # 记录到日志
            self.logger.info(
                f"CPU使用率: {data['cpu']['usage_percent']}% | "
                f"内存使用率: {data['memory']['used_percent']}% | "
                f"进程数: {data['processes']}"
            )

            # 检查警报条件
            self.check_alerts(data)

            # 等待下一个间隔
            import time
            time.sleep(interval)

        return monitoring_data

    def check_alerts(self, data):
        """检查警报条件"""
        alerts = []

        # CPU使用率警报
        if data['cpu']['usage_percent'] > 90:
            alerts.append(f"CPU使用率过高: {data['cpu']['usage_percent']}%")

        # 内存使用率警报
        if data['memory']['used_percent'] > 90:
            alerts.append(f"内存使用率过高: {data['memory']['used_percent']}%")

        # 磁盘空间警报
        for disk in data['disk']:
            if disk['used_percent'] > 90:
                alerts.append(f"磁盘 {disk['device']} 空间不足: {disk['used_percent']}%")

        # 如果有警报,发送通知
        if alerts:
            self.send_alert(alerts, data)

    def send_alert(self, alerts, system_data):
        """发送警报(示例:记录到文件)"""
        alert_data = {
            'timestamp': datetime.datetime.now().isoformat(),
            'alerts': alerts,
            'system_state': system_data
        }

        alert_file = "system_alerts.json"

        # 读取现有警报
        existing_alerts = []
        if os.path.exists(alert_file):
            with open(alert_file, 'r') as f:
                try:
                    existing_alerts = json.load(f)
                except:
                    existing_alerts = []

        # 添加新警报
        existing_alerts.append(alert_data)

        # 保存警报
        with open(alert_file, 'w') as f:
            json.dump(existing_alerts, f, indent=2)

        print(f"警报已记录: {', '.join(alerts)}")

    def generate_report(self, monitoring_data, output_format='html'):
        """生成监控报告"""
        if not monitoring_data:
            print("没有监控数据可生成报告")
            return

        if output_format == 'html':
            self.generate_html_report(monitoring_data)
        elif output_format == 'json':
            self.generate_json_report(monitoring_data)
        else:
            self.generate_text_report(monitoring_data)

    def generate_html_report(self, data):
        """生成HTML报告"""
        report_file = "system_report.html"

        html_content = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>系统监控报告</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 20px; }
                .header { background: #2c3e50; color: white; padding: 20px; }
                .metric { margin: 20px 0; padding: 15px; border: 1px solid #ddd; }
                .alert { color: #e74c3c; font-weight: bold; }
                .ok { color: #27ae60; }
            </style>
        </head>
        <body>
            <div class="header">
                <h1>系统监控报告</h1>
                <p>生成时间: {timestamp}</p>
            </div>
        """.format(timestamp=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

        # 使用最新数据
        latest = data[-1] if data else {}

        # CPU信息
        cpu_status = "alert" if latest.get('cpu', {}).get('usage_percent', 0) > 90 else "ok"
        html_content += f"""
            <div class="metric">
                <h2>CPU使用情况</h2>
                <p class="{cpu_status}">使用率: {latest.get('cpu', {}).get('usage_percent', 'N/A')}%</p>
                <p>核心数: {latest.get('cpu', {}).get('cores', 'N/A')}</p>
                <p>线程数: {latest.get('cpu', {}).get('threads', 'N/A')}</p>
            </div>
        """

        # 内存信息
        mem_status = "alert" if latest.get('memory', {}).get('used_percent', 0) > 90 else "ok"
        html_content += f"""
            <div class="metric">
                <h2>内存使用情况</h2>
                <p class="{mem_status}">使用率: {latest.get('memory', {}).get('used_percent', 'N/A')}%</p>
                <p>总量: {latest.get('memory', {}).get('total_gb', 'N/A')} GB</p>
                <p>可用: {latest.get('memory', {}).get('available_gb', 'N/A')} GB</p>
            </div>
        """

        # 磁盘信息
        html_content += """
            <div class="metric">
                <h2>磁盘使用情况</h2>
        """
        for disk in latest.get('disk', []):
            disk_status = "alert" if disk.get('used_percent', 0) > 90 else "ok"
            html_content += f"""
                <div>
                    <h3>{disk.get('device', 'N/A')} ({disk.get('mountpoint', 'N/A')})</h3>
                    <p class="{disk_status}">使用率: {disk.get('used_percent', 'N/A')}%</p>
                    <p>总量: {disk.get('total_gb', 'N/A')} GB</p>
                    <p>已用: {disk.get('used_gb', 'N/A')} GB</p>
                    <p>可用: {disk.get('free_gb', 'N/A')} GB</p>
                </div>
            """

        html_content += """
            </div>
            <div class="metric">
                <h2>其他信息</h2>
                <p>系统: {system_info}</p>
                <p>进程数: {process_count}</p>
            </div>
        </body>
        </html>
        """.format(
            system_info=latest.get('system', {}).get('platform', 'N/A'),
            process_count=latest.get('processes', 'N/A')
        )

        with open(report_file, 'w', encoding='utf-8') as f:
            f.write(html_content)

        print(f"HTML报告已生成: {report_file}")
        return report_file

# 使用示例
if __name__ == "__main__":
    # 注意:需要安装 psutil
    # pip install psutil

    monitor = SystemMonitor()

    # 监控系统60秒,每5秒记录一次
    data = monitor.monitor_resources(duration_seconds=60, interval=5)

    # 生成HTML报告
    monitor.generate_report(data, output_format='html')

    # 打印摘要信息
    if data:
        latest = data[-1]
        print("\n系统状态摘要:")
        print(f"CPU使用率: {latest['cpu']['usage_percent']}%")
        print(f"内存使用率: {latest['memory']['used_percent']}%")
        print(f"运行进程数: {latest['processes']}")

总结与建议

1. 安装所需库

# 基础库
pip install pandas openpyxl requests beautifulsoup4

# 高级功能
pip install schedule psutil PyPDF2 reportlab

# 邮件相关
pip install secure-smtplib

2. 使用技巧

3. 扩展建议

4. 注意事项

这些脚本覆盖了日常工作中的常见自动化需求,你可以根据实际需求进行调整和组合使用,大大提高工作效率!

相关推荐