Process Monitoring and Critical Path Analysis
1. Introduction
In the fast-paced world of manufacturing, monitoring production processes is essential for ensuring efficiency, quality control, and timely delivery. This use case demonstrates how to use Neo4j for monitoring manufacturing workflows, visualizing task dependencies, and performing critical path analysis (CPA) to optimize operations and mitigate delays. By leveraging graph databases, manufacturers can gain a comprehensive view of production lines, predict task durations, and identify bottlenecks to improve throughput and resource utilization.
2. Scenario
To understand the value of manufacturing process monitoring and critical path analysis, consider real-world challenges in production environments where inefficiencies can lead to costly downtime and missed deadlines. The following three key areas highlight these issues:
-
Workflow Dependency Management:
-
Complex assembly lines involve interdependent tasks that, if not properly managed, can cause cascading delays.
-
Without clear visibility into dependencies, unexpected bottlenecks disrupt just-in-time production.
-
Overlooked interconnections between tasks lead to inefficient resource allocation across machines.
-
-
Resource and Queue Optimization:
-
Machines often handle queued tasks, but overloads or poor scheduling result in idle time or backlogs.
-
Traditional systems fail to dynamically assess workloads, complicating predictions of completion times.
-
Inadequate monitoring increases the risk of equipment failures or quality issues going unnoticed.
-
-
Risk Mitigation and Compliance:
-
Regulations require traceable production processes for quality and safety standards.
-
Manual tracking is error-prone, making it hard to demonstrate compliance or optimize for efficiency.
-
Manufacturers risk penalties and reputational damage without tools to proactively identify critical paths and delays. These scenarios underscore the need for an advanced solution like Neo4j’s manufacturing process monitoring with Cypher®, which uses graph technology to model, analyze, and visualize workflows, providing critical insights for business and technical users in production planning and optimization.
-
3. Solution
Advanced graph databases like Neo4j are vital for handling the intricacies of interconnected production data in manufacturing. They excel at managing dynamic relationships, making it straightforward to model task dependencies, queues, and machine workloads. By representing data as graphs, organizations can uncover critical paths, simulate scenarios, and derive actionable insights—enhancing decision-making, operational efficiency, and production resilience.
3.1. How Graph Databases Can Help?
Graph databases provide a powerful solution to the challenges of manufacturing process monitoring and critical path analysis. Here are five key reasons why a graph database is indispensable:
-
Dependency Modeling: Graphs naturally handle complex task interconnections and machine assignments, capturing relationships that relational databases can’t efficiently represent.
-
Real-Time Queue and Workload Analysis: They enable dynamic views of machine queues and pending work, allowing for instant identification of bottlenecks.
-
Comprehensive Process Visualization: Graphs offer a full overview of production workflows, exposing hidden inefficiencies and risks.
-
Critical Path Computation: With features like path aggregation, graphs support calculating ETAs and critical paths for proactive adjustments.
-
Scalable Optimization: Integration with Graph Data Science (GDS) allows for advanced analytics like longest path algorithms at scale. These capabilities make graph databases central to deriving insights and solving the multifaceted issues in manufacturing process monitoring.
4. Modelling
This section demonstrates Cypher queries on an example graph. The goal is to show query structures and guide data modeling in production. We’ll use a small graph with several nodes, based on the data model below:
4.1. Data Model
4.1.1 Required Data Fields
Below are the fields required to get started:
-
MachineNode:-
processor_id: Unique identifier (e.g., "M1") -
name: Name of the machine (e.g., "AssemblyMachine1") -
load: Current load level
-
-
Process Node:
-
process_id: Unique identifier (e.g., "Prod1") -
name: Name of the production process (e.g., "WidgetProduction_Q1")
-
-
Job Node (representing tasks):
-
job_id: Unique identifier (e.g., "T0") -
name: Name of the task (e.g., "Shared_MaterialPrep") -
status: Current status (e.g., "Completed", "Running", "Pending") -
duration: Expected or actual duration -
quality_score: Quality or risk score (adapted from risk_score) -
completion_progress: Progress percentage (0.0 to 1.0)
-
-
DEPENDS_ONRelationship: Task dependencies -
WAITSRelationship: Queue order on machines -
RUNS_ONRelationship: Task assignment to machines -
IS_INSTANCE_OFRelationship: Task association to processes -
QUEUE_HEADandQUEUE_TAILRelationships: Machine queue boundaries
For the refactored model used in scalable CPA:
* Additional Nodes:
- :Start (per job, with job_id)
- :End (per job, with job_id)
- :KickOff (single node representing the start of the entire process)
* Relationships:
- [:STARTS] (from :Job to :Start)
- [:ENDS] (from :Job to :End)
- [:TIME] (weighted edges with duration property):
- From :Start to :End for each job (duration = job execution time)
- From :End of a dependency job to :Start of the dependent job (duration = 3 seconds wait time)
- From :KickOff to :Start of initial jobs (duration = minimal job duration in the graph)
4.2. Demo Data
The following Cypher statement will create the example graph in the Neo4j database (adapted for manufacturing context):
// Machines
CREATE (m1:Machine {processor_id: 'M1', name: 'AssemblyMachine1', load: 3})
CREATE (m2:Machine {processor_id: 'M2', name: 'AssemblyMachine2', load: 2})
CREATE (m3:Machine {processor_id: 'M3', name: 'AssemblyMachine3', load: 1})
// Production Processes
CREATE (prod1:Process {process_id: 'Prod1', name: 'WidgetProduction_Q1'})
CREATE (prod2:Process {process_id: 'Prod2', name: 'GadgetProduction_Q1'})
CREATE (prod3:Process {process_id: 'Prod3', name: 'ComponentProduction_Q1'})
// Shared Task (part of WidgetProduction_Q1 and GadgetProduction_Q1)
CREATE (t0:Job {job_id: 'T0', name: 'Shared_MaterialPrep', status: 'Completed', duration: 5, quality_score: 0.2, completion_progress: 1.0})
CREATE (t0)-[:RUNS_ON]->(m1)
// Tasks for WidgetProduction_Q1 (Diamond-shaped DAG)
CREATE (t1:Job {job_id: 'T1', name: 'Widget_Assembly1', status: 'Completed', duration: 15, quality_score: 0.7, completion_progress: 1.0})
CREATE (t2:Job {job_id: 'T2', name: 'Widget_Assembly2', status: 'Completed', duration: 12, quality_score: 0.6, completion_progress: 1.0})
CREATE (t3:Job {job_id: 'T3', name: 'Widget_QualityCheck', status: 'Completed', duration: 6, quality_score: 0.4, completion_progress: 1.0})
CREATE (t4:Job {job_id: 'T4', name: 'Widget_Packaging', status: 'Running', duration: 3, quality_score: 0.2, completion_progress: 0.5})
CREATE (t1)-[:RUNS_ON]->(m1), (t2)-[:RUNS_ON]->(m1), (t3)-[:RUNS_ON]->(m2), (t4)-[:RUNS_ON]->(m3)
CREATE (t1)-[:DEPENDS_ON]->(t0), (t2)-[:DEPENDS_ON]->(t0), (t3)-[:DEPENDS_ON]->(t1), (t3)-[:DEPENDS_ON]->(t2), (t4)-[:DEPENDS_ON]->(t3)
CREATE (t4)-[:IS_INSTANCE_OF]->(prod1)
// Tasks for GadgetProduction_Q1 (Parallel Paths DAG)
CREATE (t5:Job {job_id: 'T5', name: 'Gadget_Assembly1', status: 'Completed', duration: 14, quality_score: 0.6, completion_progress: 1.0})
CREATE (t6:Job {job_id: 'T6', name: 'Gadget_Assembly2', status: 'Completed', duration: 11, quality_score: 0.5, completion_progress: 1.0})
CREATE (t7:Job {job_id: 'T7', name: 'Gadget_QualityCheck1', status: 'Completed', duration: 5, quality_score: 0.3, completion_progress: 1.0})
CREATE (t8:Job {job_id: 'T8', name: 'Gadget_QualityCheck2', status: 'Completed', duration: 4, quality_score: 0.2, completion_progress: 1.0})
CREATE (t9:Job {job_id: 'T9', name: 'Gadget_Packaging', status: 'Pending', duration: 2, quality_score: 0.1, completion_progress: 0.0})
CREATE (t5)-[:RUNS_ON]->(m1), (t6)-[:RUNS_ON]->(m1), (t7)-[:RUNS_ON]->(m2), (t8)-[:RUNS_ON]->(m2), (t9)-[:RUNS_ON]->(m3)
CREATE (t5)-[:DEPENDS_ON]->(t0), (t6)-[:DEPENDS_ON]->(t0), (t7)-[:DEPENDS_ON]->(t5), (t8)-[:DEPENDS_ON]->(t6), (t9)-[:DEPENDS_ON]->(t7), (t9)-[:DEPENDS_ON]->(t8)
CREATE (t9)-[:IS_INSTANCE_OF]->(prod2)
// Shared Task (part of GadgetProduction_Q1 and ComponentProduction_Q1)
CREATE (t10:Job {job_id: 'T10', name: 'Shared_ComponentAssembly', status: 'Running', duration: 10, quality_score: 0.5, completion_progress: 0.5})
CREATE (t10)-[:RUNS_ON]->(m2)
// Tasks for ComponentProduction_Q1 (Single Chain DAG)
CREATE (t11:Job {job_id: 'T11', name: 'Component_MaterialPrep', status: 'Completed', duration: 12, quality_score: 0.5, completion_progress: 1.0})
CREATE (t12:Job {job_id: 'T12', name: 'Component_QualityCheck', status: 'Pending', duration: 5, quality_score: 0.3, completion_progress: 0.0})
CREATE (t13:Job {job_id: 'T13', name: 'Component_Inspection', status: 'Pending', duration: 6, quality_score: 0.4, completion_progress: 0.0})
CREATE (t14:Job {job_id: 'T14', name: 'Component_Packaging', status: 'Pending', duration: 4, quality_score: 0.2, completion_progress: 0.0})
CREATE (t11)-[:RUNS_ON]->(m1), (t12)-[:RUNS_ON]->(m2), (t13)-[:RUNS_ON]->(m3), (t14)-[:RUNS_ON]->(m3)
CREATE (t12)-[:DEPENDS_ON]->(t10), (t10)-[:DEPENDS_ON]->(t11), (t13)-[:DEPENDS_ON]->(t12), (t14)-[:DEPENDS_ON]->(t13)
CREATE (t14)-[:IS_INSTANCE_OF]->(prod3)
// Queue for AssemblyMachine1 (t0 -> t1 -> t5 -> t2 -> t6 -> t11)
CREATE (m1)-[:QUEUE_HEAD]->(t0)
CREATE (m1)-[:QUEUE_TAIL]->(t11)
CREATE (t1)-[:WAITS]->(t0), (t5)-[:WAITS]->(t1), (t2)-[:WAITS]->(t5), (t6)-[:WAITS]->(t2), (t11)-[:WAITS]->(t6)
// Queue for AssemblyMachine2 (t3 -> t7 -> t8 -> t10 -> t12)
CREATE (m2)-[:QUEUE_HEAD]->(t3)
CREATE (m2)-[:QUEUE_TAIL]->(t12)
CREATE (t7)-[:WAITS]->(t3), (t8)-[:WAITS]->(t7), (t10)-[:WAITS]->(t8), (t12)-[:WAITS]->(t10)
// Queue for AssemblyMachine3 (t4 -> t9 -> t13 -> t14)
CREATE (m3)-[:QUEUE_HEAD]->(t4)
CREATE (m3)-[:QUEUE_TAIL]->(t14)
CREATE (t9)-[:WAITS]->(t4), (t13)-[:WAITS]->(t9), (t14)-[:WAITS]->(t13);
5. Cypher Queries
|
These Cypher queries are compatible with Neo4j Version 5.9+ and Cypher 5 or 25. |
5.1. Show the Graph Model
This query visualizes the overall schema:
CALL db.schema.visualization()
5.2. Show a Manufacturing Process
This query displays a specific production process and its dependencies:
MATCH (n:Process {process_id:"Prod1"})<-[i:IS_INSTANCE_OF]-(j:Job)
OPTIONAL MATCH path = (j)-[:DEPENDS_ON]->*()
RETURN path, n, i
5.3. Show a Machine Queue
This query shows the queue of tasks waiting on a specific machine:
MATCH path = (n:Machine {processor_id: "M3"} )-[:QUEUE_HEAD]->()
(()<-[:WAITS]-())*
()<-[:QUEUE_TAIL]-(n)
RETURN path
5.4. Show Work Still to Do for a Process
This query identifies pending tasks for a production process:
MATCH (n:Process {process_id:"Prod3"})<-[i:IS_INSTANCE_OF]-(j:Job)
OPTIONAL MATCH path = (j)-[:DEPENDS_ON|WAITS]->*(x WHERE x.status <> "Completed")
RETURN path, n, i
5.5. Critical Path Analysis of a Process
This query computes the critical path and estimated time for a production process:
MATCH (n:Process {process_id:"Prod3"})<-[i:IS_INSTANCE_OF]-(j:Job WHERE j.status <> "Completed")
OPTIONAL MATCH path = (j)(()-[:DEPENDS_ON|WAITS]->(jobs))*(x WHERE x.status <> "Completed")
// the *duration* property in this context means *expected_duration* because tasks are not completed yet
WITH n, i, path, reduce(duration=0, job IN [j]+jobs |
duration + job.duration * (1.0-job.completion_progress)) AS total_duration
ORDER BY total_duration DESC LIMIT 1
RETURN n, i, path, total_duration
5.6. Scalable Critical Path Analysis with GDS
For larger graphs, refactor the model to treat time as relationships and use Neo4j’s Graph Data Science (GDS) library for longest path computation. This approach scales to thousands of jobs, identifying critical sequences to prevent disruptions.
This refactoring is inspired by the Neo4j blog post Unlocking DAGs in Neo4j: From Basics to Critical Path Analysis.
5.6.1. Create Indexes for Merge
This query creates indexes for efficient merging:
CREATE INDEX start_job_id IF NOT EXISTS FOR (s:Start) ON (s.job_id);
CREATE INDEX end_job_id IF NOT EXISTS FOR (e:End) ON (e.job_id);
5.6.2. Time as Relationships
This query creates Start and End nodes with TIME relationships for job durations:
MATCH (j:Job)
CALL (j) {
MERGE (s:Start {job_id: j.job_id})
MERGE (e:End {job_id: j.job_id})
MERGE (j)-[:STARTS]->(s)
MERGE (j)-[:ENDS]->(e)
MERGE (s)-[:TIME {duration: j.duration}]->(e)
} IN CONCURRENT TRANSACTIONS OF 1000 ROWS;
5.6.3. Dependency 3sec Wait Time
This query adds 3-second-duration TIME relationships for dependencies and waits:
MATCH (j1)-[:DEPENDS|WAITS]->(j0)
CALL (j0, j1) {
MERGE (s:Start {job_id: j1.job_id})
MERGE (e:End {job_id: j0.job_id})
MERGE (e)-[:TIME {duration: 3}]->(s)
} IN CONCURRENT TRANSACTIONS OF 1000 ROWS;
5.6.5. KickOff to Initial Jobs
This query connects KickOff to initial jobs:
MATCH (j:Job)
WITH j.duration AS duration
ORDER BY duration ASC LIMIT 1
MATCH (ko:KickOff)
WITH ko, duration
MATCH (j:Job)-[:STARTS]->(s)
WHERE NOT EXISTS {(j)-[:DEPENDS|WAITS]->()}
CALL (ko, s, duration) {
MERGE (ko)-[:TIME {duration: duration}]->(s)
} IN TRANSACTIONS OF 1000 ROWS;
5.6.6. Project In-Memory Graph
This query projects the graph for GDS:
MATCH (source:Start|KickOff|End)
OPTIONAL MATCH (source)-[r:TIME]->(target)
RETURN gds.graph.project("g", source, target, {relationshipProperties: r {.duration}})
5.6.7. Stream Critical Paths
This query streams the longest paths:
CALL gds.dag.longestPath.stream("g", {relationshipWeightProperty: "duration"})
YIELD targetNode as target, totalCost, path, costs
WITH target AS last_activity, totalCost, path, costs
ORDER BY totalCost DESC
WITH last_activity, collect ({totalCost:totalCost, path:path, costs:costs})[0] AS longest
RETURN last_activity, longest.totalCost AS critical_time, longest.path AS path, longest.costs AS costs
5.6.8. Stream Critical Times for specific jobs
This query streams critical times for specific jobs (requires $job_id_list parameter):
:params {
job_id_list: ["T11", "T12", "T14"]
}
CALL gds.dag.longestPath.stream("g", {relationshipWeightProperty: "duration"})
YIELD targetNode as target, totalCost, path, costs
WITH gds.util.asNode(target).job_id AS last_activity, totalCost, path, costs
ORDER BY totalCost DESC
WITH last_activity, collect ({totalCost:totalCost, path:path, costs:costs})[0] AS longest
WHERE last_activity IN $job_id_list
WITH last_activity, longest.totalCost AS critical_time, longest.path AS path, longest.costs AS costs
ORDER BY size(last_activity)
RETURN last_activity, critical_time
Which returns for those parameters:
[
{
"last_activity": "T11",
"critical_time": 86.0
},
{
"last_activity": "T12",
"critical_time": 44.0
},
{
"last_activity": "T14",
"critical_time": 26.0
}
]