I'm using mongo db with payloadcms and I have tree like structure.
We have a collection called nodes. A node have children as array and a parent id.
Now I want to change the node status like marking them as relevant, non relevant etc. If I mark a top level node as irrelevant then I want all of it's children to be irrelevant as well and vice versa.
How can I do this efficiently?
On the frontend side, I keep an map of node id and their status so that user can mark the tree freely and finally hit the save button. I send that map to backend to process the changes. Since this will hit a large portion of the documents - around 4,00,000 - how can I process this efficiently?
On top of it I'm using the update-many api to batch update the nodes based on the status.
All nodes with same status updates together so in total I have 4 update-many queries.
I'm already using a payload jobs for this but is there any other thing that I can do here?
This is the code for the background job that runs when the user hits the "apply changes" button:
import { PaginatedDocs, PayloadRequest, TaskConfig } from 'payload';
import { Node } from '../payload-types';
import { NodeStatusFailureEmail } from '../../emails/NodeStatusFailureEmail';
import { env } from '../env';
type MarkNodeInput = {
input: {
changes: Array<{
nodeId: string;
status: string;
}>;
};
output: {
success: boolean;
updatedCount: number;
message: string;
errors?: string[];
};
};
const changeNodeStatusTask: TaskConfig<MarkNodeInput> = {
slug: 'changeNodeStatusTask',
handler: async ({ input, req }) => {
const startTime = new Date().toISOString();
try {
console.log('\n\nStarting changeNodeStatusTask with input:');
const changes = input.changes;
const nodeIDs: string[] = changes.map((change) => change.nodeId);
const validChanges: Array<{ nodeId: string; status: string }> = [];
const allNodes: PaginatedDocs<Node> = await req.payload.find({
collection: 'nodes',
where: {
id: {
in: nodeIDs,
},
},
pagination: false,
});
for (const node of allNodes.docs) {
try {
if (node.linkedWithID) {
console.warn(`Skipping linked/wrapper node: ${node.id}`);
continue;
}
const change = changes.find((c) => c.nodeId === node.id);
if (change) {
validChanges.push(change);
}
} catch (error: any) {
console.error(`Failed to validate node ${node.id}:`, error);
continue;
}
}
if (validChanges.length === 0) {
throw new Error('No valid nodes to update (all were linked/wrapper nodes or invalid)');
}
const statusMap: Record<string, string> = {
decision: 'decision_node',
relevant: 'relevant',
irrelevant: 'irrelevant',
archived: 'archived',
};
const nodesToUpdate = new Map<string, string>();
const cascadeNodes = new Set<string>();
const errors: string[] = [];
for (const change of validChanges) {
const dbStatus = statusMap[change.status] || change.status;
nodesToUpdate.set(change.nodeId, dbStatus);
if (change.status === 'relevant' || change.status === 'irrelevant') {
cascadeNodes.add(change.nodeId);
}
}
for (const nodeId of cascadeNodes) {
try {
const status = nodesToUpdate.get(nodeId)!;
const descendants = await getAllDescendants(req, nodeId);
// Add all descendants to update list with the same status
for (const descendantId of descendants) {
nodesToUpdate.set(descendantId, status);
}
} catch (error: any) {
console.error(`Failed to get descendants for node ${nodeId}:`, error);
errors.push(`Node ${nodeId} descendants: ${error?.message || 'Unknown error'}`);
}
}
// Group nodes by status for batch updates
const nodesByStatus = new Map<string, string[]>();
for (const [nodeId, status] of nodesToUpdate.entries()) {
if (!nodesByStatus.has(status)) {
nodesByStatus.set(status, []);
}
nodesByStatus.get(status)!.push(nodeId);
}
let updatedCount = 0;
// Batch update nodes by status using updateMany (via where clause)
const updatePromises = Array.from(nodesByStatus.entries()).map(async ([status, nodeIds]) => {
try {
console.log(`Updating ${nodeIds.length} nodes to status: ${status}`);
const result = await req.payload.update({
collection: 'nodes',
where: {
id: {
in: nodeIds,
},
},
data: {
status: status as 'relevant' | 'irrelevant' | 'archived' | 'decision_node',
},
});
updatedCount += result.docs.length;
// Track any errors returned by the batch update
if (result.errors && result.errors.length > 0) {
for (const error of result.errors) {
console.error(`Failed to update node ${error.id}:`, error);
errors.push(`Node ${error.id}: ${error.message || 'Unknown error'}`);
}
}
} catch (error: any) {
console.error(`Failed to batch update nodes with status ${status}:`, error);
errors.push(`Batch update for status ${status}: ${error?.message || 'Unknown error'}`);
}
});
await Promise.all(updatePromises);
if (errors.length > 0) {
try {
console.log('Sending node status failure email to admin...');
const adminEmail = env.SMTP_ADMIN_EMAIL || '[email protected]';
const fromEmail = '[email protected]';
const fromName = 'OncoproAI System';
await NodeStatusFailureEmail({
updatedCount,
totalNodes: nodesToUpdate.size,
errors,
timestamp: startTime,
}).send(req.payload.sendEmail, {
to: adminEmail,
from: {
name: fromName,
address: fromEmail,
},
});
console.log('Node status failure email sent successfully');
} catch (emailError) {
console.error('Failed to send node status failure email:', emailError);
}
}
// Return output property to match TaskHandlerResult<MarkNodeInput> type
return {
output: {
success: errors.length === 0,
updatedCount,
message: `Updated ${updatedCount} nodes.`,
...(errors.length > 0 && { errors }),
},
};
} catch (error: any) {
console.error('Critical error in changeNodeStatusTask:', error);
// Send failure email for critical errors
try {
const adminEmail = env.SMTP_ADMIN_EMAIL || '[email protected]';
const fromEmail = '[email protected]';
const fromName = 'OncoproAI';
await NodeStatusFailureEmail({
updatedCount: 0,
totalNodes: input.changes.length,
errors: [error?.message || 'Unknown critical error occurred'],
timestamp: startTime,
}).send(req.payload.sendEmail, {
to: adminEmail,
from: {
name: fromName,
address: fromEmail,
},
});
console.log('Critical error email sent successfully');
} catch (emailError) {
console.error('Failed to send critical error email:', emailError);
}
// Re-throw the error or return failure output
return {
output: {
success: false,
updatedCount: 0,
message: 'Task failed with critical error',
errors: [error?.message || 'Unknown error'],
},
};
}
},
};
export default changeNodeStatusTask;
/**
* Recursively get all descendant node IDs using BFS
* Similar to StatusTracker.getAllDescendantIds but works with database queries
*/
async function getAllDescendants(req: PayloadRequest, nodeId: string): Promise<string[]> {
const descendants: string[] = [];
const queue: string[] = [nodeId];
const visited = new Set<string>([nodeId]);
while (queue.length > 0) {
const currentId = queue.shift()!;
const children = await req.payload.find({
collection: 'nodes',
where: {
parentID: {
equals: currentId,
},
linkedWithID: { exists: false },
},
limit: 1000,
});
for (const child of children.docs) {
if (!visited.has(child.id)) {
visited.add(child.id);
descendants.push(child.id);
queue.push(child.id);
}
}
}
return descendants;
}