#!/usr/bin/env python3 """ 课程学生表现分析脚本 用法: python scripts/analyze_course_student.py \\ --course-id 6975983771e15346c9e8fdc0 \\ --uid 248 --name 王梓骏 --lessons 8 """ import io import json import os import re import sys from collections import defaultdict from datetime import datetime from html import unescape from pathlib import Path import httpx sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") PROJECT_ROOT = Path(__file__).parent.parent def load_env(env_path: Path) -> dict: d = {} if env_path.exists(): for line in env_path.read_text(encoding="utf-8").split("\n"): line = line.strip() if line and not line.startswith("#") and "=" in line: k, v = line.split("=", 1) d[k.strip()] = v.strip() return d env = load_env(PROJECT_ROOT / ".env") OJ_BASE_URL = os.environ.get("OJ_BASE_URL") or env.get( "OJ_BASE_URL", "https://oj.qonnwolf.com" ) USERNAME = os.environ.get("OJ_USERNAME") or env.get("OJ_USERNAME", "") PASSWORD = os.environ.get("OJ_PASSWORD") or env.get("OJ_PASSWORD", "") STATUS_MAP = { "Accepted": "AC", "Wrong Answer": "WA", "Compile Error": "CE", "Time Limit Exceeded": "TLE", "Memory Limit Exceeded": "MLE", "Runtime Error": "RE", "Presentation Error": "PE", "Output Limit Exceeded": "OLE", "Runtime Error on Test": "RE", "Partial Accepted": "PA", } # 作业名称 → 类型 HW_TYPE = { "课堂练习": "A", "课后作业": "B", "拓展练习": "C", } # ────────────────────────────────────────── # 基础工具 # ────────────────────────────────────────── def strip_tags(html: str) -> str: return unescape(re.sub(r"<[^>]+>", "", html)).replace("\xa0", " ").strip() def detect_status(row_html: str) -> str: for kw, st in STATUS_MAP.items(): if kw in row_html: return st return "UNKNOWN" def normalize_time(raw: str) -> str: m = re.search(r"(\d{4})-(\d{1,2})-(\d{1,2})\s+(\d{1,2}):(\d{2}):(\d{2})", raw) if not m: return "" y, mo, d, h, mi, s = m.groups() return f"{y}-{int(mo):02d}-{int(d):02d}T{int(h):02d}:{mi}:{s}" # ────────────────────────────────────────── # 登录 # ────────────────────────────────────────── def login(client: httpx.Client) -> bool: try: r = client.post("/login", json={"uname": USERNAME, "password": PASSWORD}) r.raise_for_status() ok = any(c.name == "sid" for c in client.cookies.jar) print(f" {'[OK] 登录成功' if ok else '[X] 登录失败'}: {USERNAME}") return ok except Exception as e: print(f" [X] 登录异常: {e}") return False # ────────────────────────────────────────── # 课程结构抓取(解析 sectionHWMap) # ────────────────────────────────────────── def fetch_course_structure(client: httpx.Client, course_id: str) -> list[dict]: """ 从课程页面解析所有节次和作业 ID。 返回: [{"title": "知识回顾", "homeworks": [{"hw_id": "...", "type": "A/B/C", "name": "..."}]}, ...] 按页面顺序排列,跳过 homeworks 为空的节次。 """ r = client.get(f"/course/{course_id}") html = r.text # 1) 提取节次标题(按 HTML 顺序) # openGlobalPanel('', '') panel_calls = re.findall( r"openGlobalPanel\('([A-Fa-f0-9]{24})',\s*'([^']+)'\)", html ) section_titles: dict[str, str] = {} # id → title section_order: list[str] = [] # ordered section ids for sid, title in panel_calls: if sid not in section_titles: section_titles[sid] = title section_order.append(sid) # 2) 提取 sectionHWMap m = re.search(r"sectionHWMap:\s*(\{.*?\}),\s*\n\s*open", html, re.DOTALL) if not m: print(" [X] 未找到 sectionHWMap") return [] raw_map = m.group(1) section_hws: dict[str, list[dict]] = {} # section_id → [{hw_id, name}] for sid_m in re.finditer(r'"([A-Fa-f0-9]{24})":\s*\[(.*?)\]', raw_map, re.DOTALL): sid = sid_m.group(1) entries = re.findall( r'\{\s*id:\s*"([^"]+)",\s*name:\s*"([^"]+)"', sid_m.group(2) ) section_hws[sid] = [{"hw_id": hw_id, "name": name} for hw_id, name in entries] # 3) 按顺序组合 lessons = [] for sid in section_order: hws_raw = section_hws.get(sid, []) if not hws_raw: continue # 该节次无作业,跳过 homeworks = [] for hw in hws_raw: # 去掉 emoji,识别类型 name_clean = re.sub(r"[^\w\s]", "", hw["name"]).strip() hw_type = "other" for key, t in HW_TYPE.items(): if key in hw["name"]: hw_type = t break homeworks.append( {"hw_id": hw["hw_id"], "type": hw_type, "name": hw["name"]} ) lessons.append( { "section_id": sid, "title": section_titles.get(sid, sid), "homeworks": homeworks, } ) return lessons # ────────────────────────────────────────── # 提交记录抓取 # ────────────────────────────────────────── def parse_record_rows(html: str) -> list[dict]: rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html, re.IGNORECASE | re.DOTALL) records = [] for i, row in enumerate(rows, 1): if "/user/" not in row or "/p/" not in row: continue user_m = re.search(r'href="/user/(\d+)"[^>]*>(.*?)</a>', row, re.DOTALL) if not user_m: continue prob_m = re.search( r'href="/p/([^"?/]+)(?:\?[^"]*)?"[^>]*>(.*?)</a>', row, re.DOTALL ) rec_id_m = re.search(r"/record/([A-Za-z0-9]+)", row) time_m = re.search(r"(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})", row) records.append( { "id": rec_id_m.group(1) if rec_id_m else str(i), "uid": int(user_m.group(1)), "uname": strip_tags(user_m.group(2)), "pid": strip_tags(prob_m.group(1)) if prob_m else "", "status": detect_status(row), "time": normalize_time(time_m.group(1)) if time_m else "", } ) return records def fetch_hw_official_pids(client: httpx.Client, hw_id: str) -> list[str]: """ 从作业页面读取官方题目列表,作为分母。 避免用提交记录反推题目导致计数偏差。 """ r = client.get(f"/homework/{hw_id}") pids = re.findall(r'href="/p/([A-Za-z0-9]+)"', r.text) return list(dict.fromkeys(pids)) # 去重保序 def fetch_hw_records( client: httpx.Client, hw_id: str, max_pages: int = 30 ) -> list[dict]: records, seen = [], set() for page in range(1, max_pages + 1): r = client.get(f"/record?tid={hw_id}&page={page}") if r.status_code != 200: break rows = parse_record_rows(r.text) if not rows: break new = 0 for rec in rows: if rec["id"] not in seen: seen.add(rec["id"]) records.append(rec) new += 1 if new == 0: break if f"page={page + 1}" not in r.text: break records.sort(key=lambda x: x.get("time", "")) return records # ────────────────────────────────────────── # 分析逻辑 # ────────────────────────────────────────── def pattern_label(attempts: int, solved: bool) -> str: if attempts == 0: return "未提交" if not solved: return "尝试未通过" if attempts == 1: return "一气呵成 ⚡" if attempts <= 3: return "调试改进" if attempts <= 6: return "多次尝试" return "耐心调试" def analyze_student(uid: int, uname: str, lessons: list[dict]) -> dict: """ 以题目为粒度统计完成情况: - total[type]["total"] = 该类型所有作业的总题目数 - total[type]["done"] = 学生 AC 的题目数 - total[type]["attempts"] = 学生该类型总提交次数 """ result = { "uid": uid, "uname": uname, "lessons": [], "total": { "A": {"done": 0, "total": 0, "attempts": 0}, "B": {"done": 0, "total": 0, "attempts": 0}, "C": {"done": 0, "total": 0, "attempts": 0}, }, "all_errors": defaultdict(int), } for lesson in lessons: ls = {"title": lesson["title"], "problems": [], "present": False} for hw in lesson["homeworks"]: recs = hw["records"] # ── 官方题目列表(从作业页面读取,不从提交记录推断)── official_pids = hw.get("official_pids", []) all_pids = ( official_pids if official_pids else sorted(set(r["pid"] for r in recs if r["pid"])) ) n_total_pids = len(all_pids) # ── 该学生的提交 ── stu_recs = [r for r in recs if r["uid"] == uid] attempts = len(stu_recs) # 该学生 AC 的题目(只统计在官方题目列表内的) official_pids_set = set(all_pids) stu_ac_pids = set( r["pid"] for r in stu_recs if r["status"] == "AC" and r["pid"] and r["pid"] in official_pids_set ) n_solved = len(stu_ac_pids) # 错误类型(非 AC 提交) errors = [r["status"] for r in stu_recs if r["status"] != "AC"] err_counts = defaultdict(int) for e in errors: err_counts[e] += 1 result["all_errors"][e] += 1 t = hw["type"] if t in ("A", "B", "C"): result["total"][t]["total"] += n_total_pids result["total"][t]["attempts"] += attempts result["total"][t]["done"] += n_solved # 完成情况文字 if attempts == 0: pat = "未提交" elif n_solved == n_total_pids: pat = f"全部完成 ({n_solved}/{n_total_pids})" else: pat = f"部分完成 ({n_solved}/{n_total_pids})" ls["problems"].append( { "name": hw["name"], "type": t, "attempts": attempts, "n_solved": n_solved, "n_total": n_total_pids, "solved": n_solved == n_total_pids and n_total_pids > 0, "pattern": pat, "errors": dict(err_counts), } ) if attempts > 0: ls["present"] = True result["lessons"].append(ls) result["all_errors"] = dict(result["all_errors"]) return result # ────────────────────────────────────────── # 报告生成 # ────────────────────────────────────────── MEDAL = {0: "🥇 第1名", 1: "🥈 第2名", 2: "🥉 第3名"} def pct(done, total): return 0.0 if total == 0 else done / total * 100 def rank_scores(all_analyses: list[dict], key: str): scores = [] for a in all_analyses: t = a["total"][key] scores.append((a["uid"], a["uname"], pct(t["done"], t["total"]))) scores.sort(key=lambda x: -x[2]) return scores def build_report( target_uid: int, target_name: str, all_analyses: list[dict], lessons: list[dict], ) -> str: target = next((a for a in all_analyses if a["uid"] == target_uid), None) if not target: return f"❌ 未找到 uid={target_uid} 的提交记录(该学生可能还未提交过任何题目)" lines = [] W = 62 lines.append("=" * W) lines.append(f" {target_name} · 2026春季班 CSP04 学习报告") lines.append( f" 统计课次:前 {len(lessons)} 节 | 生成: {datetime.now().strftime('%Y-%m-%d %H:%M')}" ) lines.append("=" * W) t = target["total"] a_rate = pct(t["A"]["done"], t["A"]["total"]) b_rate = pct(t["B"]["done"], t["B"]["total"]) c_rate = pct(t["C"]["done"], t["C"]["total"]) attend = sum(1 for l in target["lessons"] if l["present"]) # ── 一、总体概览 ── lines.append("") lines.append("【一、总体学习概览】") lines.append(f" 出勤情况:{attend}/{len(lessons)} 节有提交记录") lines.append( f" 课堂练习(A):{t['A']['done']}/{t['A']['total']} 题 完成率 {a_rate:.0f}% 累计 {t['A']['attempts']} 次提交" ) lines.append( f" 课后作业(B):{t['B']['done']}/{t['B']['total']} 题 完成率 {b_rate:.0f}% 累计 {t['B']['attempts']} 次提交" ) lines.append( f" 拓展练习(C):{t['C']['done']}/{t['C']['total']} 题 完成率 {c_rate:.0f}% 累计 {t['C']['attempts']} 次提交" ) # ── 二、逐节课 ── lines.append("") lines.append("【二、逐节课做题明细】") for i, ls in enumerate(target["lessons"], 1): tag = "✅" if ls["present"] else "⬜" lines.append(f"\n {tag} 第{i}节 {ls['title']}") for p in ls["problems"]: if p["attempts"] == 0: icon = "—" elif p["solved"]: icon = "✓" else: icon = "○" type_label = {"A": "课堂练习", "B": "课后作业", "C": "拓展练习"}.get( p["type"], "其他" ) err_str = "" if p["errors"]: err_str = ( " [" + ", ".join(f"{k}×{v}" for k, v in p["errors"].items()) + "]" ) lines.append( f" {icon} {type_label} {p['attempts']}次提交 {p['pattern']}{err_str}" ) # ── 三、班级对比 ── lines.append("") lines.append("【三、在班级中的位置】") n = len(all_analyses) for key, label in [("A", "课堂练习"), ("B", "课后作业"), ("C", "拓展练习")]: scores = rank_scores(all_analyses, key) avg = sum(s for _, _, s in scores) / len(scores) if scores else 0 rank = next(i for i, (uid, _, _) in enumerate(scores) if uid == target_uid) my_sc = next(s for uid, _, s in scores if uid == target_uid) medal_str = MEDAL.get(rank, f"第{rank + 1}名") lines.append( f"\n {label} {medal_str} {target_name}: {my_sc:.0f}% (均值: {avg:.0f}%,共{n}人)" ) lines.append(" ┌" + "─" * 42) for j, (uid, uname, sc) in enumerate(scores): bar = "█" * int(sc / 5) me = " ← 本人" if uid == target_uid else "" lines.append(f" │ {j + 1:2d}. {uname[:5]:<5s} {sc:5.1f}% {bar}{me}") lines.append(" └" + "─" * 42) # ── 四、亮点 ── lines.append("") lines.append("【四、做得好的地方 ✨】") strengths = [] if attend == len(lessons): strengths.append(f"全勤出勤:{len(lessons)} 节课均有提交记录,学习连贯性强") elif attend >= len(lessons) * 0.75: strengths.append(f"出勤率高达 {attend}/{len(lessons)} 节,学习节奏稳定") scores_a = rank_scores(all_analyses, "A") rank_a = next(i for i, (u, _, _) in enumerate(scores_a) if u == target_uid) scores_b = rank_scores(all_analyses, "B") rank_b = next(i for i, (u, _, _) in enumerate(scores_b) if u == target_uid) scores_c = rank_scores(all_analyses, "C") rank_c = next(i for i, (u, _, _) in enumerate(scores_c) if u == target_uid) if rank_a == 0: strengths.append( f"课堂练习完成率全班第一({a_rate:.0f}%),课堂专注度和吸收能力突出" ) elif rank_a <= 2: strengths.append(f"课堂练习完成率班级前三({a_rate:.0f}%),课堂表现优秀") elif a_rate >= 80: strengths.append(f"课堂练习完成率 {a_rate:.0f}%,基础掌握较扎实") if b_rate >= 80: strengths.append( f"课后作业完成率 {b_rate:.0f}%,课后巩固习惯{'很好' if b_rate >= 90 else '较好'}" ) if rank_b == 0 and b_rate > 0: strengths.append("课后作业完成率全班最高,自律性强、学习投入度高") if c_rate > 0: strengths.append( f"有完成拓展练习({c_rate:.0f}%),主动挑战高难度题,很有进取心" ) if rank_c == 0 and c_rate > 0: strengths.append("拓展练习全班第一,学有余力且善于钻研") total_solved = t["A"]["done"] + t["B"]["done"] + t["C"]["done"] total_attempts = t["A"]["attempts"] + t["B"]["attempts"] + t["C"]["attempts"] if total_solved > 0: avg_tries = total_attempts / total_solved if avg_tries <= 2: strengths.append( f"平均仅需 {avg_tries:.1f} 次提交即可通过,思路清晰、代码质量高" ) if not strengths: strengths.append("坚持参与学习,持续积累是进步的基础") for s in strengths: lines.append(f" ✅ {s}") # ── 五、待改进 ── lines.append("") lines.append("【五、需要加强的地方 📌】") gaps = [] if a_rate < 60: gaps.append( f"课堂练习完成率偏低({a_rate:.0f}%),课堂时间利用率有待提升," "遇到卡点建议及时向老师请教,不要在一个问题上停留太久" ) elif a_rate < 80: gaps.append( f"课堂练习完成率 {a_rate:.0f}%,还有提升空间,建议下笔前先把题意和思路整理清楚" ) if b_rate < 30: gaps.append( f"课后作业完成率仅 {b_rate:.0f}%,课后练习严重不足——知识点在课堂上理解了," "但不做题就很容易遗忘,建议每次课后至少完成必做作业" ) elif b_rate < 60: gaps.append(f"课后作业完成率 {b_rate:.0f}%,课后练习不够充分,建议提高完成频率") if c_rate == 0 and t["C"]["total"] > 0: gaps.append("拓展练习暂无提交——不要求全部做完,但尝试一下对思维提升很有帮助") if total_solved > 0: avg_tries = total_attempts / total_solved if avg_tries > 8: gaps.append( f"平均每题需要 {avg_tries:.1f} 次才通过,建议做题前先在草稿纸上梳理逻辑," "把样例手动跑一遍,再提交,大幅减少无效试错" ) elif avg_tries > 5: gaps.append( f"平均每题约 {avg_tries:.1f} 次提交通过,养成'提交前自检'习惯可以进一步减少次数" ) errs = target["all_errors"] if errs.get("WA", 0) >= 10: gaps.append( f"答案错误(WA)累计 {errs['WA']} 次,建议重点练习边界条件判断和分类讨论," "每次WA后认真分析是哪种情况遗漏了" ) if errs.get("CE", 0) >= 5: gaps.append(f"编译错误(CE)累计 {errs['CE']} 次,提交前先确认代码没有语法错误") if errs.get("RE", 0) >= 5: gaps.append(f"运行错误(RE){errs['RE']} 次,注意数组下标越界和递归终止条件") if errs.get("TLE", 0) >= 3: gaps.append( f"超时(TLE){errs['TLE']} 次,需要关注算法时间复杂度,避免暴力解法" ) if not gaps: gaps.append("整体表现均衡,建议进一步挑战拓展练习,向更高水平迈进") for g in gaps: lines.append(f" 📌 {g}") # ── 六、家长建议 ── lines.append("") lines.append("【六、给家长的话】") if b_rate < 50: lines.append(" • 课后作业完成率不高,建议每次上完课后家长提醒孩子完成练习,") lines.append(' 最好当天做,养成"当天课当天练"的好习惯,效果事半功倍。') else: lines.append(" • 课后练习完成情况不错!鼓励孩子继续保持,") lines.append(" 也可以问问孩子今天学了什么,帮助他用语言组织、加深理解。") if c_rate == 0 and t["C"]["total"] > 0: lines.append(" • 拓展练习还没开始尝试,如果孩子平时有余力,") lines.append(" 可以鼓励他试试看,不要求做完,重要的是训练思维的过程。") lines.append(" • 信奥学习贵在坚持,遇到难题时多鼓励孩子,") lines.append(" 告诉他“卡住了很正常,调试出来才是成长”。") lines.append(" 有任何疑问随时联系老师~") lines.append("") lines.append("=" * W) return "\n".join(lines) # ────────────────────────────────────────── # 主流程 # ────────────────────────────────────────── def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--course-id", required=True) parser.add_argument("--uid", required=True, type=int) parser.add_argument("--name", default="") parser.add_argument("--lessons", default=8, type=int, help="分析前 N 节课") args = parser.parse_args() client = httpx.Client(base_url=OJ_BASE_URL, timeout=60.0, follow_redirects=True) # 1. 登录 print("\n[1] 登录 OJ...") if not login(client): sys.exit(1) # 2. 解析课程结构 print(f"\n[2] 解析课程结构: {args.course_id}") all_lessons = fetch_course_structure(client, args.course_id) print(f" 共找到 {len(all_lessons)} 个有作业的节次") for i, l in enumerate(all_lessons, 1): hw_summary = " | ".join(f"{h['name']}" for h in l["homeworks"]) print(f" 第{i}节: {l['title']} [{hw_summary}]") lessons = all_lessons[: args.lessons] print(f"\n 分析前 {len(lessons)} 节") # 3. 抓取每节课的题目列表 + 提交记录 print(f"\n[3] 抓取作业题目列表与提交记录...") for i, lesson in enumerate(lessons, 1): print(f" 第{i}节: {lesson['title']}") for hw in lesson["homeworks"]: # 先拉官方题目列表 official_pids = fetch_hw_official_pids(client, hw["hw_id"]) hw["official_pids"] = official_pids # 再拉提交记录 recs = fetch_hw_records(client, hw["hw_id"]) hw["records"] = recs n_students = len({r["uid"] for r in recs}) print( f" {hw['name']} (tid={hw['hw_id'][:8]}...): " f"{len(official_pids)} 道题 / {len(recs)} 条记录 / {n_students} 名学生" ) client.close() # 4. 汇总所有学生 print(f"\n[4] 汇总学生列表...") uid_uname: dict[int, str] = {} for lesson in lessons: for hw in lesson["homeworks"]: for rec in hw["records"]: uid_uname[rec["uid"]] = rec["uname"] if args.uid not in uid_uname: uid_uname[args.uid] = args.name or f"uid{args.uid}" elif args.name: uid_uname[args.uid] = args.name print(f" 共 {len(uid_uname)} 名学生有记录") for uid, uname in sorted(uid_uname.items(), key=lambda x: x[1]): tag = " ← 目标" if uid == args.uid else "" print(f" uid={uid} {uname}{tag}") # 5. 分析所有学生 print(f"\n[5] 分析各学生数据...") all_analyses = [] for uid, uname in sorted(uid_uname.items()): a = analyze_student(uid, uname, lessons) all_analyses.append(a) t = a["total"] print( f" {uname}(uid={uid}): " f"A={t['A']['done']}/{t['A']['total']} " f"B={t['B']['done']}/{t['B']['total']} " f"C={t['C']['done']}/{t['C']['total']}" ) # 6. 生成报告 target_name = uid_uname.get(args.uid, args.name or f"uid{args.uid}") print(f"\n[6] 生成报告: {target_name} (uid={args.uid})") report = build_report(args.uid, target_name, all_analyses, lessons) print("\n" + report) # 7. 保存 out_dir = PROJECT_ROOT / ".claude" / "memory" / "oj" / "analysis" out_dir.mkdir(parents=True, exist_ok=True) out_txt = out_dir / f"course_{args.course_id[:8]}_{args.uid}_report.txt" out_txt.write_text(report, encoding="utf-8") print(f"\n📄 报告已保存: {out_txt}") out_json = out_dir / f"course_{args.course_id[:8]}_{args.uid}_raw.json" payload = { "course_id": args.course_id, "target_uid": args.uid, "target_name": target_name, "generated_at": datetime.now().isoformat(), "lessons_analyzed": len(lessons), "lesson_titles": [l["title"] for l in lessons], "all_analyses": [ {k: v for k, v in a.items() if k != "lessons"} for a in all_analyses ], } out_json.write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8" ) print(f"📊 数据已保存: {out_json}") if __name__ == "__main__": main()