关联主题:: dida2cflow
同级:: 2025-09-18_星期四
下一级::

滴答清单任务重复同步到cflow

问题:dida2cflow的作用是将滴答清单带有标签为cflow的内容同步到cflow。但是滴答清单同步到cflow的内容总是有很多重复的内容。所以我就在淘宝买了cursor高级版的套餐,对cursor这个容器进行了一次新的升级。

根目录项目文件夹:【完整】基础版v1.0_副本4

AI诊断的问题及修改建议

📋 滴答清单同步到cflow的完整流程

1. 同步循环启动 (第1899-2014行)

启动同步服务 → 每300秒执行一次 → 调用get_tasks() → 处理任务列表

2. 获取任务阶段 (第289-515行)

调用滴答清单API → 获取所有任务 → 过滤包含"cflow"标签的任务 → 返回需要同步的任务列表

3. 任务同步判断 (第264-287行 + 第376-502行)

检查任务是否删除 → 检查是否有"cflow"标签 → 检查完成状态 → 检查是否已同步过 → 决定是否同步

4. 创建或更新卡片 (第747-1296行)

提取任务内容 → 处理图片 → 检查是否更新还是创建 → 调用cflow API → 保存同步记录

5. 反馈到滴答清单 (第2015-2121行)

将cflow卡片ID写回滴答清单任务内容 → 更新任务

🔍 容易出问题的关键环节

问题环节1:同步状态判断混乱 (第376-502行)

现状代码逻辑:

# 有3个地方可能包含memo信息:
# 1. 任务内容中的cflow ID (第368-373行)
cflow_id_match = re.search(r'cflow ID:#(\d+)', content)
 
# 2. synced_tasks记录中的memo_id (第384-449行)  
if task_id in self.synced_tasks:
    last_sync_time = self.synced_tasks[task_id].get('modified_time', '')
 
# 3. 检查cflow ID是否存在 (第431-437行)
if self.check_cflow_memo_exists(cflow_id):

问题: 这3个信息源可能不一致,导致判断错误

问题环节2:创建/更新决策逻辑复杂 (第777-791行)

现状代码:

# 多重判断导致逻辑混乱
if cflow_id and self.check_cflow_memo_exists(cflow_id):
    memo_id = cflow_id
    is_update = True
elif task_id in self.synced_tasks and self.synced_tasks[task_id].get('memo_id'):
    memo_id = self.synced_tasks[task_id]['memo_id']
    if self.check_cflow_memo_exists(memo_id):
        is_update = True
    else:
        # 这里可能导致重复创建
        logger.info(f"记录的memo_id {memo_id} 不存在,将创建新卡片")

问题: 当memo不存在时,没有清理同步记录就直接创建新卡片

问题环节3:时间戳比较复杂 (第389-449行)

现状代码:

# 复杂的时间处理逻辑,容易出错
if isinstance(modified_time, str) and 'T' in modified_time:
    dt = parser.parse(modified_time)
    # 复杂的时区转换...
    current_timestamp = int(dt.timestamp())
else:
    # 时间戳处理...
    if timestamp > 1000000000000:
        timestamp = timestamp // 1000

问题: 时间比较失败时可能误判需要同步

问题环节4:并发处理缺乏保护 (第1948-1980行)

现状代码:

# 没有并发保护的循环处理
for task in new_tasks:
    if self.create_or_update_cflow_card(task):
        # 可能同时处理同一个任务

问题: 同一任务可能被并发处理多次

🎯 具体解决方案

解决方案1:统一同步状态管理

新增统一的状态检查方法:

def get_unified_sync_status(self, task):
    """统一获取任务的同步状态"""
    task_id = task.get('id')
    content = task.get('content', '')
    
    # 步骤1:从任务内容提取cflow ID
    content_cflow_id = None
    cflow_id_match = re.search(r'cflow ID:#(\d+)', content)
    if cflow_id_match:
        content_cflow_id = cflow_id_match.group(1)
    
    # 步骤2:从同步记录获取memo ID
    record_memo_id = None
    if task_id in self.synced_tasks:
        record_memo_id = self.synced_tasks[task_id].get('memo_id')
    
    # 步骤3:验证ID的有效性
    valid_memo_id = None
    if content_cflow_id and self.check_cflow_memo_exists(content_cflow_id):
        valid_memo_id = content_cflow_id
        logger.info(f"任务 {task_id} 使用内容中的有效cflow ID: {content_cflow_id}")
    elif record_memo_id and self.check_cflow_memo_exists(record_memo_id):
        valid_memo_id = record_memo_id
        logger.info(f"任务 {task_id} 使用记录中的有效memo ID: {record_memo_id}")
    
    # 步骤4:清理无效记录
    if content_cflow_id and content_cflow_id != valid_memo_id:
        logger.warning(f"任务内容中的cflow ID {content_cflow_id} 无效,需要清理")
    
    if record_memo_id and record_memo_id != valid_memo_id:
        logger.warning(f"同步记录中的memo ID {record_memo_id} 无效,需要清理")
        if task_id in self.synced_tasks:
            del self.synced_tasks[task_id]
            self.save_synced_tasks()
    
    return {
        'task_id': task_id,
        'valid_memo_id': valid_memo_id,
        'needs_sync': valid_memo_id is None,
        'needs_update': False  # 后续判断
    }

解决方案2:简化同步判断逻辑

