## 主要变更 ### 课程设计 - 大纲扩展到 18 课(新增第 12-18 课:单词塔防 3D 大项目) - 引入 AI 三角色协作工作流(Planner / Reviewer / Tester)作为整学期框架 - 每课详化:核心概念 + 误概念预设 + 教学锚点 + 学生产出 + 老师课前要准备 ### 第 12 课教案(完整逐字稿) - 主题:Skills 入门 - 用 game-studio 做跳一跳 - 90 分钟 4C 结构 + 诊断点 + 分支策略 - 5 个误概念预设 + AI 助教提示词模板 + 教师备课指南 ### prototype 工程产物(可玩 demo) - 跳一跳-3d/index.html:Three.js 3D 跳一跳(蓄力 + Web Audio 音效 + PERFECT 命中) - 单词塔防/game-3d.html:完整 3D 塔防(三阶段 + 商店 + 卡片 + 战斗循环,15 击杀完美胜利) - 单词塔防/level-editor-3d.html:3D 关卡设计器(Kenney GLB 模型 + localStorage 保存) - 单词塔防/level-editor.html:2D 关卡设计器(原型保留) - 单词塔防/index.html:2D 塔防原型(原型保留) ### 工程加固 - .gitignore 加强:排除 token、Kenney 大素材、调试截图、第三方插件、Playwright 临时 - 从 git tracking 移除 scripts/.dingtalk_token.json(本地保留) - scripts/sync_to_dingtalk.py:OAuth 流程改为手动 authCode 粘贴(避免本地 server 受限) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
816 lines
31 KiB
Python
816 lines
31 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
钉钉知识库同步脚本
|
||
将本地教案和大纲同步到钉钉知识库「标准化教案手册」
|
||
|
||
用法:
|
||
python sync_to_dingtalk.py auth # 首次授权(获取用户 Token)
|
||
python sync_to_dingtalk.py <file.md> # 同步指定文件到知识库
|
||
python sync_to_dingtalk.py all # 同步所有教案
|
||
python sync_to_dingtalk.py aicode03 # 只同步 AICODE-03
|
||
python sync_to_dingtalk.py aicode06 # 只同步 AICODE-06
|
||
python sync_to_dingtalk.py outline # 只同步课程大纲
|
||
python sync_to_dingtalk.py sales # 只同步销售材料
|
||
python sync_to_dingtalk.py sales,outline # 同步多个目标(逗号分隔,只发一条通知)
|
||
python sync_to_dingtalk.py outline --notify # 同步并发钉钉机器人通知(默认不发)
|
||
"""
|
||
|
||
import re
|
||
import sys
|
||
import json
|
||
import time
|
||
import hmac
|
||
import hashlib
|
||
import base64
|
||
import urllib.parse
|
||
import webbrowser
|
||
from datetime import datetime
|
||
import requests
|
||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
from pathlib import Path
|
||
|
||
PROJECT_ROOT = Path(__file__).parent.parent
|
||
ENV_PATH = PROJECT_ROOT / ".env"
|
||
TOKEN_PATH = PROJECT_ROOT / "scripts" / ".dingtalk_token.json"
|
||
|
||
|
||
# ============================================
|
||
# 配置加载
|
||
# ============================================
|
||
|
||
def load_env():
|
||
"""从 .env 文件加载配置"""
|
||
config = {}
|
||
with open(ENV_PATH, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if "=" in line:
|
||
key, value = line.split("=", 1)
|
||
config[key.strip()] = value.strip()
|
||
return config
|
||
|
||
|
||
# ============================================
|
||
# OAuth 授权(获取用户级 Token)
|
||
# ============================================
|
||
|
||
class OAuthCallbackHandler(BaseHTTPRequestHandler):
|
||
"""处理 OAuth 回调的 HTTP handler"""
|
||
auth_code = None
|
||
|
||
def do_GET(self):
|
||
query = urllib.parse.urlparse(self.path).query
|
||
params = urllib.parse.parse_qs(query)
|
||
if "authCode" in params:
|
||
OAuthCallbackHandler.auth_code = params["authCode"][0]
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||
self.end_headers()
|
||
self.wfile.write("授权成功!你可以关闭此页面。".encode("utf-8"))
|
||
else:
|
||
self.send_response(400)
|
||
self.end_headers()
|
||
self.wfile.write("授权失败,未收到 authCode".encode("utf-8"))
|
||
|
||
def log_message(self, format, *args):
|
||
pass # 静默日志
|
||
|
||
|
||
def do_oauth(config):
|
||
"""执行 OAuth 授权流程,获取用户 Token"""
|
||
client_id = config["DINGTALK_APP_KEY"]
|
||
client_secret = config["DINGTALK_APP_SECRET"]
|
||
redirect_uri = "http://127.0.0.1:18765/callback"
|
||
|
||
# 构建授权 URL
|
||
auth_url = (
|
||
"https://login.dingtalk.com/oauth2/auth?"
|
||
f"redirect_uri={urllib.parse.quote(redirect_uri)}"
|
||
f"&response_type=code"
|
||
f"&client_id={client_id}"
|
||
f"&scope=openid"
|
||
f"&prompt=consent"
|
||
)
|
||
|
||
# 直接手动模式:打印链接,让用户粘贴 authCode
|
||
print("=" * 60)
|
||
print("请用浏览器打开以下链接,完成钉钉登录授权:")
|
||
print(f"\n{auth_url}\n")
|
||
print("授权后浏览器地址栏会出现类似:")
|
||
print(" http://127.0.0.1:18765/callback?authCode=xxxxxx&...")
|
||
print("\n从地址栏复制 authCode= 后面的值粘贴到这里:")
|
||
print("=" * 60)
|
||
webbrowser.open(auth_url)
|
||
auth_code = input("authCode: ").strip()
|
||
if not auth_code:
|
||
print("[auth] 未输入 authCode,授权取消")
|
||
return None
|
||
|
||
print(f"[auth] 收到授权码")
|
||
|
||
# 用 auth_code 换取 user access token
|
||
resp = requests.post(
|
||
"https://api.dingtalk.com/v1.0/oauth2/userAccessToken",
|
||
json={
|
||
"clientId": client_id,
|
||
"clientSecret": client_secret,
|
||
"code": auth_code,
|
||
"grantType": "authorization_code",
|
||
},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
|
||
# 保存 token
|
||
token_data = {
|
||
"accessToken": data["accessToken"],
|
||
"refreshToken": data["refreshToken"],
|
||
"expireTime": time.time() + data.get("expireIn", 7200) - 300,
|
||
"clientId": client_id,
|
||
"clientSecret": client_secret,
|
||
}
|
||
TOKEN_PATH.write_text(json.dumps(token_data, indent=2), encoding="utf-8")
|
||
print(f"[auth] 用户 Token 获取成功,已保存到 {TOKEN_PATH.name}")
|
||
print(f"[auth] Access Token 有效期 2 小时,Refresh Token 有效期 30 天(自动续期)")
|
||
return token_data
|
||
|
||
|
||
def load_user_token(config):
|
||
"""加载已保存的用户 Token,自动刷新过期的 Token"""
|
||
if not TOKEN_PATH.exists():
|
||
return None
|
||
|
||
data = json.loads(TOKEN_PATH.read_text(encoding="utf-8"))
|
||
|
||
# 检查是否过期
|
||
if time.time() < data.get("expireTime", 0):
|
||
return data
|
||
|
||
# 尝试用 refresh_token 刷新
|
||
refresh_token = data.get("refreshToken")
|
||
if not refresh_token:
|
||
return None
|
||
|
||
print("[token] Access Token 已过期,使用 Refresh Token 刷新...")
|
||
resp = requests.post(
|
||
"https://api.dingtalk.com/v1.0/oauth2/userAccessToken",
|
||
json={
|
||
"clientId": data.get("clientId", config["DINGTALK_APP_KEY"]),
|
||
"clientSecret": data.get("clientSecret", config["DINGTALK_APP_SECRET"]),
|
||
"refreshToken": refresh_token,
|
||
"grantType": "refresh_token",
|
||
},
|
||
)
|
||
|
||
if resp.status_code != 200:
|
||
print("[token] Refresh Token 已失效,请重新授权: python sync_to_dingtalk.py auth")
|
||
return None
|
||
|
||
new_data = resp.json()
|
||
token_data = {
|
||
"accessToken": new_data["accessToken"],
|
||
"refreshToken": new_data["refreshToken"],
|
||
"expireTime": time.time() + new_data.get("expireIn", 7200) - 300,
|
||
"clientId": data.get("clientId", config["DINGTALK_APP_KEY"]),
|
||
"clientSecret": data.get("clientSecret", config["DINGTALK_APP_SECRET"]),
|
||
}
|
||
TOKEN_PATH.write_text(json.dumps(token_data, indent=2), encoding="utf-8")
|
||
print("[token] Token 刷新成功")
|
||
return token_data
|
||
|
||
|
||
# ============================================
|
||
# 钉钉 API 客户端
|
||
# ============================================
|
||
|
||
class DingTalkClient:
|
||
"""钉钉 API 客户端(双 Token 模式)"""
|
||
|
||
BASE_URL = "https://api.dingtalk.com"
|
||
|
||
def __init__(self, app_key, app_secret, operator_id, workspace_id, user_token=None):
|
||
self.app_key = app_key
|
||
self.app_secret = app_secret
|
||
self.operator_id = operator_id
|
||
self.workspace_id = workspace_id
|
||
self.user_token = user_token
|
||
self.app_token = None
|
||
self.app_token_expire_time = 0
|
||
|
||
def _ensure_app_token(self):
|
||
"""确保应用级 access_token 有效"""
|
||
if self.app_token and time.time() < self.app_token_expire_time:
|
||
return
|
||
resp = requests.post(
|
||
f"{self.BASE_URL}/v1.0/oauth2/accessToken",
|
||
json={"appKey": self.app_key, "appSecret": self.app_secret},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
self.app_token = data["accessToken"]
|
||
self.app_token_expire_time = time.time() + data["expireIn"] - 300
|
||
print(f"[token] 应用 Token 获取成功")
|
||
|
||
def _app_headers(self):
|
||
"""应用级请求头(创建文件夹/文档、查询节点)"""
|
||
self._ensure_app_token()
|
||
return {
|
||
"x-acs-dingtalk-access-token": self.app_token,
|
||
"Content-Type": "application/json",
|
||
}
|
||
|
||
def _user_headers(self):
|
||
"""用户级请求头(写入文档内容)"""
|
||
if not self.user_token:
|
||
raise RuntimeError("需要用户 Token,请先运行: python sync_to_dingtalk.py auth")
|
||
return {
|
||
"x-acs-dingtalk-access-token": self.user_token,
|
||
"Content-Type": "application/json",
|
||
}
|
||
|
||
def list_nodes(self, parent_node_id):
|
||
"""获取指定文件夹下的节点列表"""
|
||
nodes = []
|
||
next_token = None
|
||
while True:
|
||
params = {
|
||
"parentNodeId": parent_node_id,
|
||
"operatorId": self.operator_id,
|
||
"maxResults": 50,
|
||
}
|
||
if next_token:
|
||
params["nextToken"] = next_token
|
||
resp = requests.get(
|
||
f"{self.BASE_URL}/v2.0/wiki/nodes",
|
||
headers=self._app_headers(),
|
||
params=params,
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
nodes.extend(data.get("nodes", []))
|
||
next_token = data.get("nextToken")
|
||
if not next_token:
|
||
break
|
||
return nodes
|
||
|
||
def find_node(self, parent_node_id, name, node_type=None):
|
||
"""在指定文件夹下按名称查找节点"""
|
||
nodes = self.list_nodes(parent_node_id)
|
||
for node in nodes:
|
||
if node["name"] == name:
|
||
if node_type is None or node["type"] == node_type:
|
||
return node
|
||
return None
|
||
|
||
def find_node_by_prefix(self, parent_node_id, prefix):
|
||
"""在指定文件夹下按名称前缀查找节点(用于按课次编号匹配)
|
||
例如 prefix='AICODE03-02' 可匹配 'AICODE03-02 提问的艺术.adoc'
|
||
"""
|
||
nodes = self.list_nodes(parent_node_id)
|
||
for node in nodes:
|
||
node_name = node["name"]
|
||
# 钉钉文档名称带 .adoc 后缀
|
||
if node_name.endswith(".adoc"):
|
||
node_name = node_name[:-5]
|
||
if node_name.startswith(prefix):
|
||
return node
|
||
return None
|
||
|
||
def create_folder(self, parent_node_id, name):
|
||
"""创建文件夹,如果已存在则返回现有的。返回 (nodeId, actualName)"""
|
||
existing = self.find_node(parent_node_id, name, "FOLDER")
|
||
if existing:
|
||
print(f" [folder] 已存在: {existing['name']}")
|
||
return existing["nodeId"], existing["name"]
|
||
|
||
resp = requests.post(
|
||
f"{self.BASE_URL}/v1.0/doc/workspaces/{self.workspace_id}/docs",
|
||
headers=self._app_headers(),
|
||
json={
|
||
"name": name,
|
||
"docType": "FOLDER",
|
||
"operatorId": self.operator_id,
|
||
"parentNodeId": parent_node_id,
|
||
},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
print(f" [folder] 创建成功: {name}")
|
||
return data["nodeId"], name
|
||
|
||
def resolve_node_name(self, parent_node_id, target_node_id):
|
||
"""通过遍历父目录,获取指定 nodeId 的实际名称"""
|
||
nodes = self.list_nodes(parent_node_id)
|
||
for node in nodes:
|
||
if node["nodeId"] == target_node_id:
|
||
return node["name"]
|
||
return None
|
||
|
||
def create_doc(self, parent_node_id, name):
|
||
"""创建文档,返回 (nodeId, docKey, url)"""
|
||
resp = requests.post(
|
||
f"{self.BASE_URL}/v1.0/doc/workspaces/{self.workspace_id}/docs",
|
||
headers=self._app_headers(),
|
||
json={
|
||
"name": name,
|
||
"docType": "DOC",
|
||
"operatorId": self.operator_id,
|
||
"parentNodeId": parent_node_id,
|
||
},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
return data["nodeId"], data["docKey"], data.get("url", "")
|
||
|
||
def overwrite_content(self, doc_key, markdown_content):
|
||
"""覆写文档内容(Markdown 格式,需要用户级 Token)"""
|
||
resp = requests.post(
|
||
f"{self.BASE_URL}/v2.0/doc/me/suites/documents/{doc_key}/overwriteContent",
|
||
headers=self._user_headers(),
|
||
json={
|
||
"dataType": "markdown",
|
||
"content": markdown_content,
|
||
},
|
||
)
|
||
if resp.status_code != 200:
|
||
print(f" [error] 写入失败: {resp.status_code} {resp.text}")
|
||
resp.raise_for_status()
|
||
print(f" [content] 内容写入成功")
|
||
|
||
def upload_markdown(self, parent_node_id, name, markdown_content):
|
||
"""上传 Markdown 文档:创建 + 写入内容。返回 (node_id, url)
|
||
|
||
匹配逻辑(按优先级):
|
||
1. 从文件名提取课次编号(如 AICODE03-02),按编号前缀匹配已有文档
|
||
2. 如果没有编号(如课程大纲),按完整文件名匹配
|
||
3. 都没匹配到 → 新建文档
|
||
"""
|
||
lesson_id = extract_lesson_id(name)
|
||
existing = None
|
||
|
||
# 优先按课次编号匹配
|
||
if lesson_id:
|
||
existing = self.find_node_by_prefix(parent_node_id, lesson_id)
|
||
if existing:
|
||
old_name = existing["name"].rstrip(".adoc") if existing["name"].endswith(".adoc") else existing["name"]
|
||
if old_name != name:
|
||
print(f" [update] 按编号匹配覆盖: {old_name} → {name}")
|
||
else:
|
||
print(f" [update] 覆盖已有文档: {name}")
|
||
|
||
# 没有编号或编号没匹配到,按完整文件名匹配
|
||
if not existing:
|
||
existing = self.find_node(parent_node_id, f"{name}.adoc")
|
||
if existing:
|
||
print(f" [update] 覆盖已有文档: {name}")
|
||
|
||
if existing:
|
||
doc_key = existing["nodeId"]
|
||
self.overwrite_content(doc_key, markdown_content)
|
||
return existing["nodeId"], existing.get("url", "")
|
||
else:
|
||
node_id, doc_key, url = self.create_doc(parent_node_id, name)
|
||
print(f" [create] 新建文档: {name}")
|
||
time.sleep(2) # 等待文档初始化
|
||
self.overwrite_content(doc_key, markdown_content)
|
||
return node_id, url or ""
|
||
|
||
|
||
# ============================================
|
||
# Webhook 通知
|
||
# ============================================
|
||
|
||
DINGTALK_DOC_BASE_URL = "https://alidocs.dingtalk.com/i/nodes"
|
||
|
||
|
||
def _get_doc_url(result):
|
||
"""获取文档链接:优先用 API 返回的 url,否则用 nodeId 构造"""
|
||
url = result.get("url", "")
|
||
if url:
|
||
return url
|
||
node_id = result.get("node_id", "")
|
||
if node_id:
|
||
return f"{DINGTALK_DOC_BASE_URL}/{node_id}"
|
||
return ""
|
||
|
||
|
||
def send_webhook(config, results):
|
||
"""同步完成后发送钉钉机器人通知(仅报告成功项,按知识库目录分组)"""
|
||
webhook_url = config.get("DINGTALK_WEBHOOK_URL")
|
||
webhook_secret = config.get("DINGTALK_WEBHOOK_SECRET")
|
||
if not webhook_url or not results:
|
||
return
|
||
|
||
# HMAC-SHA256 签名
|
||
timestamp = str(int(time.time() * 1000))
|
||
string_to_sign = f"{timestamp}\n{webhook_secret}"
|
||
hmac_code = hmac.new(
|
||
webhook_secret.encode("utf-8"),
|
||
string_to_sign.encode("utf-8"),
|
||
digestmod=hashlib.sha256,
|
||
).digest()
|
||
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code).decode("utf-8"))
|
||
signed_url = f"{webhook_url}×tamp={timestamp}&sign={sign}"
|
||
|
||
# 按知识库目录分组
|
||
from collections import OrderedDict
|
||
groups = OrderedDict()
|
||
for r in results:
|
||
folder = r["kb_folder"]
|
||
if folder not in groups:
|
||
groups[folder] = []
|
||
groups[folder].append(r)
|
||
|
||
# 构建 Markdown 消息(按目录分组)
|
||
lines = ["### 知识库文档已更新\n"]
|
||
for folder, items in groups.items():
|
||
lines.append(f"**{folder}**\n")
|
||
for r in items:
|
||
doc_url = _get_doc_url(r)
|
||
if doc_url:
|
||
lines.append(f"- [{r['name']}]({doc_url})")
|
||
else:
|
||
lines.append(f"- {r['name']}")
|
||
lines.append("")
|
||
|
||
body = {
|
||
"msgtype": "markdown",
|
||
"markdown": {
|
||
"title": "知识库文档已更新",
|
||
"text": "\n".join(lines),
|
||
},
|
||
}
|
||
|
||
try:
|
||
resp = requests.post(signed_url, json=body, timeout=10)
|
||
if resp.status_code == 200 and resp.json().get("errcode") == 0:
|
||
print("[webhook] 通知发送成功")
|
||
else:
|
||
print(f"[webhook] 通知发送失败: {resp.text}")
|
||
except Exception as e:
|
||
print(f"[webhook] 通知发送异常: {e}")
|
||
|
||
|
||
# ============================================
|
||
# 辅助函数
|
||
# ============================================
|
||
|
||
def extract_lesson_id(name):
|
||
"""从文件名中提取课次编号。
|
||
'AICODE03-02 AI的记忆之谜' → 'AICODE03-02'
|
||
'AICODE06-04 代码审查入门' → 'AICODE06-04'
|
||
'AICODE-03课程大纲' → None(大纲没有课次编号,走文件名匹配)
|
||
"""
|
||
match = re.match(r'(AICODE\d{2}-\d{2})', name)
|
||
return match.group(1) if match else None
|
||
|
||
|
||
def _lesson_sort_key(stem):
|
||
"""排序键:课程大纲排第一,教案按课次编号排序。
|
||
'AICODE-03课程大纲' → (0, 0, 'AICODE-03课程大纲')
|
||
'AICODE03-02 AI的记忆之谜' → (1, 2, 'AICODE03-02 AI的记忆之谜')
|
||
"""
|
||
lesson_id = extract_lesson_id(stem)
|
||
if lesson_id:
|
||
# 提取课次数字(如 AICODE03-02 → 2)
|
||
num = int(lesson_id.split("-")[1])
|
||
return (1, num, stem)
|
||
elif "大纲" in stem or "课程" in stem:
|
||
return (0, 0, stem)
|
||
else:
|
||
return (2, 0, stem)
|
||
|
||
|
||
# ============================================
|
||
# 同步逻辑
|
||
# ============================================
|
||
|
||
def sync_lessons(client, local_dir, parent_node_id, folder_name, kb_folder, results=None):
|
||
"""同步一个课程目录下的所有教案(按课次编号排序)"""
|
||
folder_id, _ = client.create_folder(parent_node_id, folder_name)
|
||
|
||
lesson_dir = Path(local_dir)
|
||
# 只取当前目录的 .md 文件(排除子目录如 旧版本/)
|
||
md_files = sorted(
|
||
[f for f in lesson_dir.glob("*.md") if f.parent == lesson_dir],
|
||
key=lambda f: _lesson_sort_key(f.stem),
|
||
)
|
||
|
||
if not md_files:
|
||
print(f" [warn] {local_dir} 下没有找到 .md 文件")
|
||
return
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"同步 {folder_name}: {len(md_files)} 篇教案")
|
||
print(f"{'='*50}")
|
||
|
||
for i, md_file in enumerate(md_files, 1):
|
||
name = md_file.stem
|
||
content = md_file.read_text(encoding="utf-8")
|
||
print(f"\n[{i}/{len(md_files)}] {name}")
|
||
try:
|
||
node_id, doc_url = client.upload_markdown(folder_id, name, content)
|
||
if results is not None:
|
||
results.append({
|
||
"name": name,
|
||
"kb_folder": kb_folder,
|
||
"node_id": node_id,
|
||
"url": doc_url,
|
||
})
|
||
except Exception as e:
|
||
print(f" [error] 同步失败: {e}")
|
||
time.sleep(1) # 避免频率限制
|
||
|
||
|
||
def sync_single_file(client, file_path, parent_node_id, kb_folder, label="", results=None):
|
||
"""同步单个文件"""
|
||
path = Path(file_path)
|
||
if not path.exists():
|
||
print(f" [error] 文件不存在: {path}")
|
||
return
|
||
name = path.stem
|
||
content = path.read_text(encoding="utf-8")
|
||
print(f"\n同步{label}: {name}")
|
||
try:
|
||
node_id, doc_url = client.upload_markdown(parent_node_id, name, content)
|
||
if results is not None:
|
||
results.append({
|
||
"name": name,
|
||
"kb_folder": kb_folder,
|
||
"node_id": node_id,
|
||
"url": doc_url,
|
||
})
|
||
except Exception as e:
|
||
print(f" [error] 同步失败: {e}")
|
||
|
||
|
||
def cmd_list(config, token_data, target_folder=None):
|
||
"""列出钉钉知识库中的文档"""
|
||
client = DingTalkClient(
|
||
app_key=config["DINGTALK_APP_KEY"],
|
||
app_secret=config["DINGTALK_APP_SECRET"],
|
||
operator_id=config["DINGTALK_OPERATOR_ID"],
|
||
workspace_id=config["DINGTALK_WORKSPACE_ID"],
|
||
user_token=token_data["accessToken"],
|
||
)
|
||
innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"]
|
||
root_node = config["DINGTALK_ROOT_NODE_ID"]
|
||
|
||
print("正在读取知识库目录结构...")
|
||
kb_innovation = client.resolve_node_name(root_node, innovation_node) or "AICODE-AI编程创新课"
|
||
|
||
# 列出指定文件夹或所有文件夹
|
||
folders_to_list = []
|
||
if target_folder:
|
||
folder_name = target_folder.upper()
|
||
if not folder_name.startswith("AICODE-"):
|
||
folder_name = f"AICODE-{folder_name.replace('AICODE', '')}"
|
||
folders_to_list = [folder_name]
|
||
else:
|
||
# 列出所有子文件夹
|
||
top_nodes = client.list_nodes(innovation_node)
|
||
folders_to_list = [n["name"] for n in top_nodes if n.get("type") == "FOLDER"]
|
||
|
||
for folder_name in folders_to_list:
|
||
existing = client.find_node(innovation_node, folder_name, "FOLDER")
|
||
if not existing:
|
||
print(f"\n[warn] 文件夹 '{folder_name}' 不存在")
|
||
continue
|
||
folder_id = existing["nodeId"]
|
||
nodes = client.list_nodes(folder_id)
|
||
print(f"\n📁 {kb_innovation} / {folder_name} ({len(nodes)} 篇)")
|
||
print("-" * 50)
|
||
for i, node in enumerate(nodes, 1):
|
||
name = node["name"]
|
||
if name.endswith(".adoc"):
|
||
name = name[:-5]
|
||
ntype = "📄" if node.get("type") != "FOLDER" else "📁"
|
||
print(f" {i}. {ntype} {name}")
|
||
|
||
|
||
def cmd_delete(config, token_data, node_names):
|
||
"""删除钉钉知识库中的指定文档"""
|
||
client = DingTalkClient(
|
||
app_key=config["DINGTALK_APP_KEY"],
|
||
app_secret=config["DINGTALK_APP_SECRET"],
|
||
operator_id=config["DINGTALK_OPERATOR_ID"],
|
||
workspace_id=config["DINGTALK_WORKSPACE_ID"],
|
||
user_token=token_data["accessToken"],
|
||
)
|
||
innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"]
|
||
|
||
print("正在读取知识库目录结构...")
|
||
|
||
for target_name in node_names:
|
||
# 从名称推断所在文件夹
|
||
if "AICODE06" in target_name or "AICODE-06" in target_name:
|
||
folder_name = "AICODE-06"
|
||
elif "AICODE03" in target_name or "AICODE-03" in target_name:
|
||
folder_name = "AICODE-03"
|
||
else:
|
||
print(f" [skip] 无法判断 '{target_name}' 所在文件夹")
|
||
continue
|
||
|
||
folder = client.find_node(innovation_node, folder_name, "FOLDER")
|
||
if not folder:
|
||
print(f" [error] 文件夹 '{folder_name}' 不存在")
|
||
continue
|
||
|
||
# 按前缀或完整名称查找
|
||
lesson_id = extract_lesson_id(target_name)
|
||
found = None
|
||
if lesson_id:
|
||
found = client.find_node_by_prefix(folder["nodeId"], lesson_id)
|
||
if not found:
|
||
# 尝试完整名称匹配(带.adoc)
|
||
found = client.find_node(folder["nodeId"], f"{target_name}.adoc")
|
||
if not found:
|
||
found = client.find_node(folder["nodeId"], target_name)
|
||
|
||
if not found:
|
||
print(f" [warn] 未找到: {target_name}")
|
||
continue
|
||
|
||
actual_name = found["name"].rstrip(".adoc") if found["name"].endswith(".adoc") else found["name"]
|
||
node_id = found["nodeId"]
|
||
|
||
# 调用删除 API
|
||
try:
|
||
resp = requests.delete(
|
||
f"{client.BASE_URL}/v2.0/wiki/nodes/{node_id}",
|
||
headers=client._user_headers(),
|
||
params={"operatorId": client.operator_id},
|
||
)
|
||
resp.raise_for_status()
|
||
print(f" [deleted] {actual_name}")
|
||
except Exception as e:
|
||
print(f" [error] 删除 '{actual_name}' 失败: {e}")
|
||
|
||
|
||
def main():
|
||
config = load_env()
|
||
|
||
# 处理 auth 命令
|
||
if len(sys.argv) > 1 and sys.argv[1] == "auth":
|
||
do_oauth(config)
|
||
return
|
||
|
||
# 处理 list 命令
|
||
if len(sys.argv) > 1 and sys.argv[1] == "list":
|
||
token_data = load_user_token(config)
|
||
if not token_data:
|
||
print("未找到有效的用户 Token。请先授权。")
|
||
sys.exit(1)
|
||
target = sys.argv[2] if len(sys.argv) > 2 else None
|
||
cmd_list(config, token_data, target)
|
||
return
|
||
|
||
# 处理 delete 命令
|
||
if len(sys.argv) > 1 and sys.argv[1] == "delete":
|
||
token_data = load_user_token(config)
|
||
if not token_data:
|
||
print("未找到有效的用户 Token。请先授权。")
|
||
sys.exit(1)
|
||
if len(sys.argv) < 3:
|
||
print("用法: python scripts/sync_to_dingtalk.py delete <文档名1> [文档名2] ...")
|
||
sys.exit(1)
|
||
cmd_delete(config, token_data, sys.argv[2:])
|
||
return
|
||
|
||
# 加载用户 Token
|
||
token_data = load_user_token(config)
|
||
if not token_data:
|
||
print("未找到有效的用户 Token。请先授权:")
|
||
print(" python scripts/sync_to_dingtalk.py auth")
|
||
sys.exit(1)
|
||
|
||
client = DingTalkClient(
|
||
app_key=config["DINGTALK_APP_KEY"],
|
||
app_secret=config["DINGTALK_APP_SECRET"],
|
||
operator_id=config["DINGTALK_OPERATOR_ID"],
|
||
workspace_id=config["DINGTALK_WORKSPACE_ID"],
|
||
user_token=token_data["accessToken"],
|
||
)
|
||
|
||
innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"]
|
||
ai_course_node = config["DINGTALK_AI_COURSE_NODE_ID"]
|
||
root_node = config["DINGTALK_ROOT_NODE_ID"]
|
||
|
||
# ── 动态遍历知识库,获取真实目录名称 ──
|
||
print("正在读取知识库目录结构...")
|
||
kb_innovation = client.resolve_node_name(root_node, innovation_node) or "AICODE-AI编程创新课"
|
||
kb_ai_course = client.resolve_node_name(root_node, ai_course_node) or "AI人工智能课"
|
||
print(f" 一级目录: {kb_innovation}, {kb_ai_course}")
|
||
|
||
raw_target = sys.argv[1] if len(sys.argv) > 1 else "all"
|
||
# 支持逗号分隔的多目标,如 "sales,outline"
|
||
targets = [t.strip() for t in raw_target.split(",")]
|
||
handled = False
|
||
results = [] # 收集成功同步的文档信息
|
||
|
||
# 批量同步
|
||
if any(t in ("all", "aicode03") for t in targets):
|
||
_, sub_name = client.create_folder(innovation_node, "AICODE-03")
|
||
sync_lessons(
|
||
client, PROJECT_ROOT / "3-lessons" / "AICODE-03",
|
||
innovation_node, "AICODE-03",
|
||
kb_folder=f"{kb_innovation} / {sub_name}", results=results,
|
||
)
|
||
handled = True
|
||
|
||
if any(t in ("all", "aicode06") for t in targets):
|
||
_, sub_name = client.create_folder(innovation_node, "AICODE-06")
|
||
sync_lessons(
|
||
client, PROJECT_ROOT / "3-lessons" / "AICODE-06",
|
||
innovation_node, "AICODE-06",
|
||
kb_folder=f"{kb_innovation} / {sub_name}", results=results,
|
||
)
|
||
handled = True
|
||
|
||
if any(t in ("all", "outline") for t in targets):
|
||
aicode03_folder, sub03 = client.create_folder(innovation_node, "AICODE-03")
|
||
sync_single_file(
|
||
client, PROJECT_ROOT / "3-lessons" / "AICODE-03" / "AICODE-03课程大纲.md",
|
||
aicode03_folder, kb_folder=f"{kb_innovation} / {sub03}", label="AICODE-03大纲", results=results,
|
||
)
|
||
aicode06_folder, sub06 = client.create_folder(innovation_node, "AICODE-06")
|
||
sync_single_file(
|
||
client, PROJECT_ROOT / "3-lessons" / "AICODE-06" / "AICODE-06课程大纲.md",
|
||
aicode06_folder, kb_folder=f"{kb_innovation} / {sub06}", label="AICODE-06大纲", results=results,
|
||
)
|
||
handled = True
|
||
|
||
if any(t in ("all", "sales") for t in targets):
|
||
sales_folder_id, sales_folder_name = client.create_folder(ai_course_node, "销售工具")
|
||
sales_kb = f"{kb_ai_course} / {sales_folder_name}"
|
||
sync_single_file(
|
||
client, PROJECT_ROOT / "2-sales" / "课程介绍.md",
|
||
sales_folder_id, kb_folder=sales_kb, label="课程介绍", results=results,
|
||
)
|
||
sync_single_file(
|
||
client, PROJECT_ROOT / "2-sales" / "家长QA.md",
|
||
sales_folder_id, kb_folder=sales_kb, label="家长QA", results=results,
|
||
)
|
||
handled = True
|
||
|
||
# 支持同步指定的 .md 文件路径(仅单目标模式)
|
||
target = targets[0] if len(targets) == 1 else ""
|
||
if target.endswith(".md"):
|
||
file_path = Path(target)
|
||
if not file_path.is_absolute():
|
||
file_path = PROJECT_ROOT / file_path
|
||
# 根据路径判断目标文件夹和知识库路径
|
||
if "AICODE-03" in str(file_path):
|
||
folder_id, sub_name = client.create_folder(innovation_node, "AICODE-03")
|
||
kb_folder = f"{kb_innovation} / {sub_name}"
|
||
elif "AICODE-06" in str(file_path):
|
||
folder_id, sub_name = client.create_folder(innovation_node, "AICODE-06")
|
||
kb_folder = f"{kb_innovation} / {sub_name}"
|
||
elif "2-sales" in str(file_path):
|
||
folder_id, sales_name = client.create_folder(ai_course_node, "销售工具")
|
||
kb_folder = f"{kb_ai_course} / {sales_name}"
|
||
else:
|
||
folder_id = innovation_node
|
||
kb_folder = kb_innovation
|
||
sync_single_file(client, file_path, folder_id, kb_folder=kb_folder, label="指定文件", results=results)
|
||
handled = True
|
||
|
||
# 支持按课次编号(如 AICODE03-05)查找文件
|
||
if not handled and target.upper().startswith("AICODE"):
|
||
lesson_id = target.upper()
|
||
# 在两个课程目录中查找匹配的文件
|
||
found = False
|
||
for course_dir in ["AICODE-03", "AICODE-06"]:
|
||
lesson_dir = PROJECT_ROOT / "3-lessons" / course_dir
|
||
for md_file in lesson_dir.glob("*.md"):
|
||
if lesson_id in md_file.stem.upper().replace(" ", ""):
|
||
folder_id, sub_name = client.create_folder(innovation_node, course_dir)
|
||
kb_folder = f"{kb_innovation} / {sub_name}"
|
||
sync_single_file(
|
||
client, md_file, folder_id,
|
||
kb_folder=kb_folder, label=f"教案 {md_file.stem}", results=results,
|
||
)
|
||
found = True
|
||
break
|
||
if found:
|
||
break
|
||
if not found:
|
||
print(f"[error] 未找到匹配 '{target}' 的教案文件")
|
||
sys.exit(1)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"同步完成!共 {len(results)} 篇文档")
|
||
print(f"{'='*50}")
|
||
|
||
# 发送 Webhook 通知(仅当指定 --notify 时)
|
||
if "--notify" in sys.argv:
|
||
send_webhook(config, results)
|
||
else:
|
||
print("[webhook] 未发送通知(如需通知请加 --notify)")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|