rspace-online/modules/rtasks/lib/clickup-sync.ts

511 lines
16 KiB
TypeScript

/**
* ClickUp sync service — import, outbound watcher, inbound webhook.
*
* Outbound: registerWatcher on ':tasks:boards:' docs, queue + batch push.
* Inbound: webhook handler validates HMAC, maps fields, applies via changeDoc.
*/
import type { SyncServer } from '../../../server/local-first/sync-server';
import { ClickUpClient, ClickUpApiError } from './clickup-client';
import {
mapClickUpTaskToRTasks,
mapRTasksToClickUpBody,
contentHash,
buildStatusMaps,
detectConflicts,
} from './clickup-mapping';
import {
boardDocId,
clickupConnectionDocId,
createTaskItem,
} from '../schemas';
import type {
BoardDoc,
TaskItem,
ClickUpConnectionDoc,
ClickUpBoardMeta,
} from '../schemas';
// ── Outbound push queue ──
interface PushItem {
docId: string;
taskId: string;
retries: number;
}
const pushQueue: PushItem[] = [];
let pushTimerId: ReturnType<typeof setInterval> | null = null;
// Track last-pushed hashes to avoid re-pushing unchanged tasks
const lastPushedHash = new Map<string, string>();
// ── Init: register watcher + start queue processor ──
export function initClickUpSync(syncServer: SyncServer) {
syncServer.registerWatcher(':tasks:boards:', (docId, doc) => {
const boardDoc = doc as BoardDoc;
if (!boardDoc?.board?.clickup?.syncEnabled) return;
const cu = boardDoc.board.clickup;
for (const [taskId, task] of Object.entries(boardDoc.tasks)) {
if (!task.clickup) continue;
if (task.clickup.syncStatus === 'synced') {
// Check if content actually changed since last push
const key = `${docId}:${taskId}`;
const currentFields = `${task.title}\x00${task.description}\x00${task.status}\x00${task.priority ?? ''}`;
if (lastPushedHash.get(key) === currentFields) continue;
// Content may have changed — enqueue for push
if (!pushQueue.some(p => p.docId === docId && p.taskId === taskId)) {
pushQueue.push({ docId, taskId, retries: 0 });
}
}
}
});
// Start queue processor (runs every 5s)
if (!pushTimerId) {
pushTimerId = setInterval(() => processPushQueue(syncServer), 5_000);
}
console.log('[ClickUp] Sync watcher registered');
}
async function processPushQueue(syncServer: SyncServer) {
if (pushQueue.length === 0) return;
const batch = pushQueue.splice(0, 10); // drain ≤10 items
for (const item of batch) {
try {
const doc = syncServer.getDoc<BoardDoc>(item.docId);
if (!doc) continue;
const task = doc.tasks[item.taskId];
if (!task?.clickup) continue;
const cu = doc.board.clickup;
if (!cu?.syncEnabled) continue;
// Get connection token
const space = doc.meta.spaceSlug;
const connDoc = syncServer.getDoc<ClickUpConnectionDoc>(clickupConnectionDocId(space));
if (!connDoc?.clickup?.accessToken) continue;
const client = new ClickUpClient(connDoc.clickup.accessToken);
// Compute current hash
const hash = await contentHash({
title: task.title,
description: task.description,
status: task.status,
priority: task.priority,
});
// Skip if hash matches last synced
if (hash === task.clickup.contentHash) {
const key = `${item.docId}:${item.taskId}`;
lastPushedHash.set(key, `${task.title}\x00${task.description}\x00${task.status}\x00${task.priority ?? ''}`);
continue;
}
// Push to ClickUp
const body = mapRTasksToClickUpBody(
{ title: task.title, description: task.description, status: task.status, priority: task.priority, labels: [...task.labels] },
cu.statusMap,
);
await client.updateTask(task.clickup.taskId, body);
// Update sync status
syncServer.changeDoc<BoardDoc>(item.docId, `Sync task ${item.taskId} to ClickUp`, (d) => {
const t = d.tasks[item.taskId];
if (!t?.clickup) return;
t.clickup.syncStatus = 'synced';
t.clickup.contentHash = hash;
t.clickup.lastSyncedAt = Date.now();
});
const key = `${item.docId}:${item.taskId}`;
lastPushedHash.set(key, `${task.title}\x00${task.description}\x00${task.status}\x00${task.priority ?? ''}`);
} catch (err) {
item.retries++;
if (item.retries < 3) {
pushQueue.push(item); // re-enqueue
} else {
// Mark as failed
try {
syncServer.changeDoc<BoardDoc>(item.docId, `Mark push failed ${item.taskId}`, (d) => {
const t = d.tasks[item.taskId];
if (t?.clickup) t.clickup.syncStatus = 'push-failed';
});
} catch {}
console.error(`[ClickUp] Push failed after 3 retries for task ${item.taskId}:`, err);
}
}
}
}
// ── Import a ClickUp list into an rTasks board ──
export async function importClickUpList(
syncServer: SyncServer,
space: string,
boardSlug: string,
listId: string,
accessToken: string,
opts: { enableSync?: boolean; createNew?: boolean } = {},
): Promise<{ boardId: string; taskCount: number }> {
const client = new ClickUpClient(accessToken);
// Fetch list details for status mapping
const listInfo = await client.getList(listId);
const listStatuses = listInfo.statuses || [];
const { statusMap, reverseStatusMap } = buildStatusMaps(listStatuses);
// Fetch all tasks (paginate)
let allTasks: any[] = [];
let page = 0;
while (true) {
const batch = await client.getTasks(listId, page);
allTasks = allTasks.concat(batch);
if (batch.length < 100) break;
page++;
}
const docId = boardDocId(space, boardSlug);
let doc = syncServer.getDoc<BoardDoc>(docId);
if (!doc || opts.createNew) {
// Create new board
const Automerge = await import('@automerge/automerge');
const now = Date.now();
doc = Automerge.change(Automerge.init<BoardDoc>(), 'import ClickUp list', (d: BoardDoc) => {
d.meta = { module: 'tasks', collection: 'boards', version: 1, spaceSlug: space, createdAt: now };
d.board = {
id: boardSlug,
name: listInfo.name || 'Imported Board',
slug: boardSlug,
description: `Imported from ClickUp list: ${listInfo.name || listId}`,
icon: null,
ownerDid: null,
statuses: ['TODO', 'IN_PROGRESS', 'REVIEW', 'DONE'],
labels: [],
createdAt: now,
updatedAt: now,
};
d.tasks = {};
// Set ClickUp board meta
if (opts.enableSync) {
d.board.clickup = {
listId,
listName: listInfo.name || '',
workspaceId: listInfo.folder?.space?.id || '',
syncEnabled: true,
statusMap,
reverseStatusMap,
};
}
});
syncServer.setDoc(docId, doc!);
} else if (opts.enableSync) {
// Add ClickUp meta to existing board
syncServer.changeDoc<BoardDoc>(docId, 'Enable ClickUp sync', (d) => {
d.board.clickup = {
listId,
listName: listInfo.name || '',
workspaceId: listInfo.folder?.space?.id || '',
syncEnabled: true,
statusMap,
reverseStatusMap,
};
});
}
// Import tasks
let taskCount = 0;
for (const cuTask of allTasks) {
const mapped = mapClickUpTaskToRTasks(cuTask, reverseStatusMap);
const taskId = crypto.randomUUID();
const hash = await contentHash({
title: mapped.title,
description: mapped.description,
status: mapped.status,
priority: mapped.priority,
});
syncServer.changeDoc<BoardDoc>(docId, `Import task ${cuTask.id}`, (d) => {
d.tasks[taskId] = createTaskItem(taskId, space, mapped.title, {
description: mapped.description,
status: mapped.status,
priority: mapped.priority,
labels: mapped.labels,
sortOrder: taskCount * 1000,
createdBy: 'clickup-import',
});
d.tasks[taskId].clickup = {
taskId: mapped.clickUpMeta.taskId,
listId: mapped.clickUpMeta.listId,
url: mapped.clickUpMeta.url,
lastSyncedAt: Date.now(),
syncStatus: 'synced',
contentHash: hash,
};
});
taskCount++;
}
console.log(`[ClickUp] Imported ${taskCount} tasks from list ${listId} into ${boardSlug}`);
return { boardId: boardSlug, taskCount };
}
// ── Export rTasks board → ClickUp list ──
export async function pushBoardToClickUp(
syncServer: SyncServer,
space: string,
boardSlug: string,
listId: string,
accessToken: string,
statusMap?: Record<string, string>,
): Promise<{ pushed: number }> {
const client = new ClickUpClient(accessToken);
const docId = boardDocId(space, boardSlug);
const doc = syncServer.getDoc<BoardDoc>(docId);
if (!doc) throw new Error('Board not found');
let pushed = 0;
for (const [taskId, task] of Object.entries(doc.tasks)) {
if (task.clickup?.taskId) continue; // already linked
const body = mapRTasksToClickUpBody(
{ title: task.title, description: task.description, status: task.status, priority: task.priority, labels: [...task.labels] },
statusMap,
);
const created = await client.createTask(listId, body);
const hash = await contentHash({
title: task.title,
description: task.description,
status: task.status,
priority: task.priority,
});
syncServer.changeDoc<BoardDoc>(docId, `Link task ${taskId} to ClickUp`, (d) => {
const t = d.tasks[taskId];
if (!t) return;
t.clickup = {
taskId: created.id,
listId,
url: created.url || `https://app.clickup.com/t/${created.id}`,
lastSyncedAt: Date.now(),
syncStatus: 'synced',
contentHash: hash,
};
});
pushed++;
}
console.log(`[ClickUp] Pushed ${pushed} tasks from ${boardSlug} to ClickUp list ${listId}`);
return { pushed };
}
// ── Inbound webhook handler ──
export async function handleClickUpWebhook(
syncServer: SyncServer,
space: string,
body: any,
signature: string | null,
): Promise<{ ok: boolean; message: string }> {
// Get connection for HMAC validation
const connDoc = syncServer.getDoc<ClickUpConnectionDoc>(clickupConnectionDocId(space));
if (!connDoc?.clickup) {
return { ok: false, message: 'No ClickUp connection for this space' };
}
// Validate HMAC-SHA256 signature
if (signature && connDoc.clickup.webhookSecret) {
const encoder = new TextEncoder();
const key = await crypto.subtle.importKey(
'raw',
encoder.encode(connDoc.clickup.webhookSecret),
{ name: 'HMAC', hash: 'SHA-256' },
false,
['sign'],
);
const sig = await crypto.subtle.sign('HMAC', key, encoder.encode(JSON.stringify(body)));
const expected = Array.from(new Uint8Array(sig)).map(b => b.toString(16).padStart(2, '0')).join('');
if (signature !== expected) {
return { ok: false, message: 'Invalid webhook signature' };
}
}
const event = body.event;
const taskId = body.task_id;
if (!taskId || !['taskUpdated', 'taskStatusUpdated', 'taskCreated'].includes(event)) {
return { ok: true, message: 'Event ignored' };
}
// Find which board doc has this ClickUp task
const boardDocIds = syncServer.getDocIds().filter(id => id.startsWith(`${space}:tasks:boards:`));
let targetDocId: string | null = null;
let targetTaskId: string | null = null;
for (const docId of boardDocIds) {
const doc = syncServer.getDoc<BoardDoc>(docId);
if (!doc?.board?.clickup?.syncEnabled) continue;
for (const [tid, task] of Object.entries(doc.tasks)) {
if (task.clickup?.taskId === taskId) {
targetDocId = docId;
targetTaskId = tid;
break;
}
}
if (targetDocId) break;
}
if (!targetDocId || !targetTaskId) {
// New task from ClickUp — only import if a board is watching this list
if (event === 'taskCreated' && body.list_id) {
for (const docId of boardDocIds) {
const doc = syncServer.getDoc<BoardDoc>(docId);
if (doc?.board?.clickup?.syncEnabled && doc.board.clickup.listId === body.list_id) {
// Fetch full task and import
const client = new ClickUpClient(connDoc.clickup.accessToken);
try {
const cuTask = await client.getTask(taskId);
const mapped = mapClickUpTaskToRTasks(cuTask, doc.board.clickup.reverseStatusMap);
const newTaskId = crypto.randomUUID();
const hash = await contentHash({
title: mapped.title,
description: mapped.description,
status: mapped.status,
priority: mapped.priority,
});
syncServer.changeDoc<BoardDoc>(docId, `Webhook: import new task ${taskId}`, (d) => {
d.tasks[newTaskId] = createTaskItem(newTaskId, space, mapped.title, {
description: mapped.description,
status: mapped.status,
priority: mapped.priority,
labels: mapped.labels,
sortOrder: Object.keys(d.tasks).length * 1000,
createdBy: 'clickup-webhook',
});
d.tasks[newTaskId].clickup = {
taskId: mapped.clickUpMeta.taskId,
listId: mapped.clickUpMeta.listId,
url: mapped.clickUpMeta.url,
lastSyncedAt: Date.now(),
syncStatus: 'synced',
contentHash: hash,
};
});
return { ok: true, message: `Imported new task ${taskId}` };
} catch (err) {
console.error(`[ClickUp] Webhook import failed for ${taskId}:`, err);
return { ok: false, message: 'Failed to import task' };
}
}
}
}
return { ok: true, message: 'Task not tracked' };
}
// Fetch latest task from ClickUp
const client = new ClickUpClient(connDoc.clickup.accessToken);
try {
const doc = syncServer.getDoc<BoardDoc>(targetDocId)!;
const cuTask = await client.getTask(taskId);
const mapped = mapClickUpTaskToRTasks(cuTask, doc.board.clickup?.reverseStatusMap);
const remoteHash = await contentHash({
title: mapped.title,
description: mapped.description,
status: mapped.status,
priority: mapped.priority,
});
const localTask = doc.tasks[targetTaskId];
if (!localTask) return { ok: true, message: 'Task not found locally' };
// Skip if remote hasn't changed
if (localTask.clickup?.remoteHash === remoteHash) {
return { ok: true, message: 'No changes detected' };
}
// Check for conflicts
const localHash = await contentHash({
title: localTask.title,
description: localTask.description,
status: localTask.status,
priority: localTask.priority,
});
const localChanged = localTask.clickup?.contentHash !== localHash;
const remoteChanged = localTask.clickup?.remoteHash !== remoteHash;
if (localChanged && remoteChanged) {
// Both sides changed — detect field-level conflicts
// rTasks wins on conflicts, apply remote-only changes
const baseFields = {
title: localTask.title,
description: localTask.description,
status: localTask.status,
priority: localTask.priority,
};
const { remoteOnly, conflicts } = detectConflicts(
{ title: localTask.title, description: localTask.description, status: localTask.status, priority: localTask.priority },
{ title: mapped.title, description: mapped.description, status: mapped.status, priority: mapped.priority },
baseFields,
);
syncServer.changeDoc<BoardDoc>(targetDocId, `Webhook: merge task ${taskId}`, (d) => {
const t = d.tasks[targetTaskId!];
if (!t) return;
// Apply remote-only field changes
for (const field of remoteOnly) {
if (field === 'title') t.title = mapped.title;
if (field === 'description') t.description = mapped.description;
if (field === 'status') t.status = mapped.status;
if (field === 'priority') t.priority = mapped.priority;
}
if (t.clickup) {
t.clickup.remoteHash = remoteHash;
t.clickup.lastSyncedAt = Date.now();
t.clickup.syncStatus = conflicts.length > 0 ? 'conflict' : 'synced';
}
t.updatedAt = Date.now();
});
} else {
// Only remote changed — apply all remote fields
syncServer.changeDoc<BoardDoc>(targetDocId, `Webhook: update task ${taskId}`, (d) => {
const t = d.tasks[targetTaskId!];
if (!t) return;
t.title = mapped.title;
t.description = mapped.description;
t.status = mapped.status;
t.priority = mapped.priority;
if (mapped.labels.length > 0) t.labels = mapped.labels;
if (t.clickup) {
t.clickup.remoteHash = remoteHash;
t.clickup.contentHash = remoteHash;
t.clickup.lastSyncedAt = Date.now();
t.clickup.syncStatus = 'synced';
}
t.updatedAt = Date.now();
});
}
return { ok: true, message: `Updated task ${taskId}` };
} catch (err) {
console.error(`[ClickUp] Webhook update failed for ${taskId}:`, err);
return { ok: false, message: 'Failed to update task' };
}
}