rspace-online/modules/rcred/graph-collector.ts

480 lines
17 KiB
TypeScript

/**
* rCred Graph Collector — builds a contribution graph from all module activity.
*
* Reads Automerge docs via syncServer.getDoc()/listDocs(). Pure function:
* no side effects, no writes, no mutations.
*
* Collects from 8 rApp modules: rTasks, rDocs, rChats, rCal, rVote, rFlows, rTime, rWallet.
*/
import type { SyncServer } from '../../server/local-first/sync-server';
import type { CredNode, CredEdge, CredConfigDoc, ContributionTypeWeight } from './schemas';
// ── Module doc types (imported for type safety) ──
import type { BoardDoc, TaskItem } from '../rtasks/schemas';
import { boardDocId } from '../rtasks/schemas';
import type { NotebookDoc, NoteItem } from '../rdocs/schemas';
import type { ChatChannelDoc, ChatMessage } from '../rchats/schemas';
import type { CalendarDoc, CalendarEvent } from '../rcal/schemas';
import type { ProposalDoc } from '../rvote/schemas';
import type { FlowsDoc } from '../rflows/schemas';
import type { CommitmentsDoc, Commitment } from '../rtime/schemas';
import { commitmentsDocId } from '../rtime/schemas';
import type { IntentsDoc, SolverResultsDoc } from '../rtime/schemas-intent';
import type { WalletDoc } from '../rwallet/schemas';
import { walletDocId } from '../rwallet/schemas';
interface CollectResult {
nodes: CredNode[];
edges: CredEdge[];
}
/** Maximum chat messages processed per channel to bound graph size. */
const MAX_MESSAGES_PER_CHANNEL = 500;
/** Cutoff timestamp based on config.lookbackDays. */
function cutoff(config: CredConfigDoc): number {
return Date.now() - config.lookbackDays * 86_400_000;
}
/** Resolve weight for a contribution type from config. */
function w(config: CredConfigDoc, type: string): number {
return config.weights[type]?.weight ?? 1.0;
}
/** Make a contributor node ID from a DID. */
function contributorId(did: string): string {
return `contributor:${did}`;
}
/** Make a contribution node ID. */
function contribId(module: string, type: string, id: string): string {
return `contribution:${module}:${type}:${id}`;
}
/** Make a contributor node. */
function makeContributor(did: string, label: string): CredNode {
return {
id: contributorId(did),
type: 'contributor',
did,
label,
sourceModule: '',
contributionType: '',
timestamp: 0,
weight: 0,
};
}
/** Make a contribution node. */
function makeContribution(
module: string, type: string, id: string, label: string,
timestamp: number, config: CredConfigDoc,
): CredNode {
return {
id: contribId(module, type, id),
type: 'contribution',
label,
sourceModule: module,
contributionType: type,
timestamp,
weight: w(config, type),
};
}
/** Make an edge. */
function makeEdge(
from: string, to: string, edgeType: CredEdge['type'], weight: number,
): CredEdge {
const id = `edge:${from}${to}`;
return { id, from, to, type: edgeType, weight };
}
// ── Collectors ──
function collectTasks(space: string, syncServer: SyncServer, config: CredConfigDoc, cut: number): CollectResult {
const nodes: CredNode[] = [];
const edges: CredEdge[] = [];
const docIds = syncServer.listDocs().filter(id => id.startsWith(`${space}:tasks:boards:`));
for (const docId of docIds) {
const doc = syncServer.getDoc<BoardDoc>(docId);
if (!doc?.tasks) continue;
for (const task of Object.values(doc.tasks)) {
if (task.createdAt < cut) continue;
const creator = task.createdBy;
if (!creator) continue;
// Task created
const cNode = makeContribution('rtasks', 'task-created', task.id, task.title, task.createdAt, config);
nodes.push(cNode);
nodes.push(makeContributor(creator, ''));
edges.push(makeEdge(contributorId(creator), cNode.id, 'authored', w(config, 'task-created')));
// Task completed (different from creation — higher weight)
if (task.status === 'DONE' && task.assigneeId) {
const doneNode = makeContribution('rtasks', 'task-completed', task.id, `Completed: ${task.title}`, task.updatedAt, config);
nodes.push(doneNode);
nodes.push(makeContributor(task.assigneeId, ''));
edges.push(makeEdge(contributorId(task.assigneeId), doneNode.id, 'completed', w(config, 'task-completed')));
}
}
}
return { nodes, edges };
}
function collectDocs(space: string, syncServer: SyncServer, config: CredConfigDoc, cut: number): CollectResult {
const nodes: CredNode[] = [];
const edges: CredEdge[] = [];
const docIds = syncServer.listDocs().filter(id => id.startsWith(`${space}:notes:notebooks:`));
for (const docId of docIds) {
const doc = syncServer.getDoc<NotebookDoc>(docId);
if (!doc?.items) continue;
for (const item of Object.values(doc.items)) {
if (item.createdAt < cut) continue;
const author = item.authorId;
if (!author) continue;
// Doc authored
const cNode = makeContribution('rdocs', 'doc-authored', item.id, item.title || 'Untitled', item.createdAt, config);
nodes.push(cNode);
nodes.push(makeContributor(author, ''));
edges.push(makeEdge(contributorId(author), cNode.id, 'authored', w(config, 'doc-authored')));
// Comments on this doc
if (item.comments) {
for (const thread of Object.values(item.comments)) {
if (!thread.messages) continue;
for (const msg of Object.values(thread.messages)) {
const commentAuthor = (msg as any).authorId;
if (!commentAuthor || (msg as any).createdAt < cut) continue;
const commentNode = makeContribution('rdocs', 'comment-authored', (msg as any).id || `${item.id}-comment`, 'Comment', (msg as any).createdAt, config);
nodes.push(commentNode);
nodes.push(makeContributor(commentAuthor, ''));
edges.push(makeEdge(contributorId(commentAuthor), commentNode.id, 'authored', w(config, 'comment-authored')));
// Comment → doc edge
edges.push(makeEdge(commentNode.id, cNode.id, 'commented-on', 0.5));
}
}
}
}
}
return { nodes, edges };
}
function collectChats(space: string, syncServer: SyncServer, config: CredConfigDoc, cut: number): CollectResult {
const nodes: CredNode[] = [];
const edges: CredEdge[] = [];
const docIds = syncServer.listDocs().filter(id => id.startsWith(`${space}:chats:channel:`));
for (const docId of docIds) {
const doc = syncServer.getDoc<ChatChannelDoc>(docId);
if (!doc?.messages) continue;
const messages = Object.values(doc.messages);
// Sort by time descending, cap at MAX_MESSAGES_PER_CHANNEL
const sorted = messages
.filter(m => m.createdAt >= cut)
.sort((a, b) => b.createdAt - a.createdAt)
.slice(0, MAX_MESSAGES_PER_CHANNEL);
for (const msg of sorted) {
if (!msg.authorId) continue;
// Message sent
const cNode = makeContribution('rchats', 'message-sent', msg.id, msg.content?.slice(0, 60) || 'Message', msg.createdAt, config);
nodes.push(cNode);
nodes.push(makeContributor(msg.authorId, msg.authorName || ''));
edges.push(makeEdge(contributorId(msg.authorId), cNode.id, 'authored', w(config, 'message-sent')));
// Reactions on this message
if (msg.reactions) {
for (const [_emoji, reactors] of Object.entries(msg.reactions)) {
if (!Array.isArray(reactors)) continue;
for (const reactorDid of reactors) {
const rNode = makeContribution('rchats', 'reaction-given', `${msg.id}-react-${reactorDid}`, 'Reaction', msg.createdAt, config);
nodes.push(rNode);
nodes.push(makeContributor(reactorDid, ''));
edges.push(makeEdge(contributorId(reactorDid), rNode.id, 'authored', w(config, 'reaction-given')));
edges.push(makeEdge(rNode.id, cNode.id, 'reacted-to', 0.2));
}
}
}
}
}
return { nodes, edges };
}
function collectCal(space: string, syncServer: SyncServer, config: CredConfigDoc, cut: number): CollectResult {
const nodes: CredNode[] = [];
const edges: CredEdge[] = [];
const docId = `${space}:cal:events`;
const doc = syncServer.getDoc<CalendarDoc>(docId);
if (!doc?.events) return { nodes, edges };
for (const event of Object.values(doc.events)) {
if (event.createdAt < cut) continue;
// Event scheduled — use sourceId as creator hint or skip
// CalendarEvent has no explicit 'createdBy' field, but sourceType/sourceId gives provenance
// For manually created events, metadata may contain the DID
const scheduledBy = (event as any).scheduledBy || (event.metadata as any)?.createdBy;
if (scheduledBy) {
const cNode = makeContribution('rcal', 'event-scheduled', event.id, event.title, event.createdAt, config);
nodes.push(cNode);
nodes.push(makeContributor(scheduledBy, ''));
edges.push(makeEdge(contributorId(scheduledBy), cNode.id, 'authored', w(config, 'event-scheduled')));
}
// Event attended
if (event.attendees) {
const eventNode = makeContribution('rcal', 'event-scheduled', event.id, event.title, event.createdAt, config);
// Only add if not already added above
if (!scheduledBy) nodes.push(eventNode);
for (const attendee of event.attendees) {
if (attendee.status !== 'yes') continue;
// Attendees have email not DID — use hashed email as contributor
const attendeeDid = attendee.email
? `email:${simpleHash(attendee.email)}`
: `anon:${simpleHash(attendee.name)}`;
const attendLabel = attendee.name || attendee.email || 'Attendee';
const aNode = makeContribution('rcal', 'event-attended', `${event.id}-${attendeeDid}`, `Attended: ${event.title}`, attendee.respondedAt || event.startTime, config);
nodes.push(aNode);
nodes.push(makeContributor(attendeeDid, attendLabel));
edges.push(makeEdge(contributorId(attendeeDid), aNode.id, 'attended', w(config, 'event-attended')));
edges.push(makeEdge(aNode.id, eventNode.id, 'attended', 0.5));
}
}
}
return { nodes, edges };
}
function collectVotes(space: string, syncServer: SyncServer, config: CredConfigDoc, cut: number): CollectResult {
const nodes: CredNode[] = [];
const edges: CredEdge[] = [];
const docIds = syncServer.listDocs().filter(id => id.startsWith(`${space}:vote:proposals:`));
for (const docId of docIds) {
const doc = syncServer.getDoc<ProposalDoc>(docId);
if (!doc?.proposal) continue;
const prop = doc.proposal;
if (prop.createdAt < cut) continue;
// Proposal authored
if (prop.authorId) {
const cNode = makeContribution('rvote', 'proposal-authored', prop.id, prop.title, prop.createdAt, config);
nodes.push(cNode);
nodes.push(makeContributor(prop.authorId, ''));
edges.push(makeEdge(contributorId(prop.authorId), cNode.id, 'authored', w(config, 'proposal-authored')));
// Votes on this proposal
if (doc.votes) {
for (const vote of Object.values(doc.votes)) {
if (!vote.userId || vote.createdAt < cut) continue;
const vNode = makeContribution('rvote', 'vote-cast', vote.id, `Vote on: ${prop.title}`, vote.createdAt, config);
nodes.push(vNode);
nodes.push(makeContributor(vote.userId, ''));
edges.push(makeEdge(contributorId(vote.userId), vNode.id, 'authored', w(config, 'vote-cast')));
edges.push(makeEdge(vNode.id, cNode.id, 'voted-on', 0.5));
}
}
// Final votes
if (doc.finalVotes) {
for (const fv of Object.values(doc.finalVotes)) {
if (!(fv as any).userId || (fv as any).createdAt < cut) continue;
const fvId = `fv-${prop.id}-${(fv as any).userId}`;
const fvNode = makeContribution('rvote', 'vote-cast', fvId, `Final vote: ${prop.title}`, (fv as any).createdAt, config);
nodes.push(fvNode);
nodes.push(makeContributor((fv as any).userId, ''));
edges.push(makeEdge(contributorId((fv as any).userId), fvNode.id, 'authored', w(config, 'vote-cast')));
edges.push(makeEdge(fvNode.id, cNode.id, 'voted-on', 0.5));
}
}
}
}
return { nodes, edges };
}
function collectFlows(space: string, syncServer: SyncServer, config: CredConfigDoc, cut: number): CollectResult {
const nodes: CredNode[] = [];
const edges: CredEdge[] = [];
const docId = `${space}:flows:data`;
const doc = syncServer.getDoc<FlowsDoc>(docId);
if (!doc) return { nodes, edges };
// Canvas flows
if (doc.canvasFlows) {
for (const flow of Object.values(doc.canvasFlows)) {
if (flow.createdAt < cut || !flow.createdBy) continue;
const cNode = makeContribution('rflows', 'flow-created', flow.id, flow.name, flow.createdAt, config);
nodes.push(cNode);
nodes.push(makeContributor(flow.createdBy, ''));
edges.push(makeEdge(contributorId(flow.createdBy), cNode.id, 'authored', w(config, 'flow-created')));
}
}
// Budget allocations
if (doc.budgetAllocations) {
for (const [allocId, alloc] of Object.entries(doc.budgetAllocations)) {
if (!alloc.participantDid || alloc.updatedAt < cut) continue;
const cNode = makeContribution('rflows', 'budget-allocated', allocId, 'Budget allocation', alloc.updatedAt, config);
nodes.push(cNode);
nodes.push(makeContributor(alloc.participantDid, ''));
edges.push(makeEdge(contributorId(alloc.participantDid), cNode.id, 'authored', w(config, 'budget-allocated')));
}
}
return { nodes, edges };
}
function collectTime(space: string, syncServer: SyncServer, config: CredConfigDoc, cut: number): CollectResult {
const nodes: CredNode[] = [];
const edges: CredEdge[] = [];
// Commitments
const cDoc = syncServer.getDoc<CommitmentsDoc>(commitmentsDocId(space));
if (cDoc?.items) {
for (const commitment of Object.values(cDoc.items)) {
if (commitment.createdAt < cut) continue;
const did = commitment.ownerDid;
if (!did) continue;
const cNode = makeContribution('rtime', 'commitment-created', commitment.id, `${commitment.hours}h ${commitment.skill}`, commitment.createdAt, config);
nodes.push(cNode);
nodes.push(makeContributor(did, commitment.memberName));
edges.push(makeEdge(contributorId(did), cNode.id, 'authored', w(config, 'commitment-created')));
// Settled commitments get extra credit
if (commitment.status === 'settled') {
const sNode = makeContribution('rtime', 'settlement-completed', `settle-${commitment.id}`, `Settled: ${commitment.hours}h ${commitment.skill}`, commitment.createdAt, config);
nodes.push(sNode);
edges.push(makeEdge(contributorId(did), sNode.id, 'completed', w(config, 'settlement-completed')));
}
}
}
// Intents
const iDocId = `${space}:rtime:intents`;
const iDoc = syncServer.getDoc<IntentsDoc>(iDocId);
if (iDoc && (iDoc as any).intents) {
for (const intent of Object.values((iDoc as any).intents as Record<string, any>)) {
if (intent.createdAt < cut || !intent.memberId) continue;
const cNode = makeContribution('rtime', 'commitment-created', intent.id, `Intent: ${intent.skill} ${intent.hours}h`, intent.createdAt, config);
nodes.push(cNode);
nodes.push(makeContributor(intent.memberId, intent.memberName || ''));
edges.push(makeEdge(contributorId(intent.memberId), cNode.id, 'authored', w(config, 'commitment-created')));
}
}
return { nodes, edges };
}
function collectWallet(space: string, syncServer: SyncServer, config: CredConfigDoc, cut: number): CollectResult {
const nodes: CredNode[] = [];
const edges: CredEdge[] = [];
const doc = syncServer.getDoc<WalletDoc>(walletDocId(space));
if (!doc) return { nodes, edges };
// Watched addresses
if (doc.watchedAddresses) {
for (const [addr, watched] of Object.entries(doc.watchedAddresses)) {
if (!watched.addedBy || watched.addedAt < cut) continue;
const cNode = makeContribution('rwallet', 'address-added', addr, `${watched.label || addr.slice(0, 10)}`, watched.addedAt, config);
nodes.push(cNode);
nodes.push(makeContributor(watched.addedBy, ''));
edges.push(makeEdge(contributorId(watched.addedBy), cNode.id, 'authored', w(config, 'address-added')));
}
}
// TX annotations
if (doc.annotations) {
for (const [txHash, annotation] of Object.entries(doc.annotations)) {
if (!annotation.authorDid || annotation.createdAt < cut) continue;
const cNode = makeContribution('rwallet', 'tx-annotated', txHash, annotation.note?.slice(0, 60) || 'Annotation', annotation.createdAt, config);
nodes.push(cNode);
nodes.push(makeContributor(annotation.authorDid, ''));
edges.push(makeEdge(contributorId(annotation.authorDid), cNode.id, 'authored', w(config, 'tx-annotated')));
}
}
return { nodes, edges };
}
// ── Main collector ──
/**
* Build the full contribution graph for a space.
* Pure function — reads only, no writes.
*/
export function collectContribGraph(
space: string,
syncServer: SyncServer,
config: CredConfigDoc,
): { nodes: CredNode[]; edges: CredEdge[] } {
const cut = cutoff(config);
const allNodes: CredNode[] = [];
const allEdges: CredEdge[] = [];
const collectors = [
collectTasks, collectDocs, collectChats, collectCal,
collectVotes, collectFlows, collectTime, collectWallet,
];
for (const collect of collectors) {
const { nodes, edges } = collect(space, syncServer, config, cut);
allNodes.push(...nodes);
allEdges.push(...edges);
}
// Deduplicate contributor nodes (same DID from multiple modules)
const nodeMap = new Map<string, CredNode>();
for (const node of allNodes) {
const existing = nodeMap.get(node.id);
if (existing) {
// Merge: keep the label if we got a better one
if (node.label && !existing.label) existing.label = node.label;
// Accumulate weight for contribution nodes
if (node.type === 'contribution') existing.weight = Math.max(existing.weight, node.weight);
} else {
nodeMap.set(node.id, node);
}
}
// Deduplicate edges (same source→target)
const edgeMap = new Map<string, CredEdge>();
for (const edge of allEdges) {
if (!edgeMap.has(edge.id)) {
edgeMap.set(edge.id, edge);
}
}
return {
nodes: Array.from(nodeMap.values()),
edges: Array.from(edgeMap.values()),
};
}
// ── Util ──
function simpleHash(str: string): string {
let hash = 0;
for (let i = 0; i < str.length; i++) {
hash = ((hash << 5) - hash + str.charCodeAt(i)) | 0;
}
return Math.abs(hash).toString(36);
}