Initial commit from WSL migration
This commit is contained in:
815
scripts/sync_to_dingtalk.py
Normal file
815
scripts/sync_to_dingtalk.py
Normal file
@@ -0,0 +1,815 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
钉钉知识库同步脚本
|
||||
将本地教案和大纲同步到钉钉知识库「标准化教案手册」
|
||||
|
||||
用法:
|
||||
python sync_to_dingtalk.py auth # 首次授权(获取用户 Token)
|
||||
python sync_to_dingtalk.py <file.md> # 同步指定文件到知识库
|
||||
python sync_to_dingtalk.py all # 同步所有教案
|
||||
python sync_to_dingtalk.py aicode03 # 只同步 AICODE-03
|
||||
python sync_to_dingtalk.py aicode06 # 只同步 AICODE-06
|
||||
python sync_to_dingtalk.py outline # 只同步课程大纲
|
||||
python sync_to_dingtalk.py sales # 只同步销售材料
|
||||
python sync_to_dingtalk.py sales,outline # 同步多个目标(逗号分隔,只发一条通知)
|
||||
python sync_to_dingtalk.py outline --notify # 同步并发钉钉机器人通知(默认不发)
|
||||
"""
|
||||
|
||||
import re
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
import hmac
|
||||
import hashlib
|
||||
import base64
|
||||
import urllib.parse
|
||||
import webbrowser
|
||||
from datetime import datetime
|
||||
import requests
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from pathlib import Path
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
ENV_PATH = PROJECT_ROOT / ".env"
|
||||
TOKEN_PATH = PROJECT_ROOT / "scripts" / ".dingtalk_token.json"
|
||||
|
||||
|
||||
# ============================================
|
||||
# 配置加载
|
||||
# ============================================
|
||||
|
||||
def load_env():
|
||||
"""从 .env 文件加载配置"""
|
||||
config = {}
|
||||
with open(ENV_PATH, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
if "=" in line:
|
||||
key, value = line.split("=", 1)
|
||||
config[key.strip()] = value.strip()
|
||||
return config
|
||||
|
||||
|
||||
# ============================================
|
||||
# OAuth 授权(获取用户级 Token)
|
||||
# ============================================
|
||||
|
||||
class OAuthCallbackHandler(BaseHTTPRequestHandler):
|
||||
"""处理 OAuth 回调的 HTTP handler"""
|
||||
auth_code = None
|
||||
|
||||
def do_GET(self):
|
||||
query = urllib.parse.urlparse(self.path).query
|
||||
params = urllib.parse.parse_qs(query)
|
||||
if "authCode" in params:
|
||||
OAuthCallbackHandler.auth_code = params["authCode"][0]
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||||
self.end_headers()
|
||||
self.wfile.write("授权成功!你可以关闭此页面。".encode("utf-8"))
|
||||
else:
|
||||
self.send_response(400)
|
||||
self.end_headers()
|
||||
self.wfile.write("授权失败,未收到 authCode".encode("utf-8"))
|
||||
|
||||
def log_message(self, format, *args):
|
||||
pass # 静默日志
|
||||
|
||||
|
||||
def do_oauth(config):
|
||||
"""执行 OAuth 授权流程,获取用户 Token"""
|
||||
client_id = config["DINGTALK_APP_KEY"]
|
||||
client_secret = config["DINGTALK_APP_SECRET"]
|
||||
redirect_uri = "http://127.0.0.1:18765/callback"
|
||||
|
||||
# 构建授权 URL
|
||||
auth_url = (
|
||||
"https://login.dingtalk.com/oauth2/auth?"
|
||||
f"redirect_uri={urllib.parse.quote(redirect_uri)}"
|
||||
f"&response_type=code"
|
||||
f"&client_id={client_id}"
|
||||
f"&scope=openid"
|
||||
f"&prompt=consent"
|
||||
)
|
||||
|
||||
print("正在启动本地服务器等待授权回调...")
|
||||
print(f"请在浏览器中完成钉钉登录授权。")
|
||||
print(f"如果浏览器没有自动打开,请手动访问:\n{auth_url}\n")
|
||||
|
||||
# 启动本地 HTTP 服务器
|
||||
server = HTTPServer(("127.0.0.1", 18765), OAuthCallbackHandler)
|
||||
webbrowser.open(auth_url)
|
||||
|
||||
# 等待回调
|
||||
while OAuthCallbackHandler.auth_code is None:
|
||||
server.handle_request()
|
||||
|
||||
auth_code = OAuthCallbackHandler.auth_code
|
||||
server.server_close()
|
||||
print(f"[auth] 收到授权码")
|
||||
|
||||
# 用 auth_code 换取 user access token
|
||||
resp = requests.post(
|
||||
"https://api.dingtalk.com/v1.0/oauth2/userAccessToken",
|
||||
json={
|
||||
"clientId": client_id,
|
||||
"clientSecret": client_secret,
|
||||
"code": auth_code,
|
||||
"grantType": "authorization_code",
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
# 保存 token
|
||||
token_data = {
|
||||
"accessToken": data["accessToken"],
|
||||
"refreshToken": data["refreshToken"],
|
||||
"expireTime": time.time() + data.get("expireIn", 7200) - 300,
|
||||
"clientId": client_id,
|
||||
"clientSecret": client_secret,
|
||||
}
|
||||
TOKEN_PATH.write_text(json.dumps(token_data, indent=2), encoding="utf-8")
|
||||
print(f"[auth] 用户 Token 获取成功,已保存到 {TOKEN_PATH.name}")
|
||||
print(f"[auth] Access Token 有效期 2 小时,Refresh Token 有效期 30 天(自动续期)")
|
||||
return token_data
|
||||
|
||||
|
||||
def load_user_token(config):
|
||||
"""加载已保存的用户 Token,自动刷新过期的 Token"""
|
||||
if not TOKEN_PATH.exists():
|
||||
return None
|
||||
|
||||
data = json.loads(TOKEN_PATH.read_text(encoding="utf-8"))
|
||||
|
||||
# 检查是否过期
|
||||
if time.time() < data.get("expireTime", 0):
|
||||
return data
|
||||
|
||||
# 尝试用 refresh_token 刷新
|
||||
refresh_token = data.get("refreshToken")
|
||||
if not refresh_token:
|
||||
return None
|
||||
|
||||
print("[token] Access Token 已过期,使用 Refresh Token 刷新...")
|
||||
resp = requests.post(
|
||||
"https://api.dingtalk.com/v1.0/oauth2/userAccessToken",
|
||||
json={
|
||||
"clientId": data.get("clientId", config["DINGTALK_APP_KEY"]),
|
||||
"clientSecret": data.get("clientSecret", config["DINGTALK_APP_SECRET"]),
|
||||
"refreshToken": refresh_token,
|
||||
"grantType": "refresh_token",
|
||||
},
|
||||
)
|
||||
|
||||
if resp.status_code != 200:
|
||||
print("[token] Refresh Token 已失效,请重新授权: python sync_to_dingtalk.py auth")
|
||||
return None
|
||||
|
||||
new_data = resp.json()
|
||||
token_data = {
|
||||
"accessToken": new_data["accessToken"],
|
||||
"refreshToken": new_data["refreshToken"],
|
||||
"expireTime": time.time() + new_data.get("expireIn", 7200) - 300,
|
||||
"clientId": data.get("clientId", config["DINGTALK_APP_KEY"]),
|
||||
"clientSecret": data.get("clientSecret", config["DINGTALK_APP_SECRET"]),
|
||||
}
|
||||
TOKEN_PATH.write_text(json.dumps(token_data, indent=2), encoding="utf-8")
|
||||
print("[token] Token 刷新成功")
|
||||
return token_data
|
||||
|
||||
|
||||
# ============================================
|
||||
# 钉钉 API 客户端
|
||||
# ============================================
|
||||
|
||||
class DingTalkClient:
|
||||
"""钉钉 API 客户端(双 Token 模式)"""
|
||||
|
||||
BASE_URL = "https://api.dingtalk.com"
|
||||
|
||||
def __init__(self, app_key, app_secret, operator_id, workspace_id, user_token=None):
|
||||
self.app_key = app_key
|
||||
self.app_secret = app_secret
|
||||
self.operator_id = operator_id
|
||||
self.workspace_id = workspace_id
|
||||
self.user_token = user_token
|
||||
self.app_token = None
|
||||
self.app_token_expire_time = 0
|
||||
|
||||
def _ensure_app_token(self):
|
||||
"""确保应用级 access_token 有效"""
|
||||
if self.app_token and time.time() < self.app_token_expire_time:
|
||||
return
|
||||
resp = requests.post(
|
||||
f"{self.BASE_URL}/v1.0/oauth2/accessToken",
|
||||
json={"appKey": self.app_key, "appSecret": self.app_secret},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
self.app_token = data["accessToken"]
|
||||
self.app_token_expire_time = time.time() + data["expireIn"] - 300
|
||||
print(f"[token] 应用 Token 获取成功")
|
||||
|
||||
def _app_headers(self):
|
||||
"""应用级请求头(创建文件夹/文档、查询节点)"""
|
||||
self._ensure_app_token()
|
||||
return {
|
||||
"x-acs-dingtalk-access-token": self.app_token,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
def _user_headers(self):
|
||||
"""用户级请求头(写入文档内容)"""
|
||||
if not self.user_token:
|
||||
raise RuntimeError("需要用户 Token,请先运行: python sync_to_dingtalk.py auth")
|
||||
return {
|
||||
"x-acs-dingtalk-access-token": self.user_token,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
def list_nodes(self, parent_node_id):
|
||||
"""获取指定文件夹下的节点列表"""
|
||||
nodes = []
|
||||
next_token = None
|
||||
while True:
|
||||
params = {
|
||||
"parentNodeId": parent_node_id,
|
||||
"operatorId": self.operator_id,
|
||||
"maxResults": 50,
|
||||
}
|
||||
if next_token:
|
||||
params["nextToken"] = next_token
|
||||
resp = requests.get(
|
||||
f"{self.BASE_URL}/v2.0/wiki/nodes",
|
||||
headers=self._app_headers(),
|
||||
params=params,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
nodes.extend(data.get("nodes", []))
|
||||
next_token = data.get("nextToken")
|
||||
if not next_token:
|
||||
break
|
||||
return nodes
|
||||
|
||||
def find_node(self, parent_node_id, name, node_type=None):
|
||||
"""在指定文件夹下按名称查找节点"""
|
||||
nodes = self.list_nodes(parent_node_id)
|
||||
for node in nodes:
|
||||
if node["name"] == name:
|
||||
if node_type is None or node["type"] == node_type:
|
||||
return node
|
||||
return None
|
||||
|
||||
def find_node_by_prefix(self, parent_node_id, prefix):
|
||||
"""在指定文件夹下按名称前缀查找节点(用于按课次编号匹配)
|
||||
例如 prefix='AICODE03-02' 可匹配 'AICODE03-02 提问的艺术.adoc'
|
||||
"""
|
||||
nodes = self.list_nodes(parent_node_id)
|
||||
for node in nodes:
|
||||
node_name = node["name"]
|
||||
# 钉钉文档名称带 .adoc 后缀
|
||||
if node_name.endswith(".adoc"):
|
||||
node_name = node_name[:-5]
|
||||
if node_name.startswith(prefix):
|
||||
return node
|
||||
return None
|
||||
|
||||
def create_folder(self, parent_node_id, name):
|
||||
"""创建文件夹,如果已存在则返回现有的。返回 (nodeId, actualName)"""
|
||||
existing = self.find_node(parent_node_id, name, "FOLDER")
|
||||
if existing:
|
||||
print(f" [folder] 已存在: {existing['name']}")
|
||||
return existing["nodeId"], existing["name"]
|
||||
|
||||
resp = requests.post(
|
||||
f"{self.BASE_URL}/v1.0/doc/workspaces/{self.workspace_id}/docs",
|
||||
headers=self._app_headers(),
|
||||
json={
|
||||
"name": name,
|
||||
"docType": "FOLDER",
|
||||
"operatorId": self.operator_id,
|
||||
"parentNodeId": parent_node_id,
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
print(f" [folder] 创建成功: {name}")
|
||||
return data["nodeId"], name
|
||||
|
||||
def resolve_node_name(self, parent_node_id, target_node_id):
|
||||
"""通过遍历父目录,获取指定 nodeId 的实际名称"""
|
||||
nodes = self.list_nodes(parent_node_id)
|
||||
for node in nodes:
|
||||
if node["nodeId"] == target_node_id:
|
||||
return node["name"]
|
||||
return None
|
||||
|
||||
def create_doc(self, parent_node_id, name):
|
||||
"""创建文档,返回 (nodeId, docKey, url)"""
|
||||
resp = requests.post(
|
||||
f"{self.BASE_URL}/v1.0/doc/workspaces/{self.workspace_id}/docs",
|
||||
headers=self._app_headers(),
|
||||
json={
|
||||
"name": name,
|
||||
"docType": "DOC",
|
||||
"operatorId": self.operator_id,
|
||||
"parentNodeId": parent_node_id,
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return data["nodeId"], data["docKey"], data.get("url", "")
|
||||
|
||||
def overwrite_content(self, doc_key, markdown_content):
|
||||
"""覆写文档内容(Markdown 格式,需要用户级 Token)"""
|
||||
resp = requests.post(
|
||||
f"{self.BASE_URL}/v2.0/doc/me/suites/documents/{doc_key}/overwriteContent",
|
||||
headers=self._user_headers(),
|
||||
json={
|
||||
"dataType": "markdown",
|
||||
"content": markdown_content,
|
||||
},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
print(f" [error] 写入失败: {resp.status_code} {resp.text}")
|
||||
resp.raise_for_status()
|
||||
print(f" [content] 内容写入成功")
|
||||
|
||||
def upload_markdown(self, parent_node_id, name, markdown_content):
|
||||
"""上传 Markdown 文档:创建 + 写入内容。返回 (node_id, url)
|
||||
|
||||
匹配逻辑(按优先级):
|
||||
1. 从文件名提取课次编号(如 AICODE03-02),按编号前缀匹配已有文档
|
||||
2. 如果没有编号(如课程大纲),按完整文件名匹配
|
||||
3. 都没匹配到 → 新建文档
|
||||
"""
|
||||
lesson_id = extract_lesson_id(name)
|
||||
existing = None
|
||||
|
||||
# 优先按课次编号匹配
|
||||
if lesson_id:
|
||||
existing = self.find_node_by_prefix(parent_node_id, lesson_id)
|
||||
if existing:
|
||||
old_name = existing["name"].rstrip(".adoc") if existing["name"].endswith(".adoc") else existing["name"]
|
||||
if old_name != name:
|
||||
print(f" [update] 按编号匹配覆盖: {old_name} → {name}")
|
||||
else:
|
||||
print(f" [update] 覆盖已有文档: {name}")
|
||||
|
||||
# 没有编号或编号没匹配到,按完整文件名匹配
|
||||
if not existing:
|
||||
existing = self.find_node(parent_node_id, f"{name}.adoc")
|
||||
if existing:
|
||||
print(f" [update] 覆盖已有文档: {name}")
|
||||
|
||||
if existing:
|
||||
doc_key = existing["nodeId"]
|
||||
self.overwrite_content(doc_key, markdown_content)
|
||||
return existing["nodeId"], existing.get("url", "")
|
||||
else:
|
||||
node_id, doc_key, url = self.create_doc(parent_node_id, name)
|
||||
print(f" [create] 新建文档: {name}")
|
||||
time.sleep(2) # 等待文档初始化
|
||||
self.overwrite_content(doc_key, markdown_content)
|
||||
return node_id, url or ""
|
||||
|
||||
|
||||
# ============================================
|
||||
# Webhook 通知
|
||||
# ============================================
|
||||
|
||||
DINGTALK_DOC_BASE_URL = "https://alidocs.dingtalk.com/i/nodes"
|
||||
|
||||
|
||||
def _get_doc_url(result):
|
||||
"""获取文档链接:优先用 API 返回的 url,否则用 nodeId 构造"""
|
||||
url = result.get("url", "")
|
||||
if url:
|
||||
return url
|
||||
node_id = result.get("node_id", "")
|
||||
if node_id:
|
||||
return f"{DINGTALK_DOC_BASE_URL}/{node_id}"
|
||||
return ""
|
||||
|
||||
|
||||
def send_webhook(config, results):
|
||||
"""同步完成后发送钉钉机器人通知(仅报告成功项,按知识库目录分组)"""
|
||||
webhook_url = config.get("DINGTALK_WEBHOOK_URL")
|
||||
webhook_secret = config.get("DINGTALK_WEBHOOK_SECRET")
|
||||
if not webhook_url or not results:
|
||||
return
|
||||
|
||||
# HMAC-SHA256 签名
|
||||
timestamp = str(int(time.time() * 1000))
|
||||
string_to_sign = f"{timestamp}\n{webhook_secret}"
|
||||
hmac_code = hmac.new(
|
||||
webhook_secret.encode("utf-8"),
|
||||
string_to_sign.encode("utf-8"),
|
||||
digestmod=hashlib.sha256,
|
||||
).digest()
|
||||
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code).decode("utf-8"))
|
||||
signed_url = f"{webhook_url}×tamp={timestamp}&sign={sign}"
|
||||
|
||||
# 按知识库目录分组
|
||||
from collections import OrderedDict
|
||||
groups = OrderedDict()
|
||||
for r in results:
|
||||
folder = r["kb_folder"]
|
||||
if folder not in groups:
|
||||
groups[folder] = []
|
||||
groups[folder].append(r)
|
||||
|
||||
# 构建 Markdown 消息(按目录分组)
|
||||
lines = ["### 知识库文档已更新\n"]
|
||||
for folder, items in groups.items():
|
||||
lines.append(f"**{folder}**\n")
|
||||
for r in items:
|
||||
doc_url = _get_doc_url(r)
|
||||
if doc_url:
|
||||
lines.append(f"- [{r['name']}]({doc_url})")
|
||||
else:
|
||||
lines.append(f"- {r['name']}")
|
||||
lines.append("")
|
||||
|
||||
body = {
|
||||
"msgtype": "markdown",
|
||||
"markdown": {
|
||||
"title": "知识库文档已更新",
|
||||
"text": "\n".join(lines),
|
||||
},
|
||||
}
|
||||
|
||||
try:
|
||||
resp = requests.post(signed_url, json=body, timeout=10)
|
||||
if resp.status_code == 200 and resp.json().get("errcode") == 0:
|
||||
print("[webhook] 通知发送成功")
|
||||
else:
|
||||
print(f"[webhook] 通知发送失败: {resp.text}")
|
||||
except Exception as e:
|
||||
print(f"[webhook] 通知发送异常: {e}")
|
||||
|
||||
|
||||
# ============================================
|
||||
# 辅助函数
|
||||
# ============================================
|
||||
|
||||
def extract_lesson_id(name):
|
||||
"""从文件名中提取课次编号。
|
||||
'AICODE03-02 AI的记忆之谜' → 'AICODE03-02'
|
||||
'AICODE06-04 代码审查入门' → 'AICODE06-04'
|
||||
'AICODE-03课程大纲' → None(大纲没有课次编号,走文件名匹配)
|
||||
"""
|
||||
match = re.match(r'(AICODE\d{2}-\d{2})', name)
|
||||
return match.group(1) if match else None
|
||||
|
||||
|
||||
def _lesson_sort_key(stem):
|
||||
"""排序键:课程大纲排第一,教案按课次编号排序。
|
||||
'AICODE-03课程大纲' → (0, 0, 'AICODE-03课程大纲')
|
||||
'AICODE03-02 AI的记忆之谜' → (1, 2, 'AICODE03-02 AI的记忆之谜')
|
||||
"""
|
||||
lesson_id = extract_lesson_id(stem)
|
||||
if lesson_id:
|
||||
# 提取课次数字(如 AICODE03-02 → 2)
|
||||
num = int(lesson_id.split("-")[1])
|
||||
return (1, num, stem)
|
||||
elif "大纲" in stem or "课程" in stem:
|
||||
return (0, 0, stem)
|
||||
else:
|
||||
return (2, 0, stem)
|
||||
|
||||
|
||||
# ============================================
|
||||
# 同步逻辑
|
||||
# ============================================
|
||||
|
||||
def sync_lessons(client, local_dir, parent_node_id, folder_name, kb_folder, results=None):
|
||||
"""同步一个课程目录下的所有教案(按课次编号排序)"""
|
||||
folder_id, _ = client.create_folder(parent_node_id, folder_name)
|
||||
|
||||
lesson_dir = Path(local_dir)
|
||||
# 只取当前目录的 .md 文件(排除子目录如 旧版本/)
|
||||
md_files = sorted(
|
||||
[f for f in lesson_dir.glob("*.md") if f.parent == lesson_dir],
|
||||
key=lambda f: _lesson_sort_key(f.stem),
|
||||
)
|
||||
|
||||
if not md_files:
|
||||
print(f" [warn] {local_dir} 下没有找到 .md 文件")
|
||||
return
|
||||
|
||||
print(f"\n{'='*50}")
|
||||
print(f"同步 {folder_name}: {len(md_files)} 篇教案")
|
||||
print(f"{'='*50}")
|
||||
|
||||
for i, md_file in enumerate(md_files, 1):
|
||||
name = md_file.stem
|
||||
content = md_file.read_text(encoding="utf-8")
|
||||
print(f"\n[{i}/{len(md_files)}] {name}")
|
||||
try:
|
||||
node_id, doc_url = client.upload_markdown(folder_id, name, content)
|
||||
if results is not None:
|
||||
results.append({
|
||||
"name": name,
|
||||
"kb_folder": kb_folder,
|
||||
"node_id": node_id,
|
||||
"url": doc_url,
|
||||
})
|
||||
except Exception as e:
|
||||
print(f" [error] 同步失败: {e}")
|
||||
time.sleep(1) # 避免频率限制
|
||||
|
||||
|
||||
def sync_single_file(client, file_path, parent_node_id, kb_folder, label="", results=None):
|
||||
"""同步单个文件"""
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
print(f" [error] 文件不存在: {path}")
|
||||
return
|
||||
name = path.stem
|
||||
content = path.read_text(encoding="utf-8")
|
||||
print(f"\n同步{label}: {name}")
|
||||
try:
|
||||
node_id, doc_url = client.upload_markdown(parent_node_id, name, content)
|
||||
if results is not None:
|
||||
results.append({
|
||||
"name": name,
|
||||
"kb_folder": kb_folder,
|
||||
"node_id": node_id,
|
||||
"url": doc_url,
|
||||
})
|
||||
except Exception as e:
|
||||
print(f" [error] 同步失败: {e}")
|
||||
|
||||
|
||||
def cmd_list(config, token_data, target_folder=None):
|
||||
"""列出钉钉知识库中的文档"""
|
||||
client = DingTalkClient(
|
||||
app_key=config["DINGTALK_APP_KEY"],
|
||||
app_secret=config["DINGTALK_APP_SECRET"],
|
||||
operator_id=config["DINGTALK_OPERATOR_ID"],
|
||||
workspace_id=config["DINGTALK_WORKSPACE_ID"],
|
||||
user_token=token_data["accessToken"],
|
||||
)
|
||||
innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"]
|
||||
root_node = config["DINGTALK_ROOT_NODE_ID"]
|
||||
|
||||
print("正在读取知识库目录结构...")
|
||||
kb_innovation = client.resolve_node_name(root_node, innovation_node) or "AICODE-AI编程创新课"
|
||||
|
||||
# 列出指定文件夹或所有文件夹
|
||||
folders_to_list = []
|
||||
if target_folder:
|
||||
folder_name = target_folder.upper()
|
||||
if not folder_name.startswith("AICODE-"):
|
||||
folder_name = f"AICODE-{folder_name.replace('AICODE', '')}"
|
||||
folders_to_list = [folder_name]
|
||||
else:
|
||||
# 列出所有子文件夹
|
||||
top_nodes = client.list_nodes(innovation_node)
|
||||
folders_to_list = [n["name"] for n in top_nodes if n.get("type") == "FOLDER"]
|
||||
|
||||
for folder_name in folders_to_list:
|
||||
existing = client.find_node(innovation_node, folder_name, "FOLDER")
|
||||
if not existing:
|
||||
print(f"\n[warn] 文件夹 '{folder_name}' 不存在")
|
||||
continue
|
||||
folder_id = existing["nodeId"]
|
||||
nodes = client.list_nodes(folder_id)
|
||||
print(f"\n📁 {kb_innovation} / {folder_name} ({len(nodes)} 篇)")
|
||||
print("-" * 50)
|
||||
for i, node in enumerate(nodes, 1):
|
||||
name = node["name"]
|
||||
if name.endswith(".adoc"):
|
||||
name = name[:-5]
|
||||
ntype = "📄" if node.get("type") != "FOLDER" else "📁"
|
||||
print(f" {i}. {ntype} {name}")
|
||||
|
||||
|
||||
def cmd_delete(config, token_data, node_names):
|
||||
"""删除钉钉知识库中的指定文档"""
|
||||
client = DingTalkClient(
|
||||
app_key=config["DINGTALK_APP_KEY"],
|
||||
app_secret=config["DINGTALK_APP_SECRET"],
|
||||
operator_id=config["DINGTALK_OPERATOR_ID"],
|
||||
workspace_id=config["DINGTALK_WORKSPACE_ID"],
|
||||
user_token=token_data["accessToken"],
|
||||
)
|
||||
innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"]
|
||||
|
||||
print("正在读取知识库目录结构...")
|
||||
|
||||
for target_name in node_names:
|
||||
# 从名称推断所在文件夹
|
||||
if "AICODE06" in target_name or "AICODE-06" in target_name:
|
||||
folder_name = "AICODE-06"
|
||||
elif "AICODE03" in target_name or "AICODE-03" in target_name:
|
||||
folder_name = "AICODE-03"
|
||||
else:
|
||||
print(f" [skip] 无法判断 '{target_name}' 所在文件夹")
|
||||
continue
|
||||
|
||||
folder = client.find_node(innovation_node, folder_name, "FOLDER")
|
||||
if not folder:
|
||||
print(f" [error] 文件夹 '{folder_name}' 不存在")
|
||||
continue
|
||||
|
||||
# 按前缀或完整名称查找
|
||||
lesson_id = extract_lesson_id(target_name)
|
||||
found = None
|
||||
if lesson_id:
|
||||
found = client.find_node_by_prefix(folder["nodeId"], lesson_id)
|
||||
if not found:
|
||||
# 尝试完整名称匹配(带.adoc)
|
||||
found = client.find_node(folder["nodeId"], f"{target_name}.adoc")
|
||||
if not found:
|
||||
found = client.find_node(folder["nodeId"], target_name)
|
||||
|
||||
if not found:
|
||||
print(f" [warn] 未找到: {target_name}")
|
||||
continue
|
||||
|
||||
actual_name = found["name"].rstrip(".adoc") if found["name"].endswith(".adoc") else found["name"]
|
||||
node_id = found["nodeId"]
|
||||
|
||||
# 调用删除 API
|
||||
try:
|
||||
resp = requests.delete(
|
||||
f"{client.BASE_URL}/v2.0/wiki/nodes/{node_id}",
|
||||
headers=client._user_headers(),
|
||||
params={"operatorId": client.operator_id},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
print(f" [deleted] {actual_name}")
|
||||
except Exception as e:
|
||||
print(f" [error] 删除 '{actual_name}' 失败: {e}")
|
||||
|
||||
|
||||
def main():
|
||||
config = load_env()
|
||||
|
||||
# 处理 auth 命令
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "auth":
|
||||
do_oauth(config)
|
||||
return
|
||||
|
||||
# 处理 list 命令
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "list":
|
||||
token_data = load_user_token(config)
|
||||
if not token_data:
|
||||
print("未找到有效的用户 Token。请先授权。")
|
||||
sys.exit(1)
|
||||
target = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
cmd_list(config, token_data, target)
|
||||
return
|
||||
|
||||
# 处理 delete 命令
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "delete":
|
||||
token_data = load_user_token(config)
|
||||
if not token_data:
|
||||
print("未找到有效的用户 Token。请先授权。")
|
||||
sys.exit(1)
|
||||
if len(sys.argv) < 3:
|
||||
print("用法: python scripts/sync_to_dingtalk.py delete <文档名1> [文档名2] ...")
|
||||
sys.exit(1)
|
||||
cmd_delete(config, token_data, sys.argv[2:])
|
||||
return
|
||||
|
||||
# 加载用户 Token
|
||||
token_data = load_user_token(config)
|
||||
if not token_data:
|
||||
print("未找到有效的用户 Token。请先授权:")
|
||||
print(" python scripts/sync_to_dingtalk.py auth")
|
||||
sys.exit(1)
|
||||
|
||||
client = DingTalkClient(
|
||||
app_key=config["DINGTALK_APP_KEY"],
|
||||
app_secret=config["DINGTALK_APP_SECRET"],
|
||||
operator_id=config["DINGTALK_OPERATOR_ID"],
|
||||
workspace_id=config["DINGTALK_WORKSPACE_ID"],
|
||||
user_token=token_data["accessToken"],
|
||||
)
|
||||
|
||||
innovation_node = config["DINGTALK_AICODE_INNOVATION_NODE_ID"]
|
||||
ai_course_node = config["DINGTALK_AI_COURSE_NODE_ID"]
|
||||
root_node = config["DINGTALK_ROOT_NODE_ID"]
|
||||
|
||||
# ── 动态遍历知识库,获取真实目录名称 ──
|
||||
print("正在读取知识库目录结构...")
|
||||
kb_innovation = client.resolve_node_name(root_node, innovation_node) or "AICODE-AI编程创新课"
|
||||
kb_ai_course = client.resolve_node_name(root_node, ai_course_node) or "AI人工智能课"
|
||||
print(f" 一级目录: {kb_innovation}, {kb_ai_course}")
|
||||
|
||||
raw_target = sys.argv[1] if len(sys.argv) > 1 else "all"
|
||||
# 支持逗号分隔的多目标,如 "sales,outline"
|
||||
targets = [t.strip() for t in raw_target.split(",")]
|
||||
handled = False
|
||||
results = [] # 收集成功同步的文档信息
|
||||
|
||||
# 批量同步
|
||||
if any(t in ("all", "aicode03") for t in targets):
|
||||
_, sub_name = client.create_folder(innovation_node, "AICODE-03")
|
||||
sync_lessons(
|
||||
client, PROJECT_ROOT / "3-lessons" / "AICODE-03",
|
||||
innovation_node, "AICODE-03",
|
||||
kb_folder=f"{kb_innovation} / {sub_name}", results=results,
|
||||
)
|
||||
handled = True
|
||||
|
||||
if any(t in ("all", "aicode06") for t in targets):
|
||||
_, sub_name = client.create_folder(innovation_node, "AICODE-06")
|
||||
sync_lessons(
|
||||
client, PROJECT_ROOT / "3-lessons" / "AICODE-06",
|
||||
innovation_node, "AICODE-06",
|
||||
kb_folder=f"{kb_innovation} / {sub_name}", results=results,
|
||||
)
|
||||
handled = True
|
||||
|
||||
if any(t in ("all", "outline") for t in targets):
|
||||
aicode03_folder, sub03 = client.create_folder(innovation_node, "AICODE-03")
|
||||
sync_single_file(
|
||||
client, PROJECT_ROOT / "3-lessons" / "AICODE-03" / "AICODE-03课程大纲.md",
|
||||
aicode03_folder, kb_folder=f"{kb_innovation} / {sub03}", label="AICODE-03大纲", results=results,
|
||||
)
|
||||
aicode06_folder, sub06 = client.create_folder(innovation_node, "AICODE-06")
|
||||
sync_single_file(
|
||||
client, PROJECT_ROOT / "3-lessons" / "AICODE-06" / "AICODE-06课程大纲.md",
|
||||
aicode06_folder, kb_folder=f"{kb_innovation} / {sub06}", label="AICODE-06大纲", results=results,
|
||||
)
|
||||
handled = True
|
||||
|
||||
if any(t in ("all", "sales") for t in targets):
|
||||
sales_folder_id, sales_folder_name = client.create_folder(ai_course_node, "销售工具")
|
||||
sales_kb = f"{kb_ai_course} / {sales_folder_name}"
|
||||
sync_single_file(
|
||||
client, PROJECT_ROOT / "2-sales" / "课程介绍.md",
|
||||
sales_folder_id, kb_folder=sales_kb, label="课程介绍", results=results,
|
||||
)
|
||||
sync_single_file(
|
||||
client, PROJECT_ROOT / "2-sales" / "家长QA.md",
|
||||
sales_folder_id, kb_folder=sales_kb, label="家长QA", results=results,
|
||||
)
|
||||
handled = True
|
||||
|
||||
# 支持同步指定的 .md 文件路径(仅单目标模式)
|
||||
target = targets[0] if len(targets) == 1 else ""
|
||||
if target.endswith(".md"):
|
||||
file_path = Path(target)
|
||||
if not file_path.is_absolute():
|
||||
file_path = PROJECT_ROOT / file_path
|
||||
# 根据路径判断目标文件夹和知识库路径
|
||||
if "AICODE-03" in str(file_path):
|
||||
folder_id, sub_name = client.create_folder(innovation_node, "AICODE-03")
|
||||
kb_folder = f"{kb_innovation} / {sub_name}"
|
||||
elif "AICODE-06" in str(file_path):
|
||||
folder_id, sub_name = client.create_folder(innovation_node, "AICODE-06")
|
||||
kb_folder = f"{kb_innovation} / {sub_name}"
|
||||
elif "2-sales" in str(file_path):
|
||||
folder_id, sales_name = client.create_folder(ai_course_node, "销售工具")
|
||||
kb_folder = f"{kb_ai_course} / {sales_name}"
|
||||
else:
|
||||
folder_id = innovation_node
|
||||
kb_folder = kb_innovation
|
||||
sync_single_file(client, file_path, folder_id, kb_folder=kb_folder, label="指定文件", results=results)
|
||||
handled = True
|
||||
|
||||
# 支持按课次编号(如 AICODE03-05)查找文件
|
||||
if not handled and target.upper().startswith("AICODE"):
|
||||
lesson_id = target.upper()
|
||||
# 在两个课程目录中查找匹配的文件
|
||||
found = False
|
||||
for course_dir in ["AICODE-03", "AICODE-06"]:
|
||||
lesson_dir = PROJECT_ROOT / "3-lessons" / course_dir
|
||||
for md_file in lesson_dir.glob("*.md"):
|
||||
if lesson_id in md_file.stem.upper().replace(" ", ""):
|
||||
folder_id, sub_name = client.create_folder(innovation_node, course_dir)
|
||||
kb_folder = f"{kb_innovation} / {sub_name}"
|
||||
sync_single_file(
|
||||
client, md_file, folder_id,
|
||||
kb_folder=kb_folder, label=f"教案 {md_file.stem}", results=results,
|
||||
)
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
break
|
||||
if not found:
|
||||
print(f"[error] 未找到匹配 '{target}' 的教案文件")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"\n{'='*50}")
|
||||
print(f"同步完成!共 {len(results)} 篇文档")
|
||||
print(f"{'='*50}")
|
||||
|
||||
# 发送 Webhook 通知(仅当指定 --notify 时)
|
||||
if "--notify" in sys.argv:
|
||||
send_webhook(config, results)
|
||||
else:
|
||||
print("[webhook] 未发送通知(如需通知请加 --notify)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user