一文详解:8个Python自动化脚本让你告别重复劳动
本文将介绍8个实用Python自动化脚本,涵盖文件管理、数据处理、办公自动化等常见场景,帮助你提升工作效率。
1. 文件批量重命名脚本
import os
import re
def batch_rename(path, pattern, replacement, extension=None):
"""
批量重命名文件
:param path: 目录路径
:param pattern: 要替换的模式(正则表达式)
:param replacement: 替换为的内容
:param extension: 文件扩展名过滤(可选)
"""
for filename in os.listdir(path):
# 按扩展名过滤
if extension and not filename.endswith(extension):
continue
# 构建新文件名
new_name = re.sub(pattern, replacement, filename)
# 重命名文件
old_path = os.path.join(path, filename)
new_path = os.path.join(path, new_name)
# 避免重名覆盖
if old_path != new_path and not os.path.exists(new_path):
os.rename(old_path, new_path)
print(f"重命名: {filename} -> {new_name}")
# 使用示例:将所有.jpg文件中的"IMG_"替换为"photo_"
batch_rename("./photos", r"IMG_", "photo_", ".jpg")
2. 自动整理下载文件夹
import os
import shutil
from pathlib import Path
def organize_downloads(download_folder, organize_rules=None):
"""
自动整理下载文件夹
:param download_folder: 下载文件夹路径
:param organize_rules: 整理规则字典 {扩展名: 目标文件夹}
"""
if organize_rules is None:
# 默认整理规则
organize_rules = {
# 文档类
'.pdf': '文档',
'.doc': '文档',
'.docx': '文档',
'.txt': '文档',
# 图片类
'.jpg': '图片',
'.jpeg': '图片',
'.png': '图片',
'.gif': '图片',
# 压缩包
'.zip': '压缩包',
'.rar': '压缩包',
'.7z': '压缩包',
# 程序文件
'.exe': '程序',
'.msi': '程序',
'.py': '代码',
}
download_path = Path(download_folder)
# 创建分类文件夹
for folder in set(organize_rules.values()):
(download_path / folder).mkdir(exist_ok=True)
# 移动文件
for item in download_path.iterdir():
if item.is_file():
ext = item.suffix.lower()
if ext in organize_rules:
target_folder = organize_rules[ext]
target_path = download_path / target_folder / item.name
# 处理重名文件
counter = 1
while target_path.exists():
name_parts = item.stem, item.suffix
new_name = f"{name_parts[0]}_{counter}{name_parts[1]}"
target_path = download_path / target_folder / new_name
counter += 1
shutil.move(str(item), str(target_path))
print(f"已移动: {item.name} -> {target_folder}/")
# 使用示例
organize_downloads(r"C:\Users\YourName\Downloads")
3. Excel数据批量处理脚本
import pandas as pd
import os
from datetime import datetime
def process_excel_files(folder_path, output_file="汇总结果.xlsx"):
"""
批量处理Excel文件并汇总
:param folder_path: Excel文件所在文件夹
:param output_file: 输出文件名
"""
all_data = []
for file in os.listdir(folder_path):
if file.endswith(('.xlsx', '.xls')):
file_path = os.path.join(folder_path, file)
try:
# 读取Excel文件(可指定工作表)
df = pd.read_excel(file_path, sheet_name=0)
# 添加来源文件列
df['来源文件'] = file
df['处理时间'] = datetime.now()
# 数据清洗示例:去除空行
df = df.dropna(how='all')
all_data.append(df)
print(f"已处理: {file} (共{len(df)}行)")
except Exception as e:
print(f"处理文件 {file} 时出错: {e}")
if all_data:
# 合并所有数据
combined_df = pd.concat(all_data, ignore_index=True)
# 保存到新文件
output_path = os.path.join(folder_path, output_file)
combined_df.to_excel(output_path, index=False)
print(f"数据已汇总保存至: {output_path} (共{len(combined_df)}行)")
# 生成统计报告
generate_report(combined_df, folder_path)
else:
print("未找到可处理的Excel文件")
def generate_report(df, folder_path):
"""生成数据统计报告"""
report = []
report.append("="*50)
report.append("数据处理统计报告")
report.append("="*50)
report.append(f"总行数: {len(df)}")
report.append(f"总列数: {len(df.columns)}")
report.append("\n列信息:")
for col in df.columns:
report.append(f" - {col}: {df[col].dtype}")
report.append("\n文件来源统计:")
file_counts = df['来源文件'].value_counts()
for file, count in file_counts.items():
report.append(f" - {file}: {count}行")
# 保存报告
report_path = os.path.join(folder_path, "数据处理报告.txt")
with open(report_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(report))
print(f"统计报告已生成: {report_path}")
# 使用示例
process_excel_files("./data_files")
4. 网页内容自动抓取脚本
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from urllib.parse import urljoin
class WebScraper:
def __init__(self, base_url, headers=None):
self.base_url = base_url
self.headers = headers or {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
self.session = requests.Session()
def fetch_page(self, url):
"""获取网页内容"""
try:
response = self.session.get(url, headers=self.headers, timeout=10)
response.raise_for_status()
return response.text
except requests.RequestException as e:
print(f"请求失败 {url}: {e}")
return None
def scrape_links(self, selector, max_pages=5):
"""抓取链接"""
all_links = []
for page in range(1, max_pages + 1):
if page == 1:
url = self.base_url
else:
url = f"{self.base_url}?page={page}"
html = self.fetch_page(url)
if not html:
break
soup = BeautifulSoup(html, 'html.parser')
links = soup.select(selector)
for link in links:
href = link.get('href')
if href:
full_url = urljoin(self.base_url, href)
all_links.append({
'标题': link.get_text(strip=True),
'链接': full_url
})
print(f"第{page}页完成,找到{len(links)}个链接")
time.sleep(1) # 礼貌性延迟
return all_links
def save_to_csv(self, data, filename):
"""保存到CSV"""
if data:
df = pd.DataFrame(data)
df.to_csv(filename, index=False, encoding='utf-8-sig')
print(f"数据已保存至 {filename}")
# 使用示例
if __name__ == "__main__":
# 示例:抓取新闻标题和链接
scraper = WebScraper("https://news.example.com")
links = scraper.scrape_links("h2.news-title a", max_pages=3)
scraper.save_to_csv(links, "news_links.csv")
5. 自动发送邮件脚本
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import pandas as pd
import os
class EmailAutomation:
def __init__(self, smtp_server, smtp_port, sender_email, sender_password):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.sender_email = sender_email
self.sender_password = sender_password
def send_email(self, recipient, subject, body, attachments=None):
"""发送单封邮件"""
msg = MIMEMultipart()
msg['From'] = self.sender_email
msg['To'] = recipient
msg['Subject'] = subject
# 添加正文
msg.attach(MIMEText(body, 'html' if '<' in body else 'plain'))
# 添加附件
if attachments:
for attachment in attachments:
if os.path.exists(attachment):
with open(attachment, 'rb') as f:
part = MIMEBase('application', 'octet-stream')
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename="{os.path.basename(attachment)}"'
)
msg.attach(part)
# 发送邮件
try:
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.sender_email, self.sender_password)
server.send_message(msg)
print(f"邮件发送成功: {recipient}")
return True
except Exception as e:
print(f"发送失败 {recipient}: {e}")
return False
def batch_send(self, recipients_df, subject_template, body_template):
"""
批量发送邮件
:param recipients_df: 包含收件人信息的DataFrame
:param subject_template: 主题模板,使用{字段名}占位
:param body_template: 正文模板,使用{字段名}占位
"""
results = []
for _, row in recipients_df.iterrows():
# 个性化替换
subject = subject_template
body = body_template
for col in recipients_df.columns:
placeholder = '{' + col + '}'
if placeholder in subject:
subject = subject.replace(placeholder, str(row[col]))
if placeholder in body:
body = body.replace(placeholder, str(row[col]))
# 发送邮件
success = self.send_email(
recipient=row['邮箱'],
subject=subject,
body=body
)
results.append({
'姓名': row.get('姓名', ''),
'邮箱': row['邮箱'],
'状态': '成功' if success else '失败'
})
# 保存发送结果
pd.DataFrame(results).to_csv('邮件发送结果.csv', index=False)
return results
# 使用示例
def main():
# 初始化邮件客户端
email_client = EmailAutomation(
smtp_server="smtp.gmail.com",
smtp_port=587,
sender_email="your_email@gmail.com",
sender_password="your_password"
)
# 读取收件人列表
recipients = pd.read_csv('收件人列表.csv')
# 邮件模板
subject_template = "尊敬的{姓名},您的月度报告已生成"
body_template = """
<html>
<body>
<h2>尊敬的{姓名},您好!</h2>
<p>您的{部门}月度工作报告已经生成。</p>
<p>请查收附件,如有问题请及时联系。</p>
<br>
<p>此致<br>公司人力资源部</p>
</body>
</html>
"""
# 批量发送
email_client.batch_send(recipients, subject_template, body_template)
# 注意:实际使用需要配置邮箱的SMTP服务和授权码
6. 定时任务自动化脚本
import schedule
import time
import datetime
import logging
from typing import Callable
class TaskScheduler:
def __init__(self):
self.tasks = []
self.running = False
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('task_scheduler.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def add_daily_task(self, task_func: Callable, time_str: str, *args, **kwargs):
"""添加每日定时任务"""
schedule.every().day.at(time_str).do(
self._wrapper, task_func, *args, **kwargs
)
self.tasks.append({
'function': task_func.__name__,
'time': time_str,
'type': 'daily'
})
self.logger.info(f"添加每日任务: {task_func.__name__} 在 {time_str}")
def add_hourly_task(self, task_func: Callable, minute: int = 0, *args, **kwargs):
"""添加每小时定时任务"""
schedule.every().hour.at(f":{minute:02d}").do(
self._wrapper, task_func, *args, **kwargs
)
self.tasks.append({
'function': task_func.__name__,
'minute': minute,
'type': 'hourly'
})
self.logger.info(f"添加每小时任务: {task_func.__name__} 在第 {minute} 分钟")
def add_weekly_task(self, task_func: Callable, day: str, time_str: str, *args, **kwargs):
"""添加每周定时任务"""
getattr(schedule.every(), day).at(time_str).do(
self._wrapper, task_func, *args, **kwargs
)
self.tasks.append({
'function': task_func.__name__,
'day': day,
'time': time_str,
'type': 'weekly'
})
self.logger.info(f"添加每周任务: {task_func.__name__} 在 {day} {time_str}")
def _wrapper(self, task_func, *args, **kwargs):
"""任务执行包装器,添加异常处理"""
try:
self.logger.info(f"开始执行任务: {task_func.__name__}")
result = task_func(*args, **kwargs)
self.logger.info(f"任务完成: {task_func.__name__}")
return result
except Exception as e:
self.logger.error(f"任务失败 {task_func.__name__}: {e}")
return None
def start(self, run_once=False):
"""启动调度器"""
self.running = True
self.logger.info("任务调度器启动")
if run_once:
schedule.run_all()
self.logger.info("所有任务已执行一次")
else:
while self.running:
schedule.run_pending()
time.sleep(1)
def stop(self):
"""停止调度器"""
self.running = False
self.logger.info("任务调度器停止")
# 示例任务函数
def backup_database():
"""示例:数据库备份任务"""
import shutil
import datetime
source = "data/app.db"
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
destination = f"backups/app_{timestamp}.db"
shutil.copy2(source, destination)
print(f"数据库已备份至: {destination}")
def generate_daily_report():
"""示例:生成日报"""
import pandas as pd
import numpy as np
# 模拟生成报告
data = {
'日期': [datetime.date.today()],
'销售额': [np.random.randint(1000, 5000)],
'订单数': [np.random.randint(50, 200)],
'用户访问': [np.random.randint(1000, 5000)]
}
df = pd.DataFrame(data)
filename = f"reports/日报_{datetime.date.today()}.csv"
df.to_csv(filename, index=False)
print(f"日报已生成: {filename}")
def cleanup_temp_files():
"""示例:清理临时文件"""
import os
import glob
temp_files = glob.glob("temp/*.tmp")
for file in temp_files:
try:
os.remove(file)
print(f"已删除: {file}")
except:
pass
# 使用示例
if __name__ == "__main__":
scheduler = TaskScheduler()
# 添加定时任务
scheduler.add_daily_task(backup_database, "02:00")
scheduler.add_daily_task(generate_daily_report, "08:30")
scheduler.add_hourly_task(cleanup_temp_files, minute=30)
# 打印任务列表
print("已配置的任务:")
for task in scheduler.tasks:
print(f"- {task['function']}: {task}")
# 运行调度器(这里只运行一次作为演示)
print("\n开始执行任务...")
scheduler.start(run_once=True)
7. PDF文档批量处理脚本
import os
from PyPDF2 import PdfReader, PdfWriter
import pandas as pd
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
import io
class PDFAutomation:
@staticmethod
def merge_pdfs(input_folder, output_file):
"""合并多个PDF文件"""
pdf_writer = PdfWriter()
# 按文件名排序
pdf_files = sorted(
[f for f in os.listdir(input_folder) if f.lower().endswith('.pdf')]
)
for pdf_file in pdf_files:
pdf_path = os.path.join(input_folder, pdf_file)
pdf_reader = PdfReader(pdf_path)
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
pdf_writer.add_page(page)
print(f"已添加: {pdf_file}")
# 保存合并后的PDF
with open(output_file, 'wb') as out:
pdf_writer.write(out)
print(f"PDF合并完成: {output_file}")
return output_file
@staticmethod
def split_pdf(input_file, output_folder, pages_per_file=10):
"""拆分PDF文件"""
pdf_reader = PdfReader(input_file)
total_pages = len(pdf_reader.pages)
os.makedirs(output_folder, exist_ok=True)
file_count = 0
for start_page in range(0, total_pages, pages_per_file):
pdf_writer = PdfWriter()
end_page = min(start_page + pages_per_file, total_pages)
for page_num in range(start_page, end_page):
page = pdf_reader.pages[page_num]
pdf_writer.add_page(page)
output_file = os.path.join(
output_folder,
f"{os.path.splitext(os.path.basename(input_file))[0]}_part{file_count+1}.pdf"
)
with open(output_file, 'wb') as out:
pdf_writer.write(out)
print(f"已创建: {output_file} (页 {start_page+1}-{end_page})")
file_count += 1
return file_count
@staticmethod
def extract_pages(input_file, output_file, page_numbers):
"""提取指定页面"""
pdf_reader = PdfReader(input_file)
pdf_writer = PdfWriter()
for page_num in page_numbers:
if 1 <= page_num <= len(pdf_reader.pages):
page = pdf_reader.pages[page_num-1]
pdf_writer.add_page(page)
else:
print(f"警告: 页面 {page_num} 不存在,已跳过")
with open(output_file, 'wb') as out:
pdf_writer.write(out)
print(f"页面提取完成: {output_file}")
return output_file
@staticmethod
def add_watermark(input_file, output_file, watermark_text="机密"):
"""添加水印"""
# 创建水印PDF
packet = io.BytesIO()
can = canvas.Canvas(packet, pagesize=letter)
# 设置水印样式
can.setFont("Helvetica", 40)
can.setFillColorRGB(0.8, 0.8, 0.8, alpha=0.3) # 浅灰色,半透明
can.rotate(45) # 旋转45度
can.drawString(200, 100, watermark_text)
can.save()
packet.seek(0)
# 读取水印PDF
watermark_reader = PdfReader(packet)
watermark_page = watermark_reader.pages[0]
# 读取原始PDF并添加水印
pdf_reader = PdfReader(input_file)
pdf_writer = PdfWriter()
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
page.merge_page(watermark_page)
pdf_writer.add_page(page)
# 保存带水印的PDF
with open(output_file, 'wb') as out:
pdf_writer.write(out)
print(f"水印添加完成: {output_file}")
return output_file
# 使用示例
def main():
pdf_tool = PDFAutomation()
# 1. 合并PDF
pdf_tool.merge_pdfs("./pdf_documents", "合并文档.pdf")
# 2. 拆分PDF
pdf_tool.split_pdf("大型文档.pdf", "./拆分结果", pages_per_file=5)
# 3. 提取页面
pdf_tool.extract_pages("报告.pdf", "摘要.pdf", [1, 3, 5])
# 4. 添加水印
pdf_tool.add_watermark("合同.pdf", "合同_水印.pdf", "公司机密")
if __name__ == "__main__":
# 注意:需要安装 reportlab 和 PyPDF2
# pip install reportlab PyPDF2
main()
8. 系统监控与自动化报告脚本
import psutil
import platform
import os
import datetime
import json
import logging
from typing import Dict, Any
class SystemMonitor:
def __init__(self, log_file="system_monitor.log"):
self.log_file = log_file
self.setup_logging()
def setup_logging(self):
"""配置日志系统"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(message)s',
handlers=[
logging.FileHandler(self.log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def get_system_info(self) -> Dict[str, Any]:
"""获取系统信息"""
info = {
'timestamp': datetime.datetime.now().isoformat(),
'system': {
'platform': platform.system(),
'platform_version': platform.version(),
'architecture': platform.architecture()[0],
'processor': platform.processor(),
},
'cpu': {
'usage_percent': psutil.cpu_percent(interval=1),
'cores': psutil.cpu_count(logical=False),
'threads': psutil.cpu_count(logical=True),
'frequency': psutil.cpu_freq().current if psutil.cpu_freq() else None,
},
'memory': {
'total_gb': round(psutil.virtual_memory().total / (1024**3), 2),
'available_gb': round(psutil.virtual_memory().available / (1024**3), 2),
'used_percent': psutil.virtual_memory().percent,
},
'disk': [],
'network': {
'bytes_sent': psutil.net_io_counters().bytes_sent,
'bytes_received': psutil.net_io_counters().bytes_recv,
},
'processes': len(psutil.pids()),
}
# 磁盘信息
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
info['disk'].append({
'device': partition.device,
'mountpoint': partition.mountpoint,
'total_gb': round(usage.total / (1024**3), 2),
'used_gb': round(usage.used / (1024**3), 2),
'free_gb': round(usage.free / (1024**3), 2),
'used_percent': usage.percent,
})
except:
continue
return info
def monitor_resources(self, duration_seconds=60, interval=5):
"""监控系统资源"""
start_time = datetime.datetime.now()
end_time = start_time + datetime.timedelta(seconds=duration_seconds)
monitoring_data = []
print(f"开始监控,将持续 {duration_seconds} 秒...")
while datetime.datetime.now() < end_time:
data = self.get_system_info()
monitoring_data.append(data)
# 记录到日志
self.logger.info(
f"CPU使用率: {data['cpu']['usage_percent']}% | "
f"内存使用率: {data['memory']['used_percent']}% | "
f"进程数: {data['processes']}"
)
# 检查警报条件
self.check_alerts(data)
# 等待下一个间隔
import time
time.sleep(interval)
return monitoring_data
def check_alerts(self, data):
"""检查警报条件"""
alerts = []
# CPU使用率警报
if data['cpu']['usage_percent'] > 90:
alerts.append(f"CPU使用率过高: {data['cpu']['usage_percent']}%")
# 内存使用率警报
if data['memory']['used_percent'] > 90:
alerts.append(f"内存使用率过高: {data['memory']['used_percent']}%")
# 磁盘空间警报
for disk in data['disk']:
if disk['used_percent'] > 90:
alerts.append(f"磁盘 {disk['device']} 空间不足: {disk['used_percent']}%")
# 如果有警报,发送通知
if alerts:
self.send_alert(alerts, data)
def send_alert(self, alerts, system_data):
"""发送警报(示例:记录到文件)"""
alert_data = {
'timestamp': datetime.datetime.now().isoformat(),
'alerts': alerts,
'system_state': system_data
}
alert_file = "system_alerts.json"
# 读取现有警报
existing_alerts = []
if os.path.exists(alert_file):
with open(alert_file, 'r') as f:
try:
existing_alerts = json.load(f)
except:
existing_alerts = []
# 添加新警报
existing_alerts.append(alert_data)
# 保存警报
with open(alert_file, 'w') as f:
json.dump(existing_alerts, f, indent=2)
print(f"警报已记录: {', '.join(alerts)}")
def generate_report(self, monitoring_data, output_format='html'):
"""生成监控报告"""
if not monitoring_data:
print("没有监控数据可生成报告")
return
if output_format == 'html':
self.generate_html_report(monitoring_data)
elif output_format == 'json':
self.generate_json_report(monitoring_data)
else:
self.generate_text_report(monitoring_data)
def generate_html_report(self, data):
"""生成HTML报告"""
report_file = "system_report.html"
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>系统监控报告</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background: #2c3e50; color: white; padding: 20px; }
.metric { margin: 20px 0; padding: 15px; border: 1px solid #ddd; }
.alert { color: #e74c3c; font-weight: bold; }
.ok { color: #27ae60; }
</style>
</head>
<body>
<div class="header">
<h1>系统监控报告</h1>
<p>生成时间: {timestamp}</p>
</div>
""".format(timestamp=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 使用最新数据
latest = data[-1] if data else {}
# CPU信息
cpu_status = "alert" if latest.get('cpu', {}).get('usage_percent', 0) > 90 else "ok"
html_content += f"""
<div class="metric">
<h2>CPU使用情况</h2>
<p class="{cpu_status}">使用率: {latest.get('cpu', {}).get('usage_percent', 'N/A')}%</p>
<p>核心数: {latest.get('cpu', {}).get('cores', 'N/A')}</p>
<p>线程数: {latest.get('cpu', {}).get('threads', 'N/A')}</p>
</div>
"""
# 内存信息
mem_status = "alert" if latest.get('memory', {}).get('used_percent', 0) > 90 else "ok"
html_content += f"""
<div class="metric">
<h2>内存使用情况</h2>
<p class="{mem_status}">使用率: {latest.get('memory', {}).get('used_percent', 'N/A')}%</p>
<p>总量: {latest.get('memory', {}).get('total_gb', 'N/A')} GB</p>
<p>可用: {latest.get('memory', {}).get('available_gb', 'N/A')} GB</p>
</div>
"""
# 磁盘信息
html_content += """
<div class="metric">
<h2>磁盘使用情况</h2>
"""
for disk in latest.get('disk', []):
disk_status = "alert" if disk.get('used_percent', 0) > 90 else "ok"
html_content += f"""
<div>
<h3>{disk.get('device', 'N/A')} ({disk.get('mountpoint', 'N/A')})</h3>
<p class="{disk_status}">使用率: {disk.get('used_percent', 'N/A')}%</p>
<p>总量: {disk.get('total_gb', 'N/A')} GB</p>
<p>已用: {disk.get('used_gb', 'N/A')} GB</p>
<p>可用: {disk.get('free_gb', 'N/A')} GB</p>
</div>
"""
html_content += """
</div>
<div class="metric">
<h2>其他信息</h2>
<p>系统: {system_info}</p>
<p>进程数: {process_count}</p>
</div>
</body>
</html>
""".format(
system_info=latest.get('system', {}).get('platform', 'N/A'),
process_count=latest.get('processes', 'N/A')
)
with open(report_file, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"HTML报告已生成: {report_file}")
return report_file
# 使用示例
if __name__ == "__main__":
# 注意:需要安装 psutil
# pip install psutil
monitor = SystemMonitor()
# 监控系统60秒,每5秒记录一次
data = monitor.monitor_resources(duration_seconds=60, interval=5)
# 生成HTML报告
monitor.generate_report(data, output_format='html')
# 打印摘要信息
if data:
latest = data[-1]
print("\n系统状态摘要:")
print(f"CPU使用率: {latest['cpu']['usage_percent']}%")
print(f"内存使用率: {latest['memory']['used_percent']}%")
print(f"运行进程数: {latest['processes']}")
总结与建议
1. 安装所需库
# 基础库
pip install pandas openpyxl requests beautifulsoup4
# 高级功能
pip install schedule psutil PyPDF2 reportlab
# 邮件相关
pip install secure-smtplib
2. 使用技巧
- 逐步测试:先在小规模数据上测试脚本
- 添加日志:使用logging模块记录执行过程
- 异常处理:使用try-except处理可能出现的错误
- 定期备份:重要文件操作前先备份
3. 扩展建议
- 为脚本添加GUI界面
- 集成到系统任务计划
- 添加邮件/消息通知功能
- 记录脚本执行历史
4. 注意事项
- 文件操作前确认路径正确
- 网络请求添加超时和重试机制
- 敏感信息(如密码)使用环境变量存储
- 遵循目标网站的robots.txt协议
这些脚本覆盖了日常工作中的常见自动化需求,你可以根据实际需求进行调整和组合使用,大大提高工作效率!