rspace-online/server/local-first/migration/run-migration.ts

215 lines
6.4 KiB
TypeScript

/**
* Run PG → Automerge migrations.
*
* Usage (inside rspace container):
* bun run server/local-first/migration/run-migration.ts [space] [flags]
*
* Flags:
* --dry-run Preview without creating docs
* --module=notes Run only a specific module migration
* --verify After migrating, compare Automerge docs against PG
*
* Default space: "demo". Creates disk backups in /data/docs-backup/.
* Idempotent: skips docs that already exist in the SyncServer.
*/
import postgres from 'postgres';
import * as Automerge from '@automerge/automerge';
import { mkdirSync, writeFileSync } from 'node:fs';
import { dirname } from 'node:path';
import {
migrateModule,
allMigrations,
type MigrationResult,
} from './pg-to-automerge';
import { syncServer } from '../../sync-instance';
import { loadAllDocs, docIdToPath } from '../doc-persistence';
const DATABASE_URL = process.env.DATABASE_URL;
if (!DATABASE_URL) {
throw new Error('DATABASE_URL environment variable is required');
}
const sql = postgres(DATABASE_URL, { max: 5, idle_timeout: 10 });
// Wrap postgres.js in a pg-compatible pool.query() interface
const pool = {
async query(text: string, params?: any[]) {
const result = params
? await sql.unsafe(text, params)
: await sql.unsafe(text);
return { rows: Array.from(result) };
},
};
// ── CLI args ──
const args = process.argv.slice(2);
const flags = args.filter((a) => a.startsWith('--'));
const positional = args.filter((a) => !a.startsWith('--'));
const space = positional[0] || 'demo';
const dryRun = flags.includes('--dry-run');
const verify = flags.includes('--verify');
const moduleFlag = flags.find((f) => f.startsWith('--module='))?.split('=')[1];
const BACKUP_DIR = '/data/docs-backup';
async function main() {
const mode = dryRun ? 'DRY-RUN' : 'MIGRATE';
console.log(`\n=== PG → AUTOMERGE ${mode} (space: "${space}") ===\n`);
// Load any existing docs so idempotency checks work
await loadAllDocs(syncServer);
// Filter migrations if --module flag provided
const migrations = moduleFlag
? allMigrations.filter((m) => m.module === moduleFlag)
: allMigrations;
if (moduleFlag && migrations.length === 0) {
console.error(`No migration found for module "${moduleFlag}"`);
console.error(`Available: ${allMigrations.map((m) => m.module).join(', ')}`);
process.exit(1);
}
const results: MigrationResult[] = [];
for (const migration of migrations) {
const result = await migrateModule(migration, pool, space, syncServer, {
dryRun,
backupDir: dryRun ? undefined : BACKUP_DIR,
});
results.push(result);
console.log('');
}
// Save docs to disk (skip in dry-run)
if (!dryRun) {
console.log('[Migration] Saving all docs to /data/docs/...');
let saved = 0;
for (const docId of syncServer.getDocIds()) {
const doc = syncServer.getDoc(docId);
if (!doc) continue;
try {
const filePath = docIdToPath(docId);
mkdirSync(dirname(filePath), { recursive: true });
const binary = Automerge.save(doc);
writeFileSync(filePath, binary);
saved++;
} catch (e) {
console.error(`[Migration] Failed to save ${docId}:`, e);
}
}
console.log(`[Migration] Saved ${saved} docs to disk.`);
}
// ── Summary ──
printSummary(results);
// ── Verification ──
if (verify) {
console.log('\n=== VERIFICATION ===\n');
await verifyNotes(space);
}
console.log(`\nBackups: ${BACKUP_DIR}/`);
console.log(`Persistent: /data/docs/`);
console.log(`Total docs in SyncServer: ${syncServer.getDocIds().length}`);
await sql.end();
}
// ── Verification: compare PG vs Automerge for notes ──
async function verifyNotes(space: string) {
try {
// Count notebooks in PG
const { rows: pgNotebooks } = await pool.query(
'SELECT id, title FROM rnotes.notebooks ORDER BY created_at'
);
// Count notebook docs in Automerge
const notesDocs = syncServer.getDocIds().filter((id) =>
id.includes(':notes:notebooks:')
);
console.log(` PG notebooks: ${pgNotebooks.length}`);
console.log(` Automerge notebooks: ${notesDocs.length}`);
if (pgNotebooks.length !== notesDocs.length) {
console.warn(' ⚠ Count mismatch!');
}
// Per-notebook: compare note counts
let allMatch = true;
for (const nb of pgNotebooks) {
const { rows: pgNotes } = await pool.query(
'SELECT COUNT(*) as count FROM rnotes.notes WHERE notebook_id = $1',
[nb.id]
);
const pgCount = parseInt(pgNotes[0]?.count ?? '0', 10);
const docId = `${space}:notes:notebooks:${nb.id}`;
const doc = syncServer.getDoc<{ items: Record<string, unknown> }>(docId);
const amCount = doc ? Object.keys(doc.items ?? {}).length : 0;
if (pgCount !== amCount) {
console.warn(` ⚠ "${nb.title}" (${nb.id}): PG=${pgCount} AM=${amCount}`);
allMatch = false;
}
}
if (allMatch && pgNotebooks.length > 0) {
console.log(' ✓ All note counts match between PG and Automerge');
}
} catch (e) {
console.warn(` Verification skipped (notes tables may not exist): ${e}`);
}
}
// ── Print summary table ──
function printSummary(results: MigrationResult[]) {
console.log('\n=== SUMMARY ===\n');
console.log(
`${'Module'.padEnd(12)} ${'Created'.padStart(8)} ${'Skipped'.padStart(8)} ${'Rows'.padStart(6)} ${'Errors'.padStart(7)} ${'Time'.padStart(8)}`
);
console.log('-'.repeat(52));
let totalCreated = 0;
let totalSkipped = 0;
let totalRows = 0;
let totalErrors = 0;
for (const r of results) {
console.log(
`${r.module.padEnd(12)} ${String(r.docsCreated).padStart(8)} ${String(r.docsSkipped).padStart(8)} ${String(r.rowsMigrated).padStart(6)} ${String(r.errors.length).padStart(7)} ${(r.durationMs + 'ms').padStart(8)}`
);
totalCreated += r.docsCreated;
totalSkipped += r.docsSkipped;
totalRows += r.rowsMigrated;
totalErrors += r.errors.length;
}
console.log('-'.repeat(52));
console.log(
`${'TOTAL'.padEnd(12)} ${String(totalCreated).padStart(8)} ${String(totalSkipped).padStart(8)} ${String(totalRows).padStart(6)} ${String(totalErrors).padStart(7)}`
);
if (totalErrors > 0) {
console.log('\n=== ERRORS ===\n');
for (const r of results) {
for (const e of r.errors) {
console.error(`[${r.module}] ${e}`);
}
}
}
}
main().catch((e) => {
console.error('Fatal:', e);
process.exit(1);
});