Files
ClassFeedback/scripts/analyze_course_student.py

711 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
课程学生表现分析脚本
用法:
python scripts/analyze_course_student.py \\
--course-id 6975983771e15346c9e8fdc0 \\
--uid 248 --name 王梓骏 --lessons 8
"""
import io
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime
from html import unescape
from pathlib import Path
import httpx
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
PROJECT_ROOT = Path(__file__).parent.parent
def load_env(env_path: Path) -> dict:
d = {}
if env_path.exists():
for line in env_path.read_text(encoding="utf-8").split("\n"):
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
d[k.strip()] = v.strip()
return d
env = load_env(PROJECT_ROOT / ".env")
OJ_BASE_URL = os.environ.get("OJ_BASE_URL") or env.get(
"OJ_BASE_URL", "https://oj.qonnwolf.com"
)
USERNAME = os.environ.get("OJ_USERNAME") or env.get("OJ_USERNAME", "")
PASSWORD = os.environ.get("OJ_PASSWORD") or env.get("OJ_PASSWORD", "")
STATUS_MAP = {
"Accepted": "AC",
"Wrong Answer": "WA",
"Compile Error": "CE",
"Time Limit Exceeded": "TLE",
"Memory Limit Exceeded": "MLE",
"Runtime Error": "RE",
"Presentation Error": "PE",
"Output Limit Exceeded": "OLE",
"Runtime Error on Test": "RE",
"Partial Accepted": "PA",
}
# 作业名称 → 类型
HW_TYPE = {
"课堂练习": "A",
"课后作业": "B",
"拓展练习": "C",
}
# ──────────────────────────────────────────
# 基础工具
# ──────────────────────────────────────────
def strip_tags(html: str) -> str:
return unescape(re.sub(r"<[^>]+>", "", html)).replace("\xa0", " ").strip()
def detect_status(row_html: str) -> str:
for kw, st in STATUS_MAP.items():
if kw in row_html:
return st
return "UNKNOWN"
def normalize_time(raw: str) -> str:
m = re.search(r"(\d{4})-(\d{1,2})-(\d{1,2})\s+(\d{1,2}):(\d{2}):(\d{2})", raw)
if not m:
return ""
y, mo, d, h, mi, s = m.groups()
return f"{y}-{int(mo):02d}-{int(d):02d}T{int(h):02d}:{mi}:{s}"
# ──────────────────────────────────────────
# 登录
# ──────────────────────────────────────────
def login(client: httpx.Client) -> bool:
try:
r = client.post("/login", json={"uname": USERNAME, "password": PASSWORD})
r.raise_for_status()
ok = any(c.name == "sid" for c in client.cookies.jar)
print(f" {'[OK] 登录成功' if ok else '[X] 登录失败'}: {USERNAME}")
return ok
except Exception as e:
print(f" [X] 登录异常: {e}")
return False
# ──────────────────────────────────────────
# 课程结构抓取(解析 sectionHWMap
# ──────────────────────────────────────────
def fetch_course_structure(client: httpx.Client, course_id: str) -> list[dict]:
"""
从课程页面解析所有节次和作业 ID。
返回: [{"title": "知识回顾", "homeworks": [{"hw_id": "...", "type": "A/B/C", "name": "..."}]}, ...]
按页面顺序排列,跳过 homeworks 为空的节次。
"""
r = client.get(f"/course/{course_id}")
html = r.text
# 1) 提取节次标题(按 HTML 顺序)
# openGlobalPanel('<section_id>', '<title>')
panel_calls = re.findall(
r"openGlobalPanel\('([A-Fa-f0-9]{24})',\s*'([^']+)'\)", html
)
section_titles: dict[str, str] = {} # id → title
section_order: list[str] = [] # ordered section ids
for sid, title in panel_calls:
if sid not in section_titles:
section_titles[sid] = title
section_order.append(sid)
# 2) 提取 sectionHWMap
m = re.search(r"sectionHWMap:\s*(\{.*?\}),\s*\n\s*open", html, re.DOTALL)
if not m:
print(" [X] 未找到 sectionHWMap")
return []
raw_map = m.group(1)
section_hws: dict[str, list[dict]] = {} # section_id → [{hw_id, name}]
for sid_m in re.finditer(r'"([A-Fa-f0-9]{24})":\s*\[(.*?)\]', raw_map, re.DOTALL):
sid = sid_m.group(1)
entries = re.findall(
r'\{\s*id:\s*"([^"]+)",\s*name:\s*"([^"]+)"', sid_m.group(2)
)
section_hws[sid] = [{"hw_id": hw_id, "name": name} for hw_id, name in entries]
# 3) 按顺序组合
lessons = []
for sid in section_order:
hws_raw = section_hws.get(sid, [])
if not hws_raw:
continue # 该节次无作业,跳过
homeworks = []
for hw in hws_raw:
# 去掉 emoji识别类型
name_clean = re.sub(r"[^\w\s]", "", hw["name"]).strip()
hw_type = "other"
for key, t in HW_TYPE.items():
if key in hw["name"]:
hw_type = t
break
homeworks.append(
{"hw_id": hw["hw_id"], "type": hw_type, "name": hw["name"]}
)
lessons.append(
{
"section_id": sid,
"title": section_titles.get(sid, sid),
"homeworks": homeworks,
}
)
return lessons
# ──────────────────────────────────────────
# 提交记录抓取
# ──────────────────────────────────────────
def parse_record_rows(html: str) -> list[dict]:
rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html, re.IGNORECASE | re.DOTALL)
records = []
for i, row in enumerate(rows, 1):
if "/user/" not in row or "/p/" not in row:
continue
user_m = re.search(r'href="/user/(\d+)"[^>]*>(.*?)</a>', row, re.DOTALL)
if not user_m:
continue
prob_m = re.search(
r'href="/p/([^"?/]+)(?:\?[^"]*)?"[^>]*>(.*?)</a>', row, re.DOTALL
)
rec_id_m = re.search(r"/record/([A-Za-z0-9]+)", row)
time_m = re.search(r"(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})", row)
records.append(
{
"id": rec_id_m.group(1) if rec_id_m else str(i),
"uid": int(user_m.group(1)),
"uname": strip_tags(user_m.group(2)),
"pid": strip_tags(prob_m.group(1)) if prob_m else "",
"status": detect_status(row),
"time": normalize_time(time_m.group(1)) if time_m else "",
}
)
return records
def fetch_hw_official_pids(client: httpx.Client, hw_id: str) -> list[str]:
"""
从作业页面读取官方题目列表,作为分母。
避免用提交记录反推题目导致计数偏差。
"""
r = client.get(f"/homework/{hw_id}")
pids = re.findall(r'href="/p/([A-Za-z0-9]+)"', r.text)
return list(dict.fromkeys(pids)) # 去重保序
def fetch_hw_records(
client: httpx.Client, hw_id: str, max_pages: int = 30
) -> list[dict]:
records, seen = [], set()
for page in range(1, max_pages + 1):
r = client.get(f"/record?tid={hw_id}&page={page}")
if r.status_code != 200:
break
rows = parse_record_rows(r.text)
if not rows:
break
new = 0
for rec in rows:
if rec["id"] not in seen:
seen.add(rec["id"])
records.append(rec)
new += 1
if new == 0:
break
if f"page={page + 1}" not in r.text:
break
records.sort(key=lambda x: x.get("time", ""))
return records
# ──────────────────────────────────────────
# 分析逻辑
# ──────────────────────────────────────────
def pattern_label(attempts: int, solved: bool) -> str:
if attempts == 0:
return "未提交"
if not solved:
return "尝试未通过"
if attempts == 1:
return "一气呵成 ⚡"
if attempts <= 3:
return "调试改进"
if attempts <= 6:
return "多次尝试"
return "耐心调试"
def analyze_student(uid: int, uname: str, lessons: list[dict]) -> dict:
"""
以题目为粒度统计完成情况:
- total[type]["total"] = 该类型所有作业的总题目数
- total[type]["done"] = 学生 AC 的题目数
- total[type]["attempts"] = 学生该类型总提交次数
"""
result = {
"uid": uid,
"uname": uname,
"lessons": [],
"total": {
"A": {"done": 0, "total": 0, "attempts": 0},
"B": {"done": 0, "total": 0, "attempts": 0},
"C": {"done": 0, "total": 0, "attempts": 0},
},
"all_errors": defaultdict(int),
}
for lesson in lessons:
ls = {"title": lesson["title"], "problems": [], "present": False}
for hw in lesson["homeworks"]:
recs = hw["records"]
# ── 官方题目列表(从作业页面读取,不从提交记录推断)──
official_pids = hw.get("official_pids", [])
all_pids = (
official_pids
if official_pids
else sorted(set(r["pid"] for r in recs if r["pid"]))
)
n_total_pids = len(all_pids)
# ── 该学生的提交 ──
stu_recs = [r for r in recs if r["uid"] == uid]
attempts = len(stu_recs)
# 该学生 AC 的题目(只统计在官方题目列表内的)
official_pids_set = set(all_pids)
stu_ac_pids = set(
r["pid"]
for r in stu_recs
if r["status"] == "AC" and r["pid"] and r["pid"] in official_pids_set
)
n_solved = len(stu_ac_pids)
# 错误类型(非 AC 提交)
errors = [r["status"] for r in stu_recs if r["status"] != "AC"]
err_counts = defaultdict(int)
for e in errors:
err_counts[e] += 1
result["all_errors"][e] += 1
t = hw["type"]
if t in ("A", "B", "C"):
result["total"][t]["total"] += n_total_pids
result["total"][t]["attempts"] += attempts
result["total"][t]["done"] += n_solved
# 完成情况文字
if attempts == 0:
pat = "未提交"
elif n_solved == n_total_pids:
pat = f"全部完成 ({n_solved}/{n_total_pids})"
else:
pat = f"部分完成 ({n_solved}/{n_total_pids})"
ls["problems"].append(
{
"name": hw["name"],
"type": t,
"attempts": attempts,
"n_solved": n_solved,
"n_total": n_total_pids,
"solved": n_solved == n_total_pids and n_total_pids > 0,
"pattern": pat,
"errors": dict(err_counts),
}
)
if attempts > 0:
ls["present"] = True
result["lessons"].append(ls)
result["all_errors"] = dict(result["all_errors"])
return result
# ──────────────────────────────────────────
# 报告生成
# ──────────────────────────────────────────
MEDAL = {0: "🥇 第1名", 1: "🥈 第2名", 2: "🥉 第3名"}
def pct(done, total):
return 0.0 if total == 0 else done / total * 100
def rank_scores(all_analyses: list[dict], key: str):
scores = []
for a in all_analyses:
t = a["total"][key]
scores.append((a["uid"], a["uname"], pct(t["done"], t["total"])))
scores.sort(key=lambda x: -x[2])
return scores
def build_report(
target_uid: int,
target_name: str,
all_analyses: list[dict],
lessons: list[dict],
) -> str:
target = next((a for a in all_analyses if a["uid"] == target_uid), None)
if not target:
return f"❌ 未找到 uid={target_uid} 的提交记录(该学生可能还未提交过任何题目)"
lines = []
W = 62
lines.append("=" * W)
lines.append(f" {target_name} · 2026春季班 CSP04 学习报告")
lines.append(
f" 统计课次:前 {len(lessons)} 节 | 生成: {datetime.now().strftime('%Y-%m-%d %H:%M')}"
)
lines.append("=" * W)
t = target["total"]
a_rate = pct(t["A"]["done"], t["A"]["total"])
b_rate = pct(t["B"]["done"], t["B"]["total"])
c_rate = pct(t["C"]["done"], t["C"]["total"])
attend = sum(1 for l in target["lessons"] if l["present"])
# ── 一、总体概览 ──
lines.append("")
lines.append("【一、总体学习概览】")
lines.append(f" 出勤情况:{attend}/{len(lessons)} 节有提交记录")
lines.append(
f" 课堂练习A{t['A']['done']}/{t['A']['total']} 题 完成率 {a_rate:.0f}% 累计 {t['A']['attempts']} 次提交"
)
lines.append(
f" 课后作业B{t['B']['done']}/{t['B']['total']} 题 完成率 {b_rate:.0f}% 累计 {t['B']['attempts']} 次提交"
)
lines.append(
f" 拓展练习C{t['C']['done']}/{t['C']['total']} 题 完成率 {c_rate:.0f}% 累计 {t['C']['attempts']} 次提交"
)
# ── 二、逐节课 ──
lines.append("")
lines.append("【二、逐节课做题明细】")
for i, ls in enumerate(target["lessons"], 1):
tag = "" if ls["present"] else ""
lines.append(f"\n {tag}{i}{ls['title']}")
for p in ls["problems"]:
if p["attempts"] == 0:
icon = ""
elif p["solved"]:
icon = ""
else:
icon = ""
type_label = {"A": "课堂练习", "B": "课后作业", "C": "拓展练习"}.get(
p["type"], "其他"
)
err_str = ""
if p["errors"]:
err_str = (
" [" + ", ".join(f"{k}×{v}" for k, v in p["errors"].items()) + "]"
)
lines.append(
f" {icon} {type_label} {p['attempts']}次提交 {p['pattern']}{err_str}"
)
# ── 三、班级对比 ──
lines.append("")
lines.append("【三、在班级中的位置】")
n = len(all_analyses)
for key, label in [("A", "课堂练习"), ("B", "课后作业"), ("C", "拓展练习")]:
scores = rank_scores(all_analyses, key)
avg = sum(s for _, _, s in scores) / len(scores) if scores else 0
rank = next(i for i, (uid, _, _) in enumerate(scores) if uid == target_uid)
my_sc = next(s for uid, _, s in scores if uid == target_uid)
medal_str = MEDAL.get(rank, f"{rank + 1}")
lines.append(
f"\n {label} {medal_str} {target_name}: {my_sc:.0f}% (均值: {avg:.0f}%,共{n}人)"
)
lines.append("" + "" * 42)
for j, (uid, uname, sc) in enumerate(scores):
bar = "" * int(sc / 5)
me = " ← 本人" if uid == target_uid else ""
lines.append(f"{j + 1:2d}. {uname[:5]:<5s} {sc:5.1f}% {bar}{me}")
lines.append("" + "" * 42)
# ── 四、亮点 ──
lines.append("")
lines.append("【四、做得好的地方 ✨】")
strengths = []
if attend == len(lessons):
strengths.append(f"全勤出勤:{len(lessons)} 节课均有提交记录,学习连贯性强")
elif attend >= len(lessons) * 0.75:
strengths.append(f"出勤率高达 {attend}/{len(lessons)} 节,学习节奏稳定")
scores_a = rank_scores(all_analyses, "A")
rank_a = next(i for i, (u, _, _) in enumerate(scores_a) if u == target_uid)
scores_b = rank_scores(all_analyses, "B")
rank_b = next(i for i, (u, _, _) in enumerate(scores_b) if u == target_uid)
scores_c = rank_scores(all_analyses, "C")
rank_c = next(i for i, (u, _, _) in enumerate(scores_c) if u == target_uid)
if rank_a == 0:
strengths.append(
f"课堂练习完成率全班第一({a_rate:.0f}%),课堂专注度和吸收能力突出"
)
elif rank_a <= 2:
strengths.append(f"课堂练习完成率班级前三({a_rate:.0f}%),课堂表现优秀")
elif a_rate >= 80:
strengths.append(f"课堂练习完成率 {a_rate:.0f}%,基础掌握较扎实")
if b_rate >= 80:
strengths.append(
f"课后作业完成率 {b_rate:.0f}%,课后巩固习惯{'很好' if b_rate >= 90 else '较好'}"
)
if rank_b == 0 and b_rate > 0:
strengths.append("课后作业完成率全班最高,自律性强、学习投入度高")
if c_rate > 0:
strengths.append(
f"有完成拓展练习({c_rate:.0f}%),主动挑战高难度题,很有进取心"
)
if rank_c == 0 and c_rate > 0:
strengths.append("拓展练习全班第一,学有余力且善于钻研")
total_solved = t["A"]["done"] + t["B"]["done"] + t["C"]["done"]
total_attempts = t["A"]["attempts"] + t["B"]["attempts"] + t["C"]["attempts"]
if total_solved > 0:
avg_tries = total_attempts / total_solved
if avg_tries <= 2:
strengths.append(
f"平均仅需 {avg_tries:.1f} 次提交即可通过,思路清晰、代码质量高"
)
if not strengths:
strengths.append("坚持参与学习,持续积累是进步的基础")
for s in strengths:
lines.append(f"{s}")
# ── 五、待改进 ──
lines.append("")
lines.append("【五、需要加强的地方 📌】")
gaps = []
if a_rate < 60:
gaps.append(
f"课堂练习完成率偏低({a_rate:.0f}%),课堂时间利用率有待提升,"
"遇到卡点建议及时向老师请教,不要在一个问题上停留太久"
)
elif a_rate < 80:
gaps.append(
f"课堂练习完成率 {a_rate:.0f}%,还有提升空间,建议下笔前先把题意和思路整理清楚"
)
if b_rate < 30:
gaps.append(
f"课后作业完成率仅 {b_rate:.0f}%,课后练习严重不足——知识点在课堂上理解了,"
"但不做题就很容易遗忘,建议每次课后至少完成必做作业"
)
elif b_rate < 60:
gaps.append(f"课后作业完成率 {b_rate:.0f}%,课后练习不够充分,建议提高完成频率")
if c_rate == 0 and t["C"]["total"] > 0:
gaps.append("拓展练习暂无提交——不要求全部做完,但尝试一下对思维提升很有帮助")
if total_solved > 0:
avg_tries = total_attempts / total_solved
if avg_tries > 8:
gaps.append(
f"平均每题需要 {avg_tries:.1f} 次才通过,建议做题前先在草稿纸上梳理逻辑,"
"把样例手动跑一遍,再提交,大幅减少无效试错"
)
elif avg_tries > 5:
gaps.append(
f"平均每题约 {avg_tries:.1f} 次提交通过,养成'提交前自检'习惯可以进一步减少次数"
)
errs = target["all_errors"]
if errs.get("WA", 0) >= 10:
gaps.append(
f"答案错误WA累计 {errs['WA']} 次,建议重点练习边界条件判断和分类讨论,"
"每次WA后认真分析是哪种情况遗漏了"
)
if errs.get("CE", 0) >= 5:
gaps.append(f"编译错误CE累计 {errs['CE']} 次,提交前先确认代码没有语法错误")
if errs.get("RE", 0) >= 5:
gaps.append(f"运行错误RE{errs['RE']} 次,注意数组下标越界和递归终止条件")
if errs.get("TLE", 0) >= 3:
gaps.append(
f"超时TLE{errs['TLE']} 次,需要关注算法时间复杂度,避免暴力解法"
)
if not gaps:
gaps.append("整体表现均衡,建议进一步挑战拓展练习,向更高水平迈进")
for g in gaps:
lines.append(f" 📌 {g}")
# ── 六、家长建议 ──
lines.append("")
lines.append("【六、给家长的话】")
if b_rate < 50:
lines.append(" • 课后作业完成率不高,建议每次上完课后家长提醒孩子完成练习,")
lines.append(' 最好当天做,养成"当天课当天练"的好习惯,效果事半功倍。')
else:
lines.append(" • 课后练习完成情况不错!鼓励孩子继续保持,")
lines.append(" 也可以问问孩子今天学了什么,帮助他用语言组织、加深理解。")
if c_rate == 0 and t["C"]["total"] > 0:
lines.append(" • 拓展练习还没开始尝试,如果孩子平时有余力,")
lines.append(" 可以鼓励他试试看,不要求做完,重要的是训练思维的过程。")
lines.append(" • 信奥学习贵在坚持,遇到难题时多鼓励孩子,")
lines.append(" 告诉他“卡住了很正常,调试出来才是成长”。")
lines.append(" 有任何疑问随时联系老师~")
lines.append("")
lines.append("=" * W)
return "\n".join(lines)
# ──────────────────────────────────────────
# 主流程
# ──────────────────────────────────────────
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--course-id", required=True)
parser.add_argument("--uid", required=True, type=int)
parser.add_argument("--name", default="")
parser.add_argument("--lessons", default=8, type=int, help="分析前 N 节课")
args = parser.parse_args()
client = httpx.Client(base_url=OJ_BASE_URL, timeout=60.0, follow_redirects=True)
# 1. 登录
print("\n[1] 登录 OJ...")
if not login(client):
sys.exit(1)
# 2. 解析课程结构
print(f"\n[2] 解析课程结构: {args.course_id}")
all_lessons = fetch_course_structure(client, args.course_id)
print(f" 共找到 {len(all_lessons)} 个有作业的节次")
for i, l in enumerate(all_lessons, 1):
hw_summary = " | ".join(f"{h['name']}" for h in l["homeworks"])
print(f"{i}节: {l['title']} [{hw_summary}]")
lessons = all_lessons[: args.lessons]
print(f"\n 分析前 {len(lessons)}")
# 3. 抓取每节课的题目列表 + 提交记录
print(f"\n[3] 抓取作业题目列表与提交记录...")
for i, lesson in enumerate(lessons, 1):
print(f"{i}节: {lesson['title']}")
for hw in lesson["homeworks"]:
# 先拉官方题目列表
official_pids = fetch_hw_official_pids(client, hw["hw_id"])
hw["official_pids"] = official_pids
# 再拉提交记录
recs = fetch_hw_records(client, hw["hw_id"])
hw["records"] = recs
n_students = len({r["uid"] for r in recs})
print(
f" {hw['name']} (tid={hw['hw_id'][:8]}...): "
f"{len(official_pids)} 道题 / {len(recs)} 条记录 / {n_students} 名学生"
)
client.close()
# 4. 汇总所有学生
print(f"\n[4] 汇总学生列表...")
uid_uname: dict[int, str] = {}
for lesson in lessons:
for hw in lesson["homeworks"]:
for rec in hw["records"]:
uid_uname[rec["uid"]] = rec["uname"]
if args.uid not in uid_uname:
uid_uname[args.uid] = args.name or f"uid{args.uid}"
elif args.name:
uid_uname[args.uid] = args.name
print(f"{len(uid_uname)} 名学生有记录")
for uid, uname in sorted(uid_uname.items(), key=lambda x: x[1]):
tag = " ← 目标" if uid == args.uid else ""
print(f" uid={uid} {uname}{tag}")
# 5. 分析所有学生
print(f"\n[5] 分析各学生数据...")
all_analyses = []
for uid, uname in sorted(uid_uname.items()):
a = analyze_student(uid, uname, lessons)
all_analyses.append(a)
t = a["total"]
print(
f" {uname}(uid={uid}): "
f"A={t['A']['done']}/{t['A']['total']} "
f"B={t['B']['done']}/{t['B']['total']} "
f"C={t['C']['done']}/{t['C']['total']}"
)
# 6. 生成报告
target_name = uid_uname.get(args.uid, args.name or f"uid{args.uid}")
print(f"\n[6] 生成报告: {target_name} (uid={args.uid})")
report = build_report(args.uid, target_name, all_analyses, lessons)
print("\n" + report)
# 7. 保存
out_dir = PROJECT_ROOT / ".claude" / "memory" / "oj" / "analysis"
out_dir.mkdir(parents=True, exist_ok=True)
out_txt = out_dir / f"course_{args.course_id[:8]}_{args.uid}_report.txt"
out_txt.write_text(report, encoding="utf-8")
print(f"\n📄 报告已保存: {out_txt}")
out_json = out_dir / f"course_{args.course_id[:8]}_{args.uid}_raw.json"
payload = {
"course_id": args.course_id,
"target_uid": args.uid,
"target_name": target_name,
"generated_at": datetime.now().isoformat(),
"lessons_analyzed": len(lessons),
"lesson_titles": [l["title"] for l in lessons],
"all_analyses": [
{k: v for k, v in a.items() if k != "lessons"} for a in all_analyses
],
}
out_json.write_text(
json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f"📊 数据已保存: {out_json}")
if __name__ == "__main__":
main()