Files
AICODE2026/scripts/sync_to_dingtalk.py
Rocky 1c5e72676b feat: AICODE-06 春季后半 7 课大纲 + 第 12 课教案 + prototype 工程产物
## 主要变更

### 课程设计
- 大纲扩展到 18 课(新增第 12-18 课:单词塔防 3D 大项目)
- 引入 AI 三角色协作工作流(Planner / Reviewer / Tester)作为整学期框架
- 每课详化:核心概念 + 误概念预设 + 教学锚点 + 学生产出 + 老师课前要准备

### 第 12 课教案(完整逐字稿)
- 主题:Skills 入门 - 用 game-studio 做跳一跳
- 90 分钟 4C 结构 + 诊断点 + 分支策略
- 5 个误概念预设 + AI 助教提示词模板 + 教师备课指南

### prototype 工程产物(可玩 demo)
- 跳一跳-3d/index.html:Three.js 3D 跳一跳(蓄力 + Web Audio 音效 + PERFECT 命中)
- 单词塔防/game-3d.html:完整 3D 塔防(三阶段 + 商店 + 卡片 + 战斗循环,15 击杀完美胜利)
- 单词塔防/level-editor-3d.html:3D 关卡设计器(Kenney GLB 模型 + localStorage 保存)
- 单词塔防/level-editor.html:2D 关卡设计器(原型保留)
- 单词塔防/index.html:2D 塔防原型(原型保留)

### 工程加固
- .gitignore 加强:排除 token、Kenney 大素材、调试截图、第三方插件、Playwright 临时
- 从 git tracking 移除 scripts/.dingtalk_token.json(本地保留)
- scripts/sync_to_dingtalk.py:OAuth 流程改为手动 authCode 粘贴(避免本地 server 受限)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 23:04:54 +02:00

