Files
ClassFeedback/scripts/update_feedback_with_oj.py

706 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
获取CSP05-03课次的OJ作业数据并更新已有学生反馈。
使用方式:
python scripts/update_feedback_with_oj.py
功能:
1. 登录OJ系统
2. 从hw_dict.json获取CSP05-03的作业ID列表
3. 抓取每个作业的提交记录
4. 按学生分析做题情况(思考模式、错误分布)
5. 将OJ数据分析结果追加到已有的反馈文件中
"""
import io
import json
import os
import re
import sys
from collections import defaultdict
# 修复Windows控制台编码GBK无法输出emoji
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
from datetime import datetime
from html import unescape
from pathlib import Path
from typing import Any
import httpx
def load_env_from_file(env_path: Path) -> dict:
"""从.env文件加载环境变量"""
env_vars = {}
if env_path.exists():
for line in env_path.read_text(encoding='utf-8').split('\n'):
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
env_vars[key] = value
return env_vars
# 加载.env配置
PROJECT_ROOT = Path(__file__).parent.parent
env_vars = load_env_from_file(PROJECT_ROOT / '.env')
# ========== 配置(优先级:环境变量 > .env文件 > 默认值) ==========
OJ_BASE_URL = os.environ.get('OJ_BASE_URL') or env_vars.get('OJ_BASE_URL', 'https://oj.qonnwolf.com')
USERNAME = os.environ.get('OJ_USERNAME') or env_vars.get('OJ_USERNAME', '')
PASSWORD = os.environ.get('OJ_PASSWORD') or env_vars.get('OJ_PASSWORD', '')
# 默认值Claude Code运行时会通过命令行参数覆盖
COURSE_CODE = "CSP05-03"
COURSE_TITLE = ""
CLASS_NAME = "CSP05克力周六1600"
FEEDBACK_DATE = ""
# 出勤学生名单(通过--students参数传入逗号分隔
ATTENDING_STUDENTS = []
HW_DICT_PATH = PROJECT_ROOT / "config" / "hw_dict.json"
CLASS_DIR = None # 运行时设置
def parse_args():
"""解析命令行参数"""
import argparse
parser = argparse.ArgumentParser(description="获取OJ作业数据并更新反馈")
parser.add_argument("--course", default="CSP05-03", help="课程代码,如 CSP05-03")
parser.add_argument("--title", default="", help="课程标题,如 递归应用")
parser.add_argument("--class-name", default="CSP05克力周六1600", help="班级名称")
parser.add_argument("--date", default="", help="上课日期 YYYYMMDD默认今天")
parser.add_argument("--students", default="", help="出勤学生名单,逗号分隔")
parser.add_argument("--username", default=USERNAME or "", help="OJ用户名默认从 .env 读取)")
parser.add_argument("--password", default=PASSWORD or "", help="OJ密码默认从 .env 读取)")
parser.add_argument("--get-student-oj", default="", help="只获取单个学生的OJ数据并输出Markdown传入学生姓名")
return parser.parse_args()
# ========== OJ数据获取 ==========
STATUS_MAP = {
"Accepted": "AC",
"Wrong Answer": "WA",
"Compile Error": "CE",
"Time Limit Exceeded": "TLE",
"Time Limit": "TLE",
"Memory Limit Exceeded": "MLE",
"Memory Limit": "MLE",
"Runtime Error": "RE",
"Presentation Error": "PE",
"Output Limit Exceeded": "OLE",
}
def strip_tags(raw_html: str) -> str:
"""移除HTML标签并清理空白"""
text = re.sub(r"<[^>]+>", "", raw_html)
text = unescape(text).replace("\xa0", " ")
return " ".join(text.split())
def detect_status(row_html: str) -> str:
"""识别判题状态"""
for keyword, status in STATUS_MAP.items():
if keyword in row_html:
return status
return "UNKNOWN"
def normalize_submit_time(raw_text: str) -> str:
"""标准化提交时间"""
match = re.search(
r"(\d{4})-(\d{1,2})-(\d{1,2})\s+(\d{1,2}):(\d{2}):(\d{2})",
raw_text,
)
if not match:
return ""
year, month, day, hour, minute, second = match.groups()
return f"{year}-{int(month):02d}-{int(day):02d}T{int(hour):02d}:{minute}:{second}"
def parse_record_rows(html: str) -> list[dict[str, Any]]:
"""从HTML页面解析提交记录行"""
rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html, re.IGNORECASE | re.DOTALL)
records: list[dict[str, Any]] = []
for index, row_html in enumerate(rows, start=1):
if "/user/" not in row_html or "/p/" not in row_html:
continue
# 提取用户ID和姓名
user_match = re.search(
r'href="/user/(\d+)"[^>]*>(.*?)</a>',
row_html,
re.IGNORECASE | re.DOTALL,
)
if not user_match:
continue
# 提取题目
problem_match = re.search(
r'href="/p/([^"?/]+)(?:\?[^"]*)??"[^>]*>(.*?)</a>',
row_html,
re.IGNORECASE | re.DOTALL,
)
if not problem_match:
continue
# 提取记录ID
record_id_match = re.search(r'/record/([A-Za-z0-9]+)', row_html, re.IGNORECASE)
# 提交时间
submit_time_match = re.search(
r"(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})",
row_html,
)
student_name = strip_tags(user_match.group(2))
records.append({
"id": record_id_match.group(1) if record_id_match else str(index),
"student_id": int(user_match.group(1)),
"student_name": student_name,
"problem_id": strip_tags(problem_match.group(1)),
"problem_title": strip_tags(problem_match.group(2)),
"status": detect_status(row_html),
"submit_time": (
normalize_submit_time(submit_time_match.group(1))
if submit_time_match else ""
),
})
return records
def login(client: httpx.Client) -> bool:
"""登录OJ系统"""
try:
response = client.post("/login", json={"uname": USERNAME, "password": PASSWORD})
response.raise_for_status()
has_sid = any(c.name == "sid" for c in client.cookies.jar)
if has_sid:
print(f" [OK] 登录成功: {USERNAME}")
return True
else:
print(f" [X] 登录失败: 未获取到session")
return False
except Exception as e:
print(f" [X] 登录失败: {e}")
return False
def fetch_homework_records(
client: httpx.Client,
homework_id: str,
student_names: set[str],
max_pages: int = 20,
) -> list[dict[str, Any]]:
"""抓取作业的提交记录,只保留目标学生"""
records: list[dict[str, Any]] = []
seen_ids: set[str] = set()
for page in range(1, max_pages + 1):
resp = client.get(f"/record?tid={homework_id}&page={page}")
if resp.status_code != 200:
break
page_records = parse_record_rows(resp.text)
if not page_records:
break
for record in page_records:
if record["student_name"] not in student_names:
continue
record_id = str(record["id"])
if record_id in seen_ids:
continue
seen_ids.add(record_id)
records.append(record)
# 检查是否有下一页
if f"page={page + 1}" not in resp.text:
break
# 按提交时间排序
records.sort(key=lambda x: x.get("submit_time", ""))
return records
def fetch_homework_problems(
client: httpx.Client,
homework_id: str,
) -> list[dict[str, str]]:
"""获取作业包中的具体题目列表只取A包"""
resp = client.get(f"/homework/{homework_id}")
if resp.status_code != 200:
print(f" [X] 获取作业详情失败: {resp.status_code}")
return []
problems = []
# 匹配HTML中的题目行
# <a href="/p/CSP0309A1?tid=..."><b>CSP0309A1</b>&nbsp;&nbsp;银行叫号模拟</a>
pattern = (
r'href="/p/([^"?]+)\?tid=' + re.escape(homework_id) +
r'"[^>]*><b>([^<]+)</b>(?:&nbsp;|\s)*([^<]*)</a>'
)
matches = re.findall(pattern, resp.text)
for pid, code, name in matches:
name_clean = strip_tags(name).strip()
if not name_clean:
name_clean = code
problems.append({
"id": pid,
"code": strip_tags(code),
"name": name_clean,
})
return problems
def load_homework_ids(course_code: str) -> list[dict[str, str]]:
"""从hw_dict.json加载作业ID只保留A包"""
if not HW_DICT_PATH.exists():
print(f" [X] 找不到作业字典: {HW_DICT_PATH}")
return []
data = json.loads(HW_DICT_PATH.read_text(encoding="utf-8"))
items = data.get(course_code, [])
if not items:
print(f" [X] 作业字典中没有 {course_code} 的配置")
return []
# 只保留A包课堂练习
a_items = [item for item in items if item["title"].endswith("A")]
if not a_items:
# 如果没有A包回退到第一个
a_items = [items[0]]
return [{"id": item["id"], "title": item["title"]} for item in a_items]
# ========== 分析逻辑 ==========
def analyze_student_performance(
student_name: str,
all_records: dict[str, list[dict[str, Any]]],
homework_labels: list[str],
problem_list: list[dict[str, str]] | None = None,
) -> dict[str, Any]:
"""分析单个学生的OJ表现
如果提供了 problem_listA包的具体题目列表则按具体题目分析
否则按作业包标签分析(兼容旧逻辑)。
"""
analysis = {
"name": student_name,
"problems": {},
"total_solved": 0,
"total_attempts": 0,
"total_errors": defaultdict(int),
"patterns": [],
}
# 按具体题目分析A包模式
if problem_list:
# 合并所有作业包的记录通常只有A包
all_student_records: list[dict[str, Any]] = []
for label in homework_labels:
records = all_records.get(label, [])
all_student_records.extend([r for r in records if r["student_name"] == student_name])
total_attempts_all = len(all_student_records)
total_solved_all = 0
for prob in problem_list:
prob_id = prob["id"] # 如 CSP0309A1
prob_code = prob["code"]
prob_name = prob["name"]
# 筛选该题目的提交记录(匹配 problem_id
student_records = [r for r in all_student_records if r["problem_id"] == prob_id]
attempts = len(student_records)
solved = any(r["status"] == "AC" for r in student_records)
errors = [r["status"] for r in student_records if r["status"] != "AC"]
# 判定思考模式
if attempts == 0:
pattern = "未提交"
elif attempts == 1 and solved:
pattern = "一气呵成"
elif solved and attempts <= 3:
pattern = "调试改进"
elif solved and attempts <= 6:
pattern = "多次尝试后通过"
elif not solved and attempts >= 3:
pattern = "遇到困难"
elif solved:
pattern = "耐心调试"
else:
pattern = "尝试中"
error_counts = defaultdict(int)
for e in errors:
error_counts[e] += 1
# 用题目名称作为展示标签同时保留原始ID
display_label = f"{prob_code} {prob_name}" if prob_name else prob_code
problem_info = {
"label": display_label,
"raw_label": prob_code,
"attempts": attempts,
"solved": solved,
"pattern": pattern,
"errors": dict(error_counts),
"submit_times": [r.get("submit_time", "") for r in student_records],
}
analysis["problems"][display_label] = problem_info
if solved:
total_solved_all += 1
for e in errors:
analysis["total_errors"][e] += 1
analysis["total_attempts"] = total_attempts_all
analysis["total_solved"] = total_solved_all
analysis["completion"] = f"{total_solved_all}/{len(problem_list)}"
else:
# 兼容旧逻辑:按作业包分析
for label in homework_labels:
records = all_records.get(label, [])
student_records = [r for r in records if r["student_name"] == student_name]
attempts = len(student_records)
solved = any(r["status"] == "AC" for r in student_records)
errors = [r["status"] for r in student_records if r["status"] != "AC"]
if attempts == 0:
pattern = "未提交"
elif attempts == 1 and solved:
pattern = "一气呵成"
elif solved and attempts <= 3:
pattern = "调试改进"
elif solved and attempts <= 6:
pattern = "多次尝试后通过"
elif not solved and attempts >= 3:
pattern = "遇到困难"
elif solved:
pattern = "耐心调试"
else:
pattern = "尝试中"
error_counts = defaultdict(int)
for e in errors:
error_counts[e] += 1
problem_info = {
"label": label,
"attempts": attempts,
"solved": solved,
"pattern": pattern,
"errors": dict(error_counts),
"submit_times": [r.get("submit_time", "") for r in student_records],
}
analysis["problems"][label] = problem_info
analysis["total_attempts"] += attempts
if solved:
analysis["total_solved"] += 1
for e in errors:
analysis["total_errors"][e] += 1
analysis["completion"] = f"{analysis['total_solved']}/{len(homework_labels)}"
analysis["total_errors"] = dict(analysis["total_errors"])
return analysis
def format_oj_section(analysis: dict[str, Any]) -> str:
"""格式化OJ数据为Markdown反馈段落支持A包具体题目模式"""
total_solved = analysis["total_solved"]
total_attempts = analysis["total_attempts"]
problems = analysis["problems"]
total_count = len(problems)
lines = []
lines.append("")
lines.append("## 【OJ做题数据】")
lines.append("")
lines.append(f"**完成情况**: {analysis['completion']} (共{total_attempts}次提交)")
lines.append("")
# 单题详情表
lines.append("| 题目 | 状态 | 提交次数 | 思考模式 | 错误类型 |")
lines.append("|------|------|----------|----------|----------|")
for label, info in problems.items():
icon = "" if info["solved"] else ""
status = "通过" if info["solved"] else "未通过"
attempts = info["attempts"]
pattern = info["pattern"]
error_str = ""
if info["errors"]:
error_parts = [f"{k}×{v}" for k, v in info["errors"].items()]
error_str = ", ".join(error_parts)
else:
error_str = ""
if attempts == 0:
lines.append(f"| {label} | ⬜ 未提交 | 0 | — | — |")
else:
lines.append(f"| {label} | {icon} {status} | {attempts} | {pattern} | {error_str} |")
lines.append("")
# 总结性评价(适配具体题目数量)
if total_solved == total_count and total_count > 0:
if total_attempts <= total_count + 2:
lines.append(f"**📊 分析**: {total_count}题全部完成,且提交效率高,思路清晰,代码质量优秀。")
elif total_attempts <= total_count * 2:
lines.append(f"**📊 分析**: {total_count}题全部完成,经过适度调试后通过,展现了良好的调试能力。")
else:
lines.append(f"**📊 分析**: {total_count}题全部完成,共经过{total_attempts}次提交,展现了不错的耐心和坚持。")
elif total_solved > 0:
unsolved = [l for l, i in problems.items() if not i["solved"] and i["attempts"] > 0]
untried = [l for l, i in problems.items() if i["attempts"] == 0]
parts = []
if unsolved:
parts.append(f"{''.join(unsolved)}有尝试但尚未通过")
if untried:
parts.append(f"{''.join(untried)}未提交")
lines.append(f"**📊 分析**: 完成{total_solved}题,{''.join(parts)},建议课后继续完成。")
else:
if total_attempts > 0:
lines.append("**📊 分析**: 有提交但尚未通过任何题目,建议课后重点跟进。")
else:
lines.append("**📊 分析**: 本节课OJ作业暂无提交记录。")
# 错误分布
if analysis["total_errors"]:
errors = analysis["total_errors"]
error_summary = []
if "WA" in errors:
error_summary.append(f"答案错误{errors['WA']}")
if "CE" in errors:
error_summary.append(f"编译错误{errors['CE']}")
if "RE" in errors:
error_summary.append(f"运行错误{errors['RE']}")
if "TLE" in errors:
error_summary.append(f"超时{errors['TLE']}")
if error_summary:
lines.append(f"**错误分布**: {''.join(error_summary)}")
lines.append("")
return "\n".join(lines)
# ========== 反馈更新 ==========
def update_feedback_file(student_name: str, oj_section: str) -> bool:
"""在已有反馈文件末尾追加OJ数据段"""
feedback_path = CLASS_DIR / student_name / "feedback" / f"{FEEDBACK_DATE}_{COURSE_CODE}.md"
if not feedback_path.exists():
print(f" [跳过] 反馈文件不存在: {feedback_path}")
return False
content = feedback_path.read_text(encoding="utf-8")
# 检查是否已有OJ数据段
if "【OJ做题数据】" in content:
# 替换已有的OJ数据段
pattern = r"\n## 【OJ做题数据】.*?(?=\n## |---\n|\Z)"
content = re.sub(pattern, oj_section.rstrip(), content, flags=re.DOTALL)
print(f" [更新] 替换已有OJ数据段")
else:
# 在 --- 分隔线前插入
if "\n---\n" in content:
content = content.replace("\n---\n", f"{oj_section}\n---\n", 1)
else:
content = content.rstrip() + "\n" + oj_section
print(f" [新增] 追加OJ数据段")
feedback_path.write_text(content, encoding="utf-8")
return True
# ========== 主流程 ==========
def main():
global COURSE_CODE, COURSE_TITLE, CLASS_NAME, FEEDBACK_DATE, ATTENDING_STUDENTS
global USERNAME, PASSWORD, CLASS_DIR
# 解析命令行参数
args = parse_args()
COURSE_CODE = args.course
COURSE_TITLE = args.title
CLASS_NAME = args.class_name
USERNAME = args.username
PASSWORD = args.password
GET_SINGLE_STUDENT = args.get_student_oj.strip()
# 日期处理
if args.date:
FEEDBACK_DATE = args.date
else:
FEEDBACK_DATE = datetime.now().strftime("%Y%m%d")
# 学生名单
if args.students:
ATTENDING_STUDENTS = [s.strip() for s in args.students.split(",") if s.strip()]
elif GET_SINGLE_STUDENT:
# 只获取单个学生的数据
ATTENDING_STUDENTS = [GET_SINGLE_STUDENT]
# 设置班级目录
CLASS_DIR = PROJECT_ROOT / ".claude" / "memory" / "class" / CLASS_NAME
# 如果没有学生名单,尝试从班级目录读取所有学生
if not ATTENDING_STUDENTS:
if CLASS_DIR.exists():
ATTENDING_STUDENTS = [
d.name for d in sorted(CLASS_DIR.iterdir())
if d.is_dir() and d.name != "summaries" and (d / "profile.md").exists()
]
print(f" [自动] 从班级目录读取到 {len(ATTENDING_STUDENTS)} 名学生")
if not ATTENDING_STUDENTS:
print(" [X] 未指定出勤学生且班级目录中无学生,退出")
return 1
# 如果是获取单个学生的OJ数据简化输出
if not GET_SINGLE_STUDENT:
print("=" * 60)
print(f"获取 {COURSE_CODE} {COURSE_TITLE} OJ数据并更新反馈")
print("=" * 60)
# 1. 加载作业ID
print(f"\n[1] 从 hw_dict.json 加载 {COURSE_CODE} 作业ID...")
homeworks = load_homework_ids(COURSE_CODE)
if not homeworks:
print(" [X] 没有找到作业配置,退出")
return 1
for hw in homeworks:
print(f" - {hw['title']}: {hw['id']}")
# 2. 登录OJ
print(f"\n[2] 登录OJ系统...")
client = httpx.Client(base_url=OJ_BASE_URL, timeout=30.0, follow_redirects=True)
if not login(client):
print(" [X] 登录失败,退出")
return 1
# 3. 获取A包的具体题目列表 & 抓取提交记录
print(f"\n[3] 获取A包题目列表并抓取提交记录...")
student_names = set(ATTENDING_STUDENTS)
all_records: dict[str, list[dict[str, Any]]] = {}
homework_labels: list[str] = []
problem_list: list[dict[str, str]] | None = None
for hw in homeworks:
label = hw["title"]
homework_labels.append(label)
# 获取该作业包中的具体题目A包才有
problems = fetch_homework_problems(client, hw["id"])
if problems:
problem_list = problems
print(f" {label} 包含 {len(problems)} 道题目:")
for p in problems:
print(f" - {p['code']}: {p['name']}")
print(f" 正在获取 {label} 提交记录...")
records = fetch_homework_records(client, hw["id"], student_names)
all_records[label] = records
student_count = len({r["student_name"] for r in records})
print(f" 找到 {len(records)} 条记录,涉及 {student_count} 名出勤学生")
client.close()
# 4. 分析每个学生按A包具体题目分析
if not GET_SINGLE_STUDENT:
print(f"\n[4] 分析学生OJ表现...")
analyses: list[dict[str, Any]] = []
for name in ATTENDING_STUDENTS:
analysis = analyze_student_performance(name, all_records, homework_labels, problem_list)
analyses.append(analysis)
if not GET_SINGLE_STUDENT:
total_count = len(analysis["problems"])
icon = "" if analysis["total_solved"] == total_count and total_count > 0 else \
"" if analysis["total_solved"] > 0 else ""
print(f" {icon} {name}: {analysis['completion']} ({analysis['total_attempts']}次提交)")
for label, info in analysis["problems"].items():
p_icon = "" if info["solved"] else "" if info["attempts"] > 0 else ""
print(f" {p_icon} {label}: {info['attempts']}次 ({info['pattern']})")
# 如果是获取单个学生的OJ数据直接输出Markdown并退出
if GET_SINGLE_STUDENT and analyses:
oj_section = format_oj_section(analyses[0])
print(oj_section)
return 0
# 5. 更新反馈文件
print(f"\n[5] 更新反馈文件...")
updated = 0
for analysis in analyses:
name = analysis["name"]
print(f" 处理 {name}...")
oj_section = format_oj_section(analysis)
if update_feedback_file(name, oj_section):
updated += 1
# 6. 保存原始分析数据
print(f"\n[6] 保存分析JSON数据...")
analysis_dir = PROJECT_ROOT / ".claude" / "memory" / "oj" / "analysis"
analysis_dir.mkdir(parents=True, exist_ok=True)
output_file = analysis_dir / f"{COURSE_CODE}_student_analysis.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump({
"course": COURSE_CODE,
"title": COURSE_TITLE,
"date": f"{FEEDBACK_DATE[:4]}-{FEEDBACK_DATE[4:6]}-{FEEDBACK_DATE[6:8]}",
"class": CLASS_NAME,
"students": analyses,
"generated_at": datetime.now().isoformat(),
}, f, ensure_ascii=False, indent=2)
print(f" 保存到: {output_file}")
# 7. 汇总
print(f"\n" + "=" * 60)
print(f"✅ 完成!")
print(f" - 出勤学生: {len(ATTENDING_STUDENTS)}")
print(f" - 更新反馈: {updated}")
total_solved = sum(a["total_solved"] for a in analyses)
total_possible = len(ATTENDING_STUDENTS) * len(homework_labels)
print(f" - 整体完成率: {total_solved}/{total_possible}")
print("=" * 60)
return 0
if __name__ == "__main__":
sys.exit(main())