From 125964dbae9ac62f258584c41f4864416fee612a Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Mon, 23 Feb 2026 01:05:14 +0000 Subject: [PATCH] Fix 4 migration adapters, add 7 new ones, add safety features Notes/work/cal/vote adapters had wrong table names and missing fields. Now match actual PG schemas. Added books, cart, providers, files, trips, inbox, splat adapters. Engine gains idempotency, dry-run, disk backup, per-row error recovery, and progress logging. Co-Authored-By: Claude Opus 4.6 --- .../local-first/migration/pg-to-automerge.ts | 1426 +++++++++++++++++ 1 file changed, 1426 insertions(+) create mode 100644 server/local-first/migration/pg-to-automerge.ts diff --git a/server/local-first/migration/pg-to-automerge.ts b/server/local-first/migration/pg-to-automerge.ts new file mode 100644 index 0000000..c20f814 --- /dev/null +++ b/server/local-first/migration/pg-to-automerge.ts @@ -0,0 +1,1426 @@ +/** + * PG → Automerge Migration Tool + * + * One-time migration utility that reads data from PostgreSQL module schemas + * and converts them into Automerge documents. Run per-module during the + * local-first transition. + * + * Usage: + * import { migrateModule, notesMigration } from './migration/pg-to-automerge'; + * await migrateModule(notesMigration, pool, 'demo', syncServer); + * + * // Dry-run (no docs created): + * await migrateModule(notesMigration, pool, 'demo', syncServer, { dryRun: true }); + * + * // With disk backup: + * await migrateModule(notesMigration, pool, 'demo', syncServer, { backupDir: '/tmp/migration-backup' }); + * + * Each module migration: + * 1. Reads all rows from the PG schema + * 2. Creates Automerge documents per the granularity rules + * 3. Checks idempotency (skips already-migrated docs) + * 4. Registers them with the SyncServer so they're available to clients + * 5. Optionally writes the Automerge binaries to disk as backup + */ + +import * as Automerge from '@automerge/automerge'; +import { mkdirSync, writeFileSync } from 'node:fs'; +import type { SyncServer } from '../sync-server'; + +// ============================================================================ +// TYPES +// ============================================================================ + +export interface MigrationOptions { + dryRun?: boolean; + backupDir?: string; +} + +export interface MigrationResult { + module: string; + docsCreated: number; + docsSkipped: number; + rowsMigrated: number; + errors: string[]; + durationMs: number; + dryRun: boolean; +} + +/** Unified document metadata — present on all migrated docs. */ +export interface DocMeta { + module: string; + collection: string; + version: number; + spaceSlug: string; + createdAt: number; +} + +/** + * Module-specific migration adapter. Each module implements one of these + * to define how its PG data maps to Automerge documents. + */ +export interface ModuleMigration { + module: string; + collection: string; + /** + * Query PG and return rows grouped by target document. + * Returns: Map + * An empty-string suffix produces a doc without a trailing segment. + */ + extract(pool: any, space: string): Promise>; +} + +// ============================================================================ +// MIGRATION ENGINE +// ============================================================================ + +function buildMeta(migration: ModuleMigration, space: string): DocMeta { + return { + module: migration.module, + collection: migration.collection, + version: 1, + spaceSlug: space, + createdAt: Date.now(), + }; +} + +/** + * Run a module migration. + */ +export async function migrateModule( + migration: ModuleMigration, + pool: any, + space: string, + syncServer?: SyncServer, + options: MigrationOptions = {} +): Promise { + const start = Date.now(); + const { dryRun = false, backupDir } = options; + const result: MigrationResult = { + module: migration.module, + docsCreated: 0, + docsSkipped: 0, + rowsMigrated: 0, + errors: [], + durationMs: 0, + dryRun, + }; + + if (backupDir && !dryRun) { + mkdirSync(backupDir, { recursive: true }); + } + + try { + console.log( + `[Migration] Starting ${migration.module} for space "${space}"` + + (dryRun ? ' (DRY RUN)' : '') + + '...' + ); + + const dataMap = await migration.extract(pool, space); + const total = dataMap.size; + let n = 0; + + for (const [suffix, initialState] of dataMap) { + n++; + try { + const docId = suffix + ? `${space}:${migration.module}:${migration.collection}:${suffix}` + : `${space}:${migration.module}:${migration.collection}`; + + // Idempotency: skip if doc already exists + if (syncServer?.getDoc(docId)) { + console.log( + `[Migration] ${migration.module}: ${n}/${total} — skipped "${docId}" (already exists)` + ); + result.docsSkipped++; + continue; + } + + if (dryRun) { + const rowCount = countItems(initialState); + result.rowsMigrated += rowCount; + result.docsCreated++; + console.log( + `[Migration] ${migration.module}: ${n}/${total} — would create "${docId}" (${rowCount} items)` + ); + continue; + } + + let doc = Automerge.init(); + doc = Automerge.change(doc, `Migrate from PG: ${docId}`, (d: any) => { + Object.assign(d, initialState); + }); + + // Register with sync server + if (syncServer) { + syncServer.setDoc(docId, doc); + } + + // Disk backup + if (backupDir) { + const binary = Automerge.save(doc); + const safeName = docId.replace(/[:/]/g, '_'); + writeFileSync(`${backupDir}/${safeName}.automerge`, binary); + } + + result.docsCreated++; + const rowCount = countItems(initialState); + result.rowsMigrated += rowCount; + + console.log( + `[Migration] ${migration.module}: ${n}/${total} — created "${docId}" (${rowCount} items)` + ); + } catch (e) { + const msg = `Failed to create doc for suffix "${suffix}": ${e}`; + result.errors.push(msg); + console.error(`[Migration] ${msg}`); + } + } + } catch (e) { + const msg = `Migration failed for ${migration.module}: ${e}`; + result.errors.push(msg); + console.error(`[Migration] ${msg}`); + } + + result.durationMs = Date.now() - start; + console.log( + `[Migration] ${migration.module}: ${result.docsCreated} docs created, ` + + `${result.docsSkipped} skipped, ` + + `${result.rowsMigrated} rows, ${result.errors.length} errors, ` + + `${result.durationMs}ms` + + (dryRun ? ' (DRY RUN)' : '') + ); + + return result; +} + +/** + * Run multiple module migrations. + */ +export async function migrateAll( + migrations: ModuleMigration[], + pool: any, + space: string, + syncServer?: SyncServer, + options: MigrationOptions = {} +): Promise { + const results: MigrationResult[] = []; + for (const migration of migrations) { + const result = await migrateModule(migration, pool, space, syncServer, options); + results.push(result); + } + return results; +} + +// ============================================================================ +// HELPER: safe row access (per-row error recovery) +// ============================================================================ + +function safeTimestamp(val: any): number { + if (!val) return 0; + return new Date(val).getTime(); +} + +function safePoint(val: any): { x: number; y: number } | null { + if (!val) return null; + // PG POINT comes as "(x,y)" string or {x, y} object + if (typeof val === 'string') { + const m = val.match(/\(([^,]+),([^)]+)\)/); + if (m) return { x: parseFloat(m[1]), y: parseFloat(m[2]) }; + } + if (typeof val === 'object' && 'x' in val) { + return { x: Number(val.x), y: Number(val.y) }; + } + return null; +} + +// ============================================================================ +// NOTES MIGRATION +// ============================================================================ + +/** + * Notes: rnotes.notebooks + rnotes.notes + rnotes.note_tags/tags + * Granularity: 1 doc per notebook ({space}:notes:notebooks:{notebookId}) + */ +export const notesMigration: ModuleMigration = { + module: 'notes', + collection: 'notebooks', + async extract(pool, space) { + const result = new Map(); + + try { + // Get all notebooks (no space_slug — notebooks are global, we migrate all) + const { rows: notebooks } = await pool.query( + `SELECT * FROM rnotes.notebooks ORDER BY created_at` + ); + + for (const nb of notebooks) { + try { + // Get notes for this notebook with tags + const { rows: notes } = await pool.query( + `SELECT n.*, + COALESCE(array_agg(t.name) FILTER (WHERE t.name IS NOT NULL), '{}') AS tags + FROM rnotes.notes n + LEFT JOIN rnotes.note_tags nt ON nt.note_id = n.id + LEFT JOIN rnotes.tags t ON t.id = nt.tag_id + WHERE n.notebook_id = $1 + GROUP BY n.id + ORDER BY n.sort_order, n.created_at`, + [nb.id] + ); + + const items: Record = {}; + for (const row of notes) { + try { + items[row.id] = { + id: row.id, + notebookId: row.notebook_id, + authorId: row.author_id || null, + title: row.title || '', + content: row.content || '', + contentPlain: row.content_plain || '', + type: row.type || 'NOTE', + url: row.url || null, + language: row.language || null, + fileUrl: row.file_url || null, + mimeType: row.mime_type || null, + fileSize: row.file_size || null, + duration: row.duration || null, + isPinned: !!row.is_pinned, + sortOrder: row.sort_order ?? 0, + tags: row.tags || [], + createdAt: safeTimestamp(row.created_at), + updatedAt: safeTimestamp(row.updated_at), + }; + } catch (e) { + console.error(`[Migration] notes: error on note ${row.id}: ${e}`); + } + } + + result.set(nb.id, { + meta: buildMeta(notesMigration, space), + notebook: { + id: nb.id, + title: nb.title || '', + slug: nb.slug || '', + description: nb.description || '', + coverColor: nb.cover_color || null, + isPublic: !!nb.is_public, + createdAt: safeTimestamp(nb.created_at), + updatedAt: safeTimestamp(nb.updated_at), + }, + items, + }); + } catch (e) { + console.error(`[Migration] notes: error on notebook ${nb.id}: ${e}`); + } + } + } catch (e) { + console.warn(`[Migration] notes: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// WORK MIGRATION +// ============================================================================ + +/** + * Work: rwork.spaces + rwork.tasks + * Granularity: 1 doc per space/board ({space}:work:boards:{spaceId}) + */ +export const workMigration: ModuleMigration = { + module: 'work', + collection: 'boards', + async extract(pool, space) { + const result = new Map(); + + try { + const { rows: spaces } = await pool.query( + `SELECT * FROM rwork.spaces WHERE slug = $1`, + [space] + ); + + for (const ws of spaces) { + try { + const { rows: tasks } = await pool.query( + `SELECT * FROM rwork.tasks WHERE space_id = $1 ORDER BY sort_order, created_at`, + [ws.id] + ); + + const taskMap: Record = {}; + for (const task of tasks) { + try { + taskMap[task.id] = { + id: task.id, + spaceId: task.space_id, + title: task.title || '', + description: task.description || '', + status: task.status || 'todo', + priority: task.priority || null, + labels: task.labels || [], + assigneeId: task.assignee_id || null, + createdBy: task.created_by || null, + sortOrder: task.sort_order ?? 0, + createdAt: safeTimestamp(task.created_at), + updatedAt: safeTimestamp(task.updated_at), + }; + } catch (e) { + console.error(`[Migration] work: error on task ${task.id}: ${e}`); + } + } + + result.set(ws.id, { + meta: buildMeta(workMigration, space), + board: { + id: ws.id, + name: ws.name || '', + slug: ws.slug || '', + description: ws.description || '', + icon: ws.icon || null, + ownerDid: ws.owner_did || null, + statuses: ws.statuses || ['todo', 'in-progress', 'done'], + labels: ws.labels || [], + createdAt: safeTimestamp(ws.created_at), + updatedAt: safeTimestamp(ws.updated_at), + }, + tasks: taskMap, + }); + } catch (e) { + console.error(`[Migration] work: error on space ${ws.id}: ${e}`); + } + } + } catch (e) { + console.warn(`[Migration] work: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// CALENDAR MIGRATION +// ============================================================================ + +/** + * Cal: rcal.events + rcal.calendar_sources + rcal.locations + * Granularity: 1 doc per space ({space}:cal:events) + */ +export const calMigration: ModuleMigration = { + module: 'cal', + collection: 'events', + async extract(pool, space) { + const result = new Map(); + + try { + // Events have no space_slug — linked via source_id to calendar_sources. + // We migrate all events and include source/location data inline. + const { rows } = await pool.query( + `SELECT e.*, + cs.name AS source_name, cs.source_type, cs.color AS source_color, + cs.url AS source_url, cs.is_active AS source_active, + l.name AS loc_name, l.granularity AS loc_granularity, + l.lat AS loc_lat, l.lng AS loc_lng + FROM rcal.events e + LEFT JOIN rcal.calendar_sources cs ON cs.id = e.source_id + LEFT JOIN rcal.locations l ON l.id = e.location_id + ORDER BY e.start_time` + ); + + if (rows.length === 0) return result; + + // Also get calendar sources as their own list + const { rows: sources } = await pool.query( + `SELECT * FROM rcal.calendar_sources ORDER BY created_at` + ); + const sourcesMap: Record = {}; + for (const s of sources) { + sourcesMap[s.id] = { + id: s.id, + name: s.name || '', + sourceType: s.source_type || '', + url: s.url || null, + color: s.color || null, + isActive: s.is_active ?? true, + isVisible: s.is_visible ?? true, + syncIntervalMinutes: s.sync_interval_minutes || null, + lastSyncedAt: safeTimestamp(s.last_synced_at), + ownerId: s.owner_id || null, + createdAt: safeTimestamp(s.created_at), + }; + } + + const events: Record = {}; + for (const row of rows) { + try { + events[row.id] = { + id: row.id, + title: row.title || '', + description: row.description || '', + startTime: safeTimestamp(row.start_time), + endTime: safeTimestamp(row.end_time), + allDay: !!row.all_day, + timezone: row.timezone || null, + rrule: row.rrule || null, + status: row.status || null, + visibility: row.visibility || null, + sourceId: row.source_id || null, + sourceName: row.source_name || null, + sourceType: row.source_type || null, + sourceColor: row.source_color || null, + locationId: row.location_id || null, + locationName: row.location_name || row.loc_name || null, + coordinates: safePoint(row.coordinates), + locationGranularity: row.location_granularity ?? row.loc_granularity ?? null, + locationLat: row.loc_lat ?? null, + locationLng: row.loc_lng ?? null, + isVirtual: !!row.is_virtual, + virtualUrl: row.virtual_url || null, + virtualPlatform: row.virtual_platform || null, + rToolSource: row.r_tool_source || null, + rToolEntityId: row.r_tool_entity_id || null, + attendees: row.attendees || [], + attendeeCount: row.attendee_count ?? 0, + metadata: row.metadata || null, + createdAt: safeTimestamp(row.created_at), + updatedAt: safeTimestamp(row.updated_at), + }; + } catch (e) { + console.error(`[Migration] cal: error on event ${row.id}: ${e}`); + } + } + + result.set('', { + meta: buildMeta(calMigration, space), + sources: sourcesMap, + events, + }); + } catch (e) { + console.warn(`[Migration] cal: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// VOTE MIGRATION +// ============================================================================ + +/** + * Vote: rvote.proposals + rvote.votes + rvote.final_votes + rvote.spaces + * Granularity: 1 doc per proposal ({space}:vote:proposals:{proposalId}) + */ +export const voteMigration: ModuleMigration = { + module: 'vote', + collection: 'proposals', + async extract(pool, space) { + const result = new Map(); + + try { + // Get space config + let spaceConfig: any = null; + try { + const { rows } = await pool.query( + `SELECT * FROM rvote.spaces WHERE slug = $1`, + [space] + ); + if (rows.length > 0) { + const s = rows[0]; + spaceConfig = { + slug: s.slug, + name: s.name || '', + description: s.description || '', + ownerDid: s.owner_did || '', + visibility: s.visibility || 'public', + promotionThreshold: s.promotion_threshold ?? null, + votingPeriodDays: s.voting_period_days ?? null, + creditsPerDay: s.credits_per_day ?? null, + maxCredits: s.max_credits ?? null, + startingCredits: s.starting_credits ?? null, + createdAt: safeTimestamp(s.created_at), + updatedAt: safeTimestamp(s.updated_at), + }; + } + } catch (_) { + // spaces table may not have data + } + + const { rows: proposals } = await pool.query( + `SELECT * FROM rvote.proposals WHERE space_slug = $1 ORDER BY created_at`, + [space] + ); + + for (const prop of proposals) { + try { + // Conviction votes (quadratic) + const { rows: votes } = await pool.query( + `SELECT * FROM rvote.votes WHERE proposal_id = $1`, + [prop.id] + ); + + const voteMap: Record = {}; + for (const v of votes) { + try { + voteMap[v.id] = { + id: v.id, + userId: v.user_id, + proposalId: v.proposal_id, + weight: v.weight, + creditCost: v.credit_cost, + createdAt: safeTimestamp(v.created_at), + decaysAt: safeTimestamp(v.decays_at), + }; + } catch (e) { + console.error(`[Migration] vote: error on vote ${v.id}: ${e}`); + } + } + + // Final votes (YES/NO/ABSTAIN) + const { rows: finalVotes } = await pool.query( + `SELECT * FROM rvote.final_votes WHERE proposal_id = $1`, + [prop.id] + ); + + const finalVoteMap: Record = {}; + for (const fv of finalVotes) { + try { + finalVoteMap[fv.id] = { + id: fv.id, + userId: fv.user_id, + proposalId: fv.proposal_id, + vote: fv.vote, + createdAt: safeTimestamp(fv.created_at), + }; + } catch (e) { + console.error(`[Migration] vote: error on final_vote ${fv.id}: ${e}`); + } + } + + result.set(prop.id, { + meta: buildMeta(voteMigration, space), + spaceConfig, + proposal: { + id: prop.id, + spaceSlug: prop.space_slug, + authorId: prop.author_id, + title: prop.title || '', + description: prop.description || '', + status: prop.status || 'RANKING', + score: prop.score ?? 0, + votingEndsAt: safeTimestamp(prop.voting_ends_at), + finalYes: prop.final_yes ?? 0, + finalNo: prop.final_no ?? 0, + finalAbstain: prop.final_abstain ?? 0, + createdAt: safeTimestamp(prop.created_at), + updatedAt: safeTimestamp(prop.updated_at), + }, + votes: voteMap, + finalVotes: finalVoteMap, + }); + } catch (e) { + console.error(`[Migration] vote: error on proposal ${prop.id}: ${e}`); + } + } + } catch (e) { + console.warn(`[Migration] vote: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// BOOKS MIGRATION +// ============================================================================ + +/** + * Books: rbooks.books (global — no space_slug) + * Granularity: 1 doc per space ({space}:books:catalog) + * All books in one catalog doc; PDF paths as string refs + */ +export const booksMigration: ModuleMigration = { + module: 'books', + collection: 'catalog', + async extract(pool, space) { + const result = new Map(); + + try { + const { rows } = await pool.query( + `SELECT * FROM rbooks.books ORDER BY created_at` + ); + + if (rows.length === 0) return result; + + const items: Record = {}; + for (const row of rows) { + try { + items[row.id] = { + id: row.id, + slug: row.slug || '', + title: row.title || '', + author: row.author || '', + description: row.description || '', + pdfPath: row.pdf_path || '', + pdfSizeBytes: row.pdf_size_bytes ? Number(row.pdf_size_bytes) : 0, + pageCount: row.page_count ?? 0, + tags: row.tags || [], + license: row.license || null, + coverColor: row.cover_color || null, + contributorId: row.contributor_id || null, + contributorName: row.contributor_name || null, + status: row.status || 'active', + featured: !!row.featured, + viewCount: row.view_count ?? 0, + downloadCount: row.download_count ?? 0, + createdAt: safeTimestamp(row.created_at), + updatedAt: safeTimestamp(row.updated_at), + }; + } catch (e) { + console.error(`[Migration] books: error on book ${row.id}: ${e}`); + } + } + + result.set('', { + meta: buildMeta(booksMigration, space), + items, + }); + } catch (e) { + console.warn(`[Migration] books: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// CART MIGRATION +// ============================================================================ + +/** + * Cart: rcart.catalog_entries + rcart.orders + * Granularity: 1 doc for catalog ({space}:cart:catalog), + * 1 doc per order ({space}:cart:orders:{orderId}) + */ +export const cartMigration: ModuleMigration = { + module: 'cart', + collection: 'catalog', + async extract(pool, space) { + const result = new Map(); + + try { + // Catalog entries (global) + const { rows: entries } = await pool.query( + `SELECT * FROM rcart.catalog_entries ORDER BY created_at` + ); + + const catalogItems: Record = {}; + for (const row of entries) { + try { + catalogItems[row.id] = { + id: row.id, + artifactId: row.artifact_id, + artifact: row.artifact || {}, + title: row.title || '', + productType: row.product_type || null, + requiredCapabilities: row.required_capabilities || [], + substrates: row.substrates || [], + creatorId: row.creator_id || null, + sourceSpace: row.source_space || null, + tags: row.tags || [], + status: row.status || 'active', + createdAt: safeTimestamp(row.created_at), + updatedAt: safeTimestamp(row.updated_at), + }; + } catch (e) { + console.error(`[Migration] cart: error on catalog entry ${row.id}: ${e}`); + } + } + + result.set('', { + meta: { + ...buildMeta(cartMigration, space), + collection: 'catalog', + }, + items: catalogItems, + }); + + // Orders — 1 doc per order + const { rows: orders } = await pool.query( + `SELECT * FROM rcart.orders ORDER BY created_at` + ); + + for (const row of orders) { + try { + const orderDoc = { + meta: { + ...buildMeta(cartMigration, space), + collection: 'orders', + }, + order: { + id: row.id, + catalogEntryId: row.catalog_entry_id, + artifactId: row.artifact_id, + buyerId: row.buyer_id || null, + buyerLocation: row.buyer_location || null, + buyerContact: row.buyer_contact || null, + providerId: row.provider_id || null, + providerName: row.provider_name || null, + providerDistanceKm: row.provider_distance_km ?? null, + quantity: row.quantity, + productionCost: row.production_cost ? Number(row.production_cost) : null, + creatorPayout: row.creator_payout ? Number(row.creator_payout) : null, + communityPayout: row.community_payout ? Number(row.community_payout) : null, + totalPrice: row.total_price ? Number(row.total_price) : null, + currency: row.currency || 'USD', + status: row.status || 'pending', + paymentMethod: row.payment_method || null, + paymentTx: row.payment_tx || null, + paymentNetwork: row.payment_network || null, + createdAt: safeTimestamp(row.created_at), + paidAt: safeTimestamp(row.paid_at), + acceptedAt: safeTimestamp(row.accepted_at), + completedAt: safeTimestamp(row.completed_at), + updatedAt: safeTimestamp(row.updated_at), + }, + }; + + // Orders use a different docId pattern: {space}:cart:orders:{orderId} + // We return them with a key that the engine will use as suffix. + // But since our collection is 'catalog', we need a custom key. + // Prefix with 'orders:' so the engine builds the right docId. + result.set(`orders:${row.id}`, orderDoc); + } catch (e) { + console.error(`[Migration] cart: error on order ${row.id}: ${e}`); + } + } + } catch (e) { + console.warn(`[Migration] cart: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// PROVIDERS MIGRATION +// ============================================================================ + +/** + * Providers: providers.providers (global, not space-scoped) + * Granularity: 1 shared doc (global:providers:registry) + */ +export const providersMigration: ModuleMigration = { + module: 'providers', + collection: 'registry', + async extract(pool, _space) { + const result = new Map(); + + try { + const { rows } = await pool.query( + `SELECT * FROM providers.providers ORDER BY member_since, created_at` + ); + + if (rows.length === 0) return result; + + const items: Record = {}; + for (const row of rows) { + try { + items[row.id] = { + id: row.id, + name: row.name || '', + description: row.description || '', + lat: row.lat, + lng: row.lng, + address: row.address || null, + city: row.city || null, + region: row.region || null, + country: row.country ? row.country.trim() : null, + serviceRadiusKm: row.service_radius_km ?? null, + offersShipping: !!row.offers_shipping, + capabilities: row.capabilities || [], + substrates: row.substrates || [], + standardDays: row.standard_days ?? null, + rushDays: row.rush_days ?? null, + rushSurchargePct: row.rush_surcharge_pct ?? null, + pricing: row.pricing || {}, + communities: row.communities || [], + contactEmail: row.contact_email || null, + contactPhone: row.contact_phone || null, + contactWebsite: row.contact_website || null, + wallet: row.wallet || null, + jobsCompleted: row.jobs_completed ?? 0, + avgRating: row.avg_rating ?? null, + memberSince: safeTimestamp(row.member_since), + active: row.active ?? true, + createdAt: safeTimestamp(row.created_at), + updatedAt: safeTimestamp(row.updated_at), + }; + } catch (e) { + console.error(`[Migration] providers: error on provider ${row.id}: ${e}`); + } + } + + // Use 'global' as the space for providers (overrides the space parameter) + result.set('', { + meta: { + module: 'providers', + collection: 'registry', + version: 1, + spaceSlug: 'global', + createdAt: Date.now(), + }, + items, + }); + } catch (e) { + console.warn(`[Migration] providers: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// FILES MIGRATION +// ============================================================================ + +/** + * Files: rfiles.media_files + rfiles.memory_cards + * Granularity: 1 doc per shared_space ({space}:files:cards) + * Skips: access_logs (ephemeral), public_shares (ephemeral tokens) + */ +export const filesMigration: ModuleMigration = { + module: 'files', + collection: 'cards', + async extract(pool, space) { + const result = new Map(); + + try { + // Get all unique shared_spaces + const { rows: spacesResult } = await pool.query( + `SELECT DISTINCT shared_space FROM rfiles.media_files WHERE shared_space IS NOT NULL + UNION + SELECT DISTINCT shared_space FROM rfiles.memory_cards` + ); + + for (const { shared_space: sharedSpace } of spacesResult) { + try { + // Media files for this shared space + const { rows: files } = await pool.query( + `SELECT * FROM rfiles.media_files WHERE shared_space = $1 ORDER BY created_at`, + [sharedSpace] + ); + + const fileMap: Record = {}; + for (const row of files) { + try { + fileMap[row.id] = { + id: row.id, + originalFilename: row.original_filename || '', + title: row.title || null, + description: row.description || '', + mimeType: row.mime_type || null, + fileSize: row.file_size ? Number(row.file_size) : 0, + fileHash: row.file_hash || null, + storagePath: row.storage_path || '', + tags: row.tags || [], + isProcessed: !!row.is_processed, + processingError: row.processing_error || null, + uploadedBy: row.uploaded_by || null, + sharedSpace: row.shared_space || null, + createdAt: safeTimestamp(row.created_at), + updatedAt: safeTimestamp(row.updated_at), + }; + } catch (e) { + console.error(`[Migration] files: error on file ${row.id}: ${e}`); + } + } + + // Memory cards for this shared space + const { rows: cards } = await pool.query( + `SELECT * FROM rfiles.memory_cards WHERE shared_space = $1 ORDER BY position, created_at`, + [sharedSpace] + ); + + const cardMap: Record = {}; + for (const row of cards) { + try { + cardMap[row.id] = { + id: row.id, + sharedSpace: row.shared_space || '', + title: row.title || '', + body: row.body || '', + cardType: row.card_type || null, + tags: row.tags || [], + position: row.position ?? 0, + createdBy: row.created_by || null, + createdAt: safeTimestamp(row.created_at), + updatedAt: safeTimestamp(row.updated_at), + }; + } catch (e) { + console.error(`[Migration] files: error on card ${row.id}: ${e}`); + } + } + + // Use shared_space as suffix (not space param) + result.set(sharedSpace, { + meta: { + ...buildMeta(filesMigration, space), + sharedSpace, + }, + files: fileMap, + memoryCards: cardMap, + }); + } catch (e) { + console.error(`[Migration] files: error on shared_space ${sharedSpace}: ${e}`); + } + } + } catch (e) { + console.warn(`[Migration] files: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// TRIPS MIGRATION +// ============================================================================ + +/** + * Trips: rtrips.trips + destinations + itinerary_items + bookings + expenses + packing_items + * Granularity: 1 doc per trip ({space}:trips:trips:{tripId}) + */ +export const tripsMigration: ModuleMigration = { + module: 'trips', + collection: 'trips', + async extract(pool, _space) { + const result = new Map(); + + try { + const { rows: trips } = await pool.query( + `SELECT * FROM rtrips.trips ORDER BY created_at` + ); + + for (const trip of trips) { + try { + // Destinations + const { rows: dests } = await pool.query( + `SELECT * FROM rtrips.destinations WHERE trip_id = $1 ORDER BY sort_order, arrival_date`, + [trip.id] + ); + const destinations: Record = {}; + for (const d of dests) { + destinations[d.id] = { + id: d.id, + tripId: d.trip_id, + name: d.name || '', + country: d.country || null, + lat: d.lat ?? null, + lng: d.lng ?? null, + arrivalDate: d.arrival_date?.toISOString?.() ?? d.arrival_date ?? null, + departureDate: d.departure_date?.toISOString?.() ?? d.departure_date ?? null, + notes: d.notes || '', + sortOrder: d.sort_order ?? 0, + createdAt: safeTimestamp(d.created_at), + }; + } + + // Itinerary items + const { rows: items } = await pool.query( + `SELECT * FROM rtrips.itinerary_items WHERE trip_id = $1 ORDER BY date, sort_order`, + [trip.id] + ); + const itinerary: Record = {}; + for (const it of items) { + itinerary[it.id] = { + id: it.id, + tripId: it.trip_id, + destinationId: it.destination_id || null, + title: it.title || '', + category: it.category || null, + date: it.date?.toISOString?.() ?? it.date ?? null, + startTime: it.start_time || null, + endTime: it.end_time || null, + notes: it.notes || '', + sortOrder: it.sort_order ?? 0, + createdAt: safeTimestamp(it.created_at), + }; + } + + // Bookings + const { rows: bkgs } = await pool.query( + `SELECT * FROM rtrips.bookings WHERE trip_id = $1 ORDER BY start_date`, + [trip.id] + ); + const bookings: Record = {}; + for (const b of bkgs) { + bookings[b.id] = { + id: b.id, + tripId: b.trip_id, + type: b.type || null, + provider: b.provider || null, + confirmationNumber: b.confirmation_number || null, + cost: b.cost ? Number(b.cost) : null, + currency: b.currency || null, + startDate: b.start_date?.toISOString?.() ?? b.start_date ?? null, + endDate: b.end_date?.toISOString?.() ?? b.end_date ?? null, + status: b.status || null, + notes: b.notes || '', + createdAt: safeTimestamp(b.created_at), + }; + } + + // Expenses + const { rows: exps } = await pool.query( + `SELECT * FROM rtrips.expenses WHERE trip_id = $1 ORDER BY date`, + [trip.id] + ); + const expenses: Record = {}; + for (const ex of exps) { + expenses[ex.id] = { + id: ex.id, + tripId: ex.trip_id, + paidBy: ex.paid_by || null, + description: ex.description || '', + amount: ex.amount ? Number(ex.amount) : 0, + currency: ex.currency || null, + category: ex.category || null, + date: ex.date?.toISOString?.() ?? ex.date ?? null, + splitType: ex.split_type || null, + createdAt: safeTimestamp(ex.created_at), + }; + } + + // Packing items + const { rows: packing } = await pool.query( + `SELECT * FROM rtrips.packing_items WHERE trip_id = $1 ORDER BY sort_order`, + [trip.id] + ); + const packingItems: Record = {}; + for (const p of packing) { + packingItems[p.id] = { + id: p.id, + tripId: p.trip_id, + addedBy: p.added_by || null, + name: p.name || '', + category: p.category || null, + packed: !!p.packed, + quantity: p.quantity ?? 1, + sortOrder: p.sort_order ?? 0, + createdAt: safeTimestamp(p.created_at), + }; + } + + result.set(trip.id, { + meta: buildMeta(tripsMigration, _space), + trip: { + id: trip.id, + title: trip.title || '', + slug: trip.slug || '', + description: trip.description || '', + startDate: trip.start_date?.toISOString?.() ?? trip.start_date ?? null, + endDate: trip.end_date?.toISOString?.() ?? trip.end_date ?? null, + budgetTotal: trip.budget_total ? Number(trip.budget_total) : null, + budgetCurrency: trip.budget_currency || null, + status: trip.status || 'planning', + createdBy: trip.created_by || null, + createdAt: safeTimestamp(trip.created_at), + updatedAt: safeTimestamp(trip.updated_at), + }, + destinations, + itinerary, + bookings, + expenses, + packingItems, + }); + } catch (e) { + console.error(`[Migration] trips: error on trip ${trip.id}: ${e}`); + } + } + } catch (e) { + console.warn(`[Migration] trips: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// INBOX MIGRATION +// ============================================================================ + +/** + * Inbox: rinbox.mailboxes + threads + comments + approvals + approval_signatures + * Granularity: 1 doc per mailbox ({space}:inbox:mailboxes:{mailboxId}) + * Skips: sync_state (IMAP tracking, server-side only) + */ +export const inboxMigration: ModuleMigration = { + module: 'inbox', + collection: 'mailboxes', + async extract(pool, _space) { + const result = new Map(); + + try { + const { rows: mailboxes } = await pool.query( + `SELECT * FROM rinbox.mailboxes ORDER BY created_at` + ); + + for (const mb of mailboxes) { + try { + // Threads + const { rows: threads } = await pool.query( + `SELECT * FROM rinbox.threads WHERE mailbox_id = $1 ORDER BY received_at, created_at`, + [mb.id] + ); + + const threadMap: Record = {}; + for (const t of threads) { + try { + // Comments for this thread + const { rows: comments } = await pool.query( + `SELECT * FROM rinbox.comments WHERE thread_id = $1 ORDER BY created_at`, + [t.id] + ); + const commentList = comments.map((c: any) => ({ + id: c.id, + threadId: c.thread_id, + authorId: c.author_id, + body: c.body || '', + mentions: c.mentions || [], + createdAt: safeTimestamp(c.created_at), + })); + + threadMap[t.id] = { + id: t.id, + mailboxId: t.mailbox_id, + messageId: t.message_id || null, + subject: t.subject || '', + fromAddress: t.from_address || null, + fromName: t.from_name || null, + toAddresses: t.to_addresses || [], + ccAddresses: t.cc_addresses || [], + bodyText: t.body_text || '', + bodyHtml: t.body_html || '', + tags: t.tags || [], + status: t.status || 'open', + isRead: !!t.is_read, + isStarred: !!t.is_starred, + assignedTo: t.assigned_to || null, + hasAttachments: !!t.has_attachments, + receivedAt: safeTimestamp(t.received_at), + createdAt: safeTimestamp(t.created_at), + comments: commentList, + }; + } catch (e) { + console.error(`[Migration] inbox: error on thread ${t.id}: ${e}`); + } + } + + // Approvals for this mailbox + const { rows: approvals } = await pool.query( + `SELECT * FROM rinbox.approvals WHERE mailbox_id = $1 ORDER BY created_at`, + [mb.id] + ); + + const approvalMap: Record = {}; + for (const a of approvals) { + try { + // Signatures for this approval + const { rows: sigs } = await pool.query( + `SELECT * FROM rinbox.approval_signatures WHERE approval_id = $1 ORDER BY signed_at`, + [a.id] + ); + const signatureList = sigs.map((s: any) => ({ + id: s.id, + approvalId: s.approval_id, + signerId: s.signer_id, + vote: s.vote || '', + signedAt: safeTimestamp(s.signed_at), + })); + + approvalMap[a.id] = { + id: a.id, + mailboxId: a.mailbox_id, + threadId: a.thread_id || null, + authorId: a.author_id, + subject: a.subject || '', + bodyText: a.body_text || '', + bodyHtml: a.body_html || '', + toAddresses: a.to_addresses || [], + ccAddresses: a.cc_addresses || [], + status: a.status || 'pending', + requiredSignatures: a.required_signatures ?? 1, + safeTxHash: a.safe_tx_hash || null, + createdAt: safeTimestamp(a.created_at), + resolvedAt: safeTimestamp(a.resolved_at), + signatures: signatureList, + }; + } catch (e) { + console.error(`[Migration] inbox: error on approval ${a.id}: ${e}`); + } + } + + // Members + const { rows: members } = await pool.query( + `SELECT * FROM rinbox.mailbox_members WHERE mailbox_id = $1`, + [mb.id] + ); + const memberList = members.map((m: any) => ({ + id: m.id, + mailboxId: m.mailbox_id, + userId: m.user_id, + role: m.role || 'member', + joinedAt: safeTimestamp(m.joined_at), + })); + + result.set(mb.id, { + meta: buildMeta(inboxMigration, _space), + mailbox: { + id: mb.id, + workspaceId: mb.workspace_id || null, + slug: mb.slug || '', + name: mb.name || '', + email: mb.email || '', + description: mb.description || '', + visibility: mb.visibility || 'private', + ownerDid: mb.owner_did || '', + safeAddress: mb.safe_address || null, + safeChainId: mb.safe_chain_id ?? null, + approvalThreshold: mb.approval_threshold ?? 1, + createdAt: safeTimestamp(mb.created_at), + }, + members: memberList, + threads: threadMap, + approvals: approvalMap, + }); + } catch (e) { + console.error(`[Migration] inbox: error on mailbox ${mb.id}: ${e}`); + } + } + } catch (e) { + console.warn(`[Migration] inbox: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// SPLAT MIGRATION +// ============================================================================ + +/** + * Splat: rsplat.splats + rsplat.source_files + * Granularity: 1 doc per space ({space}:splat:scenes) + */ +export const splatMigration: ModuleMigration = { + module: 'splat', + collection: 'scenes', + async extract(pool, space) { + const result = new Map(); + + try { + const { rows: splats } = await pool.query( + `SELECT * FROM rsplat.splats WHERE space_slug = $1 ORDER BY created_at`, + [space] + ); + + if (splats.length === 0) return result; + + const items: Record = {}; + for (const row of splats) { + try { + // Source files for this splat + const { rows: sourceFiles } = await pool.query( + `SELECT * FROM rsplat.source_files WHERE splat_id = $1 ORDER BY created_at`, + [row.id] + ); + + const sources = sourceFiles.map((sf: any) => ({ + id: sf.id, + splatId: sf.splat_id, + filePath: sf.file_path || '', + fileName: sf.file_name || '', + mimeType: sf.mime_type || null, + fileSizeBytes: sf.file_size_bytes ? Number(sf.file_size_bytes) : 0, + createdAt: safeTimestamp(sf.created_at), + })); + + items[row.id] = { + id: row.id, + slug: row.slug || '', + title: row.title || '', + description: row.description || '', + filePath: row.file_path || '', + fileFormat: row.file_format || '', + fileSizeBytes: row.file_size_bytes ? Number(row.file_size_bytes) : 0, + tags: row.tags || [], + spaceSlug: row.space_slug || '', + contributorId: row.contributor_id || null, + contributorName: row.contributor_name || null, + source: row.source || null, + status: row.status || 'active', + viewCount: row.view_count ?? 0, + paymentTx: row.payment_tx || null, + paymentNetwork: row.payment_network || null, + createdAt: safeTimestamp(row.created_at), + processingStatus: row.processing_status || null, + processingError: row.processing_error || null, + sourceFileCount: row.source_file_count ?? 0, + sourceFiles: sources, + }; + } catch (e) { + console.error(`[Migration] splat: error on splat ${row.id}: ${e}`); + } + } + + result.set('', { + meta: buildMeta(splatMigration, space), + items, + }); + } catch (e) { + console.warn(`[Migration] splat: ${e}`); + } + + return result; + }, +}; + +// ============================================================================ +// ALL MIGRATIONS (convenience export) +// ============================================================================ + +export const allMigrations: ModuleMigration[] = [ + notesMigration, + workMigration, + calMigration, + voteMigration, + booksMigration, + cartMigration, + providersMigration, + filesMigration, + tripsMigration, + inboxMigration, + splatMigration, +]; + +// ============================================================================ +// HELPERS +// ============================================================================ + +function countItems(state: any): number { + if (!state || typeof state !== 'object') return 1; + + let count = 0; + for (const [key, value] of Object.entries(state)) { + if (key === 'meta') continue; + if (value && typeof value === 'object' && !Array.isArray(value)) { + count += Object.keys(value).length; + } + } + return Math.max(count, 1); +}