feat(rsocials): auto-push scheduled posts to Postiz + reconcile published state

Per-post "Schedule to Postiz" button on Post nodes (rSpace → Postiz), a 60s
server sweep that auto-pushes Scheduled posts within a 10min lead window,
and a Postiz → rSpace reconcile poll that flips postizStatus to
'published' (or 'failed') and records publishedAt + releaseURL.

- schemas: PostNodeData gains postizPostId, postizIntegrationId,
  postizStatus, postizError, postizSentAt, postizCheckedAt,
  postizReleaseURL, publishedAt.
- postiz-client: listPosts(startDate, endDate) for reconciliation.
- mod.ts: sendCampaignNodeToPostiz() helper, POST /api/campaign/flows/
  :flowId/nodes/:nodeId/send-postiz, postizSweep() + postizReconcile()
  wired into onInit via startPostizScheduler.
- folk-campaign-planner: button + status badge in Post inspector,
  timeline/table dot colors reflect postizStatus (queued=purple,
  published=green, failed=red). Timeline bucket stays by scheduledAt.
- campaign-planner.css: postiz state pill styles.
- Bump campaign-planner JS/CSS to ?v=2 to bust CF cache.

Mock-Postiz smoke against the client lib passes 20/20 assertions.
Live Postiz round-trip pending deploy to Netcup.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-04-17 10:28:26 -04:00
parent 1471d1d578
commit 5c32119946
5 changed files with 376 additions and 7 deletions

View File

