rspace-online/server/local-first/migration/pg-to-automerge.ts

1427 lines
48 KiB
TypeScript

/**
* PG → Automerge Migration Tool
*
* One-time migration utility that reads data from PostgreSQL module schemas
* and converts them into Automerge documents. Run per-module during the
* local-first transition.
*
* Usage:
* import { migrateModule, notesMigration } from './migration/pg-to-automerge';
* await migrateModule(notesMigration, pool, 'demo', syncServer);
*
* // Dry-run (no docs created):
* await migrateModule(notesMigration, pool, 'demo', syncServer, { dryRun: true });
*
* // With disk backup:
* await migrateModule(notesMigration, pool, 'demo', syncServer, { backupDir: '/tmp/migration-backup' });
*
* Each module migration:
* 1. Reads all rows from the PG schema
* 2. Creates Automerge documents per the granularity rules
* 3. Checks idempotency (skips already-migrated docs)
* 4. Registers them with the SyncServer so they're available to clients
* 5. Optionally writes the Automerge binaries to disk as backup
*/
import * as Automerge from '@automerge/automerge';
import { mkdirSync, writeFileSync } from 'node:fs';
import type { SyncServer } from '../sync-server';
// ============================================================================
// TYPES
// ============================================================================
export interface MigrationOptions {
dryRun?: boolean;
backupDir?: string;
}
export interface MigrationResult {
module: string;
docsCreated: number;
docsSkipped: number;
rowsMigrated: number;
errors: string[];
durationMs: number;
dryRun: boolean;
}
/** Unified document metadata — present on all migrated docs. */
export interface DocMeta {
module: string;
collection: string;
version: number;
spaceSlug: string;
createdAt: number;
}
/**
* Module-specific migration adapter. Each module implements one of these
* to define how its PG data maps to Automerge documents.
*/
export interface ModuleMigration<T = any> {
module: string;
collection: string;
/**
* Query PG and return rows grouped by target document.
* Returns: Map<docIdSuffix, initialDocState>
* An empty-string suffix produces a doc without a trailing segment.
*/
extract(pool: any, space: string): Promise<Map<string, T>>;
}
// ============================================================================
// MIGRATION ENGINE
// ============================================================================
function buildMeta(migration: ModuleMigration, space: string): DocMeta {
return {
module: migration.module,
collection: migration.collection,
version: 1,
spaceSlug: space,
createdAt: Date.now(),
};
}
/**
* Run a module migration.
*/
export async function migrateModule(
migration: ModuleMigration,
pool: any,
space: string,
syncServer?: SyncServer,
options: MigrationOptions = {}
): Promise<MigrationResult> {
const start = Date.now();
const { dryRun = false, backupDir } = options;
const result: MigrationResult = {
module: migration.module,
docsCreated: 0,
docsSkipped: 0,
rowsMigrated: 0,
errors: [],
durationMs: 0,
dryRun,
};
if (backupDir && !dryRun) {
mkdirSync(backupDir, { recursive: true });
}
try {
console.log(
`[Migration] Starting ${migration.module} for space "${space}"` +
(dryRun ? ' (DRY RUN)' : '') +
'...'
);
const dataMap = await migration.extract(pool, space);
const total = dataMap.size;
let n = 0;
for (const [suffix, initialState] of dataMap) {
n++;
try {
const docId = suffix
? `${space}:${migration.module}:${migration.collection}:${suffix}`
: `${space}:${migration.module}:${migration.collection}`;
// Idempotency: skip if doc already exists
if (syncServer?.getDoc(docId)) {
console.log(
`[Migration] ${migration.module}: ${n}/${total} — skipped "${docId}" (already exists)`
);
result.docsSkipped++;
continue;
}
if (dryRun) {
const rowCount = countItems(initialState);
result.rowsMigrated += rowCount;
result.docsCreated++;
console.log(
`[Migration] ${migration.module}: ${n}/${total} — would create "${docId}" (${rowCount} items)`
);
continue;
}
let doc = Automerge.init<any>();
doc = Automerge.change(doc, `Migrate from PG: ${docId}`, (d: any) => {
Object.assign(d, initialState);
});
// Register with sync server
if (syncServer) {
syncServer.setDoc(docId, doc);
}
// Disk backup
if (backupDir) {
const binary = Automerge.save(doc);
const safeName = docId.replace(/[:/]/g, '_');
writeFileSync(`${backupDir}/${safeName}.automerge`, binary);
}
result.docsCreated++;
const rowCount = countItems(initialState);
result.rowsMigrated += rowCount;
console.log(
`[Migration] ${migration.module}: ${n}/${total} — created "${docId}" (${rowCount} items)`
);
} catch (e) {
const msg = `Failed to create doc for suffix "${suffix}": ${e}`;
result.errors.push(msg);
console.error(`[Migration] ${msg}`);
}
}
} catch (e) {
const msg = `Migration failed for ${migration.module}: ${e}`;
result.errors.push(msg);
console.error(`[Migration] ${msg}`);
}
result.durationMs = Date.now() - start;
console.log(
`[Migration] ${migration.module}: ${result.docsCreated} docs created, ` +
`${result.docsSkipped} skipped, ` +
`${result.rowsMigrated} rows, ${result.errors.length} errors, ` +
`${result.durationMs}ms` +
(dryRun ? ' (DRY RUN)' : '')
);
return result;
}
/**
* Run multiple module migrations.
*/
export async function migrateAll(
migrations: ModuleMigration[],
pool: any,
space: string,
syncServer?: SyncServer,
options: MigrationOptions = {}
): Promise<MigrationResult[]> {
const results: MigrationResult[] = [];
for (const migration of migrations) {
const result = await migrateModule(migration, pool, space, syncServer, options);
results.push(result);
}
return results;
}
// ============================================================================
// HELPER: safe row access (per-row error recovery)
// ============================================================================
function safeTimestamp(val: any): number {
if (!val) return 0;
return new Date(val).getTime();
}
function safePoint(val: any): { x: number; y: number } | null {
if (!val) return null;
// PG POINT comes as "(x,y)" string or {x, y} object
if (typeof val === 'string') {
const m = val.match(/\(([^,]+),([^)]+)\)/);
if (m) return { x: parseFloat(m[1]), y: parseFloat(m[2]) };
}
if (typeof val === 'object' && 'x' in val) {
return { x: Number(val.x), y: Number(val.y) };
}
return null;
}
// ============================================================================
// NOTES MIGRATION
// ============================================================================
/**
* Notes: rnotes.notebooks + rnotes.notes + rnotes.note_tags/tags
* Granularity: 1 doc per notebook ({space}:notes:notebooks:{notebookId})
*/
export const notesMigration: ModuleMigration = {
module: 'notes',
collection: 'notebooks',
async extract(pool, space) {
const result = new Map<string, any>();
try {
// Get all notebooks (no space_slug — notebooks are global, we migrate all)
const { rows: notebooks } = await pool.query(
`SELECT * FROM rnotes.notebooks ORDER BY created_at`
);
for (const nb of notebooks) {
try {
// Get notes for this notebook with tags
const { rows: notes } = await pool.query(
`SELECT n.*,
COALESCE(array_agg(t.name) FILTER (WHERE t.name IS NOT NULL), '{}') AS tags
FROM rnotes.notes n
LEFT JOIN rnotes.note_tags nt ON nt.note_id = n.id
LEFT JOIN rnotes.tags t ON t.id = nt.tag_id
WHERE n.notebook_id = $1
GROUP BY n.id
ORDER BY n.sort_order, n.created_at`,
[nb.id]
);
const items: Record<string, any> = {};
for (const row of notes) {
try {
items[row.id] = {
id: row.id,
notebookId: row.notebook_id,
authorId: row.author_id || null,
title: row.title || '',
content: row.content || '',
contentPlain: row.content_plain || '',
type: row.type || 'NOTE',
url: row.url || null,
language: row.language || null,
fileUrl: row.file_url || null,
mimeType: row.mime_type || null,
fileSize: row.file_size || null,
duration: row.duration || null,
isPinned: !!row.is_pinned,
sortOrder: row.sort_order ?? 0,
tags: row.tags || [],
createdAt: safeTimestamp(row.created_at),
updatedAt: safeTimestamp(row.updated_at),
};
} catch (e) {
console.error(`[Migration] notes: error on note ${row.id}: ${e}`);
}
}
result.set(nb.id, {
meta: buildMeta(notesMigration, space),
notebook: {
id: nb.id,
title: nb.title || '',
slug: nb.slug || '',
description: nb.description || '',
coverColor: nb.cover_color || null,
isPublic: !!nb.is_public,
createdAt: safeTimestamp(nb.created_at),
updatedAt: safeTimestamp(nb.updated_at),
},
items,
});
} catch (e) {
console.error(`[Migration] notes: error on notebook ${nb.id}: ${e}`);
}
}
} catch (e) {
console.warn(`[Migration] notes: ${e}`);
}
return result;
},
};
// ============================================================================
// WORK MIGRATION
// ============================================================================
/**
* Work: rwork.spaces + rwork.tasks
* Granularity: 1 doc per space/board ({space}:work:boards:{spaceId})
*/
export const workMigration: ModuleMigration = {
module: 'work',
collection: 'boards',
async extract(pool, space) {
const result = new Map<string, any>();
try {
const { rows: spaces } = await pool.query(
`SELECT * FROM rwork.spaces WHERE slug = $1`,
[space]
);
for (const ws of spaces) {
try {
const { rows: tasks } = await pool.query(
`SELECT * FROM rwork.tasks WHERE space_id = $1 ORDER BY sort_order, created_at`,
[ws.id]
);
const taskMap: Record<string, any> = {};
for (const task of tasks) {
try {
taskMap[task.id] = {
id: task.id,
spaceId: task.space_id,
title: task.title || '',
description: task.description || '',
status: task.status || 'todo',
priority: task.priority || null,
labels: task.labels || [],
assigneeId: task.assignee_id || null,
createdBy: task.created_by || null,
sortOrder: task.sort_order ?? 0,
createdAt: safeTimestamp(task.created_at),
updatedAt: safeTimestamp(task.updated_at),
};
} catch (e) {
console.error(`[Migration] work: error on task ${task.id}: ${e}`);
}
}
result.set(ws.id, {
meta: buildMeta(workMigration, space),
board: {
id: ws.id,
name: ws.name || '',
slug: ws.slug || '',
description: ws.description || '',
icon: ws.icon || null,
ownerDid: ws.owner_did || null,
statuses: ws.statuses || ['todo', 'in-progress', 'done'],
labels: ws.labels || [],
createdAt: safeTimestamp(ws.created_at),
updatedAt: safeTimestamp(ws.updated_at),
},
tasks: taskMap,
});
} catch (e) {
console.error(`[Migration] work: error on space ${ws.id}: ${e}`);
}
}
} catch (e) {
console.warn(`[Migration] work: ${e}`);
}
return result;
},
};
// ============================================================================
// CALENDAR MIGRATION
// ============================================================================
/**
* Cal: rcal.events + rcal.calendar_sources + rcal.locations
* Granularity: 1 doc per space ({space}:cal:events)
*/
export const calMigration: ModuleMigration = {
module: 'cal',
collection: 'events',
async extract(pool, space) {
const result = new Map<string, any>();
try {
// Events have no space_slug — linked via source_id to calendar_sources.
// We migrate all events and include source/location data inline.
const { rows } = await pool.query(
`SELECT e.*,
cs.name AS source_name, cs.source_type, cs.color AS source_color,
cs.url AS source_url, cs.is_active AS source_active,
l.name AS loc_name, l.granularity AS loc_granularity,
l.lat AS loc_lat, l.lng AS loc_lng
FROM rcal.events e
LEFT JOIN rcal.calendar_sources cs ON cs.id = e.source_id
LEFT JOIN rcal.locations l ON l.id = e.location_id
ORDER BY e.start_time`
);
if (rows.length === 0) return result;
// Also get calendar sources as their own list
const { rows: sources } = await pool.query(
`SELECT * FROM rcal.calendar_sources ORDER BY created_at`
);
const sourcesMap: Record<string, any> = {};
for (const s of sources) {
sourcesMap[s.id] = {
id: s.id,
name: s.name || '',
sourceType: s.source_type || '',
url: s.url || null,
color: s.color || null,
isActive: s.is_active ?? true,
isVisible: s.is_visible ?? true,
syncIntervalMinutes: s.sync_interval_minutes || null,
lastSyncedAt: safeTimestamp(s.last_synced_at),
ownerId: s.owner_id || null,
createdAt: safeTimestamp(s.created_at),
};
}
const events: Record<string, any> = {};
for (const row of rows) {
try {
events[row.id] = {
id: row.id,
title: row.title || '',
description: row.description || '',
startTime: safeTimestamp(row.start_time),
endTime: safeTimestamp(row.end_time),
allDay: !!row.all_day,
timezone: row.timezone || null,
rrule: row.rrule || null,
status: row.status || null,
visibility: row.visibility || null,
sourceId: row.source_id || null,
sourceName: row.source_name || null,
sourceType: row.source_type || null,
sourceColor: row.source_color || null,
locationId: row.location_id || null,
locationName: row.location_name || row.loc_name || null,
coordinates: safePoint(row.coordinates),
locationGranularity: row.location_granularity ?? row.loc_granularity ?? null,
locationLat: row.loc_lat ?? null,
locationLng: row.loc_lng ?? null,
isVirtual: !!row.is_virtual,
virtualUrl: row.virtual_url || null,
virtualPlatform: row.virtual_platform || null,
rToolSource: row.r_tool_source || null,
rToolEntityId: row.r_tool_entity_id || null,
attendees: row.attendees || [],
attendeeCount: row.attendee_count ?? 0,
metadata: row.metadata || null,
createdAt: safeTimestamp(row.created_at),
updatedAt: safeTimestamp(row.updated_at),
};
} catch (e) {
console.error(`[Migration] cal: error on event ${row.id}: ${e}`);
}
}
result.set('', {
meta: buildMeta(calMigration, space),
sources: sourcesMap,
events,
});
} catch (e) {
console.warn(`[Migration] cal: ${e}`);
}
return result;
},
};
// ============================================================================
// VOTE MIGRATION
// ============================================================================
/**
* Vote: rvote.proposals + rvote.votes + rvote.final_votes + rvote.spaces
* Granularity: 1 doc per proposal ({space}:vote:proposals:{proposalId})
*/
export const voteMigration: ModuleMigration = {
module: 'vote',
collection: 'proposals',
async extract(pool, space) {
const result = new Map<string, any>();
try {
// Get space config
let spaceConfig: any = null;
try {
const { rows } = await pool.query(
`SELECT * FROM rvote.spaces WHERE slug = $1`,
[space]
);
if (rows.length > 0) {
const s = rows[0];
spaceConfig = {
slug: s.slug,
name: s.name || '',
description: s.description || '',
ownerDid: s.owner_did || '',
visibility: s.visibility || 'public',
promotionThreshold: s.promotion_threshold ?? null,
votingPeriodDays: s.voting_period_days ?? null,
creditsPerDay: s.credits_per_day ?? null,
maxCredits: s.max_credits ?? null,
startingCredits: s.starting_credits ?? null,
createdAt: safeTimestamp(s.created_at),
updatedAt: safeTimestamp(s.updated_at),
};
}
} catch (_) {
// spaces table may not have data
}
const { rows: proposals } = await pool.query(
`SELECT * FROM rvote.proposals WHERE space_slug = $1 ORDER BY created_at`,
[space]
);
for (const prop of proposals) {
try {
// Conviction votes (quadratic)
const { rows: votes } = await pool.query(
`SELECT * FROM rvote.votes WHERE proposal_id = $1`,
[prop.id]
);
const voteMap: Record<string, any> = {};
for (const v of votes) {
try {
voteMap[v.id] = {
id: v.id,
userId: v.user_id,
proposalId: v.proposal_id,
weight: v.weight,
creditCost: v.credit_cost,
createdAt: safeTimestamp(v.created_at),
decaysAt: safeTimestamp(v.decays_at),
};
} catch (e) {
console.error(`[Migration] vote: error on vote ${v.id}: ${e}`);
}
}
// Final votes (YES/NO/ABSTAIN)
const { rows: finalVotes } = await pool.query(
`SELECT * FROM rvote.final_votes WHERE proposal_id = $1`,
[prop.id]
);
const finalVoteMap: Record<string, any> = {};
for (const fv of finalVotes) {
try {
finalVoteMap[fv.id] = {
id: fv.id,
userId: fv.user_id,
proposalId: fv.proposal_id,
vote: fv.vote,
createdAt: safeTimestamp(fv.created_at),
};
} catch (e) {
console.error(`[Migration] vote: error on final_vote ${fv.id}: ${e}`);
}
}
result.set(prop.id, {
meta: buildMeta(voteMigration, space),
spaceConfig,
proposal: {
id: prop.id,
spaceSlug: prop.space_slug,
authorId: prop.author_id,
title: prop.title || '',
description: prop.description || '',
status: prop.status || 'RANKING',
score: prop.score ?? 0,
votingEndsAt: safeTimestamp(prop.voting_ends_at),
finalYes: prop.final_yes ?? 0,
finalNo: prop.final_no ?? 0,
finalAbstain: prop.final_abstain ?? 0,
createdAt: safeTimestamp(prop.created_at),
updatedAt: safeTimestamp(prop.updated_at),
},
votes: voteMap,
finalVotes: finalVoteMap,
});
} catch (e) {
console.error(`[Migration] vote: error on proposal ${prop.id}: ${e}`);
}
}
} catch (e) {
console.warn(`[Migration] vote: ${e}`);
}
return result;
},
};
// ============================================================================
// BOOKS MIGRATION
// ============================================================================
/**
* Books: rbooks.books (global — no space_slug)
* Granularity: 1 doc per space ({space}:books:catalog)
* All books in one catalog doc; PDF paths as string refs
*/
export const booksMigration: ModuleMigration = {
module: 'books',
collection: 'catalog',
async extract(pool, space) {
const result = new Map<string, any>();
try {
const { rows } = await pool.query(
`SELECT * FROM rbooks.books ORDER BY created_at`
);
if (rows.length === 0) return result;
const items: Record<string, any> = {};
for (const row of rows) {
try {
items[row.id] = {
id: row.id,
slug: row.slug || '',
title: row.title || '',
author: row.author || '',
description: row.description || '',
pdfPath: row.pdf_path || '',
pdfSizeBytes: row.pdf_size_bytes ? Number(row.pdf_size_bytes) : 0,
pageCount: row.page_count ?? 0,
tags: row.tags || [],
license: row.license || null,
coverColor: row.cover_color || null,
contributorId: row.contributor_id || null,
contributorName: row.contributor_name || null,
status: row.status || 'active',
featured: !!row.featured,
viewCount: row.view_count ?? 0,
downloadCount: row.download_count ?? 0,
createdAt: safeTimestamp(row.created_at),
updatedAt: safeTimestamp(row.updated_at),
};
} catch (e) {
console.error(`[Migration] books: error on book ${row.id}: ${e}`);
}
}
result.set('', {
meta: buildMeta(booksMigration, space),
items,
});
} catch (e) {
console.warn(`[Migration] books: ${e}`);
}
return result;
},
};
// ============================================================================
// CART MIGRATION
// ============================================================================
/**
* Cart: rcart.catalog_entries + rcart.orders
* Granularity: 1 doc for catalog ({space}:cart:catalog),
* 1 doc per order ({space}:cart:orders:{orderId})
*/
export const cartMigration: ModuleMigration = {
module: 'cart',
collection: 'catalog',
async extract(pool, space) {
const result = new Map<string, any>();
try {
// Catalog entries (global)
const { rows: entries } = await pool.query(
`SELECT * FROM rcart.catalog_entries ORDER BY created_at`
);
const catalogItems: Record<string, any> = {};
for (const row of entries) {
try {
catalogItems[row.id] = {
id: row.id,
artifactId: row.artifact_id,
artifact: row.artifact || {},
title: row.title || '',
productType: row.product_type || null,
requiredCapabilities: row.required_capabilities || [],
substrates: row.substrates || [],
creatorId: row.creator_id || null,
sourceSpace: row.source_space || null,
tags: row.tags || [],
status: row.status || 'active',
createdAt: safeTimestamp(row.created_at),
updatedAt: safeTimestamp(row.updated_at),
};
} catch (e) {
console.error(`[Migration] cart: error on catalog entry ${row.id}: ${e}`);
}
}
result.set('', {
meta: {
...buildMeta(cartMigration, space),
collection: 'catalog',
},
items: catalogItems,
});
// Orders — 1 doc per order
const { rows: orders } = await pool.query(
`SELECT * FROM rcart.orders ORDER BY created_at`
);
for (const row of orders) {
try {
const orderDoc = {
meta: {
...buildMeta(cartMigration, space),
collection: 'orders',
},
order: {
id: row.id,
catalogEntryId: row.catalog_entry_id,
artifactId: row.artifact_id,
buyerId: row.buyer_id || null,
buyerLocation: row.buyer_location || null,
buyerContact: row.buyer_contact || null,
providerId: row.provider_id || null,
providerName: row.provider_name || null,
providerDistanceKm: row.provider_distance_km ?? null,
quantity: row.quantity,
productionCost: row.production_cost ? Number(row.production_cost) : null,
creatorPayout: row.creator_payout ? Number(row.creator_payout) : null,
communityPayout: row.community_payout ? Number(row.community_payout) : null,
totalPrice: row.total_price ? Number(row.total_price) : null,
currency: row.currency || 'USD',
status: row.status || 'pending',
paymentMethod: row.payment_method || null,
paymentTx: row.payment_tx || null,
paymentNetwork: row.payment_network || null,
createdAt: safeTimestamp(row.created_at),
paidAt: safeTimestamp(row.paid_at),
acceptedAt: safeTimestamp(row.accepted_at),
completedAt: safeTimestamp(row.completed_at),
updatedAt: safeTimestamp(row.updated_at),
},
};
// Orders use a different docId pattern: {space}:cart:orders:{orderId}
// We return them with a key that the engine will use as suffix.
// But since our collection is 'catalog', we need a custom key.
// Prefix with 'orders:' so the engine builds the right docId.
result.set(`orders:${row.id}`, orderDoc);
} catch (e) {
console.error(`[Migration] cart: error on order ${row.id}: ${e}`);
}
}
} catch (e) {
console.warn(`[Migration] cart: ${e}`);
}
return result;
},
};
// ============================================================================
// PROVIDERS MIGRATION
// ============================================================================
/**
* Providers: providers.providers (global, not space-scoped)
* Granularity: 1 shared doc (global:providers:registry)
*/
export const providersMigration: ModuleMigration = {
module: 'providers',
collection: 'registry',
async extract(pool, _space) {
const result = new Map<string, any>();
try {
const { rows } = await pool.query(
`SELECT * FROM providers.providers ORDER BY member_since, created_at`
);
if (rows.length === 0) return result;
const items: Record<string, any> = {};
for (const row of rows) {
try {
items[row.id] = {
id: row.id,
name: row.name || '',
description: row.description || '',
lat: row.lat,
lng: row.lng,
address: row.address || null,
city: row.city || null,
region: row.region || null,
country: row.country ? row.country.trim() : null,
serviceRadiusKm: row.service_radius_km ?? null,
offersShipping: !!row.offers_shipping,
capabilities: row.capabilities || [],
substrates: row.substrates || [],
standardDays: row.standard_days ?? null,
rushDays: row.rush_days ?? null,
rushSurchargePct: row.rush_surcharge_pct ?? null,
pricing: row.pricing || {},
communities: row.communities || [],
contactEmail: row.contact_email || null,
contactPhone: row.contact_phone || null,
contactWebsite: row.contact_website || null,
wallet: row.wallet || null,
jobsCompleted: row.jobs_completed ?? 0,
avgRating: row.avg_rating ?? null,
memberSince: safeTimestamp(row.member_since),
active: row.active ?? true,
createdAt: safeTimestamp(row.created_at),
updatedAt: safeTimestamp(row.updated_at),
};
} catch (e) {
console.error(`[Migration] providers: error on provider ${row.id}: ${e}`);
}
}
// Use 'global' as the space for providers (overrides the space parameter)
result.set('', {
meta: {
module: 'providers',
collection: 'registry',
version: 1,
spaceSlug: 'global',
createdAt: Date.now(),
},
items,
});
} catch (e) {
console.warn(`[Migration] providers: ${e}`);
}
return result;
},
};
// ============================================================================
// FILES MIGRATION
// ============================================================================
/**
* Files: rfiles.media_files + rfiles.memory_cards
* Granularity: 1 doc per shared_space ({space}:files:cards)
* Skips: access_logs (ephemeral), public_shares (ephemeral tokens)
*/
export const filesMigration: ModuleMigration = {
module: 'files',
collection: 'cards',
async extract(pool, space) {
const result = new Map<string, any>();
try {
// Get all unique shared_spaces
const { rows: spacesResult } = await pool.query(
`SELECT DISTINCT shared_space FROM rfiles.media_files WHERE shared_space IS NOT NULL
UNION
SELECT DISTINCT shared_space FROM rfiles.memory_cards`
);
for (const { shared_space: sharedSpace } of spacesResult) {
try {
// Media files for this shared space
const { rows: files } = await pool.query(
`SELECT * FROM rfiles.media_files WHERE shared_space = $1 ORDER BY created_at`,
[sharedSpace]
);
const fileMap: Record<string, any> = {};
for (const row of files) {
try {
fileMap[row.id] = {
id: row.id,
originalFilename: row.original_filename || '',
title: row.title || null,
description: row.description || '',
mimeType: row.mime_type || null,
fileSize: row.file_size ? Number(row.file_size) : 0,
fileHash: row.file_hash || null,
storagePath: row.storage_path || '',
tags: row.tags || [],
isProcessed: !!row.is_processed,
processingError: row.processing_error || null,
uploadedBy: row.uploaded_by || null,
sharedSpace: row.shared_space || null,
createdAt: safeTimestamp(row.created_at),
updatedAt: safeTimestamp(row.updated_at),
};
} catch (e) {
console.error(`[Migration] files: error on file ${row.id}: ${e}`);
}
}
// Memory cards for this shared space
const { rows: cards } = await pool.query(
`SELECT * FROM rfiles.memory_cards WHERE shared_space = $1 ORDER BY position, created_at`,
[sharedSpace]
);
const cardMap: Record<string, any> = {};
for (const row of cards) {
try {
cardMap[row.id] = {
id: row.id,
sharedSpace: row.shared_space || '',
title: row.title || '',
body: row.body || '',
cardType: row.card_type || null,
tags: row.tags || [],
position: row.position ?? 0,
createdBy: row.created_by || null,
createdAt: safeTimestamp(row.created_at),
updatedAt: safeTimestamp(row.updated_at),
};
} catch (e) {
console.error(`[Migration] files: error on card ${row.id}: ${e}`);
}
}
// Use shared_space as suffix (not space param)
result.set(sharedSpace, {
meta: {
...buildMeta(filesMigration, space),
sharedSpace,
},
files: fileMap,
memoryCards: cardMap,
});
} catch (e) {
console.error(`[Migration] files: error on shared_space ${sharedSpace}: ${e}`);
}
}
} catch (e) {
console.warn(`[Migration] files: ${e}`);
}
return result;
},
};
// ============================================================================
// TRIPS MIGRATION
// ============================================================================
/**
* Trips: rtrips.trips + destinations + itinerary_items + bookings + expenses + packing_items
* Granularity: 1 doc per trip ({space}:trips:trips:{tripId})
*/
export const tripsMigration: ModuleMigration = {
module: 'trips',
collection: 'trips',
async extract(pool, _space) {
const result = new Map<string, any>();
try {
const { rows: trips } = await pool.query(
`SELECT * FROM rtrips.trips ORDER BY created_at`
);
for (const trip of trips) {
try {
// Destinations
const { rows: dests } = await pool.query(
`SELECT * FROM rtrips.destinations WHERE trip_id = $1 ORDER BY sort_order, arrival_date`,
[trip.id]
);
const destinations: Record<string, any> = {};
for (const d of dests) {
destinations[d.id] = {
id: d.id,
tripId: d.trip_id,
name: d.name || '',
country: d.country || null,
lat: d.lat ?? null,
lng: d.lng ?? null,
arrivalDate: d.arrival_date?.toISOString?.() ?? d.arrival_date ?? null,
departureDate: d.departure_date?.toISOString?.() ?? d.departure_date ?? null,
notes: d.notes || '',
sortOrder: d.sort_order ?? 0,
createdAt: safeTimestamp(d.created_at),
};
}
// Itinerary items
const { rows: items } = await pool.query(
`SELECT * FROM rtrips.itinerary_items WHERE trip_id = $1 ORDER BY date, sort_order`,
[trip.id]
);
const itinerary: Record<string, any> = {};
for (const it of items) {
itinerary[it.id] = {
id: it.id,
tripId: it.trip_id,
destinationId: it.destination_id || null,
title: it.title || '',
category: it.category || null,
date: it.date?.toISOString?.() ?? it.date ?? null,
startTime: it.start_time || null,
endTime: it.end_time || null,
notes: it.notes || '',
sortOrder: it.sort_order ?? 0,
createdAt: safeTimestamp(it.created_at),
};
}
// Bookings
const { rows: bkgs } = await pool.query(
`SELECT * FROM rtrips.bookings WHERE trip_id = $1 ORDER BY start_date`,
[trip.id]
);
const bookings: Record<string, any> = {};
for (const b of bkgs) {
bookings[b.id] = {
id: b.id,
tripId: b.trip_id,
type: b.type || null,
provider: b.provider || null,
confirmationNumber: b.confirmation_number || null,
cost: b.cost ? Number(b.cost) : null,
currency: b.currency || null,
startDate: b.start_date?.toISOString?.() ?? b.start_date ?? null,
endDate: b.end_date?.toISOString?.() ?? b.end_date ?? null,
status: b.status || null,
notes: b.notes || '',
createdAt: safeTimestamp(b.created_at),
};
}
// Expenses
const { rows: exps } = await pool.query(
`SELECT * FROM rtrips.expenses WHERE trip_id = $1 ORDER BY date`,
[trip.id]
);
const expenses: Record<string, any> = {};
for (const ex of exps) {
expenses[ex.id] = {
id: ex.id,
tripId: ex.trip_id,
paidBy: ex.paid_by || null,
description: ex.description || '',
amount: ex.amount ? Number(ex.amount) : 0,
currency: ex.currency || null,
category: ex.category || null,
date: ex.date?.toISOString?.() ?? ex.date ?? null,
splitType: ex.split_type || null,
createdAt: safeTimestamp(ex.created_at),
};
}
// Packing items
const { rows: packing } = await pool.query(
`SELECT * FROM rtrips.packing_items WHERE trip_id = $1 ORDER BY sort_order`,
[trip.id]
);
const packingItems: Record<string, any> = {};
for (const p of packing) {
packingItems[p.id] = {
id: p.id,
tripId: p.trip_id,
addedBy: p.added_by || null,
name: p.name || '',
category: p.category || null,
packed: !!p.packed,
quantity: p.quantity ?? 1,
sortOrder: p.sort_order ?? 0,
createdAt: safeTimestamp(p.created_at),
};
}
result.set(trip.id, {
meta: buildMeta(tripsMigration, _space),
trip: {
id: trip.id,
title: trip.title || '',
slug: trip.slug || '',
description: trip.description || '',
startDate: trip.start_date?.toISOString?.() ?? trip.start_date ?? null,
endDate: trip.end_date?.toISOString?.() ?? trip.end_date ?? null,
budgetTotal: trip.budget_total ? Number(trip.budget_total) : null,
budgetCurrency: trip.budget_currency || null,
status: trip.status || 'planning',
createdBy: trip.created_by || null,
createdAt: safeTimestamp(trip.created_at),
updatedAt: safeTimestamp(trip.updated_at),
},
destinations,
itinerary,
bookings,
expenses,
packingItems,
});
} catch (e) {
console.error(`[Migration] trips: error on trip ${trip.id}: ${e}`);
}
}
} catch (e) {
console.warn(`[Migration] trips: ${e}`);
}
return result;
},
};
// ============================================================================
// INBOX MIGRATION
// ============================================================================
/**
* Inbox: rinbox.mailboxes + threads + comments + approvals + approval_signatures
* Granularity: 1 doc per mailbox ({space}:inbox:mailboxes:{mailboxId})
* Skips: sync_state (IMAP tracking, server-side only)
*/
export const inboxMigration: ModuleMigration = {
module: 'inbox',
collection: 'mailboxes',
async extract(pool, _space) {
const result = new Map<string, any>();
try {
const { rows: mailboxes } = await pool.query(
`SELECT * FROM rinbox.mailboxes ORDER BY created_at`
);
for (const mb of mailboxes) {
try {
// Threads
const { rows: threads } = await pool.query(
`SELECT * FROM rinbox.threads WHERE mailbox_id = $1 ORDER BY received_at, created_at`,
[mb.id]
);
const threadMap: Record<string, any> = {};
for (const t of threads) {
try {
// Comments for this thread
const { rows: comments } = await pool.query(
`SELECT * FROM rinbox.comments WHERE thread_id = $1 ORDER BY created_at`,
[t.id]
);
const commentList = comments.map((c: any) => ({
id: c.id,
threadId: c.thread_id,
authorId: c.author_id,
body: c.body || '',
mentions: c.mentions || [],
createdAt: safeTimestamp(c.created_at),
}));
threadMap[t.id] = {
id: t.id,
mailboxId: t.mailbox_id,
messageId: t.message_id || null,
subject: t.subject || '',
fromAddress: t.from_address || null,
fromName: t.from_name || null,
toAddresses: t.to_addresses || [],
ccAddresses: t.cc_addresses || [],
bodyText: t.body_text || '',
bodyHtml: t.body_html || '',
tags: t.tags || [],
status: t.status || 'open',
isRead: !!t.is_read,
isStarred: !!t.is_starred,
assignedTo: t.assigned_to || null,
hasAttachments: !!t.has_attachments,
receivedAt: safeTimestamp(t.received_at),
createdAt: safeTimestamp(t.created_at),
comments: commentList,
};
} catch (e) {
console.error(`[Migration] inbox: error on thread ${t.id}: ${e}`);
}
}
// Approvals for this mailbox
const { rows: approvals } = await pool.query(
`SELECT * FROM rinbox.approvals WHERE mailbox_id = $1 ORDER BY created_at`,
[mb.id]
);
const approvalMap: Record<string, any> = {};
for (const a of approvals) {
try {
// Signatures for this approval
const { rows: sigs } = await pool.query(
`SELECT * FROM rinbox.approval_signatures WHERE approval_id = $1 ORDER BY signed_at`,
[a.id]
);
const signatureList = sigs.map((s: any) => ({
id: s.id,
approvalId: s.approval_id,
signerId: s.signer_id,
vote: s.vote || '',
signedAt: safeTimestamp(s.signed_at),
}));
approvalMap[a.id] = {
id: a.id,
mailboxId: a.mailbox_id,
threadId: a.thread_id || null,
authorId: a.author_id,
subject: a.subject || '',
bodyText: a.body_text || '',
bodyHtml: a.body_html || '',
toAddresses: a.to_addresses || [],
ccAddresses: a.cc_addresses || [],
status: a.status || 'pending',
requiredSignatures: a.required_signatures ?? 1,
safeTxHash: a.safe_tx_hash || null,
createdAt: safeTimestamp(a.created_at),
resolvedAt: safeTimestamp(a.resolved_at),
signatures: signatureList,
};
} catch (e) {
console.error(`[Migration] inbox: error on approval ${a.id}: ${e}`);
}
}
// Members
const { rows: members } = await pool.query(
`SELECT * FROM rinbox.mailbox_members WHERE mailbox_id = $1`,
[mb.id]
);
const memberList = members.map((m: any) => ({
id: m.id,
mailboxId: m.mailbox_id,
userId: m.user_id,
role: m.role || 'member',
joinedAt: safeTimestamp(m.joined_at),
}));
result.set(mb.id, {
meta: buildMeta(inboxMigration, _space),
mailbox: {
id: mb.id,
workspaceId: mb.workspace_id || null,
slug: mb.slug || '',
name: mb.name || '',
email: mb.email || '',
description: mb.description || '',
visibility: mb.visibility || 'private',
ownerDid: mb.owner_did || '',
safeAddress: mb.safe_address || null,
safeChainId: mb.safe_chain_id ?? null,
approvalThreshold: mb.approval_threshold ?? 1,
createdAt: safeTimestamp(mb.created_at),
},
members: memberList,
threads: threadMap,
approvals: approvalMap,
});
} catch (e) {
console.error(`[Migration] inbox: error on mailbox ${mb.id}: ${e}`);
}
}
} catch (e) {
console.warn(`[Migration] inbox: ${e}`);
}
return result;
},
};
// ============================================================================
// SPLAT MIGRATION
// ============================================================================
/**
* Splat: rsplat.splats + rsplat.source_files
* Granularity: 1 doc per space ({space}:splat:scenes)
*/
export const splatMigration: ModuleMigration = {
module: 'splat',
collection: 'scenes',
async extract(pool, space) {
const result = new Map<string, any>();
try {
const { rows: splats } = await pool.query(
`SELECT * FROM rsplat.splats WHERE space_slug = $1 ORDER BY created_at`,
[space]
);
if (splats.length === 0) return result;
const items: Record<string, any> = {};
for (const row of splats) {
try {
// Source files for this splat
const { rows: sourceFiles } = await pool.query(
`SELECT * FROM rsplat.source_files WHERE splat_id = $1 ORDER BY created_at`,
[row.id]
);
const sources = sourceFiles.map((sf: any) => ({
id: sf.id,
splatId: sf.splat_id,
filePath: sf.file_path || '',
fileName: sf.file_name || '',
mimeType: sf.mime_type || null,
fileSizeBytes: sf.file_size_bytes ? Number(sf.file_size_bytes) : 0,
createdAt: safeTimestamp(sf.created_at),
}));
items[row.id] = {
id: row.id,
slug: row.slug || '',
title: row.title || '',
description: row.description || '',
filePath: row.file_path || '',
fileFormat: row.file_format || '',
fileSizeBytes: row.file_size_bytes ? Number(row.file_size_bytes) : 0,
tags: row.tags || [],
spaceSlug: row.space_slug || '',
contributorId: row.contributor_id || null,
contributorName: row.contributor_name || null,
source: row.source || null,
status: row.status || 'active',
viewCount: row.view_count ?? 0,
paymentTx: row.payment_tx || null,
paymentNetwork: row.payment_network || null,
createdAt: safeTimestamp(row.created_at),
processingStatus: row.processing_status || null,
processingError: row.processing_error || null,
sourceFileCount: row.source_file_count ?? 0,
sourceFiles: sources,
};
} catch (e) {
console.error(`[Migration] splat: error on splat ${row.id}: ${e}`);
}
}
result.set('', {
meta: buildMeta(splatMigration, space),
items,
});
} catch (e) {
console.warn(`[Migration] splat: ${e}`);
}
return result;
},
};
// ============================================================================
// ALL MIGRATIONS (convenience export)
// ============================================================================
export const allMigrations: ModuleMigration[] = [
notesMigration,
workMigration,
calMigration,
voteMigration,
booksMigration,
cartMigration,
providersMigration,
filesMigration,
tripsMigration,
inboxMigration,
splatMigration,
];
// ============================================================================
// HELPERS
// ============================================================================
function countItems(state: any): number {
if (!state || typeof state !== 'object') return 1;
let count = 0;
for (const [key, value] of Object.entries(state)) {
if (key === 'meta') continue;
if (value && typeof value === 'object' && !Array.isArray(value)) {
count += Object.keys(value).length;
}
}
return Math.max(count, 1);
}