# -*- coding: utf-8 -*- """ 钉钉知识库同步脚本 将本地教案和大纲同步到钉钉知识库「标准化教案手册」 用法: python sync_to_dingtalk.py auth # 首次授权(获取用户 Token) python sync_to_dingtalk.py # 同步指定文件到知识库 python sync_to_dingtalk.py all # 同步所有教案 python sync_to_dingtalk.py aicode03 # 只同步 AICODE-03 python sync_to_dingtalk.py aicode06 # 只同步 AICODE-06 python sync_to_dingtalk.py outline # 只同步课程大纲 python sync_to_dingtalk.py sales # 只同步销售材料 python sync_to_dingtalk.py sales,outline # 同步多个目标(逗号分隔,只发一条通知) python sync_to_dingtalk.py outline --notify # 同步并发钉钉机器人通知(默认不发) """ import re import sys import json import time import hmac import hashlib import base64 import urllib.parse import webbrowser from datetime import datetime import requests from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path PROJECT_ROOT = Path(__file__).parent.parent ENV_PATH = PROJECT_ROOT / ".env" TOKEN_PATH = PROJECT_ROOT / "scripts" / ".dingtalk_token.json" # ============================================ # 配置加载 # ============================================ def load_env(): """从 .env 文件加载配置""" config = {} with open(ENV_PATH, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" in line: key, value = line.split("=", 1) config[key.strip()] = value.strip() return config # ============================================ # OAuth 授权(获取用户级 Token) # ============================================ class OAuthCallbackHandler(BaseHTTPRequestHandler): """处理 OAuth 回调的 HTTP handler""" auth_code = None def do_GET(self): query = urllib.parse.urlparse(self.path).query params = urllib.parse.parse_qs(query) if "authCode" in params: OAuthCallbackHandler.auth_code = params["authCode"][0] self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.end_headers() self.wfile.write("授权成功!你可以关闭此页面。".encode("utf-8")) else: self.send_response(400) self.end_headers() self.wfile.write("授权失败,未收到 authCode".encode("utf-8")) def log_message(self, format, *args): pass # 静默日志 def do_oauth(config): """执行 OAuth 授权流程,获取用户 Token""" client_id = config["DINGTALK_APP_KEY"] client_secret = config["DINGTALK_APP_SECRET"] redirect_uri = "http://127.0.0.1:18765/callback" # 构建授权 URL auth_url = ( "https://login.dingtalk.com/oauth2/auth?" f"redirect_uri={urllib.parse.quote(redirect_uri)}" f"&response_type=code" f"&client_id={client_id}" f"&scope=openid" f"&prompt=consent" ) print("正在启动本地服务器等待授权回调...") print(f"请在浏览器中完成钉钉登录授权。") print(f"如果浏览器没有自动打开,请手动访问:\n{auth_url}\n") # 启动本地 HTTP 服务器 server = HTTPServer(("127.0.0.1", 18765), OAuthCallbackHandler) webbrowser.open(auth_url) # 等待回调 while OAuthCallbackHandler.auth_code is None: server.handle_request() auth_code = OAuthCallbackHandler.auth_code server.server_close() print(f"[auth] 收到授权码") # 用 auth_code 换取 user access token resp = requests.post( "https://api.dingtalk.com/v1.0/oauth2/userAccessToken", json={ "clientId": client_id, "clientSecret": client_secret, "code": auth_code, "grantType": "authorization_code", }, ) resp.raise_for_status() data = resp.json() # 保存 token token_data = { "accessToken": data["accessToken"], "refreshToken": data["refreshToken"], "expireTime": time.time() + data.get("expireIn", 7200) - 300, "clientId": client_id, "clientSecret": client_secret, } TOKEN_PATH.write_text(json.dumps(token_data, indent=2), encoding="utf-8") print(f"[auth] 用户 Token 获取成功,已保存到 {TOKEN_PATH.name}") print(f"[auth] Access Token 有效期 2 小时,Refresh Token 有效期 30 天(自动续期)") return token_data def load_user_token(config): """加载已保存的用户 Token,自动刷新过期的 Token""" if not TOKEN_PATH.exists(): return None data = json.loads(TOKEN_PATH.read_text(encoding="utf-8")) # 检查是否过期 if time.time() < data.get("expireTime", 0): return data # 尝试用 refresh_token 刷新 refresh_token = data.get("refreshToken") if not refresh_token: return None print("[token] Access Token 已过期,使用 Refresh Token 刷新...") resp = requests.post( "https://api.dingtalk.com/v1.0/oauth2/userAccessToken", json={ "clientId": data.get("clientId", config["DINGTALK_APP_KEY"]), "clientSecret": data.get("clientSecret", config["DINGTALK_APP_SECRET"]), "refreshToken": refresh_token, "grantType": "refresh_token", }, ) if resp.status_code != 200: print("[token] Refresh Token 已失效,请重新授权: python sync_to_dingtalk.py auth") return None new_data = resp.json() token_data = { "accessToken": new_data["accessToken"], "refreshToken": new_data["refreshToken"], "expireTime": time.time() + new_data.get("expireIn", 7200) - 300, "clientId": data.get("clientId", config["DINGTALK_APP_KEY"]), "clientSecret": data.get("clientSecret", config["DINGTALK_APP_SECRET"]), } TOKEN_PATH.write_text(json.dumps(token_data, indent=2), encoding="utf-8") print("[token] Token 刷新成功") return token_data # ============================================ # 钉钉 API 客户端 # ============================================ class DingTalkClient: """钉钉 API 客户端(双 Token 模式)""" BASE_URL = "https://api.dingtalk.com" def __init__(self, app_key, app_secret, operator_id, workspace_id, user_token=None): self.app_key = app_key self.app_secret = app_secret self.operator_id = operator_id self.workspace_id = workspace_id self.user_token = user_token self.app_token = None self.app_token_expire_time = 0 def _ensure_app_token(self): """确保应用级 access_token 有效""" if self.app_token and time.time() < self.app_token_expire_time: return resp = requests.post( f"{self.BASE_URL}/v1.0/oauth2/accessToken", json={"appKey": self.app_key, "appSecret": self.app_secret}, ) resp.raise_for_status() data = resp.json() self.app_token = data["accessToken"] self.app_token_expire_time = time.time() + data["expireIn"] - 300 print(f"[token] 应用 Token 获取成功") def _app_headers(self): """应用级请求头(创建文件夹/文档、查询节点)""" self._ensure_app_token() return { "x-acs-dingtalk-access-token": self.app_token, "Content-Type": "application/json", } def _user_headers(self): """用户级请求头(写入文档内容)""" if not self.user_token: raise RuntimeError("需要用户 Token,请先运行: python sync_to_dingtalk.py auth") return { "x-acs-dingtalk-access-token": self.user_token, "Content-Type": "application/json", } def list_nodes(self, parent_node_id): """获取指定文件夹下的节点列表""" nodes = [] next_token = None while True: params = { "parentNodeId": parent_node_id, "operatorId": self.operator_id, "maxResults": 50, } if next_token: params["nextToken"] = next_token resp = requests.get( f"{self.BASE_URL}/v2.0/wiki/nodes", headers=self._app_headers(), params=params, ) resp.raise_for_status() data = resp.json() nodes.extend(data.get("nodes", [])) next_token = data.get("nextToken") if not next_token: break return nodes def find_node(self, parent_node_id, name, node_type=None): """在指定文件夹下按名称查找节点""" nodes = self.list_nodes(parent_node_id) for node in nodes: if node["name"] == name: if node_type is None or node["type"] == node_type: return node return None def find_node_by_prefix(self, parent_node_id, prefix): """在指定文件夹下按名称前缀查找节点(用于按课次编号匹配) 例如 prefix='AICODE03-02' 可匹配 'AICODE03-02 提问的艺术.adoc' """ nodes = self.list_nodes(parent_node_id) for node in nodes: node_name = node["name"] # 钉钉文档名称带 .adoc 后缀 if node_name.endswith(".adoc"): node_name = node_name[:-5] if node_name.startswith(prefix): return node return None def create_folder(self, parent_node_id, name): """创建文件夹,如果已存在则返回现有的。返回 (nodeId, actualName)""" existing = self.find_node(parent_node_id, name, "FOLDER") if existing: print(f" [folder] 已存在: {existing['name']}") return existing["nodeId"], existing["name"] resp = requests.post( f"{self.BASE_URL}/v1.0/doc/workspaces/{self.workspace_id}/docs", headers=self._app_headers(), json={ "name": name, "docType": "FOLDER", "operatorId": self.operator_id, "parentNodeId": parent_node_id, }, ) resp.raise_for_status() data = resp.json() print(f" [folder] 创建成功: {name}") return data["nodeId"], name def resolve_node_name(self, parent_node_id, target_node_id): """通过遍历父目录,获取指定 nodeId 的实际名称""" nodes = self.list_nodes(parent_node_id) for node in nodes: if node["nodeId"] == target_node_id: return node["name"] return None def create_doc(self, parent_node_id, name): """创建文档,返回 (nodeId, docKey, url)""" resp = requests.post( f"{self.BASE_URL}/v1.0/doc/workspaces/{self.workspace_id}/docs", headers=self._app_headers(), json={ "name": name, "docType": "DOC", "operatorId": self.operator_id, "parentNodeId": parent_node_id, }, ) resp.raise_for_status() data = resp.json() return data["nodeId"], data["docKey"], data.get("url", "") def overwrite_content(self, doc_key, markdown_content): """覆写文档内容(Markdown 格式,需要用户级 Token)""" resp = requests.post( f"{self.BASE_URL}/v2.0/doc/me/suites/documents/{doc_key}/overwriteContent", headers=self._user_headers(), json={ "dataType": "markdown", "content": markdown_content, }, ) if resp.status_code != 200: print(f" [error] 写入失败: {resp.status_code} {resp.text}") resp.raise_for_status() print(f" [content] 内容写入成功") def upload_markdown(self, parent_node_id, name, markdown_content): """上传 Markdown 文档:创建 + 写入内容。返回 (node_id, url) 匹配逻辑(按优先级): 1. 从文件名提取课次编号(如 AICODE03-02),按编号前缀匹配已有文档 2. 如果没有编号(如课程大纲),按完整文件名匹配 3. 都没匹配到 → 新建文档 """ lesson_id = extract_lesson_id(name) existing = None # 优先按课次编号匹配 if lesson_id: existing = self.find_node_by_prefix(parent_node_id, lesson_id) if existing: old_name = existing["name"].rstrip(".adoc") if existing["name"].endswith(".adoc") else existing["name"] if old_name != name: print(f" [update] 按编号匹配覆盖: {old_name} → {name}") else: print(f" [update] 覆盖已有文档: {name}") # 没有编号或编号没匹配到,按完整文件名匹配 if not existing: existing = self.find_node(parent_node_id, f"{name}.adoc") if existing: print(f" [update] 覆盖已有文档: {name}") if existing: doc_key = existing["nodeId"] self.overwrite_content(doc_key, markdown_content) return existing["nodeId"], existing.get("url", "") else: node_id, doc_key, url = self.create_doc(parent_node_id, name) print(f" [create] 新建文档: {name}") time.sleep(2) # 等待文档初始化 self.overwrite_content(doc_key, markdown_content) return node_id, url or "" # ============================================ # Webhook 通知 # ============================================ DINGTALK_DOC_BASE_URL = "https://alidocs.dingtalk.com/i/nodes" def _get_doc_url(result): """获取文档链接:优先用 API 返回的 url,否则用 nodeId 构造""" url = result.get("url", "") if url: return url node_id = result.get("node_id", "") if node_id: return f"{DINGTALK_DOC_BASE_URL}/{node_id}" return "" def send_webhook(config, results): """同步完成后发送钉钉机器人通知(仅报告成功项,按知识库目录分组)""" webhook_url = config.get("DINGTALK_WEBHOOK_URL") webhook_secret = config.get("DINGTALK_WEBHOOK_SECRET") if not webhook_url or not results: return # HMAC-SHA256 签名 timestamp = str(int(time.time() * 1000)) string_to_sign = f"{timestamp}\n{webhook_secret}" hmac_code = hmac.new( webhook_secret.encode("utf-8"), string_to_sign.encode("utf-8"), digestmod=hashlib.sha256, ).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code).decode("utf-8")) signed_url = f"{webhook_url}×tamp={timestamp}&sign={sign}" # 按知识库目录分组 from collections import OrderedDict groups = OrderedDict() for r in results: folder = r["kb_folder"] if folder not in groups: groups[folder] = [] groups[folder].append(r) # 构建 Markdown 消息(按目录分组) lines = ["### 知识库文档已更新\n"] for folder, items in groups.items(): lines.append(f"**{folder}**\n") for r in items: doc_url = _get_doc_url(r) if doc_url: lines.append(f"- [{r['name']}]({doc_url})") else: lines.append(f"- {r['name']}") lines.append("") body = { "msgtype": "markdown", "markdown": { "title": "知识库文档已更新", "text": "\n".join(lines), }, } try: resp = requests.post(signed_url, json=body, timeout=10) if resp.status_code == 200 and resp.json().get("errcode") == 0: print("[webhook] 通知发送成功") else: print(f"[webhook] 通知发送失败: {resp.text}") except Exception as e: print(f"[webhook] 通知发送异常: {e}") # ============================================ # 辅助函数 # ============================================ def extract_lesson_id(name): """从文件名中提取课次编号。 'AICODE03-02 AI的记忆之谜' → 'AICODE03-02' 'AICODE06-04 代码审查入门' → 'AICODE06-04' 'AICODE-03课程大纲' → None(大纲没有课次编号,走文件名匹配) """ match = re.match(r'(AICODE\d{2}-\d{2})', name) return match.group(1) if match else None def _lesson_sort_key(stem): """排序键:课程大纲排第一,教案按课次编号排序。 'AICODE-03课程大纲' → (0, 0, 'AICODE-03课程大纲') 'AICODE03-02 AI的记忆之谜' → (1, 2, 'AICODE03-02 AI的记忆之谜') """ lesson_id = extract_lesson_id(stem) if lesson_id: # 提取课次数字(如 AICODE03-02 → 2) num = int(lesson_id.split("-")[1]) return (1, num, stem) elif "大纲" in stem or "课程" in stem: return (0, 0, stem) else: return (2, 0, stem) # ============================================ # 同步逻辑 # ============================================ def sync_lessons(client, local_dir, parent_node_id, folder_name, kb_folder, results=None): """同步一个课程目录下的所有教案(按课次编号排序)""" folder_id, _ = client.create_folder(parent_node_id, folder_name) lesson_dir = Path(local_dir) # 只取当前目录的 .md 文件(排除子目录如 旧版本/) md_files = sorted( [f for f in lesson_dir.glob("*.md") if f.parent == lesson_dir], key=lambda f: _lesson_sort_key(f.stem), ) if not md_files: print(f" [warn] {local_dir} 下没有找到 .md 文件") return print(f"\n{'='*50}") print(f"同步 {folder_name}: {len(md_files)} 篇教案") print(f"{'='*50}") for i, md_file in enumerate(md_files, 1): name = md_file.stem content = md_file.read_text(encoding="utf-8") print(f"\n[{i}/{len(md_files)}] {name}") try: node_id, doc_url = client.upload_markdown(folder_id, name, content) if results is not None: results.append({ "name": name, "kb_folder": kb_folder, "node_id": node_id, "url": doc_url, }) except Exception as e: print(f" [error] 同步失败: {e}") time.sleep(1) # 避免频率限制 def sync_single_file(client, file_path, parent_node_id, kb_folder, label="", results=None): """同步单个文件""" path = Path(file_path) if not path.exists(): print(f" [error] 文件不存在: {path}") return name = path.stem content = path.read_text(encoding="utf-8") print(f"\n同步{label}: {name}") try: node_id, doc_url = client.upload_markdown(parent_node_id, name, content) if results is not None: results.append({ "name": name, "kb_folder": kb_folder, "node_id": node_id, "url": doc_url, }) except Exception as e: print(f" [error] 同步失败: {e}") def cmd_list(config, token_data, target_folder=None): """列出钉钉知识库中的文档""" client = DingTalkClient( app_key=config["DINGTALK_APP_KEY"], app_secret=config["DINGTALK_APP_SECRET"], operator_id=config["DINGTALK_OPERATOR_ID"], workspace_id=config["DINGTALK_WORKSPACE_ID"], user_token=token_data["accessToken"], ) innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"] root_node = config["DINGTALK_ROOT_NODE_ID"] print("正在读取知识库目录结构...") kb_innovation = client.resolve_node_name(root_node, innovation_node) or "AICODE-AI编程创新课" # 列出指定文件夹或所有文件夹 folders_to_list = [] if target_folder: folder_name = target_folder.upper() if not folder_name.startswith("AICODE-"): folder_name = f"AICODE-{folder_name.replace('AICODE', '')}" folders_to_list = [folder_name] else: # 列出所有子文件夹 top_nodes = client.list_nodes(innovation_node) folders_to_list = [n["name"] for n in top_nodes if n.get("type") == "FOLDER"] for folder_name in folders_to_list: existing = client.find_node(innovation_node, folder_name, "FOLDER") if not existing: print(f"\n[warn] 文件夹 '{folder_name}' 不存在") continue folder_id = existing["nodeId"] nodes = client.list_nodes(folder_id) print(f"\n📁 {kb_innovation} / {folder_name} ({len(nodes)} 篇)") print("-" * 50) for i, node in enumerate(nodes, 1): name = node["name"] if name.endswith(".adoc"): name = name[:-5] ntype = "📄" if node.get("type") != "FOLDER" else "📁" print(f" {i}. {ntype} {name}") def cmd_delete(config, token_data, node_names): """删除钉钉知识库中的指定文档""" client = DingTalkClient( app_key=config["DINGTALK_APP_KEY"], app_secret=config["DINGTALK_APP_SECRET"], operator_id=config["DINGTALK_OPERATOR_ID"], workspace_id=config["DINGTALK_WORKSPACE_ID"], user_token=token_data["accessToken"], ) innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"] print("正在读取知识库目录结构...") for target_name in node_names: # 从名称推断所在文件夹 if "AICODE06" in target_name or "AICODE-06" in target_name: folder_name = "AICODE-06" elif "AICODE03" in target_name or "AICODE-03" in target_name: folder_name = "AICODE-03" else: print(f" [skip] 无法判断 '{target_name}' 所在文件夹") continue folder = client.find_node(innovation_node, folder_name, "FOLDER") if not folder: print(f" [error] 文件夹 '{folder_name}' 不存在") continue # 按前缀或完整名称查找 lesson_id = extract_lesson_id(target_name) found = None if lesson_id: found = client.find_node_by_prefix(folder["nodeId"], lesson_id) if not found: # 尝试完整名称匹配(带.adoc) found = client.find_node(folder["nodeId"], f"{target_name}.adoc") if not found: found = client.find_node(folder["nodeId"], target_name) if not found: print(f" [warn] 未找到: {target_name}") continue actual_name = found["name"].rstrip(".adoc") if found["name"].endswith(".adoc") else found["name"] node_id = found["nodeId"] # 调用删除 API try: resp = requests.delete( f"{client.BASE_URL}/v2.0/wiki/nodes/{node_id}", headers=client._user_headers(), params={"operatorId": client.operator_id}, ) resp.raise_for_status() print(f" [deleted] {actual_name}") except Exception as e: print(f" [error] 删除 '{actual_name}' 失败: {e}") def main(): config = load_env() # 处理 auth 命令 if len(sys.argv) > 1 and sys.argv[1] == "auth": do_oauth(config) return # 处理 list 命令 if len(sys.argv) > 1 and sys.argv[1] == "list": token_data = load_user_token(config) if not token_data: print("未找到有效的用户 Token。请先授权。") sys.exit(1) target = sys.argv[2] if len(sys.argv) > 2 else None cmd_list(config, token_data, target) return # 处理 delete 命令 if len(sys.argv) > 1 and sys.argv[1] == "delete": token_data = load_user_token(config) if not token_data: print("未找到有效的用户 Token。请先授权。") sys.exit(1) if len(sys.argv) < 3: print("用法: python scripts/sync_to_dingtalk.py delete <文档名1> [文档名2] ...") sys.exit(1) cmd_delete(config, token_data, sys.argv[2:]) return # 加载用户 Token token_data = load_user_token(config) if not token_data: print("未找到有效的用户 Token。请先授权:") print(" python scripts/sync_to_dingtalk.py auth") sys.exit(1) client = DingTalkClient( app_key=config["DINGTALK_APP_KEY"], app_secret=config["DINGTALK_APP_SECRET"], operator_id=config["DINGTALK_OPERATOR_ID"], workspace_id=config["DINGTALK_WORKSPACE_ID"], user_token=token_data["accessToken"], ) innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"] ai_course_node = config["DINGTALK_AI_COURSE_NODE_ID"] root_node = config["DINGTALK_ROOT_NODE_ID"] # ── 动态遍历知识库,获取真实目录名称 ── print("正在读取知识库目录结构...") kb_innovation = client.resolve_node_name(root_node, innovation_node) or "AICODE-AI编程创新课" kb_ai_course = client.resolve_node_name(root_node, ai_course_node) or "AI人工智能课" print(f" 一级目录: {kb_innovation}, {kb_ai_course}") raw_target = sys.argv[1] if len(sys.argv) > 1 else "all" # 支持逗号分隔的多目标,如 "sales,outline" targets = [t.strip() for t in raw_target.split(",")] handled = False results = [] # 收集成功同步的文档信息 # 批量同步 if any(t in ("all", "aicode03") for t in targets): _, sub_name = client.create_folder(innovation_node, "AICODE-03") sync_lessons( client, PROJECT_ROOT / "3-lessons" / "AICODE-03", innovation_node, "AICODE-03", kb_folder=f"{kb_innovation} / {sub_name}", results=results, ) handled = True if any(t in ("all", "aicode06") for t in targets): _, sub_name = client.create_folder(innovation_node, "AICODE-06") sync_lessons( client, PROJECT_ROOT / "3-lessons" / "AICODE-06", innovation_node, "AICODE-06", kb_folder=f"{kb_innovation} / {sub_name}", results=results, ) handled = True if any(t in ("all", "outline") for t in targets): aicode03_folder, sub03 = client.create_folder(innovation_node, "AICODE-03") sync_single_file( client, PROJECT_ROOT / "3-lessons" / "AICODE-03" / "AICODE-03课程大纲.md", aicode03_folder, kb_folder=f"{kb_innovation} / {sub03}", label="AICODE-03大纲", results=results, ) aicode06_folder, sub06 = client.create_folder(innovation_node, "AICODE-06") sync_single_file( client, PROJECT_ROOT / "3-lessons" / "AICODE-06" / "AICODE-06课程大纲.md", aicode06_folder, kb_folder=f"{kb_innovation} / {sub06}", label="AICODE-06大纲", results=results, ) handled = True if any(t in ("all", "sales") for t in targets): sales_folder_id, sales_folder_name = client.create_folder(ai_course_node, "销售工具") sales_kb = f"{kb_ai_course} / {sales_folder_name}" sync_single_file( client, PROJECT_ROOT / "2-sales" / "课程介绍.md", sales_folder_id, kb_folder=sales_kb, label="课程介绍", results=results, ) sync_single_file( client, PROJECT_ROOT / "2-sales" / "家长QA.md", sales_folder_id, kb_folder=sales_kb, label="家长QA", results=results, ) handled = True # 支持同步指定的 .md 文件路径(仅单目标模式) target = targets[0] if len(targets) == 1 else "" if target.endswith(".md"): file_path = Path(target) if not file_path.is_absolute(): file_path = PROJECT_ROOT / file_path # 根据路径判断目标文件夹和知识库路径 if "AICODE-03" in str(file_path): folder_id, sub_name = client.create_folder(innovation_node, "AICODE-03") kb_folder = f"{kb_innovation} / {sub_name}" elif "AICODE-06" in str(file_path): folder_id, sub_name = client.create_folder(innovation_node, "AICODE-06") kb_folder = f"{kb_innovation} / {sub_name}" elif "2-sales" in str(file_path): folder_id, sales_name = client.create_folder(ai_course_node, "销售工具") kb_folder = f"{kb_ai_course} / {sales_name}" else: folder_id = innovation_node kb_folder = kb_innovation sync_single_file(client, file_path, folder_id, kb_folder=kb_folder, label="指定文件", results=results) handled = True # 支持按课次编号(如 AICODE03-05)查找文件 if not handled and target.upper().startswith("AICODE"): lesson_id = target.upper() # 在两个课程目录中查找匹配的文件 found = False for course_dir in ["AICODE-03", "AICODE-06"]: lesson_dir = PROJECT_ROOT / "3-lessons" / course_dir for md_file in lesson_dir.glob("*.md"): if lesson_id in md_file.stem.upper().replace(" ", ""): folder_id, sub_name = client.create_folder(innovation_node, course_dir) kb_folder = f"{kb_innovation} / {sub_name}" sync_single_file( client, md_file, folder_id, kb_folder=kb_folder, label=f"教案 {md_file.stem}", results=results, ) found = True break if found: break if not found: print(f"[error] 未找到匹配 '{target}' 的教案文件") sys.exit(1) print(f"\n{'='*50}") print(f"同步完成!共 {len(results)} 篇文档") print(f"{'='*50}") # 发送 Webhook 通知(仅当指定 --notify 时) if "--notify" in sys.argv: send_webhook(config, results) else: print("[webhook] 未发送通知(如需通知请加 --notify)") if __name__ == "__main__": main()