@ -308,6 +308,23 @@ folk-campaign-planner {
min-height: 60px;
}
.cp-postiz-state {
padding: 6px 8px;
border-radius: 6px;
font-size: 11px;
font-weight: 600;
margin-top: 4px;
line-height: 1.3;
}
.cp-postiz-state--queued { background: rgba(249, 115, 22, 0.12); color: #f97316; border: 1px solid rgba(249, 115, 22, 0.3); }
.cp-postiz-state--published { background: rgba(34, 197, 94, 0.12); color: #22c55e; border: 1px solid rgba(34, 197, 94, 0.3); }
.cp-postiz-state--failed { background: rgba(239, 68, 68, 0.12); color: #ef4444; border: 1px solid rgba(239, 68, 68, 0.3); }
.cp-btn--postiz[disabled] {
opacity: 0.5;
cursor: not-allowed;
}
.cp-icp-toolbar {
padding: 8px 12px;
border-top: 1px solid var(--rs-border, #2d2d44);

View File

@ -804,6 +804,17 @@ class FolkCampaignPlanner extends HTMLElement {
switch (node.type) {
case 'post': {
const d = node.data as PostNodeData;
const hasPostiz = !!d.postizPostId;
const postizBadge = hasPostiz
? `<div class="cp-postiz-state cp-postiz-state--${d.postizStatus || 'queued'}">
${d.postizStatus === 'published'
? `\u2713 Published${d.publishedAt ? ' ' + new Date(d.publishedAt).toLocaleString('en-US', { month: 'short', day: 'numeric', hour: 'numeric', minute: '2-digit' }) : ''}${d.postizReleaseURL ? ` — <a href="${esc(d.postizReleaseURL)}" target="_blank" rel="noopener" style="color:inherit;text-decoration:underline">open</a>` : ''}`
: d.postizStatus === 'failed'
? `\u26a0 Postiz error: ${esc(d.postizError || 'unknown')}`
: `\u23f3 Queued in Postiz (${esc(d.postizPostId || '')})`}
</div>`
: '';
const canSend = !hasPostiz && !!d.content?.trim();
body = `
<div class="cp-icp-body">
<label>Content</label>
@ -826,6 +837,10 @@ class FolkCampaignPlanner extends HTMLElement {
</select>
<label>Hashtags (comma-separated)</label>
<input data-field="hashtags" value="${esc(d.hashtags.join(', '))}"/>
${postizBadge}
<button class="cp-btn cp-btn--postiz" data-action="send-postiz" ${canSend ? '' : 'disabled'} style="margin-top:8px;width:100%">
${hasPostiz ? 'Sent to Postiz' : (d.scheduledAt ? '\u{1f4c5} Schedule to Postiz' : '\u{1f680} Send to Postiz now')}
</button>
</div>`;
break;
}
@ -1070,11 +1085,50 @@ class FolkCampaignPlanner extends HTMLElement {
this.aiFillNode(node.id);
} else if (action === 'generate-brief') {
this.generateFromBriefNode(node.id);
} else if (action === 'send-postiz') {
this.sendNodeToPostiz(node.id, el as HTMLButtonElement);
}
});
});
}
private async sendNodeToPostiz(nodeId: string, btn: HTMLButtonElement) {
if (!this.currentFlowId) return;
// Flush any pending save first so the server has the latest content/scheduledAt.
if (this.saveTimer) {
clearTimeout(this.saveTimer);
this.saveTimer = null;
this.executeSave();
// Give Automerge sync a beat to propagate before we read on the server.
await new Promise(r => setTimeout(r, 400));
}
const originalLabel = btn.textContent;
btn.disabled = true;
btn.textContent = 'Sending…';
try {
const res = await fetch(
`${this.basePath}api/campaign/flows/${encodeURIComponent(this.currentFlowId)}/nodes/${encodeURIComponent(nodeId)}/send-postiz`,
{ method: 'POST' },
);
const data = await res.json().catch(() => ({}));
if (!res.ok) throw new Error(data.error || `HTTP ${res.status}`);
// Reflect server-side update locally so the inspector re-renders without waiting for sync.
const node = this.nodes.find(n => n.id === nodeId);
if (node && node.type === 'post') {
const d = node.data as PostNodeData;
d.postizPostId = data.postizPostId;
d.postizStatus = 'queued';
d.postizSentAt = Date.now();
if (d.status === 'draft') d.status = 'scheduled';
}
btn.textContent = 'Sent!';
setTimeout(() => { this.enterInlineEdit(nodeId); this.drawCanvasContent(); }, 600);
} catch (err: any) {
btn.textContent = (err.message || 'Error').slice(0, 40);
setTimeout(() => { btn.textContent = originalLabel; btn.disabled = false; }, 3000);
}
}
private nodeIcon(type: CampaignNodeType): string {
switch (type) {
case 'post': return '<span style="font-size:14px">&#x1f4dd;</span>';
@ -1258,7 +1312,11 @@ class FolkCampaignPlanner extends HTMLElement {
const platform = d.platform || 'x';
const color = PLATFORM_COLORS[platform] || '#888';
const icon = PLATFORM_ICONS[platform] || platform.charAt(0);
const statusColor = d.status === 'published' ? '#22c55e' : d.status === 'scheduled' ? '#3b82f6' : '#f59e0b';
const statusColor = d.postizStatus === 'published' || d.status === 'published' ? '#22c55e'
: d.postizStatus === 'failed' ? '#ef4444'
: d.postizStatus === 'queued' ? '#a855f7'
: d.status === 'scheduled' ? '#3b82f6'
: '#f59e0b';
const time = item.date ? item.date.toLocaleTimeString('en-US', { hour: 'numeric', minute: '2-digit' }) : '';
return `<div class="cp-tl-card" data-nav-node="${item.node.id}">
<span class="cp-tl-card__icon" style="color:${color}">${icon}</span>
@ -1339,7 +1397,11 @@ class FolkCampaignPlanner extends HTMLElement {
const cards = items.map(node => {
const d = node.data as PostNodeData;
const statusColor = d.status === 'published' ? '#22c55e' : d.status === 'scheduled' ? '#3b82f6' : '#f59e0b';
const statusColor = d.postizStatus === 'published' || d.status === 'published' ? '#22c55e'
: d.postizStatus === 'failed' ? '#ef4444'
: d.postizStatus === 'queued' ? '#a855f7'
: d.status === 'scheduled' ? '#3b82f6'
: '#f59e0b';
const dateStr = d.scheduledAt ? new Date(d.scheduledAt).toLocaleDateString('en-US', { month: 'short', day: 'numeric' }) : 'Unscheduled';
const preview = (d.content || '').split('\n')[0].substring(0, 60);
const tags = d.hashtags.slice(0, 3).map(h =>
@ -1396,7 +1458,11 @@ class FolkCampaignPlanner extends HTMLElement {
const platform = d.platform || 'x';
const color = PLATFORM_COLORS[platform] || '#888';
const icon = PLATFORM_ICONS[platform] || platform.charAt(0);
const statusColor = d.status === 'published' ? '#22c55e' : d.status === 'scheduled' ? '#3b82f6' : '#f59e0b';
const statusColor = d.postizStatus === 'published' || d.status === 'published' ? '#22c55e'
: d.postizStatus === 'failed' ? '#ef4444'
: d.postizStatus === 'queued' ? '#a855f7'
: d.status === 'scheduled' ? '#3b82f6'
: '#f59e0b';
const statusLabel = d.status ? d.status.charAt(0).toUpperCase() + d.status.slice(1) : 'Draft';
const dateStr = d.scheduledAt ? new Date(d.scheduledAt).toLocaleDateString('en-US', { month: 'short', day: 'numeric', hour: 'numeric', minute: '2-digit' }) : '—';
const preview = (d.content || '').split('\n')[0].substring(0, 50);
@ -1627,7 +1693,11 @@ class FolkCampaignPlanner extends HTMLElement {
const platform = d.platform || 'x';
const color = PLATFORM_COLORS[platform] || '#888';
const icon = PLATFORM_ICONS[platform] || platform.charAt(0);
const statusColor = d.status === 'published' ? '#22c55e' : d.status === 'scheduled' ? '#3b82f6' : '#f59e0b';
const statusColor = d.postizStatus === 'published' || d.status === 'published' ? '#22c55e'
: d.postizStatus === 'failed' ? '#ef4444'
: d.postizStatus === 'queued' ? '#a855f7'
: d.status === 'scheduled' ? '#3b82f6'
: '#f59e0b';
const preview = (d.content || '').split('\n')[0].substring(0, 50);
const charCount = (d.content || '').length;
const charMax = platform === 'x' ? 280 : 2200;

View File

@ -72,6 +72,34 @@ export async function createPost(
return res.json();
}
/** GET /public/v1/posts — list posts in a date range (used for status reconciliation). */
export interface PostizListedPost {
id: string;
content?: string;
publishDate?: string;
releaseURL?: string;
state: 'QUEUE' | 'PUBLISHED' | 'ERROR' | 'DRAFT';
integration?: { id: string; providerIdentifier?: string; name?: string };
}
export async function listPosts(
config: PostizConfig,
startDate: Date,
endDate: Date,
): Promise<PostizListedPost[]> {
const qs = new URLSearchParams({
startDate: startDate.toISOString(),
endDate: endDate.toISOString(),
});
const res = await postizFetch(config, `/public/v1/posts?${qs}`);
if (!res.ok) throw new Error(`Postiz listPosts error: ${res.status}`);
const data = await res.json();
// Postiz wraps list as { posts: [...] }; be defensive against raw array.
if (Array.isArray(data)) return data as PostizListedPost[];
if (Array.isArray(data?.posts)) return data.posts as PostizListedPost[];
return [];
}
/** Create a thread — sends multiple grouped posts sharing a group ID. */
export async function createThread(
config: PostizConfig,

View File

@ -31,16 +31,21 @@ import {
} from "./lib/image-gen";
import { DEMO_FEED } from "./lib/types";
import { getListmonkConfig, listmonkFetch } from "./lib/listmonk-proxy";
import { getPostizConfig, getIntegrations, createPost, createThread } from "./lib/postiz-client";
import { getPostizConfig, getIntegrations, createPost, createThread, listPosts } from "./lib/postiz-client";
import { verifyToken, extractToken } from "../../server/auth";
import type { EncryptIDClaims } from "../../server/auth";
import { resolveCallerRole, roleAtLeast } from "../../server/spaces";
import type { SpaceRoleString } from "../../server/spaces";
let _syncServer: SyncServer | null = null;
let _postizSweepTimer: ReturnType<typeof setInterval> | null = null;
const routes = new Hono();
// Sweep the postiz lead window: anything with scheduledAt within this many ms
// and not yet pushed to Postiz will be sent. Past-due posts push as type:'now'.
const POSTIZ_SWEEP_LEAD_MS = 10 * 60 * 1000; // 10 minutes
// ── Automerge doc management ──
function ensureDoc(space: string): SocialsDoc {
@ -797,6 +802,246 @@ routes.post("/api/postiz/posts", async (c) => {
}
});
// Resolve an integration id from a platform name against a live Postiz integrations list.
function resolveIntegrationId(integrations: any[], platform: string): string | null {
if (!platform) return integrations[0]?.id ?? null;
const needle = platform.toLowerCase();
const match = integrations.find((i: any) =>
i.name?.toLowerCase().includes(needle) ||
i.providerIdentifier?.toLowerCase().includes(needle)
);
return match?.id ?? integrations[0]?.id ?? null;
}
// Push a single campaign-flow Post node to Postiz. Writes postizPostId + status
// back onto the node via Automerge so every client sees the update.
async function sendCampaignNodeToPostiz(
space: string,
flowId: string,
nodeId: string,
): Promise<{ ok: true; postizPostId: string } | { ok: false; error: string; code: number }> {
const config = await getPostizConfig(space);
if (!config) return { ok: false, error: "Postiz not configured for this space", code: 404 };
const docId = socialsDocId(space);
const doc = _syncServer!.getDoc<SocialsDoc>(docId);
const flow = doc?.campaignFlows?.[flowId];
if (!flow) return { ok: false, error: "Flow not found", code: 404 };
const node = flow.nodes.find(n => n.id === nodeId);
if (!node || node.type !== 'post') return { ok: false, error: "Post node not found", code: 404 };
const data = node.data as PostNodeData;
if (data.postizPostId) return { ok: false, error: "Already sent to Postiz", code: 409 };
const content = (data.content || '').trim();
if (!content) return { ok: false, error: "Post has no content", code: 400 };
let integrationIds: string[];
try {
const integrations = await getIntegrations(config);
if (!Array.isArray(integrations) || integrations.length === 0) {
return { ok: false, error: "No Postiz integrations configured", code: 400 };
}
const id = data.postizIntegrationId || resolveIntegrationId(integrations, data.platform);
if (!id) return { ok: false, error: `No integration matches platform "${data.platform}"`, code: 400 };
integrationIds = [id];
} catch (err: any) {
return { ok: false, error: `Postiz integrations fetch failed: ${err.message}`, code: 502 };
}
const scheduledAt = data.scheduledAt ? new Date(data.scheduledAt) : null;
const now = new Date();
const type: 'schedule' | 'now' = !scheduledAt || scheduledAt.getTime() <= now.getTime() ? 'now' : 'schedule';
const hashtagLine = data.hashtags?.length ? '\n\n' + data.hashtags.map(h => h.startsWith('#') ? h : `#${h}`).join(' ') : '';
const payload = {
content: content + hashtagLine,
integrationIds,
type,
scheduledAt: type === 'schedule' ? scheduledAt!.toISOString() : undefined,
};
try {
const result = await createPost(config, payload);
// Postiz returns either { id } or an array — be defensive.
const postizPostId = (result as any)?.id
|| (Array.isArray(result) && (result[0] as any)?.id)
|| (result as any)?.posts?.[0]?.id
|| '';
_syncServer!.changeDoc<SocialsDoc>(docId, `postiz send ${nodeId}`, (d) => {
const f = d.campaignFlows?.[flowId];
if (!f) return;
const n = f.nodes.find(x => x.id === nodeId);
if (!n || n.type !== 'post') return;
const nd = n.data as PostNodeData;
nd.postizPostId = postizPostId || `postiz-${Date.now()}`;
nd.postizIntegrationId = integrationIds[0];
nd.postizStatus = 'queued';
nd.postizSentAt = Date.now();
nd.postizError = '';
if (nd.status === 'draft') nd.status = 'scheduled';
f.updatedAt = Date.now();
});
return { ok: true, postizPostId };
} catch (err: any) {
_syncServer!.changeDoc<SocialsDoc>(docId, `postiz send failed ${nodeId}`, (d) => {
const f = d.campaignFlows?.[flowId];
const n = f?.nodes.find(x => x.id === nodeId);
if (!n || n.type !== 'post') return;
const nd = n.data as PostNodeData;
nd.postizStatus = 'failed';
nd.postizError = err.message || 'Unknown error';
});
return { ok: false, error: err.message || 'Postiz createPost failed', code: 502 };
}
}
// Periodic sweep: for every rsocials doc, find post nodes with scheduledAt
// approaching (within lead window) or already past, status === 'scheduled',
// and no postizPostId yet — push them to Postiz.
async function postizSweep() {
if (!_syncServer) return;
const allDocIds = _syncServer.listDocs?.() || [];
const now = Date.now();
for (const docId of allDocIds) {
if (!docId.endsWith(':socials:data')) continue;
const space = docId.split(':')[0];
if (!space) continue;
let config;
try { config = await getPostizConfig(space); } catch { continue; }
if (!config) continue; // Skip spaces without Postiz wired up
const doc = _syncServer.getDoc<SocialsDoc>(docId);
if (!doc?.campaignFlows) continue;
for (const flow of Object.values(doc.campaignFlows)) {
for (const node of flow.nodes) {
if (node.type !== 'post') continue;
const d = node.data as PostNodeData;
if (d.postizPostId) continue;
if (d.status !== 'scheduled') continue;
if (!d.scheduledAt) continue;
const ts = new Date(d.scheduledAt).getTime();
if (Number.isNaN(ts)) continue;
if (ts - now > POSTIZ_SWEEP_LEAD_MS) continue; // Too far out — wait
try {
const res = await sendCampaignNodeToPostiz(space, flow.id, node.id);
if (res.ok) {
console.log(`[rsocials:postiz-sweep] sent node=${node.id} space=${space} postiz=${res.postizPostId}`);
} else {
console.warn(`[rsocials:postiz-sweep] skip node=${node.id} space=${space} reason=${res.error}`);
}
} catch (err: any) {
console.error(`[rsocials:postiz-sweep] error node=${node.id}: ${err.message}`);
}
}
}
}
}
// Reconcile local queued posts against Postiz. For each space with Postiz
// wired up, query posts in the window covering the oldest queued node, then
// flip statuses to 'published' or 'failed' based on Postiz state.
async function postizReconcile() {
if (!_syncServer) return;
const allDocIds = _syncServer.listDocs?.() || [];
const now = Date.now();
for (const docId of allDocIds) {
if (!docId.endsWith(':socials:data')) continue;
const space = docId.split(':')[0];
if (!space) continue;
// Collect queued nodes first — skip the API call if nothing to reconcile.
const doc = _syncServer.getDoc<SocialsDoc>(docId);
if (!doc?.campaignFlows) continue;
const queuedRefs: { flowId: string; nodeId: string; postizPostId: string; sentAt: number }[] = [];
for (const flow of Object.values(doc.campaignFlows)) {
for (const node of flow.nodes) {
if (node.type !== 'post') continue;
const d = node.data as PostNodeData;
if (!d.postizPostId) continue;
if (d.postizStatus !== 'queued') continue;
queuedRefs.push({
flowId: flow.id,
nodeId: node.id,
postizPostId: d.postizPostId,
sentAt: d.postizSentAt || now,
});
}
}
if (queuedRefs.length === 0) continue;
let config;
try { config = await getPostizConfig(space); } catch { continue; }
if (!config) continue;
// Query window: oldest queued send minus 1h, through 30 days out.
const oldest = Math.min(...queuedRefs.map(q => q.sentAt));
const startDate = new Date(oldest - 60 * 60 * 1000);
const endDate = new Date(now + 30 * 24 * 60 * 60 * 1000);
let listed: Awaited<ReturnType<typeof listPosts>>;
try {
listed = await listPosts(config, startDate, endDate);
} catch (err: any) {
console.warn(`[rsocials:postiz-reconcile] list failed space=${space}: ${err.message}`);
continue;
}
const byId = new Map(listed.map(p => [p.id, p]));
_syncServer.changeDoc<SocialsDoc>(docId, `postiz reconcile ${space}`, (d) => {
for (const ref of queuedRefs) {
const remote = byId.get(ref.postizPostId);
const flow = d.campaignFlows?.[ref.flowId];
const node = flow?.nodes.find(n => n.id === ref.nodeId);
if (!node || node.type !== 'post') continue;
const nd = node.data as PostNodeData;
nd.postizCheckedAt = now;
if (!remote) continue; // Postiz doesn't know the ID — maybe out of window
if (remote.state === 'PUBLISHED') {
nd.postizStatus = 'published';
nd.status = 'published';
nd.publishedAt = remote.publishDate ? Date.parse(remote.publishDate) : now;
if (remote.releaseURL) nd.postizReleaseURL = remote.releaseURL;
} else if (remote.state === 'ERROR') {
nd.postizStatus = 'failed';
nd.postizError = 'Postiz reported publish error';
}
// QUEUE / DRAFT: leave as-is.
}
});
}
}
function startPostizScheduler() {
if (_postizSweepTimer) return;
_postizSweepTimer = setInterval(() => {
postizSweep().catch(() => {});
postizReconcile().catch(() => {});
}, 60_000);
setTimeout(() => {
postizSweep().catch(() => {});
postizReconcile().catch(() => {});
}, 30_000);
console.log('[rsocials] Postiz scheduler sweep started (60s interval, 10min lead, status reconcile)');
}
routes.post("/api/campaign/flows/:flowId/nodes/:nodeId/send-postiz", async (c) => {
const space = c.req.param("space") || "demo";
const dataSpace = c.get("effectiveSpace") || space;
const flowId = c.req.param("flowId");
const nodeId = c.req.param("nodeId");
const result = await sendCampaignNodeToPostiz(dataSpace, flowId, nodeId);
if (!result.ok) return c.json({ error: result.error }, result.code as any);
return c.json({ ok: true, postizPostId: result.postizPostId });
});
routes.post("/api/postiz/threads", async (c) => {
const space = c.req.param("space") || "demo";
const config = await getPostizConfig(space);
@ -2514,8 +2759,8 @@ routes.get("/campaign-flow", (c) => {
modules: getModuleInfoList(),
theme: "dark",
body: `<folk-campaign-planner space="${escapeHtml(space)}"${idAttr}></folk-campaign-planner>`,
styles: `<link rel="stylesheet" href="/modules/rsocials/campaign-planner.css">`,
scripts: `<script type="module" src="/modules/rsocials/folk-campaign-planner.js"></script>`,
styles: `<link rel="stylesheet" href="/modules/rsocials/campaign-planner.css?v=2">`,
scripts: `<script type="module" src="/modules/rsocials/folk-campaign-planner.js?v=2"></script>`,
}));
});
@ -2879,6 +3124,7 @@ export const socialsModule: RSpaceModule = {
_syncServer = ctx.syncServer;
// Run migration for any existing file-based threads
try { await migrateFileThreadsToAutomerge("demo"); } catch { /* ignore */ }
startPostizScheduler();
},
externalApp: { url: POSTIZ_URL, name: "Postiz" },
feeds: [

View File

@ -66,6 +66,14 @@ export interface PostNodeData {
scheduledAt: string;
status: 'draft' | 'scheduled' | 'published';
hashtags: string[];
postizPostId?: string;
postizIntegrationId?: string;
postizStatus?: 'queued' | 'published' | 'failed';
postizError?: string;
postizSentAt?: number;
postizCheckedAt?: number;
postizReleaseURL?: string;
publishedAt?: number;
}
export interface ThreadNodeData {