706 lines
24 KiB
Python
706 lines
24 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
获取CSP05-03课次的OJ作业数据,并更新已有学生反馈。
|
||
|
||
使用方式:
|
||
python scripts/update_feedback_with_oj.py
|
||
|
||
功能:
|
||
1. 登录OJ系统
|
||
2. 从hw_dict.json获取CSP05-03的作业ID列表
|
||
3. 抓取每个作业的提交记录
|
||
4. 按学生分析做题情况(思考模式、错误分布)
|
||
5. 将OJ数据分析结果追加到已有的反馈文件中
|
||
"""
|
||
|
||
import io
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
from collections import defaultdict
|
||
|
||
# 修复Windows控制台编码(GBK无法输出emoji)
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
|
||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
|
||
from datetime import datetime
|
||
from html import unescape
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import httpx
|
||
|
||
|
||
def load_env_from_file(env_path: Path) -> dict:
|
||
"""从.env文件加载环境变量"""
|
||
env_vars = {}
|
||
if env_path.exists():
|
||
for line in env_path.read_text(encoding='utf-8').split('\n'):
|
||
line = line.strip()
|
||
if line and not line.startswith('#') and '=' in line:
|
||
key, value = line.split('=', 1)
|
||
env_vars[key] = value
|
||
return env_vars
|
||
|
||
|
||
# 加载.env配置
|
||
PROJECT_ROOT = Path(__file__).parent.parent
|
||
env_vars = load_env_from_file(PROJECT_ROOT / '.env')
|
||
|
||
# ========== 配置(优先级:环境变量 > .env文件 > 默认值) ==========
|
||
OJ_BASE_URL = os.environ.get('OJ_BASE_URL') or env_vars.get('OJ_BASE_URL', 'https://oj.qonnwolf.com')
|
||
USERNAME = os.environ.get('OJ_USERNAME') or env_vars.get('OJ_USERNAME', '')
|
||
PASSWORD = os.environ.get('OJ_PASSWORD') or env_vars.get('OJ_PASSWORD', '')
|
||
|
||
# 默认值(Claude Code运行时会通过命令行参数覆盖)
|
||
COURSE_CODE = "CSP05-03"
|
||
COURSE_TITLE = ""
|
||
CLASS_NAME = "CSP05克力周六1600"
|
||
FEEDBACK_DATE = ""
|
||
|
||
# 出勤学生名单(通过--students参数传入,逗号分隔)
|
||
ATTENDING_STUDENTS = []
|
||
|
||
HW_DICT_PATH = PROJECT_ROOT / "config" / "hw_dict.json"
|
||
CLASS_DIR = None # 运行时设置
|
||
|
||
|
||
def parse_args():
|
||
"""解析命令行参数"""
|
||
import argparse
|
||
parser = argparse.ArgumentParser(description="获取OJ作业数据并更新反馈")
|
||
parser.add_argument("--course", default="CSP05-03", help="课程代码,如 CSP05-03")
|
||
parser.add_argument("--title", default="", help="课程标题,如 递归应用")
|
||
parser.add_argument("--class-name", default="CSP05克力周六1600", help="班级名称")
|
||
parser.add_argument("--date", default="", help="上课日期 YYYYMMDD,默认今天")
|
||
parser.add_argument("--students", default="", help="出勤学生名单,逗号分隔")
|
||
parser.add_argument("--username", default=USERNAME or "", help="OJ用户名(默认从 .env 读取)")
|
||
parser.add_argument("--password", default=PASSWORD or "", help="OJ密码(默认从 .env 读取)")
|
||
parser.add_argument("--get-student-oj", default="", help="只获取单个学生的OJ数据并输出Markdown,传入学生姓名")
|
||
return parser.parse_args()
|
||
|
||
|
||
# ========== OJ数据获取 ==========
|
||
|
||
STATUS_MAP = {
|
||
"Accepted": "AC",
|
||
"Wrong Answer": "WA",
|
||
"Compile Error": "CE",
|
||
"Time Limit Exceeded": "TLE",
|
||
"Time Limit": "TLE",
|
||
"Memory Limit Exceeded": "MLE",
|
||
"Memory Limit": "MLE",
|
||
"Runtime Error": "RE",
|
||
"Presentation Error": "PE",
|
||
"Output Limit Exceeded": "OLE",
|
||
}
|
||
|
||
|
||
def strip_tags(raw_html: str) -> str:
|
||
"""移除HTML标签并清理空白"""
|
||
text = re.sub(r"<[^>]+>", "", raw_html)
|
||
text = unescape(text).replace("\xa0", " ")
|
||
return " ".join(text.split())
|
||
|
||
|
||
def detect_status(row_html: str) -> str:
|
||
"""识别判题状态"""
|
||
for keyword, status in STATUS_MAP.items():
|
||
if keyword in row_html:
|
||
return status
|
||
return "UNKNOWN"
|
||
|
||
|
||
def normalize_submit_time(raw_text: str) -> str:
|
||
"""标准化提交时间"""
|
||
match = re.search(
|
||
r"(\d{4})-(\d{1,2})-(\d{1,2})\s+(\d{1,2}):(\d{2}):(\d{2})",
|
||
raw_text,
|
||
)
|
||
if not match:
|
||
return ""
|
||
year, month, day, hour, minute, second = match.groups()
|
||
return f"{year}-{int(month):02d}-{int(day):02d}T{int(hour):02d}:{minute}:{second}"
|
||
|
||
|
||
def parse_record_rows(html: str) -> list[dict[str, Any]]:
|
||
"""从HTML页面解析提交记录行"""
|
||
rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html, re.IGNORECASE | re.DOTALL)
|
||
records: list[dict[str, Any]] = []
|
||
|
||
for index, row_html in enumerate(rows, start=1):
|
||
if "/user/" not in row_html or "/p/" not in row_html:
|
||
continue
|
||
|
||
# 提取用户ID和姓名
|
||
user_match = re.search(
|
||
r'href="/user/(\d+)"[^>]*>(.*?)</a>',
|
||
row_html,
|
||
re.IGNORECASE | re.DOTALL,
|
||
)
|
||
if not user_match:
|
||
continue
|
||
|
||
# 提取题目
|
||
problem_match = re.search(
|
||
r'href="/p/([^"?/]+)(?:\?[^"]*)??"[^>]*>(.*?)</a>',
|
||
row_html,
|
||
re.IGNORECASE | re.DOTALL,
|
||
)
|
||
if not problem_match:
|
||
continue
|
||
|
||
# 提取记录ID
|
||
record_id_match = re.search(r'/record/([A-Za-z0-9]+)', row_html, re.IGNORECASE)
|
||
|
||
# 提交时间
|
||
submit_time_match = re.search(
|
||
r"(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{2}:\d{2})",
|
||
row_html,
|
||
)
|
||
|
||
student_name = strip_tags(user_match.group(2))
|
||
|
||
records.append({
|
||
"id": record_id_match.group(1) if record_id_match else str(index),
|
||
"student_id": int(user_match.group(1)),
|
||
"student_name": student_name,
|
||
"problem_id": strip_tags(problem_match.group(1)),
|
||
"problem_title": strip_tags(problem_match.group(2)),
|
||
"status": detect_status(row_html),
|
||
"submit_time": (
|
||
normalize_submit_time(submit_time_match.group(1))
|
||
if submit_time_match else ""
|
||
),
|
||
})
|
||
|
||
return records
|
||
|
||
|
||
def login(client: httpx.Client) -> bool:
|
||
"""登录OJ系统"""
|
||
try:
|
||
response = client.post("/login", json={"uname": USERNAME, "password": PASSWORD})
|
||
response.raise_for_status()
|
||
has_sid = any(c.name == "sid" for c in client.cookies.jar)
|
||
if has_sid:
|
||
print(f" [OK] 登录成功: {USERNAME}")
|
||
return True
|
||
else:
|
||
print(f" [X] 登录失败: 未获取到session")
|
||
return False
|
||
except Exception as e:
|
||
print(f" [X] 登录失败: {e}")
|
||
return False
|
||
|
||
|
||
def fetch_homework_records(
|
||
client: httpx.Client,
|
||
homework_id: str,
|
||
student_names: set[str],
|
||
max_pages: int = 20,
|
||
) -> list[dict[str, Any]]:
|
||
"""抓取作业的提交记录,只保留目标学生"""
|
||
records: list[dict[str, Any]] = []
|
||
seen_ids: set[str] = set()
|
||
|
||
for page in range(1, max_pages + 1):
|
||
resp = client.get(f"/record?tid={homework_id}&page={page}")
|
||
if resp.status_code != 200:
|
||
break
|
||
|
||
page_records = parse_record_rows(resp.text)
|
||
if not page_records:
|
||
break
|
||
|
||
for record in page_records:
|
||
if record["student_name"] not in student_names:
|
||
continue
|
||
record_id = str(record["id"])
|
||
if record_id in seen_ids:
|
||
continue
|
||
seen_ids.add(record_id)
|
||
records.append(record)
|
||
|
||
# 检查是否有下一页
|
||
if f"page={page + 1}" not in resp.text:
|
||
break
|
||
|
||
# 按提交时间排序
|
||
records.sort(key=lambda x: x.get("submit_time", ""))
|
||
return records
|
||
|
||
|
||
def fetch_homework_problems(
|
||
client: httpx.Client,
|
||
homework_id: str,
|
||
) -> list[dict[str, str]]:
|
||
"""获取作业包中的具体题目列表(只取A包)"""
|
||
resp = client.get(f"/homework/{homework_id}")
|
||
if resp.status_code != 200:
|
||
print(f" [X] 获取作业详情失败: {resp.status_code}")
|
||
return []
|
||
|
||
problems = []
|
||
# 匹配HTML中的题目行:
|
||
# <a href="/p/CSP0309A1?tid=..."><b>CSP0309A1</b> 银行叫号模拟</a>
|
||
pattern = (
|
||
r'href="/p/([^"?]+)\?tid=' + re.escape(homework_id) +
|
||
r'"[^>]*><b>([^<]+)</b>(?: |\s)*([^<]*)</a>'
|
||
)
|
||
matches = re.findall(pattern, resp.text)
|
||
for pid, code, name in matches:
|
||
name_clean = strip_tags(name).strip()
|
||
if not name_clean:
|
||
name_clean = code
|
||
problems.append({
|
||
"id": pid,
|
||
"code": strip_tags(code),
|
||
"name": name_clean,
|
||
})
|
||
|
||
return problems
|
||
|
||
|
||
def load_homework_ids(course_code: str) -> list[dict[str, str]]:
|
||
"""从hw_dict.json加载作业ID,只保留A包"""
|
||
if not HW_DICT_PATH.exists():
|
||
print(f" [X] 找不到作业字典: {HW_DICT_PATH}")
|
||
return []
|
||
|
||
data = json.loads(HW_DICT_PATH.read_text(encoding="utf-8"))
|
||
items = data.get(course_code, [])
|
||
|
||
if not items:
|
||
print(f" [X] 作业字典中没有 {course_code} 的配置")
|
||
return []
|
||
|
||
# 只保留A包(课堂练习)
|
||
a_items = [item for item in items if item["title"].endswith("A")]
|
||
if not a_items:
|
||
# 如果没有A包,回退到第一个
|
||
a_items = [items[0]]
|
||
|
||
return [{"id": item["id"], "title": item["title"]} for item in a_items]
|
||
|
||
|
||
# ========== 分析逻辑 ==========
|
||
|
||
def analyze_student_performance(
|
||
student_name: str,
|
||
all_records: dict[str, list[dict[str, Any]]],
|
||
homework_labels: list[str],
|
||
problem_list: list[dict[str, str]] | None = None,
|
||
) -> dict[str, Any]:
|
||
"""分析单个学生的OJ表现
|
||
|
||
如果提供了 problem_list(A包的具体题目列表),则按具体题目分析;
|
||
否则按作业包标签分析(兼容旧逻辑)。
|
||
"""
|
||
|
||
analysis = {
|
||
"name": student_name,
|
||
"problems": {},
|
||
"total_solved": 0,
|
||
"total_attempts": 0,
|
||
"total_errors": defaultdict(int),
|
||
"patterns": [],
|
||
}
|
||
|
||
# 按具体题目分析(A包模式)
|
||
if problem_list:
|
||
# 合并所有作业包的记录(通常只有A包)
|
||
all_student_records: list[dict[str, Any]] = []
|
||
for label in homework_labels:
|
||
records = all_records.get(label, [])
|
||
all_student_records.extend([r for r in records if r["student_name"] == student_name])
|
||
|
||
total_attempts_all = len(all_student_records)
|
||
total_solved_all = 0
|
||
|
||
for prob in problem_list:
|
||
prob_id = prob["id"] # 如 CSP0309A1
|
||
prob_code = prob["code"]
|
||
prob_name = prob["name"]
|
||
|
||
# 筛选该题目的提交记录(匹配 problem_id)
|
||
student_records = [r for r in all_student_records if r["problem_id"] == prob_id]
|
||
|
||
attempts = len(student_records)
|
||
solved = any(r["status"] == "AC" for r in student_records)
|
||
errors = [r["status"] for r in student_records if r["status"] != "AC"]
|
||
|
||
# 判定思考模式
|
||
if attempts == 0:
|
||
pattern = "未提交"
|
||
elif attempts == 1 and solved:
|
||
pattern = "一气呵成"
|
||
elif solved and attempts <= 3:
|
||
pattern = "调试改进"
|
||
elif solved and attempts <= 6:
|
||
pattern = "多次尝试后通过"
|
||
elif not solved and attempts >= 3:
|
||
pattern = "遇到困难"
|
||
elif solved:
|
||
pattern = "耐心调试"
|
||
else:
|
||
pattern = "尝试中"
|
||
|
||
error_counts = defaultdict(int)
|
||
for e in errors:
|
||
error_counts[e] += 1
|
||
|
||
# 用题目名称作为展示标签,同时保留原始ID
|
||
display_label = f"{prob_code} {prob_name}" if prob_name else prob_code
|
||
|
||
problem_info = {
|
||
"label": display_label,
|
||
"raw_label": prob_code,
|
||
"attempts": attempts,
|
||
"solved": solved,
|
||
"pattern": pattern,
|
||
"errors": dict(error_counts),
|
||
"submit_times": [r.get("submit_time", "") for r in student_records],
|
||
}
|
||
|
||
analysis["problems"][display_label] = problem_info
|
||
|
||
if solved:
|
||
total_solved_all += 1
|
||
for e in errors:
|
||
analysis["total_errors"][e] += 1
|
||
|
||
analysis["total_attempts"] = total_attempts_all
|
||
analysis["total_solved"] = total_solved_all
|
||
analysis["completion"] = f"{total_solved_all}/{len(problem_list)}"
|
||
|
||
else:
|
||
# 兼容旧逻辑:按作业包分析
|
||
for label in homework_labels:
|
||
records = all_records.get(label, [])
|
||
student_records = [r for r in records if r["student_name"] == student_name]
|
||
|
||
attempts = len(student_records)
|
||
solved = any(r["status"] == "AC" for r in student_records)
|
||
errors = [r["status"] for r in student_records if r["status"] != "AC"]
|
||
|
||
if attempts == 0:
|
||
pattern = "未提交"
|
||
elif attempts == 1 and solved:
|
||
pattern = "一气呵成"
|
||
elif solved and attempts <= 3:
|
||
pattern = "调试改进"
|
||
elif solved and attempts <= 6:
|
||
pattern = "多次尝试后通过"
|
||
elif not solved and attempts >= 3:
|
||
pattern = "遇到困难"
|
||
elif solved:
|
||
pattern = "耐心调试"
|
||
else:
|
||
pattern = "尝试中"
|
||
|
||
error_counts = defaultdict(int)
|
||
for e in errors:
|
||
error_counts[e] += 1
|
||
|
||
problem_info = {
|
||
"label": label,
|
||
"attempts": attempts,
|
||
"solved": solved,
|
||
"pattern": pattern,
|
||
"errors": dict(error_counts),
|
||
"submit_times": [r.get("submit_time", "") for r in student_records],
|
||
}
|
||
|
||
analysis["problems"][label] = problem_info
|
||
analysis["total_attempts"] += attempts
|
||
|
||
if solved:
|
||
analysis["total_solved"] += 1
|
||
|
||
for e in errors:
|
||
analysis["total_errors"][e] += 1
|
||
|
||
analysis["completion"] = f"{analysis['total_solved']}/{len(homework_labels)}"
|
||
|
||
analysis["total_errors"] = dict(analysis["total_errors"])
|
||
return analysis
|
||
|
||
|
||
def format_oj_section(analysis: dict[str, Any]) -> str:
|
||
"""格式化OJ数据为Markdown反馈段落(支持A包具体题目模式)"""
|
||
|
||
total_solved = analysis["total_solved"]
|
||
total_attempts = analysis["total_attempts"]
|
||
problems = analysis["problems"]
|
||
total_count = len(problems)
|
||
|
||
lines = []
|
||
lines.append("")
|
||
lines.append("## 【OJ做题数据】")
|
||
lines.append("")
|
||
lines.append(f"**完成情况**: {analysis['completion']} (共{total_attempts}次提交)")
|
||
lines.append("")
|
||
|
||
# 单题详情表
|
||
lines.append("| 题目 | 状态 | 提交次数 | 思考模式 | 错误类型 |")
|
||
lines.append("|------|------|----------|----------|----------|")
|
||
|
||
for label, info in problems.items():
|
||
icon = "✅" if info["solved"] else "❌"
|
||
status = "通过" if info["solved"] else "未通过"
|
||
attempts = info["attempts"]
|
||
pattern = info["pattern"]
|
||
|
||
error_str = ""
|
||
if info["errors"]:
|
||
error_parts = [f"{k}×{v}" for k, v in info["errors"].items()]
|
||
error_str = ", ".join(error_parts)
|
||
else:
|
||
error_str = "—"
|
||
|
||
if attempts == 0:
|
||
lines.append(f"| {label} | ⬜ 未提交 | 0 | — | — |")
|
||
else:
|
||
lines.append(f"| {label} | {icon} {status} | {attempts} | {pattern} | {error_str} |")
|
||
|
||
lines.append("")
|
||
|
||
# 总结性评价(适配具体题目数量)
|
||
if total_solved == total_count and total_count > 0:
|
||
if total_attempts <= total_count + 2:
|
||
lines.append(f"**📊 分析**: {total_count}题全部完成,且提交效率高,思路清晰,代码质量优秀。")
|
||
elif total_attempts <= total_count * 2:
|
||
lines.append(f"**📊 分析**: {total_count}题全部完成,经过适度调试后通过,展现了良好的调试能力。")
|
||
else:
|
||
lines.append(f"**📊 分析**: {total_count}题全部完成,共经过{total_attempts}次提交,展现了不错的耐心和坚持。")
|
||
elif total_solved > 0:
|
||
unsolved = [l for l, i in problems.items() if not i["solved"] and i["attempts"] > 0]
|
||
untried = [l for l, i in problems.items() if i["attempts"] == 0]
|
||
parts = []
|
||
if unsolved:
|
||
parts.append(f"{'、'.join(unsolved)}有尝试但尚未通过")
|
||
if untried:
|
||
parts.append(f"{'、'.join(untried)}未提交")
|
||
lines.append(f"**📊 分析**: 完成{total_solved}题,{','.join(parts)},建议课后继续完成。")
|
||
else:
|
||
if total_attempts > 0:
|
||
lines.append("**📊 分析**: 有提交但尚未通过任何题目,建议课后重点跟进。")
|
||
else:
|
||
lines.append("**📊 分析**: 本节课OJ作业暂无提交记录。")
|
||
|
||
# 错误分布
|
||
if analysis["total_errors"]:
|
||
errors = analysis["total_errors"]
|
||
error_summary = []
|
||
if "WA" in errors:
|
||
error_summary.append(f"答案错误{errors['WA']}次")
|
||
if "CE" in errors:
|
||
error_summary.append(f"编译错误{errors['CE']}次")
|
||
if "RE" in errors:
|
||
error_summary.append(f"运行错误{errors['RE']}次")
|
||
if "TLE" in errors:
|
||
error_summary.append(f"超时{errors['TLE']}次")
|
||
if error_summary:
|
||
lines.append(f"**错误分布**: {','.join(error_summary)}。")
|
||
|
||
lines.append("")
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ========== 反馈更新 ==========
|
||
|
||
def update_feedback_file(student_name: str, oj_section: str) -> bool:
|
||
"""在已有反馈文件末尾追加OJ数据段"""
|
||
feedback_path = CLASS_DIR / student_name / "feedback" / f"{FEEDBACK_DATE}_{COURSE_CODE}.md"
|
||
|
||
if not feedback_path.exists():
|
||
print(f" [跳过] 反馈文件不存在: {feedback_path}")
|
||
return False
|
||
|
||
content = feedback_path.read_text(encoding="utf-8")
|
||
|
||
# 检查是否已有OJ数据段
|
||
if "【OJ做题数据】" in content:
|
||
# 替换已有的OJ数据段
|
||
pattern = r"\n## 【OJ做题数据】.*?(?=\n## |---\n|\Z)"
|
||
content = re.sub(pattern, oj_section.rstrip(), content, flags=re.DOTALL)
|
||
print(f" [更新] 替换已有OJ数据段")
|
||
else:
|
||
# 在 --- 分隔线前插入
|
||
if "\n---\n" in content:
|
||
content = content.replace("\n---\n", f"{oj_section}\n---\n", 1)
|
||
else:
|
||
content = content.rstrip() + "\n" + oj_section
|
||
print(f" [新增] 追加OJ数据段")
|
||
|
||
feedback_path.write_text(content, encoding="utf-8")
|
||
return True
|
||
|
||
|
||
# ========== 主流程 ==========
|
||
|
||
def main():
|
||
global COURSE_CODE, COURSE_TITLE, CLASS_NAME, FEEDBACK_DATE, ATTENDING_STUDENTS
|
||
global USERNAME, PASSWORD, CLASS_DIR
|
||
|
||
# 解析命令行参数
|
||
args = parse_args()
|
||
COURSE_CODE = args.course
|
||
COURSE_TITLE = args.title
|
||
CLASS_NAME = args.class_name
|
||
USERNAME = args.username
|
||
PASSWORD = args.password
|
||
GET_SINGLE_STUDENT = args.get_student_oj.strip()
|
||
|
||
# 日期处理
|
||
if args.date:
|
||
FEEDBACK_DATE = args.date
|
||
else:
|
||
FEEDBACK_DATE = datetime.now().strftime("%Y%m%d")
|
||
|
||
# 学生名单
|
||
if args.students:
|
||
ATTENDING_STUDENTS = [s.strip() for s in args.students.split(",") if s.strip()]
|
||
elif GET_SINGLE_STUDENT:
|
||
# 只获取单个学生的数据
|
||
ATTENDING_STUDENTS = [GET_SINGLE_STUDENT]
|
||
|
||
# 设置班级目录
|
||
CLASS_DIR = PROJECT_ROOT / ".claude" / "memory" / "class" / CLASS_NAME
|
||
|
||
# 如果没有学生名单,尝试从班级目录读取所有学生
|
||
if not ATTENDING_STUDENTS:
|
||
if CLASS_DIR.exists():
|
||
ATTENDING_STUDENTS = [
|
||
d.name for d in sorted(CLASS_DIR.iterdir())
|
||
if d.is_dir() and d.name != "summaries" and (d / "profile.md").exists()
|
||
]
|
||
print(f" [自动] 从班级目录读取到 {len(ATTENDING_STUDENTS)} 名学生")
|
||
if not ATTENDING_STUDENTS:
|
||
print(" [X] 未指定出勤学生且班级目录中无学生,退出")
|
||
return 1
|
||
|
||
# 如果是获取单个学生的OJ数据,简化输出
|
||
if not GET_SINGLE_STUDENT:
|
||
print("=" * 60)
|
||
print(f"获取 {COURSE_CODE} {COURSE_TITLE} OJ数据并更新反馈")
|
||
print("=" * 60)
|
||
|
||
# 1. 加载作业ID
|
||
print(f"\n[1] 从 hw_dict.json 加载 {COURSE_CODE} 作业ID...")
|
||
homeworks = load_homework_ids(COURSE_CODE)
|
||
if not homeworks:
|
||
print(" [X] 没有找到作业配置,退出")
|
||
return 1
|
||
|
||
for hw in homeworks:
|
||
print(f" - {hw['title']}: {hw['id']}")
|
||
|
||
# 2. 登录OJ
|
||
print(f"\n[2] 登录OJ系统...")
|
||
client = httpx.Client(base_url=OJ_BASE_URL, timeout=30.0, follow_redirects=True)
|
||
|
||
if not login(client):
|
||
print(" [X] 登录失败,退出")
|
||
return 1
|
||
|
||
# 3. 获取A包的具体题目列表 & 抓取提交记录
|
||
print(f"\n[3] 获取A包题目列表并抓取提交记录...")
|
||
student_names = set(ATTENDING_STUDENTS)
|
||
all_records: dict[str, list[dict[str, Any]]] = {}
|
||
homework_labels: list[str] = []
|
||
problem_list: list[dict[str, str]] | None = None
|
||
|
||
for hw in homeworks:
|
||
label = hw["title"]
|
||
homework_labels.append(label)
|
||
|
||
# 获取该作业包中的具体题目(A包才有)
|
||
problems = fetch_homework_problems(client, hw["id"])
|
||
if problems:
|
||
problem_list = problems
|
||
print(f" {label} 包含 {len(problems)} 道题目:")
|
||
for p in problems:
|
||
print(f" - {p['code']}: {p['name']}")
|
||
|
||
print(f" 正在获取 {label} 提交记录...")
|
||
records = fetch_homework_records(client, hw["id"], student_names)
|
||
all_records[label] = records
|
||
|
||
student_count = len({r["student_name"] for r in records})
|
||
print(f" 找到 {len(records)} 条记录,涉及 {student_count} 名出勤学生")
|
||
|
||
client.close()
|
||
|
||
# 4. 分析每个学生(按A包具体题目分析)
|
||
if not GET_SINGLE_STUDENT:
|
||
print(f"\n[4] 分析学生OJ表现...")
|
||
analyses: list[dict[str, Any]] = []
|
||
|
||
for name in ATTENDING_STUDENTS:
|
||
analysis = analyze_student_performance(name, all_records, homework_labels, problem_list)
|
||
analyses.append(analysis)
|
||
|
||
if not GET_SINGLE_STUDENT:
|
||
total_count = len(analysis["problems"])
|
||
icon = "★" if analysis["total_solved"] == total_count and total_count > 0 else \
|
||
"○" if analysis["total_solved"] > 0 else "✗"
|
||
print(f" {icon} {name}: {analysis['completion']} ({analysis['total_attempts']}次提交)")
|
||
|
||
for label, info in analysis["problems"].items():
|
||
p_icon = "✓" if info["solved"] else "○" if info["attempts"] > 0 else "—"
|
||
print(f" {p_icon} {label}: {info['attempts']}次 ({info['pattern']})")
|
||
|
||
# 如果是获取单个学生的OJ数据,直接输出Markdown并退出
|
||
if GET_SINGLE_STUDENT and analyses:
|
||
oj_section = format_oj_section(analyses[0])
|
||
print(oj_section)
|
||
return 0
|
||
|
||
# 5. 更新反馈文件
|
||
print(f"\n[5] 更新反馈文件...")
|
||
updated = 0
|
||
|
||
for analysis in analyses:
|
||
name = analysis["name"]
|
||
print(f" 处理 {name}...")
|
||
|
||
oj_section = format_oj_section(analysis)
|
||
if update_feedback_file(name, oj_section):
|
||
updated += 1
|
||
|
||
# 6. 保存原始分析数据
|
||
print(f"\n[6] 保存分析JSON数据...")
|
||
analysis_dir = PROJECT_ROOT / ".claude" / "memory" / "oj" / "analysis"
|
||
analysis_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
output_file = analysis_dir / f"{COURSE_CODE}_student_analysis.json"
|
||
with open(output_file, "w", encoding="utf-8") as f:
|
||
json.dump({
|
||
"course": COURSE_CODE,
|
||
"title": COURSE_TITLE,
|
||
"date": f"{FEEDBACK_DATE[:4]}-{FEEDBACK_DATE[4:6]}-{FEEDBACK_DATE[6:8]}",
|
||
"class": CLASS_NAME,
|
||
"students": analyses,
|
||
"generated_at": datetime.now().isoformat(),
|
||
}, f, ensure_ascii=False, indent=2)
|
||
print(f" 保存到: {output_file}")
|
||
|
||
# 7. 汇总
|
||
print(f"\n" + "=" * 60)
|
||
print(f"✅ 完成!")
|
||
print(f" - 出勤学生: {len(ATTENDING_STUDENTS)}")
|
||
print(f" - 更新反馈: {updated} 份")
|
||
|
||
total_solved = sum(a["total_solved"] for a in analyses)
|
||
total_possible = len(ATTENDING_STUDENTS) * len(homework_labels)
|
||
print(f" - 整体完成率: {total_solved}/{total_possible}")
|
||
print("=" * 60)
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|