816 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
钉钉知识库同步脚本
将本地教案和大纲同步到钉钉知识库「标准化教案手册」
用法:
python sync_to_dingtalk.py auth # 首次授权(获取用户 Token
python sync_to_dingtalk.py <file.md> # 同步指定文件到知识库
python sync_to_dingtalk.py all # 同步所有教案
python sync_to_dingtalk.py aicode03 # 只同步 AICODE-03
python sync_to_dingtalk.py aicode06 # 只同步 AICODE-06
python sync_to_dingtalk.py outline # 只同步课程大纲
python sync_to_dingtalk.py sales # 只同步销售材料
python sync_to_dingtalk.py sales,outline # 同步多个目标(逗号分隔,只发一条通知)
python sync_to_dingtalk.py outline --notify # 同步并发钉钉机器人通知(默认不发)
"""
import re
import sys
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
import webbrowser
from datetime import datetime
import requests
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent
ENV_PATH = PROJECT_ROOT / ".env"
TOKEN_PATH = PROJECT_ROOT / "scripts" / ".dingtalk_token.json"
# ============================================
# 配置加载
# ============================================
def load_env():
"""从 .env 文件加载配置"""
config = {}
with open(ENV_PATH, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
config[key.strip()] = value.strip()
return config
# ============================================
# OAuth 授权(获取用户级 Token
# ============================================
class OAuthCallbackHandler(BaseHTTPRequestHandler):
"""处理 OAuth 回调的 HTTP handler"""
auth_code = None
def do_GET(self):
query = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(query)
if "authCode" in params:
OAuthCallbackHandler.auth_code = params["authCode"][0]
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write("授权成功!你可以关闭此页面。".encode("utf-8"))
else:
self.send_response(400)
self.end_headers()
self.wfile.write("授权失败,未收到 authCode".encode("utf-8"))
def log_message(self, format, *args):
pass # 静默日志
def do_oauth(config):
"""执行 OAuth 授权流程,获取用户 Token"""
client_id = config["DINGTALK_APP_KEY"]
client_secret = config["DINGTALK_APP_SECRET"]
redirect_uri = "http://127.0.0.1:18765/callback"
# 构建授权 URL
auth_url = (
"https://login.dingtalk.com/oauth2/auth?"
f"redirect_uri={urllib.parse.quote(redirect_uri)}"
f"&response_type=code"
f"&client_id={client_id}"
f"&scope=openid"
f"&prompt=consent"
)
# 直接手动模式:打印链接,让用户粘贴 authCode
print("=" * 60)
print("请用浏览器打开以下链接,完成钉钉登录授权:")
print(f"\n{auth_url}\n")
print("授权后浏览器地址栏会出现类似:")
print(" http://127.0.0.1:18765/callback?authCode=xxxxxx&...")
print("\n从地址栏复制 authCode= 后面的值粘贴到这里:")
print("=" * 60)
webbrowser.open(auth_url)
auth_code = input("authCode: ").strip()
if not auth_code:
print("[auth] 未输入 authCode授权取消")
return None
print(f"[auth] 收到授权码")
# 用 auth_code 换取 user access token
resp = requests.post(
"https://api.dingtalk.com/v1.0/oauth2/userAccessToken",
json={
"clientId": client_id,
"clientSecret": client_secret,
"code": auth_code,
"grantType": "authorization_code",
},
)
resp.raise_for_status()
data = resp.json()
# 保存 token
token_data = {
"accessToken": data["accessToken"],
"refreshToken": data["refreshToken"],
"expireTime": time.time() + data.get("expireIn", 7200) - 300,
"clientId": client_id,
"clientSecret": client_secret,
}
TOKEN_PATH.write_text(json.dumps(token_data, indent=2), encoding="utf-8")
print(f"[auth] 用户 Token 获取成功,已保存到 {TOKEN_PATH.name}")
print(f"[auth] Access Token 有效期 2 小时Refresh Token 有效期 30 天(自动续期)")
return token_data
def load_user_token(config):
"""加载已保存的用户 Token自动刷新过期的 Token"""
if not TOKEN_PATH.exists():
return None
data = json.loads(TOKEN_PATH.read_text(encoding="utf-8"))
# 检查是否过期
if time.time() < data.get("expireTime", 0):
return data
# 尝试用 refresh_token 刷新
refresh_token = data.get("refreshToken")
if not refresh_token:
return None
print("[token] Access Token 已过期,使用 Refresh Token 刷新...")
resp = requests.post(
"https://api.dingtalk.com/v1.0/oauth2/userAccessToken",
json={
"clientId": data.get("clientId", config["DINGTALK_APP_KEY"]),
"clientSecret": data.get("clientSecret", config["DINGTALK_APP_SECRET"]),
"refreshToken": refresh_token,
"grantType": "refresh_token",
},
)
if resp.status_code != 200:
print("[token] Refresh Token 已失效,请重新授权: python sync_to_dingtalk.py auth")
return None
new_data = resp.json()
token_data = {
"accessToken": new_data["accessToken"],
"refreshToken": new_data["refreshToken"],
"expireTime": time.time() + new_data.get("expireIn", 7200) - 300,
"clientId": data.get("clientId", config["DINGTALK_APP_KEY"]),
"clientSecret": data.get("clientSecret", config["DINGTALK_APP_SECRET"]),
}
TOKEN_PATH.write_text(json.dumps(token_data, indent=2), encoding="utf-8")
print("[token] Token 刷新成功")
return token_data
# ============================================
# 钉钉 API 客户端
# ============================================
class DingTalkClient:
"""钉钉 API 客户端(双 Token 模式)"""
BASE_URL = "https://api.dingtalk.com"
def __init__(self, app_key, app_secret, operator_id, workspace_id, user_token=None):
self.app_key = app_key
self.app_secret = app_secret
self.operator_id = operator_id
self.workspace_id = workspace_id
self.user_token = user_token
self.app_token = None
self.app_token_expire_time = 0
def _ensure_app_token(self):
"""确保应用级 access_token 有效"""
if self.app_token and time.time() < self.app_token_expire_time:
return
resp = requests.post(
f"{self.BASE_URL}/v1.0/oauth2/accessToken",
json={"appKey": self.app_key, "appSecret": self.app_secret},
)
resp.raise_for_status()
data = resp.json()
self.app_token = data["accessToken"]
self.app_token_expire_time = time.time() + data["expireIn"] - 300
print(f"[token] 应用 Token 获取成功")
def _app_headers(self):
"""应用级请求头(创建文件夹/文档、查询节点)"""
self._ensure_app_token()
return {
"x-acs-dingtalk-access-token": self.app_token,
"Content-Type": "application/json",
}
def _user_headers(self):
"""用户级请求头(写入文档内容)"""
if not self.user_token:
raise RuntimeError("需要用户 Token请先运行: python sync_to_dingtalk.py auth")
return {
"x-acs-dingtalk-access-token": self.user_token,
"Content-Type": "application/json",
}
def list_nodes(self, parent_node_id):
"""获取指定文件夹下的节点列表"""
nodes = []
next_token = None
while True:
params = {
"parentNodeId": parent_node_id,
"operatorId": self.operator_id,
"maxResults": 50,
}
if next_token:
params["nextToken"] = next_token
resp = requests.get(
f"{self.BASE_URL}/v2.0/wiki/nodes",
headers=self._app_headers(),
params=params,
)
resp.raise_for_status()
data = resp.json()
nodes.extend(data.get("nodes", []))
next_token = data.get("nextToken")
if not next_token:
break
return nodes
def find_node(self, parent_node_id, name, node_type=None):
"""在指定文件夹下按名称查找节点"""
nodes = self.list_nodes(parent_node_id)
for node in nodes:
if node["name"] == name:
if node_type is None or node["type"] == node_type:
return node
return None
def find_node_by_prefix(self, parent_node_id, prefix):
"""在指定文件夹下按名称前缀查找节点(用于按课次编号匹配)
例如 prefix='AICODE03-02' 可匹配 'AICODE03-02 提问的艺术.adoc'
"""
nodes = self.list_nodes(parent_node_id)
for node in nodes:
node_name = node["name"]
# 钉钉文档名称带 .adoc 后缀
if node_name.endswith(".adoc"):
node_name = node_name[:-5]
if node_name.startswith(prefix):
return node
return None
def create_folder(self, parent_node_id, name):
"""创建文件夹,如果已存在则返回现有的。返回 (nodeId, actualName)"""
existing = self.find_node(parent_node_id, name, "FOLDER")
if existing:
print(f" [folder] 已存在: {existing['name']}")
return existing["nodeId"], existing["name"]
resp = requests.post(
f"{self.BASE_URL}/v1.0/doc/workspaces/{self.workspace_id}/docs",
headers=self._app_headers(),
json={
"name": name,
"docType": "FOLDER",
"operatorId": self.operator_id,
"parentNodeId": parent_node_id,
},
)
resp.raise_for_status()
data = resp.json()
print(f" [folder] 创建成功: {name}")
return data["nodeId"], name
def resolve_node_name(self, parent_node_id, target_node_id):
"""通过遍历父目录,获取指定 nodeId 的实际名称"""
nodes = self.list_nodes(parent_node_id)
for node in nodes:
if node["nodeId"] == target_node_id:
return node["name"]
return None
def create_doc(self, parent_node_id, name):
"""创建文档,返回 (nodeId, docKey, url)"""
resp = requests.post(
f"{self.BASE_URL}/v1.0/doc/workspaces/{self.workspace_id}/docs",
headers=self._app_headers(),
json={
"name": name,
"docType": "DOC",
"operatorId": self.operator_id,
"parentNodeId": parent_node_id,
},
)
resp.raise_for_status()
data = resp.json()
return data["nodeId"], data["docKey"], data.get("url", "")
def overwrite_content(self, doc_key, markdown_content):
"""覆写文档内容Markdown 格式,需要用户级 Token"""
resp = requests.post(
f"{self.BASE_URL}/v2.0/doc/me/suites/documents/{doc_key}/overwriteContent",
headers=self._user_headers(),
json={
"dataType": "markdown",
"content": markdown_content,
},
)
if resp.status_code != 200:
print(f" [error] 写入失败: {resp.status_code} {resp.text}")
resp.raise_for_status()
print(f" [content] 内容写入成功")
def upload_markdown(self, parent_node_id, name, markdown_content):
"""上传 Markdown 文档:创建 + 写入内容。返回 (node_id, url)
匹配逻辑(按优先级):
1. 从文件名提取课次编号(如 AICODE03-02按编号前缀匹配已有文档
2. 如果没有编号(如课程大纲),按完整文件名匹配
3. 都没匹配到 → 新建文档
"""
lesson_id = extract_lesson_id(name)
existing = None
# 优先按课次编号匹配
if lesson_id:
existing = self.find_node_by_prefix(parent_node_id, lesson_id)
if existing:
old_name = existing["name"].rstrip(".adoc") if existing["name"].endswith(".adoc") else existing["name"]
if old_name != name:
print(f" [update] 按编号匹配覆盖: {old_name}{name}")
else:
print(f" [update] 覆盖已有文档: {name}")
# 没有编号或编号没匹配到,按完整文件名匹配
if not existing:
existing = self.find_node(parent_node_id, f"{name}.adoc")
if existing:
print(f" [update] 覆盖已有文档: {name}")
if existing:
doc_key = existing["nodeId"]
self.overwrite_content(doc_key, markdown_content)
return existing["nodeId"], existing.get("url", "")
else:
node_id, doc_key, url = self.create_doc(parent_node_id, name)
print(f" [create] 新建文档: {name}")
time.sleep(2) # 等待文档初始化
self.overwrite_content(doc_key, markdown_content)
return node_id, url or ""
# ============================================
# Webhook 通知
# ============================================
DINGTALK_DOC_BASE_URL = "https://alidocs.dingtalk.com/i/nodes"
def _get_doc_url(result):
"""获取文档链接:优先用 API 返回的 url否则用 nodeId 构造"""
url = result.get("url", "")
if url:
return url
node_id = result.get("node_id", "")
if node_id:
return f"{DINGTALK_DOC_BASE_URL}/{node_id}"
return ""
def send_webhook(config, results):
"""同步完成后发送钉钉机器人通知(仅报告成功项,按知识库目录分组)"""
webhook_url = config.get("DINGTALK_WEBHOOK_URL")
webhook_secret = config.get("DINGTALK_WEBHOOK_SECRET")
if not webhook_url or not results:
return
# HMAC-SHA256 签名
timestamp = str(int(time.time() * 1000))
string_to_sign = f"{timestamp}\n{webhook_secret}"
hmac_code = hmac.new(
webhook_secret.encode("utf-8"),
string_to_sign.encode("utf-8"),
digestmod=hashlib.sha256,
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code).decode("utf-8"))
signed_url = f"{webhook_url}&timestamp={timestamp}&sign={sign}"
# 按知识库目录分组
from collections import OrderedDict
groups = OrderedDict()
for r in results:
folder = r["kb_folder"]
if folder not in groups:
groups[folder] = []
groups[folder].append(r)
# 构建 Markdown 消息(按目录分组)
lines = ["### 知识库文档已更新\n"]
for folder, items in groups.items():
lines.append(f"**{folder}**\n")
for r in items:
doc_url = _get_doc_url(r)
if doc_url:
lines.append(f"- [{r['name']}]({doc_url})")
else:
lines.append(f"- {r['name']}")
lines.append("")
body = {
"msgtype": "markdown",
"markdown": {
"title": "知识库文档已更新",
"text": "\n".join(lines),
},
}
try:
resp = requests.post(signed_url, json=body, timeout=10)
if resp.status_code == 200 and resp.json().get("errcode") == 0:
print("[webhook] 通知发送成功")
else:
print(f"[webhook] 通知发送失败: {resp.text}")
except Exception as e:
print(f"[webhook] 通知发送异常: {e}")
# ============================================
# 辅助函数
# ============================================
def extract_lesson_id(name):
"""从文件名中提取课次编号。
'AICODE03-02 AI的记忆之谜''AICODE03-02'
'AICODE06-04 代码审查入门''AICODE06-04'
'AICODE-03课程大纲' → None大纲没有课次编号走文件名匹配
"""
match = re.match(r'(AICODE\d{2}-\d{2})', name)
return match.group(1) if match else None
def _lesson_sort_key(stem):
"""排序键:课程大纲排第一,教案按课次编号排序。
'AICODE-03课程大纲' → (0, 0, 'AICODE-03课程大纲')
'AICODE03-02 AI的记忆之谜' → (1, 2, 'AICODE03-02 AI的记忆之谜')
"""
lesson_id = extract_lesson_id(stem)
if lesson_id:
# 提取课次数字(如 AICODE03-02 → 2
num = int(lesson_id.split("-")[1])
return (1, num, stem)
elif "大纲" in stem or "课程" in stem:
return (0, 0, stem)
else:
return (2, 0, stem)
# ============================================
# 同步逻辑
# ============================================
def sync_lessons(client, local_dir, parent_node_id, folder_name, kb_folder, results=None):
"""同步一个课程目录下的所有教案(按课次编号排序)"""
folder_id, _ = client.create_folder(parent_node_id, folder_name)
lesson_dir = Path(local_dir)
# 只取当前目录的 .md 文件(排除子目录如 旧版本/
md_files = sorted(
[f for f in lesson_dir.glob("*.md") if f.parent == lesson_dir],
key=lambda f: _lesson_sort_key(f.stem),
)
if not md_files:
print(f" [warn] {local_dir} 下没有找到 .md 文件")
return
print(f"\n{'='*50}")
print(f"同步 {folder_name}: {len(md_files)} 篇教案")
print(f"{'='*50}")
for i, md_file in enumerate(md_files, 1):
name = md_file.stem
content = md_file.read_text(encoding="utf-8")
print(f"\n[{i}/{len(md_files)}] {name}")
try:
node_id, doc_url = client.upload_markdown(folder_id, name, content)
if results is not None:
results.append({
"name": name,
"kb_folder": kb_folder,
"node_id": node_id,
"url": doc_url,
})
except Exception as e:
print(f" [error] 同步失败: {e}")
time.sleep(1) # 避免频率限制
def sync_single_file(client, file_path, parent_node_id, kb_folder, label="", results=None):
"""同步单个文件"""
path = Path(file_path)
if not path.exists():
print(f" [error] 文件不存在: {path}")
return
name = path.stem
content = path.read_text(encoding="utf-8")
print(f"\n同步{label}: {name}")
try:
node_id, doc_url = client.upload_markdown(parent_node_id, name, content)
if results is not None:
results.append({
"name": name,
"kb_folder": kb_folder,
"node_id": node_id,
"url": doc_url,
})
except Exception as e:
print(f" [error] 同步失败: {e}")
def cmd_list(config, token_data, target_folder=None):
"""列出钉钉知识库中的文档"""
client = DingTalkClient(
app_key=config["DINGTALK_APP_KEY"],
app_secret=config["DINGTALK_APP_SECRET"],
operator_id=config["DINGTALK_OPERATOR_ID"],
workspace_id=config["DINGTALK_WORKSPACE_ID"],
user_token=token_data["accessToken"],
)
innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"]
root_node = config["DINGTALK_ROOT_NODE_ID"]
print("正在读取知识库目录结构...")
kb_innovation = client.resolve_node_name(root_node, innovation_node) or "AICODE-AI编程创新课"
# 列出指定文件夹或所有文件夹
folders_to_list = []
if target_folder:
folder_name = target_folder.upper()
if not folder_name.startswith("AICODE-"):
folder_name = f"AICODE-{folder_name.replace('AICODE', '')}"
folders_to_list = [folder_name]
else:
# 列出所有子文件夹
top_nodes = client.list_nodes(innovation_node)
folders_to_list = [n["name"] for n in top_nodes if n.get("type") == "FOLDER"]
for folder_name in folders_to_list:
existing = client.find_node(innovation_node, folder_name, "FOLDER")
if not existing:
print(f"\n[warn] 文件夹 '{folder_name}' 不存在")
continue
folder_id = existing["nodeId"]
nodes = client.list_nodes(folder_id)
print(f"\n📁 {kb_innovation} / {folder_name} ({len(nodes)} 篇)")
print("-" * 50)
for i, node in enumerate(nodes, 1):
name = node["name"]
if name.endswith(".adoc"):
name = name[:-5]
ntype = "📄" if node.get("type") != "FOLDER" else "📁"
print(f" {i}. {ntype} {name}")
def cmd_delete(config, token_data, node_names):
"""删除钉钉知识库中的指定文档"""
client = DingTalkClient(
app_key=config["DINGTALK_APP_KEY"],
app_secret=config["DINGTALK_APP_SECRET"],
operator_id=config["DINGTALK_OPERATOR_ID"],
workspace_id=config["DINGTALK_WORKSPACE_ID"],
user_token=token_data["accessToken"],
)
innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"]
print("正在读取知识库目录结构...")
for target_name in node_names:
# 从名称推断所在文件夹
if "AICODE06" in target_name or "AICODE-06" in target_name:
folder_name = "AICODE-06"
elif "AICODE03" in target_name or "AICODE-03" in target_name:
folder_name = "AICODE-03"
else:
print(f" [skip] 无法判断 '{target_name}' 所在文件夹")
continue
folder = client.find_node(innovation_node, folder_name, "FOLDER")
if not folder:
print(f" [error] 文件夹 '{folder_name}' 不存在")
continue
# 按前缀或完整名称查找
lesson_id = extract_lesson_id(target_name)
found = None
if lesson_id:
found = client.find_node_by_prefix(folder["nodeId"], lesson_id)
if not found:
# 尝试完整名称匹配(带.adoc
found = client.find_node(folder["nodeId"], f"{target_name}.adoc")
if not found:
found = client.find_node(folder["nodeId"], target_name)
if not found:
print(f" [warn] 未找到: {target_name}")
continue
actual_name = found["name"].rstrip(".adoc") if found["name"].endswith(".adoc") else found["name"]
node_id = found["nodeId"]
# 调用删除 API
try:
resp = requests.delete(
f"{client.BASE_URL}/v2.0/wiki/nodes/{node_id}",
headers=client._user_headers(),
params={"operatorId": client.operator_id},
)
resp.raise_for_status()
print(f" [deleted] {actual_name}")
except Exception as e:
print(f" [error] 删除 '{actual_name}' 失败: {e}")
def main():
config = load_env()
# 处理 auth 命令
if len(sys.argv) > 1 and sys.argv[1] == "auth":
do_oauth(config)
return
# 处理 list 命令
if len(sys.argv) > 1 and sys.argv[1] == "list":
token_data = load_user_token(config)
if not token_data:
print("未找到有效的用户 Token。请先授权。")
sys.exit(1)
target = sys.argv[2] if len(sys.argv) > 2 else None
cmd_list(config, token_data, target)
return
# 处理 delete 命令
if len(sys.argv) > 1 and sys.argv[1] == "delete":
token_data = load_user_token(config)
if not token_data:
print("未找到有效的用户 Token。请先授权。")
sys.exit(1)
if len(sys.argv) < 3:
print("用法: python scripts/sync_to_dingtalk.py delete <文档名1> [文档名2] ...")
sys.exit(1)
cmd_delete(config, token_data, sys.argv[2:])
return
# 加载用户 Token
token_data = load_user_token(config)
if not token_data:
print("未找到有效的用户 Token。请先授权")
print(" python scripts/sync_to_dingtalk.py auth")
sys.exit(1)
client = DingTalkClient(
app_key=config["DINGTALK_APP_KEY"],
app_secret=config["DINGTALK_APP_SECRET"],
operator_id=config["DINGTALK_OPERATOR_ID"],
workspace_id=config["DINGTALK_WORKSPACE_ID"],
user_token=token_data["accessToken"],
)
innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"]
ai_course_node = config["DINGTALK_AI_COURSE_NODE_ID"]
root_node = config["DINGTALK_ROOT_NODE_ID"]
# ── 动态遍历知识库,获取真实目录名称 ──
print("正在读取知识库目录结构...")
kb_innovation = client.resolve_node_name(root_node, innovation_node) or "AICODE-AI编程创新课"
kb_ai_course = client.resolve_node_name(root_node, ai_course_node) or "AI人工智能课"
print(f" 一级目录: {kb_innovation}, {kb_ai_course}")
raw_target = sys.argv[1] if len(sys.argv) > 1 else "all"
# 支持逗号分隔的多目标,如 "sales,outline"
targets = [t.strip() for t in raw_target.split(",")]
handled = False
results = [] # 收集成功同步的文档信息
# 批量同步
if any(t in ("all", "aicode03") for t in targets):
_, sub_name = client.create_folder(innovation_node, "AICODE-03")
sync_lessons(
client, PROJECT_ROOT / "3-lessons" / "AICODE-03",
innovation_node, "AICODE-03",
kb_folder=f"{kb_innovation} / {sub_name}", results=results,
)
handled = True
if any(t in ("all", "aicode06") for t in targets):
_, sub_name = client.create_folder(innovation_node, "AICODE-06")
sync_lessons(
client, PROJECT_ROOT / "3-lessons" / "AICODE-06",
innovation_node, "AICODE-06",
kb_folder=f"{kb_innovation} / {sub_name}", results=results,
)
handled = True
if any(t in ("all", "outline") for t in targets):
aicode03_folder, sub03 = client.create_folder(innovation_node, "AICODE-03")
sync_single_file(
client, PROJECT_ROOT / "3-lessons" / "AICODE-03" / "AICODE-03课程大纲.md",
aicode03_folder, kb_folder=f"{kb_innovation} / {sub03}", label="AICODE-03大纲", results=results,
)
aicode06_folder, sub06 = client.create_folder(innovation_node, "AICODE-06")
sync_single_file(
client, PROJECT_ROOT / "3-lessons" / "AICODE-06" / "AICODE-06课程大纲.md",
aicode06_folder, kb_folder=f"{kb_innovation} / {sub06}", label="AICODE-06大纲", results=results,
)
handled = True
if any(t in ("all", "sales") for t in targets):
sales_folder_id, sales_folder_name = client.create_folder(ai_course_node, "销售工具")
sales_kb = f"{kb_ai_course} / {sales_folder_name}"
sync_single_file(
client, PROJECT_ROOT / "2-sales" / "课程介绍.md",
sales_folder_id, kb_folder=sales_kb, label="课程介绍", results=results,
)
sync_single_file(
client, PROJECT_ROOT / "2-sales" / "家长QA.md",
sales_folder_id, kb_folder=sales_kb, label="家长QA", results=results,
)
handled = True
# 支持同步指定的 .md 文件路径(仅单目标模式)
target = targets[0] if len(targets) == 1 else ""
if target.endswith(".md"):
file_path = Path(target)
if not file_path.is_absolute():
file_path = PROJECT_ROOT / file_path
# 根据路径判断目标文件夹和知识库路径
if "AICODE-03" in str(file_path):
folder_id, sub_name = client.create_folder(innovation_node, "AICODE-03")
kb_folder = f"{kb_innovation} / {sub_name}"
elif "AICODE-06" in str(file_path):
folder_id, sub_name = client.create_folder(innovation_node, "AICODE-06")
kb_folder = f"{kb_innovation} / {sub_name}"
elif "2-sales" in str(file_path):
folder_id, sales_name = client.create_folder(ai_course_node, "销售工具")
kb_folder = f"{kb_ai_course} / {sales_name}"
else:
folder_id = innovation_node
kb_folder = kb_innovation
sync_single_file(client, file_path, folder_id, kb_folder=kb_folder, label="指定文件", results=results)
handled = True
# 支持按课次编号(如 AICODE03-05查找文件
if not handled and target.upper().startswith("AICODE"):
lesson_id = target.upper()
# 在两个课程目录中查找匹配的文件
found = False
for course_dir in ["AICODE-03", "AICODE-06"]:
lesson_dir = PROJECT_ROOT / "3-lessons" / course_dir
for md_file in lesson_dir.glob("*.md"):
if lesson_id in md_file.stem.upper().replace(" ", ""):
folder_id, sub_name = client.create_folder(innovation_node, course_dir)
kb_folder = f"{kb_innovation} / {sub_name}"
sync_single_file(
client, md_file, folder_id,
kb_folder=kb_folder, label=f"教案 {md_file.stem}", results=results,
)
found = True
break
if found:
break
if not found:
print(f"[error] 未找到匹配 '{target}' 的教案文件")
sys.exit(1)
print(f"\n{'='*50}")
print(f"同步完成!共 {len(results)} 篇文档")
print(f"{'='*50}")
# 发送 Webhook 通知(仅当指定 --notify 时)
if "--notify" in sys.argv:
send_webhook(config, results)
else:
print("[webhook] 未发送通知(如需通知请加 --notify")
if __name__ == "__main__":
main()