关联主题:: dida2cflow
同级:: 2025-09-18_星期四
下一级::
滴答清单任务重复同步到cflow
问题:dida2cflow的作用是将滴答清单带有标签为cflow的内容同步到cflow。但是滴答清单同步到cflow的内容总是有很多重复的内容。所以我就在淘宝买了cursor高级版的套餐,对cursor这个容器进行了一次新的升级。
根目录项目文件夹:【完整】基础版v1.0_副本4
AI诊断的问题及修改建议
📋 滴答清单同步到cflow的完整流程
1. 同步循环启动 (第1899-2014行)
启动同步服务 → 每300秒执行一次 → 调用get_tasks() → 处理任务列表
2. 获取任务阶段 (第289-515行)
调用滴答清单API → 获取所有任务 → 过滤包含"cflow"标签的任务 → 返回需要同步的任务列表
3. 任务同步判断 (第264-287行 + 第376-502行)
检查任务是否删除 → 检查是否有"cflow"标签 → 检查完成状态 → 检查是否已同步过 → 决定是否同步
4. 创建或更新卡片 (第747-1296行)
提取任务内容 → 处理图片 → 检查是否更新还是创建 → 调用cflow API → 保存同步记录
5. 反馈到滴答清单 (第2015-2121行)
将cflow卡片ID写回滴答清单任务内容 → 更新任务
🔍 容易出问题的关键环节
问题环节1:同步状态判断混乱 (第376-502行)
现状代码逻辑:
# 有3个地方可能包含memo信息: # 1. 任务内容中的cflow ID (第368-373行) cflow_id_match = re.search(r'cflow ID:#(\d+)', content) # 2. synced_tasks记录中的memo_id (第384-449行) if task_id in self.synced_tasks: last_sync_time = self.synced_tasks[task_id].get('modified_time', '') # 3. 检查cflow ID是否存在 (第431-437行) if self.check_cflow_memo_exists(cflow_id):
问题: 这3个信息源可能不一致,导致判断错误
问题环节2:创建/更新决策逻辑复杂 (第777-791行)
现状代码:
# 多重判断导致逻辑混乱 if cflow_id and self.check_cflow_memo_exists(cflow_id): memo_id = cflow_id is_update = True elif task_id in self.synced_tasks and self.synced_tasks[task_id].get('memo_id'): memo_id = self.synced_tasks[task_id]['memo_id'] if self.check_cflow_memo_exists(memo_id): is_update = True else: # 这里可能导致重复创建 logger.info(f"记录的memo_id {memo_id} 不存在,将创建新卡片")
问题: 当memo不存在时,没有清理同步记录就直接创建新卡片
问题环节3:时间戳比较复杂 (第389-449行)
现状代码:
# 复杂的时间处理逻辑,容易出错 if isinstance(modified_time, str) and 'T' in modified_time: dt = parser.parse(modified_time) # 复杂的时区转换... current_timestamp = int(dt.timestamp()) else: # 时间戳处理... if timestamp > 1000000000000: timestamp = timestamp // 1000
问题: 时间比较失败时可能误判需要同步
问题环节4:并发处理缺乏保护 (第1948-1980行)
现状代码:
# 没有并发保护的循环处理 for task in new_tasks: if self.create_or_update_cflow_card(task): # 可能同时处理同一个任务
问题: 同一任务可能被并发处理多次
🎯 具体解决方案
解决方案1:统一同步状态管理
新增统一的状态检查方法:
def get_unified_sync_status(self, task): """统一获取任务的同步状态""" task_id = task.get('id') content = task.get('content', '') # 步骤1:从任务内容提取cflow ID content_cflow_id = None cflow_id_match = re.search(r'cflow ID:#(\d+)', content) if cflow_id_match: content_cflow_id = cflow_id_match.group(1) # 步骤2:从同步记录获取memo ID record_memo_id = None if task_id in self.synced_tasks: record_memo_id = self.synced_tasks[task_id].get('memo_id') # 步骤3:验证ID的有效性 valid_memo_id = None if content_cflow_id and self.check_cflow_memo_exists(content_cflow_id): valid_memo_id = content_cflow_id logger.info(f"任务 {task_id} 使用内容中的有效cflow ID: {content_cflow_id}") elif record_memo_id and self.check_cflow_memo_exists(record_memo_id): valid_memo_id = record_memo_id logger.info(f"任务 {task_id} 使用记录中的有效memo ID: {record_memo_id}") # 步骤4:清理无效记录 if content_cflow_id and content_cflow_id != valid_memo_id: logger.warning(f"任务内容中的cflow ID {content_cflow_id} 无效,需要清理") if record_memo_id and record_memo_id != valid_memo_id: logger.warning(f"同步记录中的memo ID {record_memo_id} 无效,需要清理") if task_id in self.synced_tasks: del self.synced_tasks[task_id] self.save_synced_tasks() return { 'task_id': task_id, 'valid_memo_id': valid_memo_id, 'needs_sync': valid_memo_id is None, 'needs_update': False # 后续判断 }
解决方案2:简化同步判断逻辑
替换复杂的should_sync_task方法:
def should_sync_task_v2(self, task): """简化的任务同步判断""" # 基础过滤 if task.get('deleted', 0) != 0: return False if "cflow" not in task.get('tags', []): return False if task.get('kind', '') != 'NOTE' and task.get('status', -1) != 0: return False # 获取统一状态 status = self.get_unified_sync_status(task) if status['needs_sync']: logger.info(f"任务需要同步: {task.get('title', '')}") return True # 检查是否需要更新 if status['valid_memo_id']: needs_update = self.check_needs_update(task, status['valid_memo_id']) if needs_update: logger.info(f"任务需要更新: {task.get('title', '')}") return True return False def check_needs_update(self, task, memo_id): """检查是否需要更新""" task_id = task.get('id') if task_id not in self.synced_tasks: return True # 没有同步记录,需要更新 # 简化的时间比较 current_time = self.normalize_timestamp(task.get('modifiedTime')) last_sync_time = self.synced_tasks[task_id].get('modified_time', '0') return int(current_time) > int(last_sync_time) def normalize_timestamp(self, time_value): """标准化时间戳""" try: if isinstance(time_value, str) and 'T' in time_value: dt = parser.parse(time_value) return str(int(dt.timestamp())) else: timestamp = int(time_value) if timestamp > 1000000000000: timestamp = timestamp // 1000 return str(timestamp) except: return str(int(time.time()))
解决方案3:添加任务锁机制
防止并发重复处理:
def process_task_with_lock(self, task): """带锁的任务处理""" task_id = task.get('id') lock_file = f"data/locks/task_{task_id}.lock" # 确保锁目录存在 os.makedirs("data/locks", exist_ok=True) try: # 检查锁文件 if os.path.exists(lock_file): # 检查锁文件时间,超过5分钟自动清理 lock_time = os.path.getmtime(lock_file) if time.time() - lock_time > 300: os.remove(lock_file) logger.warning(f"清理过期锁文件: {lock_file}") else: logger.info(f"任务 {task_id} 正在处理中,跳过") return False # 创建锁文件 with open(lock_file, 'w') as f: f.write(str(int(time.time()))) # 双重检查 if not self.should_sync_task_v2(task): return False # 执行同步 return self.create_or_update_cflow_card_v2(task) finally: # 清理锁文件 if os.path.exists(lock_file): os.remove(lock_file)
解决方案4:重构创建/更新逻辑
简化create_or_update_cflow_card方法:
def create_or_update_cflow_card_v2(self, task): """简化的创建或更新cflow卡片""" # 获取统一状态 status = self.get_unified_sync_status(task) task_id = task.get('id') if status['valid_memo_id']: # 更新现有卡片 success = self.update_existing_memo(task, status['valid_memo_id']) else: # 创建新卡片 success = self.create_new_memo(task) if success: logger.info(f"任务同步成功: {task.get('title', '')}") else: logger.error(f"任务同步失败: {task.get('title', '')}") return success
📊 修改优先级
- 高优先级:实现
get_unified_sync_status
方法,统一状态管理- 中优先级:添加任务锁机制,防止并发重复
- 低优先级:简化时间戳处理和重构创建逻辑
这样的改进将显著减少重复内容的创建,让同步更加可靠。
- AI修改之前我先将dida2cflow的项目推送到了Github,这样一旦出问题后可以回退版本。
修改完以后如何测试及加载?
1、本地安装docker应用;
2、cd /Users/bairimengyushi/Downloads/【完整】基础版v1.0_副本4/dida2cflow
3、构建容器
docker build -t dida2cflow .
4、本地启动测试
docker run -d \
--name dida2cflow \
--restart unless-stopped \
-p 5238:5238 \
-e CFLOW_API_URL="http://192.168.10.2:5236" \
-e SYNC_INTERVAL="30" \
-e SYNC_TAGS="cflow" \
-e LOG_LEVEL="INFO" \
dida2cflow
5、测试没有问题后导出镜像
docker save -o dida2cflow20250918.tar dida2cflow
cm:这里dida2cflow20250918.tar我特别加了日期用来区分版本。
6、手动将镜像复制到nas指定文件夹里
路径:/volume2/300-实用软件/【AI软件开发】/dida2cflow
7、FinalShell软件使用sudo -i,进入管理员模式ssh连接
8、进入存放镜像的路径
cd /volume2/300-实用软件/【AI软件开发】/dida2cflow
9、加载镜像
docker load -i dida2cflow20250918.tar
10、启动容器
docker run -d \
--name dida2cflow \
--restart unless-stopped \
-p 5238:5238 \
-e CFLOW_API_URL="http://192.168.10.2:5236" \
-e SYNC_INTERVAL="30" \
-e SYNC_TAGS="cflow" \
-e LOG_LEVEL="INFO" \
dida2cflow
搞定!