#!/usr/bin/env python3 """ 获取CSP05-03课次的OJ作业数据,并更新已有学生反馈。 使用方式: python scripts/update_feedback_with_oj.py 功能: 1. 登录OJ系统 2. 从hw_dict.json获取CSP05-03的作业ID列表 3. 抓取每个作业的提交记录 4. 按学生分析做题情况(思考模式、错误分布) 5. 将OJ数据分析结果追加到已有的反馈文件中 """ import io import json import os import re import sys from collections import defaultdict # 修复Windows控制台编码(GBK无法输出emoji) sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') from datetime import datetime from html import unescape from pathlib import Path from typing import Any import httpx def load_env_from_file(env_path: Path) -> dict: """从.env文件加载环境变量""" env_vars = {} if env_path.exists(): for line in env_path.read_text(encoding='utf-8').split('\n'): line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) env_vars[key] = value return env_vars # 加载.env配置 PROJECT_ROOT = Path(__file__).parent.parent env_vars = load_env_from_file(PROJECT_ROOT / '.env') # ========== 配置(优先级:环境变量 > .env文件 > 默认值) ========== OJ_BASE_URL = os.environ.get('OJ_BASE_URL') or env_vars.get('OJ_BASE_URL', 'https://oj.qonnwolf.com') USERNAME = os.environ.get('OJ_USERNAME') or env_vars.get('OJ_USERNAME', '') PASSWORD = os.environ.get('OJ_PASSWORD') or env_vars.get('OJ_PASSWORD', '') # 默认值(Claude Code运行时会通过命令行参数覆盖) COURSE_CODE = "CSP05-03" COURSE_TITLE = "" CLASS_NAME = "CSP05克力周六1600" FEEDBACK_DATE = "" # 出勤学生名单(通过--students参数传入,逗号分隔) ATTENDING_STUDENTS = [] HW_DICT_PATH = PROJECT_ROOT / "config" / "hw_dict.json" CLASS_DIR = None # 运行时设置 def parse_args(): """解析命令行参数""" import argparse parser = argparse.ArgumentParser(description="获取OJ作业数据并更新反馈") parser.add_argument("--course", default="CSP05-03", help="课程代码,如 CSP05-03") parser.add_argument("--title", default="", help="课程标题,如 递归应用") parser.add_argument("--class-name", default="CSP05克力周六1600", help="班级名称") parser.add_argument("--date", default="", help="上课日期 YYYYMMDD,默认今天") parser.add_argument("--students", default="", help="出勤学生名单,逗号分隔") parser.add_argument("--username", default=USERNAME or "", help="OJ用户名(默认从 .env 读取)") parser.add_argument("--password", default=PASSWORD or "", help="OJ密码(默认从 .env 读取)") parser.add_argument("--get-student-oj", default="", help="只获取单个学生的OJ数据并输出Markdown,传入学生姓名") return parser.parse_args() # ========== OJ数据获取 ========== STATUS_MAP = { "Accepted": "AC", "Wrong Answer": "WA", "Compile Error": "CE", "Time Limit Exceeded": "TLE", "Time Limit": "TLE", "Memory Limit Exceeded": "MLE", "Memory Limit": "MLE", "Runtime Error": "RE", "Presentation Error": "PE", "Output Limit Exceeded": "OLE", } def strip_tags(raw_html: str) -> str: """移除HTML标签并清理空白""" text = re.sub(r"<[^>]+>", "", raw_html) text = unescape(text).replace("\xa0", " ") return " ".join(text.split()) def detect_status(row_html: str) -> str: """识别判题状态""" for keyword, status in STATUS_MAP.items(): if keyword in row_html: return status return "UNKNOWN" def normalize_submit_time(raw_text: str) -> str: """标准化提交时间""" match = re.search( r"(\d{4})-(\d{1,2})-(\d{1,2})\s+(\d{1,2}):(\d{2}):(\d{2})", raw_text, ) if not match: return "" year, month, day, hour, minute, second = match.groups() return f"{year}-{int(month):02d}-{int(day):02d}T{int(hour):02d}:{minute}:{second}" def parse_record_rows(html: str) -> list[dict[str, Any]]: """从HTML页面解析提交记录行""" rows = re.findall(r"]*>(.*?)", html, re.IGNORECASE | re.DOTALL) records: list[dict[str, Any]] = [] for index, row_html in enumerate(rows, start=1): if "/user/" not in row_html or "/p/" not in row_html: continue # 提取用户ID和姓名 user_match = re.search( r'href="/user/(\d+)"[^>]*>(.*?)', row_html, re.IGNORECASE | re.DOTALL, ) if not user_match: continue # 提取题目 problem_match = re.search( r'href="/p/([^"?/]+)(?:\?[^"]*)??"[^>]*>(.*?)', row_html, re.IGNORECASE | re.DOTALL, ) if not problem_match: continue # 提取记录ID record_id_match = re.search(r'/record/([A-Za-z0-9]+)', row_html, re.IGNORECASE) # 提交时间 submit_time_match = re.search( r"(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})", row_html, ) student_name = strip_tags(user_match.group(2)) records.append({ "id": record_id_match.group(1) if record_id_match else str(index), "student_id": int(user_match.group(1)), "student_name": student_name, "problem_id": strip_tags(problem_match.group(1)), "problem_title": strip_tags(problem_match.group(2)), "status": detect_status(row_html), "submit_time": ( normalize_submit_time(submit_time_match.group(1)) if submit_time_match else "" ), }) return records def login(client: httpx.Client) -> bool: """登录OJ系统""" try: response = client.post("/login", json={"uname": USERNAME, "password": PASSWORD}) response.raise_for_status() has_sid = any(c.name == "sid" for c in client.cookies.jar) if has_sid: print(f" [OK] 登录成功: {USERNAME}") return True else: print(f" [X] 登录失败: 未获取到session") return False except Exception as e: print(f" [X] 登录失败: {e}") return False def fetch_homework_records( client: httpx.Client, homework_id: str, student_names: set[str], max_pages: int = 20, ) -> list[dict[str, Any]]: """抓取作业的提交记录,只保留目标学生""" records: list[dict[str, Any]] = [] seen_ids: set[str] = set() for page in range(1, max_pages + 1): resp = client.get(f"/record?tid={homework_id}&page={page}") if resp.status_code != 200: break page_records = parse_record_rows(resp.text) if not page_records: break for record in page_records: if record["student_name"] not in student_names: continue record_id = str(record["id"]) if record_id in seen_ids: continue seen_ids.add(record_id) records.append(record) # 检查是否有下一页 if f"page={page + 1}" not in resp.text: break # 按提交时间排序 records.sort(key=lambda x: x.get("submit_time", "")) return records def fetch_homework_problems( client: httpx.Client, homework_id: str, ) -> list[dict[str, str]]: """获取作业包中的具体题目列表(只取A包)""" resp = client.get(f"/homework/{homework_id}") if resp.status_code != 200: print(f" [X] 获取作业详情失败: {resp.status_code}") return [] problems = [] # 匹配HTML中的题目行: # CSP0309A1  银行叫号模拟 pattern = ( r'href="/p/([^"?]+)\?tid=' + re.escape(homework_id) + r'"[^>]*>([^<]+)(?: |\s)*([^<]*)' ) matches = re.findall(pattern, resp.text) for pid, code, name in matches: name_clean = strip_tags(name).strip() if not name_clean: name_clean = code problems.append({ "id": pid, "code": strip_tags(code), "name": name_clean, }) return problems def load_homework_ids(course_code: str) -> list[dict[str, str]]: """从hw_dict.json加载作业ID,只保留A包""" if not HW_DICT_PATH.exists(): print(f" [X] 找不到作业字典: {HW_DICT_PATH}") return [] data = json.loads(HW_DICT_PATH.read_text(encoding="utf-8")) items = data.get(course_code, []) if not items: print(f" [X] 作业字典中没有 {course_code} 的配置") return [] # 只保留A包(课堂练习) a_items = [item for item in items if item["title"].endswith("A")] if not a_items: # 如果没有A包,回退到第一个 a_items = [items[0]] return [{"id": item["id"], "title": item["title"]} for item in a_items] # ========== 分析逻辑 ========== def analyze_student_performance( student_name: str, all_records: dict[str, list[dict[str, Any]]], homework_labels: list[str], problem_list: list[dict[str, str]] | None = None, ) -> dict[str, Any]: """分析单个学生的OJ表现 如果提供了 problem_list(A包的具体题目列表),则按具体题目分析; 否则按作业包标签分析(兼容旧逻辑)。 """ analysis = { "name": student_name, "problems": {}, "total_solved": 0, "total_attempts": 0, "total_errors": defaultdict(int), "patterns": [], } # 按具体题目分析(A包模式) if problem_list: # 合并所有作业包的记录(通常只有A包) all_student_records: list[dict[str, Any]] = [] for label in homework_labels: records = all_records.get(label, []) all_student_records.extend([r for r in records if r["student_name"] == student_name]) total_attempts_all = len(all_student_records) total_solved_all = 0 for prob in problem_list: prob_id = prob["id"] # 如 CSP0309A1 prob_code = prob["code"] prob_name = prob["name"] # 筛选该题目的提交记录(匹配 problem_id) student_records = [r for r in all_student_records if r["problem_id"] == prob_id] attempts = len(student_records) solved = any(r["status"] == "AC" for r in student_records) errors = [r["status"] for r in student_records if r["status"] != "AC"] # 判定思考模式 if attempts == 0: pattern = "未提交" elif attempts == 1 and solved: pattern = "一气呵成" elif solved and attempts <= 3: pattern = "调试改进" elif solved and attempts <= 6: pattern = "多次尝试后通过" elif not solved and attempts >= 3: pattern = "遇到困难" elif solved: pattern = "耐心调试" else: pattern = "尝试中" error_counts = defaultdict(int) for e in errors: error_counts[e] += 1 # 用题目名称作为展示标签,同时保留原始ID display_label = f"{prob_code} {prob_name}" if prob_name else prob_code problem_info = { "label": display_label, "raw_label": prob_code, "attempts": attempts, "solved": solved, "pattern": pattern, "errors": dict(error_counts), "submit_times": [r.get("submit_time", "") for r in student_records], } analysis["problems"][display_label] = problem_info if solved: total_solved_all += 1 for e in errors: analysis["total_errors"][e] += 1 analysis["total_attempts"] = total_attempts_all analysis["total_solved"] = total_solved_all analysis["completion"] = f"{total_solved_all}/{len(problem_list)}" else: # 兼容旧逻辑:按作业包分析 for label in homework_labels: records = all_records.get(label, []) student_records = [r for r in records if r["student_name"] == student_name] attempts = len(student_records) solved = any(r["status"] == "AC" for r in student_records) errors = [r["status"] for r in student_records if r["status"] != "AC"] if attempts == 0: pattern = "未提交" elif attempts == 1 and solved: pattern = "一气呵成" elif solved and attempts <= 3: pattern = "调试改进" elif solved and attempts <= 6: pattern = "多次尝试后通过" elif not solved and attempts >= 3: pattern = "遇到困难" elif solved: pattern = "耐心调试" else: pattern = "尝试中" error_counts = defaultdict(int) for e in errors: error_counts[e] += 1 problem_info = { "label": label, "attempts": attempts, "solved": solved, "pattern": pattern, "errors": dict(error_counts), "submit_times": [r.get("submit_time", "") for r in student_records], } analysis["problems"][label] = problem_info analysis["total_attempts"] += attempts if solved: analysis["total_solved"] += 1 for e in errors: analysis["total_errors"][e] += 1 analysis["completion"] = f"{analysis['total_solved']}/{len(homework_labels)}" analysis["total_errors"] = dict(analysis["total_errors"]) return analysis def format_oj_section(analysis: dict[str, Any]) -> str: """格式化OJ数据为Markdown反馈段落(支持A包具体题目模式)""" total_solved = analysis["total_solved"] total_attempts = analysis["total_attempts"] problems = analysis["problems"] total_count = len(problems) lines = [] lines.append("") lines.append("## 【OJ做题数据】") lines.append("") lines.append(f"**完成情况**: {analysis['completion']} (共{total_attempts}次提交)") lines.append("") # 单题详情表 lines.append("| 题目 | 状态 | 提交次数 | 思考模式 | 错误类型 |") lines.append("|------|------|----------|----------|----------|") for label, info in problems.items(): icon = "✅" if info["solved"] else "❌" status = "通过" if info["solved"] else "未通过" attempts = info["attempts"] pattern = info["pattern"] error_str = "" if info["errors"]: error_parts = [f"{k}×{v}" for k, v in info["errors"].items()] error_str = ", ".join(error_parts) else: error_str = "—" if attempts == 0: lines.append(f"| {label} | ⬜ 未提交 | 0 | — | — |") else: lines.append(f"| {label} | {icon} {status} | {attempts} | {pattern} | {error_str} |") lines.append("") # 总结性评价(适配具体题目数量) if total_solved == total_count and total_count > 0: if total_attempts <= total_count + 2: lines.append(f"**📊 分析**: {total_count}题全部完成,且提交效率高,思路清晰,代码质量优秀。") elif total_attempts <= total_count * 2: lines.append(f"**📊 分析**: {total_count}题全部完成,经过适度调试后通过,展现了良好的调试能力。") else: lines.append(f"**📊 分析**: {total_count}题全部完成,共经过{total_attempts}次提交,展现了不错的耐心和坚持。") elif total_solved > 0: unsolved = [l for l, i in problems.items() if not i["solved"] and i["attempts"] > 0] untried = [l for l, i in problems.items() if i["attempts"] == 0] parts = [] if unsolved: parts.append(f"{'、'.join(unsolved)}有尝试但尚未通过") if untried: parts.append(f"{'、'.join(untried)}未提交") lines.append(f"**📊 分析**: 完成{total_solved}题,{','.join(parts)},建议课后继续完成。") else: if total_attempts > 0: lines.append("**📊 分析**: 有提交但尚未通过任何题目,建议课后重点跟进。") else: lines.append("**📊 分析**: 本节课OJ作业暂无提交记录。") # 错误分布 if analysis["total_errors"]: errors = analysis["total_errors"] error_summary = [] if "WA" in errors: error_summary.append(f"答案错误{errors['WA']}次") if "CE" in errors: error_summary.append(f"编译错误{errors['CE']}次") if "RE" in errors: error_summary.append(f"运行错误{errors['RE']}次") if "TLE" in errors: error_summary.append(f"超时{errors['TLE']}次") if error_summary: lines.append(f"**错误分布**: {','.join(error_summary)}。") lines.append("") return "\n".join(lines) # ========== 反馈更新 ========== def update_feedback_file(student_name: str, oj_section: str) -> bool: """在已有反馈文件末尾追加OJ数据段""" feedback_path = CLASS_DIR / student_name / "feedback" / f"{FEEDBACK_DATE}_{COURSE_CODE}.md" if not feedback_path.exists(): print(f" [跳过] 反馈文件不存在: {feedback_path}") return False content = feedback_path.read_text(encoding="utf-8") # 检查是否已有OJ数据段 if "【OJ做题数据】" in content: # 替换已有的OJ数据段 pattern = r"\n## 【OJ做题数据】.*?(?=\n## |---\n|\Z)" content = re.sub(pattern, oj_section.rstrip(), content, flags=re.DOTALL) print(f" [更新] 替换已有OJ数据段") else: # 在 --- 分隔线前插入 if "\n---\n" in content: content = content.replace("\n---\n", f"{oj_section}\n---\n", 1) else: content = content.rstrip() + "\n" + oj_section print(f" [新增] 追加OJ数据段") feedback_path.write_text(content, encoding="utf-8") return True # ========== 主流程 ========== def main(): global COURSE_CODE, COURSE_TITLE, CLASS_NAME, FEEDBACK_DATE, ATTENDING_STUDENTS global USERNAME, PASSWORD, CLASS_DIR # 解析命令行参数 args = parse_args() COURSE_CODE = args.course COURSE_TITLE = args.title CLASS_NAME = args.class_name USERNAME = args.username PASSWORD = args.password GET_SINGLE_STUDENT = args.get_student_oj.strip() # 日期处理 if args.date: FEEDBACK_DATE = args.date else: FEEDBACK_DATE = datetime.now().strftime("%Y%m%d") # 学生名单 if args.students: ATTENDING_STUDENTS = [s.strip() for s in args.students.split(",") if s.strip()] elif GET_SINGLE_STUDENT: # 只获取单个学生的数据 ATTENDING_STUDENTS = [GET_SINGLE_STUDENT] # 设置班级目录 CLASS_DIR = PROJECT_ROOT / ".claude" / "memory" / "class" / CLASS_NAME # 如果没有学生名单,尝试从班级目录读取所有学生 if not ATTENDING_STUDENTS: if CLASS_DIR.exists(): ATTENDING_STUDENTS = [ d.name for d in sorted(CLASS_DIR.iterdir()) if d.is_dir() and d.name != "summaries" and (d / "profile.md").exists() ] print(f" [自动] 从班级目录读取到 {len(ATTENDING_STUDENTS)} 名学生") if not ATTENDING_STUDENTS: print(" [X] 未指定出勤学生且班级目录中无学生,退出") return 1 # 如果是获取单个学生的OJ数据,简化输出 if not GET_SINGLE_STUDENT: print("=" * 60) print(f"获取 {COURSE_CODE} {COURSE_TITLE} OJ数据并更新反馈") print("=" * 60) # 1. 加载作业ID print(f"\n[1] 从 hw_dict.json 加载 {COURSE_CODE} 作业ID...") homeworks = load_homework_ids(COURSE_CODE) if not homeworks: print(" [X] 没有找到作业配置,退出") return 1 for hw in homeworks: print(f" - {hw['title']}: {hw['id']}") # 2. 登录OJ print(f"\n[2] 登录OJ系统...") client = httpx.Client(base_url=OJ_BASE_URL, timeout=30.0, follow_redirects=True) if not login(client): print(" [X] 登录失败,退出") return 1 # 3. 获取A包的具体题目列表 & 抓取提交记录 print(f"\n[3] 获取A包题目列表并抓取提交记录...") student_names = set(ATTENDING_STUDENTS) all_records: dict[str, list[dict[str, Any]]] = {} homework_labels: list[str] = [] problem_list: list[dict[str, str]] | None = None for hw in homeworks: label = hw["title"] homework_labels.append(label) # 获取该作业包中的具体题目(A包才有) problems = fetch_homework_problems(client, hw["id"]) if problems: problem_list = problems print(f" {label} 包含 {len(problems)} 道题目:") for p in problems: print(f" - {p['code']}: {p['name']}") print(f" 正在获取 {label} 提交记录...") records = fetch_homework_records(client, hw["id"], student_names) all_records[label] = records student_count = len({r["student_name"] for r in records}) print(f" 找到 {len(records)} 条记录,涉及 {student_count} 名出勤学生") client.close() # 4. 分析每个学生(按A包具体题目分析) if not GET_SINGLE_STUDENT: print(f"\n[4] 分析学生OJ表现...") analyses: list[dict[str, Any]] = [] for name in ATTENDING_STUDENTS: analysis = analyze_student_performance(name, all_records, homework_labels, problem_list) analyses.append(analysis) if not GET_SINGLE_STUDENT: total_count = len(analysis["problems"]) icon = "★" if analysis["total_solved"] == total_count and total_count > 0 else \ "○" if analysis["total_solved"] > 0 else "✗" print(f" {icon} {name}: {analysis['completion']} ({analysis['total_attempts']}次提交)") for label, info in analysis["problems"].items(): p_icon = "✓" if info["solved"] else "○" if info["attempts"] > 0 else "—" print(f" {p_icon} {label}: {info['attempts']}次 ({info['pattern']})") # 如果是获取单个学生的OJ数据,直接输出Markdown并退出 if GET_SINGLE_STUDENT and analyses: oj_section = format_oj_section(analyses[0]) print(oj_section) return 0 # 5. 更新反馈文件 print(f"\n[5] 更新反馈文件...") updated = 0 for analysis in analyses: name = analysis["name"] print(f" 处理 {name}...") oj_section = format_oj_section(analysis) if update_feedback_file(name, oj_section): updated += 1 # 6. 保存原始分析数据 print(f"\n[6] 保存分析JSON数据...") analysis_dir = PROJECT_ROOT / ".claude" / "memory" / "oj" / "analysis" analysis_dir.mkdir(parents=True, exist_ok=True) output_file = analysis_dir / f"{COURSE_CODE}_student_analysis.json" with open(output_file, "w", encoding="utf-8") as f: json.dump({ "course": COURSE_CODE, "title": COURSE_TITLE, "date": f"{FEEDBACK_DATE[:4]}-{FEEDBACK_DATE[4:6]}-{FEEDBACK_DATE[6:8]}", "class": CLASS_NAME, "students": analyses, "generated_at": datetime.now().isoformat(), }, f, ensure_ascii=False, indent=2) print(f" 保存到: {output_file}") # 7. 汇总 print(f"\n" + "=" * 60) print(f"✅ 完成!") print(f" - 出勤学生: {len(ATTENDING_STUDENTS)}") print(f" - 更新反馈: {updated} 份") total_solved = sum(a["total_solved"] for a in analyses) total_possible = len(ATTENDING_STUDENTS) * len(homework_labels) print(f" - 整体完成率: {total_solved}/{total_possible}") print("=" * 60) return 0 if __name__ == "__main__": sys.exit(main())