更新K4课程README,清理旧的班级记忆文件,调整目录结构
This commit is contained in:
710
scripts/analyze_course_student.py
Normal file
710
scripts/analyze_course_student.py
Normal file
@@ -0,0 +1,710 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
课程学生表现分析脚本
|
||||
用法:
|
||||
python scripts/analyze_course_student.py \\
|
||||
--course-id 6975983771e15346c9e8fdc0 \\
|
||||
--uid 248 --name 王梓骏 --lessons 8
|
||||
"""
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from html import unescape
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
|
||||
def load_env(env_path: Path) -> dict:
|
||||
d = {}
|
||||
if env_path.exists():
|
||||
for line in env_path.read_text(encoding="utf-8").split("\n"):
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#") and "=" in line:
|
||||
k, v = line.split("=", 1)
|
||||
d[k.strip()] = v.strip()
|
||||
return d
|
||||
|
||||
|
||||
env = load_env(PROJECT_ROOT / ".env")
|
||||
OJ_BASE_URL = os.environ.get("OJ_BASE_URL") or env.get(
|
||||
"OJ_BASE_URL", "https://oj.qonnwolf.com"
|
||||
)
|
||||
USERNAME = os.environ.get("OJ_USERNAME") or env.get("OJ_USERNAME", "")
|
||||
PASSWORD = os.environ.get("OJ_PASSWORD") or env.get("OJ_PASSWORD", "")
|
||||
|
||||
STATUS_MAP = {
|
||||
"Accepted": "AC",
|
||||
"Wrong Answer": "WA",
|
||||
"Compile Error": "CE",
|
||||
"Time Limit Exceeded": "TLE",
|
||||
"Memory Limit Exceeded": "MLE",
|
||||
"Runtime Error": "RE",
|
||||
"Presentation Error": "PE",
|
||||
"Output Limit Exceeded": "OLE",
|
||||
"Runtime Error on Test": "RE",
|
||||
"Partial Accepted": "PA",
|
||||
}
|
||||
|
||||
# 作业名称 → 类型
|
||||
HW_TYPE = {
|
||||
"课堂练习": "A",
|
||||
"课后作业": "B",
|
||||
"拓展练习": "C",
|
||||
}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────
|
||||
# 基础工具
|
||||
# ──────────────────────────────────────────
|
||||
|
||||
|
||||
def strip_tags(html: str) -> str:
|
||||
return unescape(re.sub(r"<[^>]+>", "", html)).replace("\xa0", " ").strip()
|
||||
|
||||
|
||||
def detect_status(row_html: str) -> str:
|
||||
for kw, st in STATUS_MAP.items():
|
||||
if kw in row_html:
|
||||
return st
|
||||
return "UNKNOWN"
|
||||
|
||||
|
||||
def normalize_time(raw: str) -> str:
|
||||
m = re.search(r"(\d{4})-(\d{1,2})-(\d{1,2})\s+(\d{1,2}):(\d{2}):(\d{2})", raw)
|
||||
if not m:
|
||||
return ""
|
||||
y, mo, d, h, mi, s = m.groups()
|
||||
return f"{y}-{int(mo):02d}-{int(d):02d}T{int(h):02d}:{mi}:{s}"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────
|
||||
# 登录
|
||||
# ──────────────────────────────────────────
|
||||
|
||||
|
||||
def login(client: httpx.Client) -> bool:
|
||||
try:
|
||||
r = client.post("/login", json={"uname": USERNAME, "password": PASSWORD})
|
||||
r.raise_for_status()
|
||||
ok = any(c.name == "sid" for c in client.cookies.jar)
|
||||
print(f" {'[OK] 登录成功' if ok else '[X] 登录失败'}: {USERNAME}")
|
||||
return ok
|
||||
except Exception as e:
|
||||
print(f" [X] 登录异常: {e}")
|
||||
return False
|
||||
|
||||
|
||||
# ──────────────────────────────────────────
|
||||
# 课程结构抓取(解析 sectionHWMap)
|
||||
# ──────────────────────────────────────────
|
||||
|
||||
|
||||
def fetch_course_structure(client: httpx.Client, course_id: str) -> list[dict]:
|
||||
"""
|
||||
从课程页面解析所有节次和作业 ID。
|
||||
返回: [{"title": "知识回顾", "homeworks": [{"hw_id": "...", "type": "A/B/C", "name": "..."}]}, ...]
|
||||
按页面顺序排列,跳过 homeworks 为空的节次。
|
||||
"""
|
||||
r = client.get(f"/course/{course_id}")
|
||||
html = r.text
|
||||
|
||||
# 1) 提取节次标题(按 HTML 顺序)
|
||||
# openGlobalPanel('<section_id>', '<title>')
|
||||
panel_calls = re.findall(
|
||||
r"openGlobalPanel\('([A-Fa-f0-9]{24})',\s*'([^']+)'\)", html
|
||||
)
|
||||
section_titles: dict[str, str] = {} # id → title
|
||||
section_order: list[str] = [] # ordered section ids
|
||||
for sid, title in panel_calls:
|
||||
if sid not in section_titles:
|
||||
section_titles[sid] = title
|
||||
section_order.append(sid)
|
||||
|
||||
# 2) 提取 sectionHWMap
|
||||
m = re.search(r"sectionHWMap:\s*(\{.*?\}),\s*\n\s*open", html, re.DOTALL)
|
||||
if not m:
|
||||
print(" [X] 未找到 sectionHWMap")
|
||||
return []
|
||||
|
||||
raw_map = m.group(1)
|
||||
section_hws: dict[str, list[dict]] = {} # section_id → [{hw_id, name}]
|
||||
for sid_m in re.finditer(r'"([A-Fa-f0-9]{24})":\s*\[(.*?)\]', raw_map, re.DOTALL):
|
||||
sid = sid_m.group(1)
|
||||
entries = re.findall(
|
||||
r'\{\s*id:\s*"([^"]+)",\s*name:\s*"([^"]+)"', sid_m.group(2)
|
||||
)
|
||||
section_hws[sid] = [{"hw_id": hw_id, "name": name} for hw_id, name in entries]
|
||||
|
||||
# 3) 按顺序组合
|
||||
lessons = []
|
||||
for sid in section_order:
|
||||
hws_raw = section_hws.get(sid, [])
|
||||
if not hws_raw:
|
||||
continue # 该节次无作业,跳过
|
||||
homeworks = []
|
||||
for hw in hws_raw:
|
||||
# 去掉 emoji,识别类型
|
||||
name_clean = re.sub(r"[^\w\s]", "", hw["name"]).strip()
|
||||
hw_type = "other"
|
||||
for key, t in HW_TYPE.items():
|
||||
if key in hw["name"]:
|
||||
hw_type = t
|
||||
break
|
||||
homeworks.append(
|
||||
{"hw_id": hw["hw_id"], "type": hw_type, "name": hw["name"]}
|
||||
)
|
||||
lessons.append(
|
||||
{
|
||||
"section_id": sid,
|
||||
"title": section_titles.get(sid, sid),
|
||||
"homeworks": homeworks,
|
||||
}
|
||||
)
|
||||
|
||||
return lessons
|
||||
|
||||
|
||||
# ──────────────────────────────────────────
|
||||
# 提交记录抓取
|
||||
# ──────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_record_rows(html: str) -> list[dict]:
|
||||
rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html, re.IGNORECASE | re.DOTALL)
|
||||
records = []
|
||||
for i, row in enumerate(rows, 1):
|
||||
if "/user/" not in row or "/p/" not in row:
|
||||
continue
|
||||
user_m = re.search(r'href="/user/(\d+)"[^>]*>(.*?)</a>', row, re.DOTALL)
|
||||
if not user_m:
|
||||
continue
|
||||
prob_m = re.search(
|
||||
r'href="/p/([^"?/]+)(?:\?[^"]*)?"[^>]*>(.*?)</a>', row, re.DOTALL
|
||||
)
|
||||
rec_id_m = re.search(r"/record/([A-Za-z0-9]+)", row)
|
||||
time_m = re.search(r"(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})", row)
|
||||
records.append(
|
||||
{
|
||||
"id": rec_id_m.group(1) if rec_id_m else str(i),
|
||||
"uid": int(user_m.group(1)),
|
||||
"uname": strip_tags(user_m.group(2)),
|
||||
"pid": strip_tags(prob_m.group(1)) if prob_m else "",
|
||||
"status": detect_status(row),
|
||||
"time": normalize_time(time_m.group(1)) if time_m else "",
|
||||
}
|
||||
)
|
||||
return records
|
||||
|
||||
|
||||
def fetch_hw_official_pids(client: httpx.Client, hw_id: str) -> list[str]:
|
||||
"""
|
||||
从作业页面读取官方题目列表,作为分母。
|
||||
避免用提交记录反推题目导致计数偏差。
|
||||
"""
|
||||
r = client.get(f"/homework/{hw_id}")
|
||||
pids = re.findall(r'href="/p/([A-Za-z0-9]+)"', r.text)
|
||||
return list(dict.fromkeys(pids)) # 去重保序
|
||||
|
||||
|
||||
def fetch_hw_records(
|
||||
client: httpx.Client, hw_id: str, max_pages: int = 30
|
||||
) -> list[dict]:
|
||||
records, seen = [], set()
|
||||
for page in range(1, max_pages + 1):
|
||||
r = client.get(f"/record?tid={hw_id}&page={page}")
|
||||
if r.status_code != 200:
|
||||
break
|
||||
rows = parse_record_rows(r.text)
|
||||
if not rows:
|
||||
break
|
||||
new = 0
|
||||
for rec in rows:
|
||||
if rec["id"] not in seen:
|
||||
seen.add(rec["id"])
|
||||
records.append(rec)
|
||||
new += 1
|
||||
if new == 0:
|
||||
break
|
||||
if f"page={page + 1}" not in r.text:
|
||||
break
|
||||
records.sort(key=lambda x: x.get("time", ""))
|
||||
return records
|
||||
|
||||
|
||||
# ──────────────────────────────────────────
|
||||
# 分析逻辑
|
||||
# ──────────────────────────────────────────
|
||||
|
||||
|
||||
def pattern_label(attempts: int, solved: bool) -> str:
|
||||
if attempts == 0:
|
||||
return "未提交"
|
||||
if not solved:
|
||||
return "尝试未通过"
|
||||
if attempts == 1:
|
||||
return "一气呵成 ⚡"
|
||||
if attempts <= 3:
|
||||
return "调试改进"
|
||||
if attempts <= 6:
|
||||
return "多次尝试"
|
||||
return "耐心调试"
|
||||
|
||||
|
||||
def analyze_student(uid: int, uname: str, lessons: list[dict]) -> dict:
|
||||
"""
|
||||
以题目为粒度统计完成情况:
|
||||
- total[type]["total"] = 该类型所有作业的总题目数
|
||||
- total[type]["done"] = 学生 AC 的题目数
|
||||
- total[type]["attempts"] = 学生该类型总提交次数
|
||||
"""
|
||||
result = {
|
||||
"uid": uid,
|
||||
"uname": uname,
|
||||
"lessons": [],
|
||||
"total": {
|
||||
"A": {"done": 0, "total": 0, "attempts": 0},
|
||||
"B": {"done": 0, "total": 0, "attempts": 0},
|
||||
"C": {"done": 0, "total": 0, "attempts": 0},
|
||||
},
|
||||
"all_errors": defaultdict(int),
|
||||
}
|
||||
|
||||
for lesson in lessons:
|
||||
ls = {"title": lesson["title"], "problems": [], "present": False}
|
||||
for hw in lesson["homeworks"]:
|
||||
recs = hw["records"]
|
||||
|
||||
# ── 官方题目列表(从作业页面读取,不从提交记录推断)──
|
||||
official_pids = hw.get("official_pids", [])
|
||||
all_pids = (
|
||||
official_pids
|
||||
if official_pids
|
||||
else sorted(set(r["pid"] for r in recs if r["pid"]))
|
||||
)
|
||||
n_total_pids = len(all_pids)
|
||||
|
||||
# ── 该学生的提交 ──
|
||||
stu_recs = [r for r in recs if r["uid"] == uid]
|
||||
attempts = len(stu_recs)
|
||||
|
||||
# 该学生 AC 的题目(只统计在官方题目列表内的)
|
||||
official_pids_set = set(all_pids)
|
||||
stu_ac_pids = set(
|
||||
r["pid"]
|
||||
for r in stu_recs
|
||||
if r["status"] == "AC" and r["pid"] and r["pid"] in official_pids_set
|
||||
)
|
||||
n_solved = len(stu_ac_pids)
|
||||
|
||||
# 错误类型(非 AC 提交)
|
||||
errors = [r["status"] for r in stu_recs if r["status"] != "AC"]
|
||||
err_counts = defaultdict(int)
|
||||
for e in errors:
|
||||
err_counts[e] += 1
|
||||
result["all_errors"][e] += 1
|
||||
|
||||
t = hw["type"]
|
||||
if t in ("A", "B", "C"):
|
||||
result["total"][t]["total"] += n_total_pids
|
||||
result["total"][t]["attempts"] += attempts
|
||||
result["total"][t]["done"] += n_solved
|
||||
|
||||
# 完成情况文字
|
||||
if attempts == 0:
|
||||
pat = "未提交"
|
||||
elif n_solved == n_total_pids:
|
||||
pat = f"全部完成 ({n_solved}/{n_total_pids})"
|
||||
else:
|
||||
pat = f"部分完成 ({n_solved}/{n_total_pids})"
|
||||
|
||||
ls["problems"].append(
|
||||
{
|
||||
"name": hw["name"],
|
||||
"type": t,
|
||||
"attempts": attempts,
|
||||
"n_solved": n_solved,
|
||||
"n_total": n_total_pids,
|
||||
"solved": n_solved == n_total_pids and n_total_pids > 0,
|
||||
"pattern": pat,
|
||||
"errors": dict(err_counts),
|
||||
}
|
||||
)
|
||||
if attempts > 0:
|
||||
ls["present"] = True
|
||||
result["lessons"].append(ls)
|
||||
|
||||
result["all_errors"] = dict(result["all_errors"])
|
||||
return result
|
||||
|
||||
|
||||
# ──────────────────────────────────────────
|
||||
# 报告生成
|
||||
# ──────────────────────────────────────────
|
||||
|
||||
MEDAL = {0: "🥇 第1名", 1: "🥈 第2名", 2: "🥉 第3名"}
|
||||
|
||||
|
||||
def pct(done, total):
|
||||
return 0.0 if total == 0 else done / total * 100
|
||||
|
||||
|
||||
def rank_scores(all_analyses: list[dict], key: str):
|
||||
scores = []
|
||||
for a in all_analyses:
|
||||
t = a["total"][key]
|
||||
scores.append((a["uid"], a["uname"], pct(t["done"], t["total"])))
|
||||
scores.sort(key=lambda x: -x[2])
|
||||
return scores
|
||||
|
||||
|
||||
def build_report(
|
||||
target_uid: int,
|
||||
target_name: str,
|
||||
all_analyses: list[dict],
|
||||
lessons: list[dict],
|
||||
) -> str:
|
||||
target = next((a for a in all_analyses if a["uid"] == target_uid), None)
|
||||
if not target:
|
||||
return f"❌ 未找到 uid={target_uid} 的提交记录(该学生可能还未提交过任何题目)"
|
||||
|
||||
lines = []
|
||||
W = 62
|
||||
lines.append("=" * W)
|
||||
lines.append(f" {target_name} · 2026春季班 CSP04 学习报告")
|
||||
lines.append(
|
||||
f" 统计课次:前 {len(lessons)} 节 | 生成: {datetime.now().strftime('%Y-%m-%d %H:%M')}"
|
||||
)
|
||||
lines.append("=" * W)
|
||||
|
||||
t = target["total"]
|
||||
a_rate = pct(t["A"]["done"], t["A"]["total"])
|
||||
b_rate = pct(t["B"]["done"], t["B"]["total"])
|
||||
c_rate = pct(t["C"]["done"], t["C"]["total"])
|
||||
attend = sum(1 for l in target["lessons"] if l["present"])
|
||||
|
||||
# ── 一、总体概览 ──
|
||||
lines.append("")
|
||||
lines.append("【一、总体学习概览】")
|
||||
lines.append(f" 出勤情况:{attend}/{len(lessons)} 节有提交记录")
|
||||
lines.append(
|
||||
f" 课堂练习(A):{t['A']['done']}/{t['A']['total']} 题 完成率 {a_rate:.0f}% 累计 {t['A']['attempts']} 次提交"
|
||||
)
|
||||
lines.append(
|
||||
f" 课后作业(B):{t['B']['done']}/{t['B']['total']} 题 完成率 {b_rate:.0f}% 累计 {t['B']['attempts']} 次提交"
|
||||
)
|
||||
lines.append(
|
||||
f" 拓展练习(C):{t['C']['done']}/{t['C']['total']} 题 完成率 {c_rate:.0f}% 累计 {t['C']['attempts']} 次提交"
|
||||
)
|
||||
|
||||
# ── 二、逐节课 ──
|
||||
lines.append("")
|
||||
lines.append("【二、逐节课做题明细】")
|
||||
for i, ls in enumerate(target["lessons"], 1):
|
||||
tag = "✅" if ls["present"] else "⬜"
|
||||
lines.append(f"\n {tag} 第{i}节 {ls['title']}")
|
||||
for p in ls["problems"]:
|
||||
if p["attempts"] == 0:
|
||||
icon = "—"
|
||||
elif p["solved"]:
|
||||
icon = "✓"
|
||||
else:
|
||||
icon = "○"
|
||||
type_label = {"A": "课堂练习", "B": "课后作业", "C": "拓展练习"}.get(
|
||||
p["type"], "其他"
|
||||
)
|
||||
err_str = ""
|
||||
if p["errors"]:
|
||||
err_str = (
|
||||
" [" + ", ".join(f"{k}×{v}" for k, v in p["errors"].items()) + "]"
|
||||
)
|
||||
lines.append(
|
||||
f" {icon} {type_label} {p['attempts']}次提交 {p['pattern']}{err_str}"
|
||||
)
|
||||
|
||||
# ── 三、班级对比 ──
|
||||
lines.append("")
|
||||
lines.append("【三、在班级中的位置】")
|
||||
n = len(all_analyses)
|
||||
for key, label in [("A", "课堂练习"), ("B", "课后作业"), ("C", "拓展练习")]:
|
||||
scores = rank_scores(all_analyses, key)
|
||||
avg = sum(s for _, _, s in scores) / len(scores) if scores else 0
|
||||
rank = next(i for i, (uid, _, _) in enumerate(scores) if uid == target_uid)
|
||||
my_sc = next(s for uid, _, s in scores if uid == target_uid)
|
||||
medal_str = MEDAL.get(rank, f"第{rank + 1}名")
|
||||
lines.append(
|
||||
f"\n {label} {medal_str} {target_name}: {my_sc:.0f}% (均值: {avg:.0f}%,共{n}人)"
|
||||
)
|
||||
lines.append(" ┌" + "─" * 42)
|
||||
for j, (uid, uname, sc) in enumerate(scores):
|
||||
bar = "█" * int(sc / 5)
|
||||
me = " ← 本人" if uid == target_uid else ""
|
||||
lines.append(f" │ {j + 1:2d}. {uname[:5]:<5s} {sc:5.1f}% {bar}{me}")
|
||||
lines.append(" └" + "─" * 42)
|
||||
|
||||
# ── 四、亮点 ──
|
||||
lines.append("")
|
||||
lines.append("【四、做得好的地方 ✨】")
|
||||
strengths = []
|
||||
|
||||
if attend == len(lessons):
|
||||
strengths.append(f"全勤出勤:{len(lessons)} 节课均有提交记录,学习连贯性强")
|
||||
elif attend >= len(lessons) * 0.75:
|
||||
strengths.append(f"出勤率高达 {attend}/{len(lessons)} 节,学习节奏稳定")
|
||||
|
||||
scores_a = rank_scores(all_analyses, "A")
|
||||
rank_a = next(i for i, (u, _, _) in enumerate(scores_a) if u == target_uid)
|
||||
scores_b = rank_scores(all_analyses, "B")
|
||||
rank_b = next(i for i, (u, _, _) in enumerate(scores_b) if u == target_uid)
|
||||
scores_c = rank_scores(all_analyses, "C")
|
||||
rank_c = next(i for i, (u, _, _) in enumerate(scores_c) if u == target_uid)
|
||||
|
||||
if rank_a == 0:
|
||||
strengths.append(
|
||||
f"课堂练习完成率全班第一({a_rate:.0f}%),课堂专注度和吸收能力突出"
|
||||
)
|
||||
elif rank_a <= 2:
|
||||
strengths.append(f"课堂练习完成率班级前三({a_rate:.0f}%),课堂表现优秀")
|
||||
elif a_rate >= 80:
|
||||
strengths.append(f"课堂练习完成率 {a_rate:.0f}%,基础掌握较扎实")
|
||||
|
||||
if b_rate >= 80:
|
||||
strengths.append(
|
||||
f"课后作业完成率 {b_rate:.0f}%,课后巩固习惯{'很好' if b_rate >= 90 else '较好'}"
|
||||
)
|
||||
if rank_b == 0 and b_rate > 0:
|
||||
strengths.append("课后作业完成率全班最高,自律性强、学习投入度高")
|
||||
|
||||
if c_rate > 0:
|
||||
strengths.append(
|
||||
f"有完成拓展练习({c_rate:.0f}%),主动挑战高难度题,很有进取心"
|
||||
)
|
||||
if rank_c == 0 and c_rate > 0:
|
||||
strengths.append("拓展练习全班第一,学有余力且善于钻研")
|
||||
|
||||
total_solved = t["A"]["done"] + t["B"]["done"] + t["C"]["done"]
|
||||
total_attempts = t["A"]["attempts"] + t["B"]["attempts"] + t["C"]["attempts"]
|
||||
if total_solved > 0:
|
||||
avg_tries = total_attempts / total_solved
|
||||
if avg_tries <= 2:
|
||||
strengths.append(
|
||||
f"平均仅需 {avg_tries:.1f} 次提交即可通过,思路清晰、代码质量高"
|
||||
)
|
||||
|
||||
if not strengths:
|
||||
strengths.append("坚持参与学习,持续积累是进步的基础")
|
||||
|
||||
for s in strengths:
|
||||
lines.append(f" ✅ {s}")
|
||||
|
||||
# ── 五、待改进 ──
|
||||
lines.append("")
|
||||
lines.append("【五、需要加强的地方 📌】")
|
||||
gaps = []
|
||||
|
||||
if a_rate < 60:
|
||||
gaps.append(
|
||||
f"课堂练习完成率偏低({a_rate:.0f}%),课堂时间利用率有待提升,"
|
||||
"遇到卡点建议及时向老师请教,不要在一个问题上停留太久"
|
||||
)
|
||||
elif a_rate < 80:
|
||||
gaps.append(
|
||||
f"课堂练习完成率 {a_rate:.0f}%,还有提升空间,建议下笔前先把题意和思路整理清楚"
|
||||
)
|
||||
|
||||
if b_rate < 30:
|
||||
gaps.append(
|
||||
f"课后作业完成率仅 {b_rate:.0f}%,课后练习严重不足——知识点在课堂上理解了,"
|
||||
"但不做题就很容易遗忘,建议每次课后至少完成必做作业"
|
||||
)
|
||||
elif b_rate < 60:
|
||||
gaps.append(f"课后作业完成率 {b_rate:.0f}%,课后练习不够充分,建议提高完成频率")
|
||||
|
||||
if c_rate == 0 and t["C"]["total"] > 0:
|
||||
gaps.append("拓展练习暂无提交——不要求全部做完,但尝试一下对思维提升很有帮助")
|
||||
|
||||
if total_solved > 0:
|
||||
avg_tries = total_attempts / total_solved
|
||||
if avg_tries > 8:
|
||||
gaps.append(
|
||||
f"平均每题需要 {avg_tries:.1f} 次才通过,建议做题前先在草稿纸上梳理逻辑,"
|
||||
"把样例手动跑一遍,再提交,大幅减少无效试错"
|
||||
)
|
||||
elif avg_tries > 5:
|
||||
gaps.append(
|
||||
f"平均每题约 {avg_tries:.1f} 次提交通过,养成'提交前自检'习惯可以进一步减少次数"
|
||||
)
|
||||
|
||||
errs = target["all_errors"]
|
||||
if errs.get("WA", 0) >= 10:
|
||||
gaps.append(
|
||||
f"答案错误(WA)累计 {errs['WA']} 次,建议重点练习边界条件判断和分类讨论,"
|
||||
"每次WA后认真分析是哪种情况遗漏了"
|
||||
)
|
||||
if errs.get("CE", 0) >= 5:
|
||||
gaps.append(f"编译错误(CE)累计 {errs['CE']} 次,提交前先确认代码没有语法错误")
|
||||
if errs.get("RE", 0) >= 5:
|
||||
gaps.append(f"运行错误(RE){errs['RE']} 次,注意数组下标越界和递归终止条件")
|
||||
if errs.get("TLE", 0) >= 3:
|
||||
gaps.append(
|
||||
f"超时(TLE){errs['TLE']} 次,需要关注算法时间复杂度,避免暴力解法"
|
||||
)
|
||||
|
||||
if not gaps:
|
||||
gaps.append("整体表现均衡,建议进一步挑战拓展练习,向更高水平迈进")
|
||||
|
||||
for g in gaps:
|
||||
lines.append(f" 📌 {g}")
|
||||
|
||||
# ── 六、家长建议 ──
|
||||
lines.append("")
|
||||
lines.append("【六、给家长的话】")
|
||||
if b_rate < 50:
|
||||
lines.append(" • 课后作业完成率不高,建议每次上完课后家长提醒孩子完成练习,")
|
||||
lines.append(' 最好当天做,养成"当天课当天练"的好习惯,效果事半功倍。')
|
||||
else:
|
||||
lines.append(" • 课后练习完成情况不错!鼓励孩子继续保持,")
|
||||
lines.append(" 也可以问问孩子今天学了什么,帮助他用语言组织、加深理解。")
|
||||
|
||||
if c_rate == 0 and t["C"]["total"] > 0:
|
||||
lines.append(" • 拓展练习还没开始尝试,如果孩子平时有余力,")
|
||||
lines.append(" 可以鼓励他试试看,不要求做完,重要的是训练思维的过程。")
|
||||
|
||||
lines.append(" • 信奥学习贵在坚持,遇到难题时多鼓励孩子,")
|
||||
lines.append(" 告诉他“卡住了很正常,调试出来才是成长”。")
|
||||
lines.append(" 有任何疑问随时联系老师~")
|
||||
|
||||
lines.append("")
|
||||
lines.append("=" * W)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────
|
||||
# 主流程
|
||||
# ──────────────────────────────────────────
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--course-id", required=True)
|
||||
parser.add_argument("--uid", required=True, type=int)
|
||||
parser.add_argument("--name", default="")
|
||||
parser.add_argument("--lessons", default=8, type=int, help="分析前 N 节课")
|
||||
args = parser.parse_args()
|
||||
|
||||
client = httpx.Client(base_url=OJ_BASE_URL, timeout=60.0, follow_redirects=True)
|
||||
|
||||
# 1. 登录
|
||||
print("\n[1] 登录 OJ...")
|
||||
if not login(client):
|
||||
sys.exit(1)
|
||||
|
||||
# 2. 解析课程结构
|
||||
print(f"\n[2] 解析课程结构: {args.course_id}")
|
||||
all_lessons = fetch_course_structure(client, args.course_id)
|
||||
print(f" 共找到 {len(all_lessons)} 个有作业的节次")
|
||||
for i, l in enumerate(all_lessons, 1):
|
||||
hw_summary = " | ".join(f"{h['name']}" for h in l["homeworks"])
|
||||
print(f" 第{i}节: {l['title']} [{hw_summary}]")
|
||||
|
||||
lessons = all_lessons[: args.lessons]
|
||||
print(f"\n 分析前 {len(lessons)} 节")
|
||||
|
||||
# 3. 抓取每节课的题目列表 + 提交记录
|
||||
print(f"\n[3] 抓取作业题目列表与提交记录...")
|
||||
for i, lesson in enumerate(lessons, 1):
|
||||
print(f" 第{i}节: {lesson['title']}")
|
||||
for hw in lesson["homeworks"]:
|
||||
# 先拉官方题目列表
|
||||
official_pids = fetch_hw_official_pids(client, hw["hw_id"])
|
||||
hw["official_pids"] = official_pids
|
||||
# 再拉提交记录
|
||||
recs = fetch_hw_records(client, hw["hw_id"])
|
||||
hw["records"] = recs
|
||||
n_students = len({r["uid"] for r in recs})
|
||||
print(
|
||||
f" {hw['name']} (tid={hw['hw_id'][:8]}...): "
|
||||
f"{len(official_pids)} 道题 / {len(recs)} 条记录 / {n_students} 名学生"
|
||||
)
|
||||
|
||||
client.close()
|
||||
|
||||
# 4. 汇总所有学生
|
||||
print(f"\n[4] 汇总学生列表...")
|
||||
uid_uname: dict[int, str] = {}
|
||||
for lesson in lessons:
|
||||
for hw in lesson["homeworks"]:
|
||||
for rec in hw["records"]:
|
||||
uid_uname[rec["uid"]] = rec["uname"]
|
||||
|
||||
if args.uid not in uid_uname:
|
||||
uid_uname[args.uid] = args.name or f"uid{args.uid}"
|
||||
elif args.name:
|
||||
uid_uname[args.uid] = args.name
|
||||
|
||||
print(f" 共 {len(uid_uname)} 名学生有记录")
|
||||
for uid, uname in sorted(uid_uname.items(), key=lambda x: x[1]):
|
||||
tag = " ← 目标" if uid == args.uid else ""
|
||||
print(f" uid={uid} {uname}{tag}")
|
||||
|
||||
# 5. 分析所有学生
|
||||
print(f"\n[5] 分析各学生数据...")
|
||||
all_analyses = []
|
||||
for uid, uname in sorted(uid_uname.items()):
|
||||
a = analyze_student(uid, uname, lessons)
|
||||
all_analyses.append(a)
|
||||
t = a["total"]
|
||||
print(
|
||||
f" {uname}(uid={uid}): "
|
||||
f"A={t['A']['done']}/{t['A']['total']} "
|
||||
f"B={t['B']['done']}/{t['B']['total']} "
|
||||
f"C={t['C']['done']}/{t['C']['total']}"
|
||||
)
|
||||
|
||||
# 6. 生成报告
|
||||
target_name = uid_uname.get(args.uid, args.name or f"uid{args.uid}")
|
||||
print(f"\n[6] 生成报告: {target_name} (uid={args.uid})")
|
||||
report = build_report(args.uid, target_name, all_analyses, lessons)
|
||||
|
||||
print("\n" + report)
|
||||
|
||||
# 7. 保存
|
||||
out_dir = PROJECT_ROOT / ".claude" / "memory" / "oj" / "analysis"
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
out_txt = out_dir / f"course_{args.course_id[:8]}_{args.uid}_report.txt"
|
||||
out_txt.write_text(report, encoding="utf-8")
|
||||
print(f"\n📄 报告已保存: {out_txt}")
|
||||
|
||||
out_json = out_dir / f"course_{args.course_id[:8]}_{args.uid}_raw.json"
|
||||
payload = {
|
||||
"course_id": args.course_id,
|
||||
"target_uid": args.uid,
|
||||
"target_name": target_name,
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
"lessons_analyzed": len(lessons),
|
||||
"lesson_titles": [l["title"] for l in lessons],
|
||||
"all_analyses": [
|
||||
{k: v for k, v in a.items() if k != "lessons"} for a in all_analyses
|
||||
],
|
||||
}
|
||||
out_json.write_text(
|
||||
json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8"
|
||||
)
|
||||
print(f"📊 数据已保存: {out_json}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
19
scripts/backup.bat
Normal file
19
scripts/backup.bat
Normal file
@@ -0,0 +1,19 @@
|
||||
@echo off
|
||||
REM 本地Git备份脚本 (Windows版本)
|
||||
|
||||
echo 开始执行本地备份...
|
||||
|
||||
REM 切换到备份分支
|
||||
git checkout local-backup
|
||||
|
||||
REM 从主分支合并最新更改
|
||||
echo 从main分支合并最新更改...
|
||||
git merge main
|
||||
|
||||
REM 切换回主分支
|
||||
git checkout main
|
||||
|
||||
echo 本地备份完成!
|
||||
echo 备份分支: local-backup
|
||||
echo 可以使用 "git log local-backup" 查看备份历史
|
||||
pause
|
||||
117
scripts/query-attendance.js
Normal file
117
scripts/query-attendance.js
Normal file
@@ -0,0 +1,117 @@
|
||||
const axios = require('axios');
|
||||
require('dotenv').config();
|
||||
|
||||
const API_BASE_URL = process.env.API_BASE_URL;
|
||||
const AUTHORIZATION = process.env.AUTHORIZATION;
|
||||
|
||||
// 查询橙子老师上周整体出勤率
|
||||
async function getTeacherAttendanceRate() {
|
||||
try {
|
||||
console.log('🔍 正在查询橙子老师上周整体出勤率...');
|
||||
const response = await axios.get(`${API_BASE_URL}/reports/teacher-attendance-rates`, {
|
||||
headers: {
|
||||
'Authorization': AUTHORIZATION
|
||||
},
|
||||
params: {
|
||||
teacher_name: '橙子(程城)',
|
||||
begin_date: '2026-04-21',
|
||||
end_date: '2026-04-27'
|
||||
}
|
||||
});
|
||||
|
||||
console.log('✅ 整体出勤率数据获取成功:');
|
||||
console.log(JSON.stringify(response.data, null, 2));
|
||||
return response.data;
|
||||
} catch (error) {
|
||||
console.error('❌ 查询失败:', error.response ? error.response.data : error.message);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// 查询上周每天的详细课程和学生出勤明细
|
||||
async function getDailyTeachingSchedule() {
|
||||
const dates = [
|
||||
'2026-04-21', '2026-04-22', '2026-04-23', '2026-04-24',
|
||||
'2026-04-25', '2026-04-26', '2026-04-27'
|
||||
];
|
||||
|
||||
const allData = [];
|
||||
|
||||
for (const date of dates) {
|
||||
try {
|
||||
console.log(`\n🔍 正在查询 ${date} 的课程明细...`);
|
||||
const response = await axios.get(`${API_BASE_URL}/reports/teaching-schedule`, {
|
||||
headers: {
|
||||
'Authorization': AUTHORIZATION
|
||||
},
|
||||
params: {
|
||||
teacher_name: '橙子(程城)',
|
||||
teaching_date: date
|
||||
}
|
||||
});
|
||||
|
||||
if (response.data.code === 0 && response.data.data.items.length > 0) {
|
||||
allData.push({
|
||||
date: date,
|
||||
courses: response.data.data.items
|
||||
});
|
||||
console.log(`✅ ${date} 找到 ${response.data.data.items.length} 节课`);
|
||||
} else {
|
||||
console.log(`ℹ️ ${date} 没有课程安排`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`❌ 查询 ${date} 失败:`, error.response ? error.response.data : error.message);
|
||||
}
|
||||
}
|
||||
|
||||
return allData;
|
||||
}
|
||||
|
||||
// 主函数
|
||||
async function main() {
|
||||
// 1. 获取整体出勤率
|
||||
const overallRate = await getTeacherAttendanceRate();
|
||||
|
||||
// 2. 获取每日明细
|
||||
const dailyDetails = await getDailyTeachingSchedule();
|
||||
|
||||
// 3. 汇总数据并保存
|
||||
const result = {
|
||||
query_range: '2026-04-21 至 2026-04-27',
|
||||
teacher_name: '橙子(程城)',
|
||||
overall_attendance_rate: overallRate,
|
||||
daily_course_details: dailyDetails
|
||||
};
|
||||
|
||||
// 保存到文件
|
||||
const fs = require('fs');
|
||||
const outputPath = '上周学生出勤原始数据.json';
|
||||
fs.writeFileSync(outputPath, JSON.stringify(result, null, 2));
|
||||
|
||||
console.log(`\n🎉 所有数据查询完成,已保存到:${outputPath}`);
|
||||
console.log('\n📊 数据摘要:');
|
||||
|
||||
// 统计总出勤情况
|
||||
let totalStudents = 0;
|
||||
let attendedStudents = 0;
|
||||
let leaveStudents = 0;
|
||||
let absentStudents = 0;
|
||||
|
||||
dailyDetails.forEach(day => {
|
||||
day.courses.forEach(course => {
|
||||
course.students.forEach(student => {
|
||||
totalStudents++;
|
||||
if (student.attendance_status === '✅ 出勤') attendedStudents++;
|
||||
else if (student.attendance_status === '⏸ 请假') leaveStudents++;
|
||||
else if (student.attendance_status === '❌ 缺勤') absentStudents++;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
console.log(`- 总学生人次:${totalStudents}`);
|
||||
console.log(`- 正常出勤:${attendedStudents} 人次(${totalStudents > 0 ? ((attendedStudents / totalStudents) * 100).toFixed(2) : 0}%)`);
|
||||
console.log(`- 请假:${leaveStudents} 人次(${totalStudents > 0 ? ((leaveStudents / totalStudents) * 100).toFixed(2) : 0}%)`);
|
||||
console.log(`- 缺勤:${absentStudents} 人次(${totalStudents > 0 ? ((absentStudents / totalStudents) * 100).toFixed(2) : 0}%)`);
|
||||
}
|
||||
|
||||
main();
|
||||
705
scripts/update_feedback_with_oj.py
Normal file
705
scripts/update_feedback_with_oj.py
Normal file
@@ -0,0 +1,705 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
获取CSP05-03课次的OJ作业数据,并更新已有学生反馈。
|
||||
|
||||
使用方式:
|
||||
python scripts/update_feedback_with_oj.py
|
||||
|
||||
功能:
|
||||
1. 登录OJ系统
|
||||
2. 从hw_dict.json获取CSP05-03的作业ID列表
|
||||
3. 抓取每个作业的提交记录
|
||||
4. 按学生分析做题情况(思考模式、错误分布)
|
||||
5. 将OJ数据分析结果追加到已有的反馈文件中
|
||||
"""
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
|
||||
# 修复Windows控制台编码(GBK无法输出emoji)
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
|
||||
from datetime import datetime
|
||||
from html import unescape
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
def load_env_from_file(env_path: Path) -> dict:
|
||||
"""从.env文件加载环境变量"""
|
||||
env_vars = {}
|
||||
if env_path.exists():
|
||||
for line in env_path.read_text(encoding='utf-8').split('\n'):
|
||||
line = line.strip()
|
||||
if line and not line.startswith('#') and '=' in line:
|
||||
key, value = line.split('=', 1)
|
||||
env_vars[key] = value
|
||||
return env_vars
|
||||
|
||||
|
||||
# 加载.env配置
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
env_vars = load_env_from_file(PROJECT_ROOT / '.env')
|
||||
|
||||
# ========== 配置(优先级:环境变量 > .env文件 > 默认值) ==========
|
||||
OJ_BASE_URL = os.environ.get('OJ_BASE_URL') or env_vars.get('OJ_BASE_URL', 'https://oj.qonnwolf.com')
|
||||
USERNAME = os.environ.get('OJ_USERNAME') or env_vars.get('OJ_USERNAME', '')
|
||||
PASSWORD = os.environ.get('OJ_PASSWORD') or env_vars.get('OJ_PASSWORD', '')
|
||||
|
||||
# 默认值(Claude Code运行时会通过命令行参数覆盖)
|
||||
COURSE_CODE = "CSP05-03"
|
||||
COURSE_TITLE = ""
|
||||
CLASS_NAME = "CSP05克力周六1600"
|
||||
FEEDBACK_DATE = ""
|
||||
|
||||
# 出勤学生名单(通过--students参数传入,逗号分隔)
|
||||
ATTENDING_STUDENTS = []
|
||||
|
||||
HW_DICT_PATH = PROJECT_ROOT / "config" / "hw_dict.json"
|
||||
CLASS_DIR = None # 运行时设置
|
||||
|
||||
|
||||
def parse_args():
|
||||
"""解析命令行参数"""
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="获取OJ作业数据并更新反馈")
|
||||
parser.add_argument("--course", default="CSP05-03", help="课程代码,如 CSP05-03")
|
||||
parser.add_argument("--title", default="", help="课程标题,如 递归应用")
|
||||
parser.add_argument("--class-name", default="CSP05克力周六1600", help="班级名称")
|
||||
parser.add_argument("--date", default="", help="上课日期 YYYYMMDD,默认今天")
|
||||
parser.add_argument("--students", default="", help="出勤学生名单,逗号分隔")
|
||||
parser.add_argument("--username", default=USERNAME or "", help="OJ用户名(默认从 .env 读取)")
|
||||
parser.add_argument("--password", default=PASSWORD or "", help="OJ密码(默认从 .env 读取)")
|
||||
parser.add_argument("--get-student-oj", default="", help="只获取单个学生的OJ数据并输出Markdown,传入学生姓名")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
# ========== OJ数据获取 ==========
|
||||
|
||||
STATUS_MAP = {
|
||||
"Accepted": "AC",
|
||||
"Wrong Answer": "WA",
|
||||
"Compile Error": "CE",
|
||||
"Time Limit Exceeded": "TLE",
|
||||
"Time Limit": "TLE",
|
||||
"Memory Limit Exceeded": "MLE",
|
||||
"Memory Limit": "MLE",
|
||||
"Runtime Error": "RE",
|
||||
"Presentation Error": "PE",
|
||||
"Output Limit Exceeded": "OLE",
|
||||
}
|
||||
|
||||
|
||||
def strip_tags(raw_html: str) -> str:
|
||||
"""移除HTML标签并清理空白"""
|
||||
text = re.sub(r"<[^>]+>", "", raw_html)
|
||||
text = unescape(text).replace("\xa0", " ")
|
||||
return " ".join(text.split())
|
||||
|
||||
|
||||
def detect_status(row_html: str) -> str:
|
||||
"""识别判题状态"""
|
||||
for keyword, status in STATUS_MAP.items():
|
||||
if keyword in row_html:
|
||||
return status
|
||||
return "UNKNOWN"
|
||||
|
||||
|
||||
def normalize_submit_time(raw_text: str) -> str:
|
||||
"""标准化提交时间"""
|
||||
match = re.search(
|
||||
r"(\d{4})-(\d{1,2})-(\d{1,2})\s+(\d{1,2}):(\d{2}):(\d{2})",
|
||||
raw_text,
|
||||
)
|
||||
if not match:
|
||||
return ""
|
||||
year, month, day, hour, minute, second = match.groups()
|
||||
return f"{year}-{int(month):02d}-{int(day):02d}T{int(hour):02d}:{minute}:{second}"
|
||||
|
||||
|
||||
def parse_record_rows(html: str) -> list[dict[str, Any]]:
|
||||
"""从HTML页面解析提交记录行"""
|
||||
rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html, re.IGNORECASE | re.DOTALL)
|
||||
records: list[dict[str, Any]] = []
|
||||
|
||||
for index, row_html in enumerate(rows, start=1):
|
||||
if "/user/" not in row_html or "/p/" not in row_html:
|
||||
continue
|
||||
|
||||
# 提取用户ID和姓名
|
||||
user_match = re.search(
|
||||
r'href="/user/(\d+)"[^>]*>(.*?)</a>',
|
||||
row_html,
|
||||
re.IGNORECASE | re.DOTALL,
|
||||
)
|
||||
if not user_match:
|
||||
continue
|
||||
|
||||
# 提取题目
|
||||
problem_match = re.search(
|
||||
r'href="/p/([^"?/]+)(?:\?[^"]*)??"[^>]*>(.*?)</a>',
|
||||
row_html,
|
||||
re.IGNORECASE | re.DOTALL,
|
||||
)
|
||||
if not problem_match:
|
||||
continue
|
||||
|
||||
# 提取记录ID
|
||||
record_id_match = re.search(r'/record/([A-Za-z0-9]+)', row_html, re.IGNORECASE)
|
||||
|
||||
# 提交时间
|
||||
submit_time_match = re.search(
|
||||
r"(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})",
|
||||
row_html,
|
||||
)
|
||||
|
||||
student_name = strip_tags(user_match.group(2))
|
||||
|
||||
records.append({
|
||||
"id": record_id_match.group(1) if record_id_match else str(index),
|
||||
"student_id": int(user_match.group(1)),
|
||||
"student_name": student_name,
|
||||
"problem_id": strip_tags(problem_match.group(1)),
|
||||
"problem_title": strip_tags(problem_match.group(2)),
|
||||
"status": detect_status(row_html),
|
||||
"submit_time": (
|
||||
normalize_submit_time(submit_time_match.group(1))
|
||||
if submit_time_match else ""
|
||||
),
|
||||
})
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def login(client: httpx.Client) -> bool:
|
||||
"""登录OJ系统"""
|
||||
try:
|
||||
response = client.post("/login", json={"uname": USERNAME, "password": PASSWORD})
|
||||
response.raise_for_status()
|
||||
has_sid = any(c.name == "sid" for c in client.cookies.jar)
|
||||
if has_sid:
|
||||
print(f" [OK] 登录成功: {USERNAME}")
|
||||
return True
|
||||
else:
|
||||
print(f" [X] 登录失败: 未获取到session")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f" [X] 登录失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def fetch_homework_records(
|
||||
client: httpx.Client,
|
||||
homework_id: str,
|
||||
student_names: set[str],
|
||||
max_pages: int = 20,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""抓取作业的提交记录,只保留目标学生"""
|
||||
records: list[dict[str, Any]] = []
|
||||
seen_ids: set[str] = set()
|
||||
|
||||
for page in range(1, max_pages + 1):
|
||||
resp = client.get(f"/record?tid={homework_id}&page={page}")
|
||||
if resp.status_code != 200:
|
||||
break
|
||||
|
||||
page_records = parse_record_rows(resp.text)
|
||||
if not page_records:
|
||||
break
|
||||
|
||||
for record in page_records:
|
||||
if record["student_name"] not in student_names:
|
||||
continue
|
||||
record_id = str(record["id"])
|
||||
if record_id in seen_ids:
|
||||
continue
|
||||
seen_ids.add(record_id)
|
||||
records.append(record)
|
||||
|
||||
# 检查是否有下一页
|
||||
if f"page={page + 1}" not in resp.text:
|
||||
break
|
||||
|
||||
# 按提交时间排序
|
||||
records.sort(key=lambda x: x.get("submit_time", ""))
|
||||
return records
|
||||
|
||||
|
||||
def fetch_homework_problems(
|
||||
client: httpx.Client,
|
||||
homework_id: str,
|
||||
) -> list[dict[str, str]]:
|
||||
"""获取作业包中的具体题目列表(只取A包)"""
|
||||
resp = client.get(f"/homework/{homework_id}")
|
||||
if resp.status_code != 200:
|
||||
print(f" [X] 获取作业详情失败: {resp.status_code}")
|
||||
return []
|
||||
|
||||
problems = []
|
||||
# 匹配HTML中的题目行:
|
||||
# <a href="/p/CSP0309A1?tid=..."><b>CSP0309A1</b> 银行叫号模拟</a>
|
||||
pattern = (
|
||||
r'href="/p/([^"?]+)\?tid=' + re.escape(homework_id) +
|
||||
r'"[^>]*><b>([^<]+)</b>(?: |\s)*([^<]*)</a>'
|
||||
)
|
||||
matches = re.findall(pattern, resp.text)
|
||||
for pid, code, name in matches:
|
||||
name_clean = strip_tags(name).strip()
|
||||
if not name_clean:
|
||||
name_clean = code
|
||||
problems.append({
|
||||
"id": pid,
|
||||
"code": strip_tags(code),
|
||||
"name": name_clean,
|
||||
})
|
||||
|
||||
return problems
|
||||
|
||||
|
||||
def load_homework_ids(course_code: str) -> list[dict[str, str]]:
|
||||
"""从hw_dict.json加载作业ID,只保留A包"""
|
||||
if not HW_DICT_PATH.exists():
|
||||
print(f" [X] 找不到作业字典: {HW_DICT_PATH}")
|
||||
return []
|
||||
|
||||
data = json.loads(HW_DICT_PATH.read_text(encoding="utf-8"))
|
||||
items = data.get(course_code, [])
|
||||
|
||||
if not items:
|
||||
print(f" [X] 作业字典中没有 {course_code} 的配置")
|
||||
return []
|
||||
|
||||
# 只保留A包(课堂练习)
|
||||
a_items = [item for item in items if item["title"].endswith("A")]
|
||||
if not a_items:
|
||||
# 如果没有A包,回退到第一个
|
||||
a_items = [items[0]]
|
||||
|
||||
return [{"id": item["id"], "title": item["title"]} for item in a_items]
|
||||
|
||||
|
||||
# ========== 分析逻辑 ==========
|
||||
|
||||
def analyze_student_performance(
|
||||
student_name: str,
|
||||
all_records: dict[str, list[dict[str, Any]]],
|
||||
homework_labels: list[str],
|
||||
problem_list: list[dict[str, str]] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""分析单个学生的OJ表现
|
||||
|
||||
如果提供了 problem_list(A包的具体题目列表),则按具体题目分析;
|
||||
否则按作业包标签分析(兼容旧逻辑)。
|
||||
"""
|
||||
|
||||
analysis = {
|
||||
"name": student_name,
|
||||
"problems": {},
|
||||
"total_solved": 0,
|
||||
"total_attempts": 0,
|
||||
"total_errors": defaultdict(int),
|
||||
"patterns": [],
|
||||
}
|
||||
|
||||
# 按具体题目分析(A包模式)
|
||||
if problem_list:
|
||||
# 合并所有作业包的记录(通常只有A包)
|
||||
all_student_records: list[dict[str, Any]] = []
|
||||
for label in homework_labels:
|
||||
records = all_records.get(label, [])
|
||||
all_student_records.extend([r for r in records if r["student_name"] == student_name])
|
||||
|
||||
total_attempts_all = len(all_student_records)
|
||||
total_solved_all = 0
|
||||
|
||||
for prob in problem_list:
|
||||
prob_id = prob["id"] # 如 CSP0309A1
|
||||
prob_code = prob["code"]
|
||||
prob_name = prob["name"]
|
||||
|
||||
# 筛选该题目的提交记录(匹配 problem_id)
|
||||
student_records = [r for r in all_student_records if r["problem_id"] == prob_id]
|
||||
|
||||
attempts = len(student_records)
|
||||
solved = any(r["status"] == "AC" for r in student_records)
|
||||
errors = [r["status"] for r in student_records if r["status"] != "AC"]
|
||||
|
||||
# 判定思考模式
|
||||
if attempts == 0:
|
||||
pattern = "未提交"
|
||||
elif attempts == 1 and solved:
|
||||
pattern = "一气呵成"
|
||||
elif solved and attempts <= 3:
|
||||
pattern = "调试改进"
|
||||
elif solved and attempts <= 6:
|
||||
pattern = "多次尝试后通过"
|
||||
elif not solved and attempts >= 3:
|
||||
pattern = "遇到困难"
|
||||
elif solved:
|
||||
pattern = "耐心调试"
|
||||
else:
|
||||
pattern = "尝试中"
|
||||
|
||||
error_counts = defaultdict(int)
|
||||
for e in errors:
|
||||
error_counts[e] += 1
|
||||
|
||||
# 用题目名称作为展示标签,同时保留原始ID
|
||||
display_label = f"{prob_code} {prob_name}" if prob_name else prob_code
|
||||
|
||||
problem_info = {
|
||||
"label": display_label,
|
||||
"raw_label": prob_code,
|
||||
"attempts": attempts,
|
||||
"solved": solved,
|
||||
"pattern": pattern,
|
||||
"errors": dict(error_counts),
|
||||
"submit_times": [r.get("submit_time", "") for r in student_records],
|
||||
}
|
||||
|
||||
analysis["problems"][display_label] = problem_info
|
||||
|
||||
if solved:
|
||||
total_solved_all += 1
|
||||
for e in errors:
|
||||
analysis["total_errors"][e] += 1
|
||||
|
||||
analysis["total_attempts"] = total_attempts_all
|
||||
analysis["total_solved"] = total_solved_all
|
||||
analysis["completion"] = f"{total_solved_all}/{len(problem_list)}"
|
||||
|
||||
else:
|
||||
# 兼容旧逻辑:按作业包分析
|
||||
for label in homework_labels:
|
||||
records = all_records.get(label, [])
|
||||
student_records = [r for r in records if r["student_name"] == student_name]
|
||||
|
||||
attempts = len(student_records)
|
||||
solved = any(r["status"] == "AC" for r in student_records)
|
||||
errors = [r["status"] for r in student_records if r["status"] != "AC"]
|
||||
|
||||
if attempts == 0:
|
||||
pattern = "未提交"
|
||||
elif attempts == 1 and solved:
|
||||
pattern = "一气呵成"
|
||||
elif solved and attempts <= 3:
|
||||
pattern = "调试改进"
|
||||
elif solved and attempts <= 6:
|
||||
pattern = "多次尝试后通过"
|
||||
elif not solved and attempts >= 3:
|
||||
pattern = "遇到困难"
|
||||
elif solved:
|
||||
pattern = "耐心调试"
|
||||
else:
|
||||
pattern = "尝试中"
|
||||
|
||||
error_counts = defaultdict(int)
|
||||
for e in errors:
|
||||
error_counts[e] += 1
|
||||
|
||||
problem_info = {
|
||||
"label": label,
|
||||
"attempts": attempts,
|
||||
"solved": solved,
|
||||
"pattern": pattern,
|
||||
"errors": dict(error_counts),
|
||||
"submit_times": [r.get("submit_time", "") for r in student_records],
|
||||
}
|
||||
|
||||
analysis["problems"][label] = problem_info
|
||||
analysis["total_attempts"] += attempts
|
||||
|
||||
if solved:
|
||||
analysis["total_solved"] += 1
|
||||
|
||||
for e in errors:
|
||||
analysis["total_errors"][e] += 1
|
||||
|
||||
analysis["completion"] = f"{analysis['total_solved']}/{len(homework_labels)}"
|
||||
|
||||
analysis["total_errors"] = dict(analysis["total_errors"])
|
||||
return analysis
|
||||
|
||||
|
||||
def format_oj_section(analysis: dict[str, Any]) -> str:
|
||||
"""格式化OJ数据为Markdown反馈段落(支持A包具体题目模式)"""
|
||||
|
||||
total_solved = analysis["total_solved"]
|
||||
total_attempts = analysis["total_attempts"]
|
||||
problems = analysis["problems"]
|
||||
total_count = len(problems)
|
||||
|
||||
lines = []
|
||||
lines.append("")
|
||||
lines.append("## 【OJ做题数据】")
|
||||
lines.append("")
|
||||
lines.append(f"**完成情况**: {analysis['completion']} (共{total_attempts}次提交)")
|
||||
lines.append("")
|
||||
|
||||
# 单题详情表
|
||||
lines.append("| 题目 | 状态 | 提交次数 | 思考模式 | 错误类型 |")
|
||||
lines.append("|------|------|----------|----------|----------|")
|
||||
|
||||
for label, info in problems.items():
|
||||
icon = "✅" if info["solved"] else "❌"
|
||||
status = "通过" if info["solved"] else "未通过"
|
||||
attempts = info["attempts"]
|
||||
pattern = info["pattern"]
|
||||
|
||||
error_str = ""
|
||||
if info["errors"]:
|
||||
error_parts = [f"{k}×{v}" for k, v in info["errors"].items()]
|
||||
error_str = ", ".join(error_parts)
|
||||
else:
|
||||
error_str = "—"
|
||||
|
||||
if attempts == 0:
|
||||
lines.append(f"| {label} | ⬜ 未提交 | 0 | — | — |")
|
||||
else:
|
||||
lines.append(f"| {label} | {icon} {status} | {attempts} | {pattern} | {error_str} |")
|
||||
|
||||
lines.append("")
|
||||
|
||||
# 总结性评价(适配具体题目数量)
|
||||
if total_solved == total_count and total_count > 0:
|
||||
if total_attempts <= total_count + 2:
|
||||
lines.append(f"**📊 分析**: {total_count}题全部完成,且提交效率高,思路清晰,代码质量优秀。")
|
||||
elif total_attempts <= total_count * 2:
|
||||
lines.append(f"**📊 分析**: {total_count}题全部完成,经过适度调试后通过,展现了良好的调试能力。")
|
||||
else:
|
||||
lines.append(f"**📊 分析**: {total_count}题全部完成,共经过{total_attempts}次提交,展现了不错的耐心和坚持。")
|
||||
elif total_solved > 0:
|
||||
unsolved = [l for l, i in problems.items() if not i["solved"] and i["attempts"] > 0]
|
||||
untried = [l for l, i in problems.items() if i["attempts"] == 0]
|
||||
parts = []
|
||||
if unsolved:
|
||||
parts.append(f"{'、'.join(unsolved)}有尝试但尚未通过")
|
||||
if untried:
|
||||
parts.append(f"{'、'.join(untried)}未提交")
|
||||
lines.append(f"**📊 分析**: 完成{total_solved}题,{','.join(parts)},建议课后继续完成。")
|
||||
else:
|
||||
if total_attempts > 0:
|
||||
lines.append("**📊 分析**: 有提交但尚未通过任何题目,建议课后重点跟进。")
|
||||
else:
|
||||
lines.append("**📊 分析**: 本节课OJ作业暂无提交记录。")
|
||||
|
||||
# 错误分布
|
||||
if analysis["total_errors"]:
|
||||
errors = analysis["total_errors"]
|
||||
error_summary = []
|
||||
if "WA" in errors:
|
||||
error_summary.append(f"答案错误{errors['WA']}次")
|
||||
if "CE" in errors:
|
||||
error_summary.append(f"编译错误{errors['CE']}次")
|
||||
if "RE" in errors:
|
||||
error_summary.append(f"运行错误{errors['RE']}次")
|
||||
if "TLE" in errors:
|
||||
error_summary.append(f"超时{errors['TLE']}次")
|
||||
if error_summary:
|
||||
lines.append(f"**错误分布**: {','.join(error_summary)}。")
|
||||
|
||||
lines.append("")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ========== 反馈更新 ==========
|
||||
|
||||
def update_feedback_file(student_name: str, oj_section: str) -> bool:
|
||||
"""在已有反馈文件末尾追加OJ数据段"""
|
||||
feedback_path = CLASS_DIR / student_name / "feedback" / f"{FEEDBACK_DATE}_{COURSE_CODE}.md"
|
||||
|
||||
if not feedback_path.exists():
|
||||
print(f" [跳过] 反馈文件不存在: {feedback_path}")
|
||||
return False
|
||||
|
||||
content = feedback_path.read_text(encoding="utf-8")
|
||||
|
||||
# 检查是否已有OJ数据段
|
||||
if "【OJ做题数据】" in content:
|
||||
# 替换已有的OJ数据段
|
||||
pattern = r"\n## 【OJ做题数据】.*?(?=\n## |---\n|\Z)"
|
||||
content = re.sub(pattern, oj_section.rstrip(), content, flags=re.DOTALL)
|
||||
print(f" [更新] 替换已有OJ数据段")
|
||||
else:
|
||||
# 在 --- 分隔线前插入
|
||||
if "\n---\n" in content:
|
||||
content = content.replace("\n---\n", f"{oj_section}\n---\n", 1)
|
||||
else:
|
||||
content = content.rstrip() + "\n" + oj_section
|
||||
print(f" [新增] 追加OJ数据段")
|
||||
|
||||
feedback_path.write_text(content, encoding="utf-8")
|
||||
return True
|
||||
|
||||
|
||||
# ========== 主流程 ==========
|
||||
|
||||
def main():
|
||||
global COURSE_CODE, COURSE_TITLE, CLASS_NAME, FEEDBACK_DATE, ATTENDING_STUDENTS
|
||||
global USERNAME, PASSWORD, CLASS_DIR
|
||||
|
||||
# 解析命令行参数
|
||||
args = parse_args()
|
||||
COURSE_CODE = args.course
|
||||
COURSE_TITLE = args.title
|
||||
CLASS_NAME = args.class_name
|
||||
USERNAME = args.username
|
||||
PASSWORD = args.password
|
||||
GET_SINGLE_STUDENT = args.get_student_oj.strip()
|
||||
|
||||
# 日期处理
|
||||
if args.date:
|
||||
FEEDBACK_DATE = args.date
|
||||
else:
|
||||
FEEDBACK_DATE = datetime.now().strftime("%Y%m%d")
|
||||
|
||||
# 学生名单
|
||||
if args.students:
|
||||
ATTENDING_STUDENTS = [s.strip() for s in args.students.split(",") if s.strip()]
|
||||
elif GET_SINGLE_STUDENT:
|
||||
# 只获取单个学生的数据
|
||||
ATTENDING_STUDENTS = [GET_SINGLE_STUDENT]
|
||||
|
||||
# 设置班级目录
|
||||
CLASS_DIR = PROJECT_ROOT / ".claude" / "memory" / "class" / CLASS_NAME
|
||||
|
||||
# 如果没有学生名单,尝试从班级目录读取所有学生
|
||||
if not ATTENDING_STUDENTS:
|
||||
if CLASS_DIR.exists():
|
||||
ATTENDING_STUDENTS = [
|
||||
d.name for d in sorted(CLASS_DIR.iterdir())
|
||||
if d.is_dir() and d.name != "summaries" and (d / "profile.md").exists()
|
||||
]
|
||||
print(f" [自动] 从班级目录读取到 {len(ATTENDING_STUDENTS)} 名学生")
|
||||
if not ATTENDING_STUDENTS:
|
||||
print(" [X] 未指定出勤学生且班级目录中无学生,退出")
|
||||
return 1
|
||||
|
||||
# 如果是获取单个学生的OJ数据,简化输出
|
||||
if not GET_SINGLE_STUDENT:
|
||||
print("=" * 60)
|
||||
print(f"获取 {COURSE_CODE} {COURSE_TITLE} OJ数据并更新反馈")
|
||||
print("=" * 60)
|
||||
|
||||
# 1. 加载作业ID
|
||||
print(f"\n[1] 从 hw_dict.json 加载 {COURSE_CODE} 作业ID...")
|
||||
homeworks = load_homework_ids(COURSE_CODE)
|
||||
if not homeworks:
|
||||
print(" [X] 没有找到作业配置,退出")
|
||||
return 1
|
||||
|
||||
for hw in homeworks:
|
||||
print(f" - {hw['title']}: {hw['id']}")
|
||||
|
||||
# 2. 登录OJ
|
||||
print(f"\n[2] 登录OJ系统...")
|
||||
client = httpx.Client(base_url=OJ_BASE_URL, timeout=30.0, follow_redirects=True)
|
||||
|
||||
if not login(client):
|
||||
print(" [X] 登录失败,退出")
|
||||
return 1
|
||||
|
||||
# 3. 获取A包的具体题目列表 & 抓取提交记录
|
||||
print(f"\n[3] 获取A包题目列表并抓取提交记录...")
|
||||
student_names = set(ATTENDING_STUDENTS)
|
||||
all_records: dict[str, list[dict[str, Any]]] = {}
|
||||
homework_labels: list[str] = []
|
||||
problem_list: list[dict[str, str]] | None = None
|
||||
|
||||
for hw in homeworks:
|
||||
label = hw["title"]
|
||||
homework_labels.append(label)
|
||||
|
||||
# 获取该作业包中的具体题目(A包才有)
|
||||
problems = fetch_homework_problems(client, hw["id"])
|
||||
if problems:
|
||||
problem_list = problems
|
||||
print(f" {label} 包含 {len(problems)} 道题目:")
|
||||
for p in problems:
|
||||
print(f" - {p['code']}: {p['name']}")
|
||||
|
||||
print(f" 正在获取 {label} 提交记录...")
|
||||
records = fetch_homework_records(client, hw["id"], student_names)
|
||||
all_records[label] = records
|
||||
|
||||
student_count = len({r["student_name"] for r in records})
|
||||
print(f" 找到 {len(records)} 条记录,涉及 {student_count} 名出勤学生")
|
||||
|
||||
client.close()
|
||||
|
||||
# 4. 分析每个学生(按A包具体题目分析)
|
||||
if not GET_SINGLE_STUDENT:
|
||||
print(f"\n[4] 分析学生OJ表现...")
|
||||
analyses: list[dict[str, Any]] = []
|
||||
|
||||
for name in ATTENDING_STUDENTS:
|
||||
analysis = analyze_student_performance(name, all_records, homework_labels, problem_list)
|
||||
analyses.append(analysis)
|
||||
|
||||
if not GET_SINGLE_STUDENT:
|
||||
total_count = len(analysis["problems"])
|
||||
icon = "★" if analysis["total_solved"] == total_count and total_count > 0 else \
|
||||
"○" if analysis["total_solved"] > 0 else "✗"
|
||||
print(f" {icon} {name}: {analysis['completion']} ({analysis['total_attempts']}次提交)")
|
||||
|
||||
for label, info in analysis["problems"].items():
|
||||
p_icon = "✓" if info["solved"] else "○" if info["attempts"] > 0 else "—"
|
||||
print(f" {p_icon} {label}: {info['attempts']}次 ({info['pattern']})")
|
||||
|
||||
# 如果是获取单个学生的OJ数据,直接输出Markdown并退出
|
||||
if GET_SINGLE_STUDENT and analyses:
|
||||
oj_section = format_oj_section(analyses[0])
|
||||
print(oj_section)
|
||||
return 0
|
||||
|
||||
# 5. 更新反馈文件
|
||||
print(f"\n[5] 更新反馈文件...")
|
||||
updated = 0
|
||||
|
||||
for analysis in analyses:
|
||||
name = analysis["name"]
|
||||
print(f" 处理 {name}...")
|
||||
|
||||
oj_section = format_oj_section(analysis)
|
||||
if update_feedback_file(name, oj_section):
|
||||
updated += 1
|
||||
|
||||
# 6. 保存原始分析数据
|
||||
print(f"\n[6] 保存分析JSON数据...")
|
||||
analysis_dir = PROJECT_ROOT / ".claude" / "memory" / "oj" / "analysis"
|
||||
analysis_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
output_file = analysis_dir / f"{COURSE_CODE}_student_analysis.json"
|
||||
with open(output_file, "w", encoding="utf-8") as f:
|
||||
json.dump({
|
||||
"course": COURSE_CODE,
|
||||
"title": COURSE_TITLE,
|
||||
"date": f"{FEEDBACK_DATE[:4]}-{FEEDBACK_DATE[4:6]}-{FEEDBACK_DATE[6:8]}",
|
||||
"class": CLASS_NAME,
|
||||
"students": analyses,
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
}, f, ensure_ascii=False, indent=2)
|
||||
print(f" 保存到: {output_file}")
|
||||
|
||||
# 7. 汇总
|
||||
print(f"\n" + "=" * 60)
|
||||
print(f"✅ 完成!")
|
||||
print(f" - 出勤学生: {len(ATTENDING_STUDENTS)}")
|
||||
print(f" - 更新反馈: {updated} 份")
|
||||
|
||||
total_solved = sum(a["total_solved"] for a in analyses)
|
||||
total_possible = len(ATTENDING_STUDENTS) * len(homework_labels)
|
||||
print(f" - 整体完成率: {total_solved}/{total_possible}")
|
||||
print("=" * 60)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user