替换复杂的should_sync_task方法:

def should_sync_task_v2(self, task):
    """简化的任务同步判断"""
    # 基础过滤
    if task.get('deleted', 0) != 0:
        return False
    if "cflow" not in task.get('tags', []):
        return False
    if task.get('kind', '') != 'NOTE' and task.get('status', -1) != 0:
        return False
    
    # 获取统一状态
    status = self.get_unified_sync_status(task)
    
    if status['needs_sync']:
        logger.info(f"任务需要同步: {task.get('title', '')}")
        return True
    
    # 检查是否需要更新
    if status['valid_memo_id']:
        needs_update = self.check_needs_update(task, status['valid_memo_id'])
        if needs_update:
            logger.info(f"任务需要更新: {task.get('title', '')}")
            return True
    
    return False
 
def check_needs_update(self, task, memo_id):
    """检查是否需要更新"""
    task_id = task.get('id')
    if task_id not in self.synced_tasks:
        return True  # 没有同步记录,需要更新
    
    # 简化的时间比较
    current_time = self.normalize_timestamp(task.get('modifiedTime'))
    last_sync_time = self.synced_tasks[task_id].get('modified_time', '0')
    
    return int(current_time) > int(last_sync_time)
 
def normalize_timestamp(self, time_value):
    """标准化时间戳"""
    try:
        if isinstance(time_value, str) and 'T' in time_value:
            dt = parser.parse(time_value)
            return str(int(dt.timestamp()))
        else:
            timestamp = int(time_value)
            if timestamp > 1000000000000:
                timestamp = timestamp // 1000
            return str(timestamp)
    except:
        return str(int(time.time()))

解决方案3:添加任务锁机制

防止并发重复处理:

def process_task_with_lock(self, task):
    """带锁的任务处理"""
    task_id = task.get('id')
    lock_file = f"data/locks/task_{task_id}.lock"
    
    # 确保锁目录存在
    os.makedirs("data/locks", exist_ok=True)
    
    try:
        # 检查锁文件
        if os.path.exists(lock_file):
            # 检查锁文件时间,超过5分钟自动清理
            lock_time = os.path.getmtime(lock_file)
            if time.time() - lock_time > 300:
                os.remove(lock_file)
                logger.warning(f"清理过期锁文件: {lock_file}")
            else:
                logger.info(f"任务 {task_id} 正在处理中,跳过")
                return False
        
        # 创建锁文件
        with open(lock_file, 'w') as f:
            f.write(str(int(time.time())))
        
        # 双重检查
        if not self.should_sync_task_v2(task):
            return False
        
        # 执行同步
        return self.create_or_update_cflow_card_v2(task)
        
    finally:
        # 清理锁文件
        if os.path.exists(lock_file):
            os.remove(lock_file)

解决方案4:重构创建/更新逻辑

简化create_or_update_cflow_card方法:

def create_or_update_cflow_card_v2(self, task):
    """简化的创建或更新cflow卡片"""
    # 获取统一状态
    status = self.get_unified_sync_status(task)
    task_id = task.get('id')
    
    if status['valid_memo_id']:
        # 更新现有卡片
        success = self.update_existing_memo(task, status['valid_memo_id'])
    else:
        # 创建新卡片
        success = self.create_new_memo(task)
    
    if success:
        logger.info(f"任务同步成功: {task.get('title', '')}")
    else:
        logger.error(f"任务同步失败: {task.get('title', '')}")
    
    return success

📊 修改优先级

  1. 高优先级:实现get_unified_sync_status方法,统一状态管理
  2. 中优先级:添加任务锁机制,防止并发重复
  3. 低优先级:简化时间戳处理和重构创建逻辑

这样的改进将显著减少重复内容的创建,让同步更加可靠。

  • AI修改之前我先将dida2cflow的项目推送到了Github,这样一旦出问题后可以回退版本。

修改完以后如何测试及加载?
1、本地安装docker应用;
2、cd /Users/bairimengyushi/Downloads/【完整】基础版v1.0_副本4/dida2cflow

3、构建容器

docker build -t dida2cflow .  

4、本地启动测试

  docker run -d \
     --name dida2cflow \
     --restart unless-stopped \
     -p 5238:5238 \
     -e CFLOW_API_URL="http://192.168.10.2:5236" \
     -e SYNC_INTERVAL="30" \
     -e SYNC_TAGS="cflow" \
     -e LOG_LEVEL="INFO" \
     dida2cflow

5、测试没有问题后导出镜像

docker save -o dida2cflow20250918.tar dida2cflow

cm:这里dida2cflow20250918.tar我特别加了日期用来区分版本。

6、手动将镜像复制到nas指定文件夹里

路径:/volume2/300-实用软件/【AI软件开发】/dida2cflow

7、FinalShell软件使用sudo -i,进入管理员模式ssh连接
8、进入存放镜像的路径
cd /volume2/300-实用软件/【AI软件开发】/dida2cflow

9、加载镜像

docker load -i dida2cflow20250918.tar

10、启动容器

  docker run -d \
     --name dida2cflow \
     --restart unless-stopped \
     -p 5238:5238 \
     -e CFLOW_API_URL="http://192.168.10.2:5236" \
     -e SYNC_INTERVAL="30" \
     -e SYNC_TAGS="cflow" \
     -e LOG_LEVEL="INFO" \
     dida2cflow

搞定!