Process Monitoring and Critical Path Analysis

1. Introduction

In the fast-paced world of manufacturing, monitoring production processes is essential for ensuring efficiency, quality control, and timely delivery. This use case demonstrates how to use Neo4j for monitoring manufacturing workflows, visualizing task dependencies, and performing critical path analysis (CPA) to optimize operations and mitigate delays. By leveraging graph databases, manufacturers can gain a comprehensive view of production lines, predict task durations, and identify bottlenecks to improve throughput and resource utilization.

2. Scenario

To understand the value of manufacturing process monitoring and critical path analysis, consider real-world challenges in production environments where inefficiencies can lead to costly downtime and missed deadlines. The following three key areas highlight these issues:

  1. Workflow Dependency Management:

    • Complex assembly lines involve interdependent tasks that, if not properly managed, can cause cascading delays.

    • Without clear visibility into dependencies, unexpected bottlenecks disrupt just-in-time production.

    • Overlooked interconnections between tasks lead to inefficient resource allocation across machines.

  2. Resource and Queue Optimization:

    • Machines often handle queued tasks, but overloads or poor scheduling result in idle time or backlogs.

    • Traditional systems fail to dynamically assess workloads, complicating predictions of completion times.

    • Inadequate monitoring increases the risk of equipment failures or quality issues going unnoticed.

  3. Risk Mitigation and Compliance:

    • Regulations require traceable production processes for quality and safety standards.

    • Manual tracking is error-prone, making it hard to demonstrate compliance or optimize for efficiency.

    • Manufacturers risk penalties and reputational damage without tools to proactively identify critical paths and delays. These scenarios underscore the need for an advanced solution like Neo4j’s manufacturing process monitoring with Cypher®, which uses graph technology to model, analyze, and visualize workflows, providing critical insights for business and technical users in production planning and optimization.

3. Solution

Advanced graph databases like Neo4j are vital for handling the intricacies of interconnected production data in manufacturing. They excel at managing dynamic relationships, making it straightforward to model task dependencies, queues, and machine workloads. By representing data as graphs, organizations can uncover critical paths, simulate scenarios, and derive actionable insights—enhancing decision-making, operational efficiency, and production resilience.

3.1. How Graph Databases Can Help?

Graph databases provide a powerful solution to the challenges of manufacturing process monitoring and critical path analysis. Here are five key reasons why a graph database is indispensable:

  1. Dependency Modeling: Graphs naturally handle complex task interconnections and machine assignments, capturing relationships that relational databases can’t efficiently represent.

  2. Real-Time Queue and Workload Analysis: They enable dynamic views of machine queues and pending work, allowing for instant identification of bottlenecks.

  3. Comprehensive Process Visualization: Graphs offer a full overview of production workflows, exposing hidden inefficiencies and risks.

  4. Critical Path Computation: With features like path aggregation, graphs support calculating ETAs and critical paths for proactive adjustments.

  5. Scalable Optimization: Integration with Graph Data Science (GDS) allows for advanced analytics like longest path algorithms at scale. These capabilities make graph databases central to deriving insights and solving the multifaceted issues in manufacturing process monitoring.

4. Modelling

This section demonstrates Cypher queries on an example graph. The goal is to show query structures and guide data modeling in production. We’ll use a small graph with several nodes, based on the data model below:

4.1. Data Model

process monitoring model
Figure 1. Data Model for Process Monitoring

4.1.1 Required Data Fields

Below are the fields required to get started:

  • Machine Node:

    • processor_id: Unique identifier (e.g., "M1")

    • name: Name of the machine (e.g., "AssemblyMachine1")

    • load: Current load level

  • Process Node:

    • process_id: Unique identifier (e.g., "Prod1")

    • name: Name of the production process (e.g., "WidgetProduction_Q1")

  • Job Node (representing tasks):

    • job_id: Unique identifier (e.g., "T0")

    • name: Name of the task (e.g., "Shared_MaterialPrep")

    • status: Current status (e.g., "Completed", "Running", "Pending")

    • duration: Expected or actual duration

    • quality_score: Quality or risk score (adapted from risk_score)

    • completion_progress: Progress percentage (0.0 to 1.0)

  • DEPENDS_ON Relationship: Task dependencies

  • WAITS Relationship: Queue order on machines

  • RUNS_ON Relationship: Task assignment to machines

  • IS_INSTANCE_OF Relationship: Task association to processes

  • QUEUE_HEAD and QUEUE_TAIL Relationships: Machine queue boundaries

For the refactored model used in scalable CPA: * Additional Nodes: - :Start (per job, with job_id) - :End (per job, with job_id) - :KickOff (single node representing the start of the entire process) * Relationships: - [:STARTS] (from :Job to :Start) - [:ENDS] (from :Job to :End) - [:TIME] (weighted edges with duration property): - From :Start to :End for each job (duration = job execution time) - From :End of a dependency job to :Start of the dependent job (duration = 3 seconds wait time) - From :KickOff to :Start of initial jobs (duration = minimal job duration in the graph)

4.1.2 Required Parameters

This use case does not require specific parameters for the queries shown, as they operate on node properties like process_id or processor_id.

4.2. Demo Data

The following Cypher statement will create the example graph in the Neo4j database (adapted for manufacturing context):

// Machines
CREATE (m1:Machine {processor_id: 'M1', name: 'AssemblyMachine1', load: 3})
CREATE (m2:Machine {processor_id: 'M2', name: 'AssemblyMachine2', load: 2})
CREATE (m3:Machine {processor_id: 'M3', name: 'AssemblyMachine3', load: 1})
// Production Processes
CREATE (prod1:Process {process_id: 'Prod1', name: 'WidgetProduction_Q1'})
CREATE (prod2:Process {process_id: 'Prod2', name: 'GadgetProduction_Q1'})
CREATE (prod3:Process {process_id: 'Prod3', name: 'ComponentProduction_Q1'})
// Shared Task (part of WidgetProduction_Q1 and GadgetProduction_Q1)
CREATE (t0:Job {job_id: 'T0', name: 'Shared_MaterialPrep', status: 'Completed', duration: 5, quality_score: 0.2, completion_progress: 1.0})
CREATE (t0)-[:RUNS_ON]->(m1)
// Tasks for WidgetProduction_Q1 (Diamond-shaped DAG)
CREATE (t1:Job {job_id: 'T1', name: 'Widget_Assembly1', status: 'Completed', duration: 15, quality_score: 0.7, completion_progress: 1.0})
CREATE (t2:Job {job_id: 'T2', name: 'Widget_Assembly2', status: 'Completed', duration: 12, quality_score: 0.6, completion_progress: 1.0})
CREATE (t3:Job {job_id: 'T3', name: 'Widget_QualityCheck', status: 'Completed', duration: 6, quality_score: 0.4, completion_progress: 1.0})
CREATE (t4:Job {job_id: 'T4', name: 'Widget_Packaging', status: 'Running', duration: 3, quality_score: 0.2, completion_progress: 0.5})
CREATE (t1)-[:RUNS_ON]->(m1), (t2)-[:RUNS_ON]->(m1), (t3)-[:RUNS_ON]->(m2), (t4)-[:RUNS_ON]->(m3)
CREATE (t1)-[:DEPENDS_ON]->(t0), (t2)-[:DEPENDS_ON]->(t0), (t3)-[:DEPENDS_ON]->(t1), (t3)-[:DEPENDS_ON]->(t2), (t4)-[:DEPENDS_ON]->(t3)
CREATE (t4)-[:IS_INSTANCE_OF]->(prod1)
// Tasks for GadgetProduction_Q1 (Parallel Paths DAG)
CREATE (t5:Job {job_id: 'T5', name: 'Gadget_Assembly1', status: 'Completed', duration: 14, quality_score: 0.6, completion_progress: 1.0})
CREATE (t6:Job {job_id: 'T6', name: 'Gadget_Assembly2', status: 'Completed', duration: 11, quality_score: 0.5, completion_progress: 1.0})
CREATE (t7:Job {job_id: 'T7', name: 'Gadget_QualityCheck1', status: 'Completed', duration: 5, quality_score: 0.3, completion_progress: 1.0})
CREATE (t8:Job {job_id: 'T8', name: 'Gadget_QualityCheck2', status: 'Completed', duration: 4, quality_score: 0.2, completion_progress: 1.0})
CREATE (t9:Job {job_id: 'T9', name: 'Gadget_Packaging', status: 'Pending', duration: 2, quality_score: 0.1, completion_progress: 0.0})
CREATE (t5)-[:RUNS_ON]->(m1), (t6)-[:RUNS_ON]->(m1), (t7)-[:RUNS_ON]->(m2), (t8)-[:RUNS_ON]->(m2), (t9)-[:RUNS_ON]->(m3)
CREATE (t5)-[:DEPENDS_ON]->(t0), (t6)-[:DEPENDS_ON]->(t0), (t7)-[:DEPENDS_ON]->(t5), (t8)-[:DEPENDS_ON]->(t6), (t9)-[:DEPENDS_ON]->(t7), (t9)-[:DEPENDS_ON]->(t8)
CREATE (t9)-[:IS_INSTANCE_OF]->(prod2)
// Shared Task (part of GadgetProduction_Q1 and ComponentProduction_Q1)
CREATE (t10:Job {job_id: 'T10', name: 'Shared_ComponentAssembly', status: 'Running', duration: 10, quality_score: 0.5, completion_progress: 0.5})
CREATE (t10)-[:RUNS_ON]->(m2)
// Tasks for ComponentProduction_Q1 (Single Chain DAG)
CREATE (t11:Job {job_id: 'T11', name: 'Component_MaterialPrep', status: 'Completed', duration: 12, quality_score: 0.5, completion_progress: 1.0})
CREATE (t12:Job {job_id: 'T12', name: 'Component_QualityCheck', status: 'Pending', duration: 5, quality_score: 0.3, completion_progress: 0.0})
CREATE (t13:Job {job_id: 'T13', name: 'Component_Inspection', status: 'Pending', duration: 6, quality_score: 0.4, completion_progress: 0.0})
CREATE (t14:Job {job_id: 'T14', name: 'Component_Packaging', status: 'Pending', duration: 4, quality_score: 0.2, completion_progress: 0.0})
CREATE (t11)-[:RUNS_ON]->(m1), (t12)-[:RUNS_ON]->(m2), (t13)-[:RUNS_ON]->(m3), (t14)-[:RUNS_ON]->(m3)
CREATE (t12)-[:DEPENDS_ON]->(t10), (t10)-[:DEPENDS_ON]->(t11), (t13)-[:DEPENDS_ON]->(t12), (t14)-[:DEPENDS_ON]->(t13)
CREATE (t14)-[:IS_INSTANCE_OF]->(prod3)
// Queue for AssemblyMachine1 (t0 -> t1 -> t5 -> t2 -> t6 -> t11)
CREATE (m1)-[:QUEUE_HEAD]->(t0)
CREATE (m1)-[:QUEUE_TAIL]->(t11)
CREATE (t1)-[:WAITS]->(t0), (t5)-[:WAITS]->(t1), (t2)-[:WAITS]->(t5), (t6)-[:WAITS]->(t2), (t11)-[:WAITS]->(t6)
// Queue for AssemblyMachine2 (t3 -> t7 -> t8 -> t10 -> t12)
CREATE (m2)-[:QUEUE_HEAD]->(t3)
CREATE (m2)-[:QUEUE_TAIL]->(t12)
CREATE (t7)-[:WAITS]->(t3), (t8)-[:WAITS]->(t7), (t10)-[:WAITS]->(t8), (t12)-[:WAITS]->(t10)
// Queue for AssemblyMachine3 (t4 -> t9 -> t13 -> t14)
CREATE (m3)-[:QUEUE_HEAD]->(t4)
CREATE (m3)-[:QUEUE_TAIL]->(t14)
CREATE (t9)-[:WAITS]->(t4), (t13)-[:WAITS]->(t9), (t14)-[:WAITS]->(t13);

5. Cypher Queries

These Cypher queries are compatible with Neo4j Version 5.9+ and Cypher 5 or 25.

5.1. Show the Graph Model

This query visualizes the overall schema:

CALL db.schema.visualization()
process monitoring model
Figure 2. Data Model

5.2. Show a Manufacturing Process

This query displays a specific production process and its dependencies:

MATCH (n:Process {process_id:"Prod1"})<-[i:IS_INSTANCE_OF]-(j:Job)
OPTIONAL MATCH path = (j)-[:DEPENDS_ON]->*()
RETURN path, n, i

5.3. Show a Machine Queue

This query shows the queue of tasks waiting on a specific machine:

MATCH path = (n:Machine {processor_id: "M3"} )-[:QUEUE_HEAD]->()
(()<-[:WAITS]-())*
()<-[:QUEUE_TAIL]-(n)
RETURN path

5.4. Show Work Still to Do for a Process

This query identifies pending tasks for a production process:

MATCH (n:Process {process_id:"Prod3"})<-[i:IS_INSTANCE_OF]-(j:Job)
OPTIONAL MATCH path = (j)-[:DEPENDS_ON|WAITS]->*(x WHERE x.status <> "Completed")
RETURN path, n, i

5.5. Critical Path Analysis of a Process

This query computes the critical path and estimated time for a production process:

MATCH (n:Process {process_id:"Prod3"})<-[i:IS_INSTANCE_OF]-(j:Job WHERE j.status <> "Completed")
OPTIONAL MATCH path = (j)(()-[:DEPENDS_ON|WAITS]->(jobs))*(x WHERE x.status <> "Completed")
// the *duration* property in this context means *expected_duration* because tasks are not completed yet
WITH n, i, path, reduce(duration=0, job IN [j]+jobs |
    duration + job.duration * (1.0-job.completion_progress)) AS total_duration
ORDER BY total_duration DESC LIMIT 1
RETURN n, i, path, total_duration
process monitoring cpa result
Figure 3. CPA result

5.6. Scalable Critical Path Analysis with GDS

For larger graphs, refactor the model to treat time as relationships and use Neo4j’s Graph Data Science (GDS) library for longest path computation. This approach scales to thousands of jobs, identifying critical sequences to prevent disruptions.

process monitoring refactored model
Figure 4. Refactored Data Model for Scalable CPA

This refactoring is inspired by the Neo4j blog post Unlocking DAGs in Neo4j: From Basics to Critical Path Analysis.

5.6.1. Create Indexes for Merge

This query creates indexes for efficient merging:

CREATE INDEX start_job_id IF NOT EXISTS FOR (s:Start) ON (s.job_id);
CREATE INDEX end_job_id IF NOT EXISTS FOR (e:End) ON (e.job_id);

5.6.2. Time as Relationships

This query creates Start and End nodes with TIME relationships for job durations:

MATCH (j:Job)
CALL (j) {
    MERGE (s:Start {job_id: j.job_id})
    MERGE (e:End {job_id: j.job_id})
    MERGE (j)-[:STARTS]->(s)
    MERGE (j)-[:ENDS]->(e)
    MERGE (s)-[:TIME {duration: j.duration}]->(e)
} IN CONCURRENT TRANSACTIONS OF 1000 ROWS;

5.6.3. Dependency 3sec Wait Time

This query adds 3-second-duration TIME relationships for dependencies and waits:

MATCH (j1)-[:DEPENDS|WAITS]->(j0)
    CALL (j0, j1) {
    MERGE (s:Start {job_id: j1.job_id})
    MERGE (e:End {job_id: j0.job_id})
    MERGE (e)-[:TIME {duration: 3}]->(s)
} IN CONCURRENT TRANSACTIONS OF 1000 ROWS;

5.6.4. Kickoff Node

This query creates the KickOff node:

MERGE (:KickOff);

5.6.5. KickOff to Initial Jobs

This query connects KickOff to initial jobs:

MATCH (j:Job)
WITH j.duration AS duration
ORDER BY duration ASC LIMIT 1
MATCH (ko:KickOff)
WITH ko, duration
MATCH (j:Job)-[:STARTS]->(s)
WHERE NOT EXISTS {(j)-[:DEPENDS|WAITS]->()}
CALL (ko, s, duration) {
    MERGE (ko)-[:TIME {duration: duration}]->(s)
} IN TRANSACTIONS OF 1000 ROWS;

5.6.6. Project In-Memory Graph

This query projects the graph for GDS:

MATCH (source:Start|KickOff|End)
OPTIONAL MATCH (source)-[r:TIME]->(target)
RETURN gds.graph.project("g", source, target, {relationshipProperties: r {.duration}})

5.6.7. Stream Critical Paths

This query streams the longest paths:

CALL gds.dag.longestPath.stream("g", {relationshipWeightProperty: "duration"})
YIELD targetNode as target, totalCost, path, costs
WITH target AS last_activity, totalCost, path, costs
ORDER BY totalCost DESC
WITH last_activity, collect ({totalCost:totalCost, path:path, costs:costs})[0] AS longest
RETURN last_activity, longest.totalCost AS critical_time, longest.path AS path, longest.costs AS costs
process monitoring gds cpa viz
Figure 5. Critical Paths to Each Job Time Bound

5.6.8. Stream Critical Times for specific jobs

This query streams critical times for specific jobs (requires $job_id_list parameter):

:params {
  job_id_list: ["T11", "T12", "T14"]
}
CALL gds.dag.longestPath.stream("g", {relationshipWeightProperty: "duration"})
YIELD targetNode as target, totalCost, path, costs
WITH gds.util.asNode(target).job_id AS last_activity, totalCost, path, costs
ORDER BY totalCost DESC
WITH last_activity, collect ({totalCost:totalCost, path:path, costs:costs})[0] AS longest
WHERE last_activity IN $job_id_list
WITH last_activity, longest.totalCost AS critical_time, longest.path AS path, longest.costs AS costs
ORDER BY size(last_activity)
RETURN last_activity, critical_time

Which returns for those parameters:

[
    {
        "last_activity": "T11",
        "critical_time": 86.0
    },
    {
        "last_activity": "T12",
        "critical_time": 44.0
    },
    {
        "last_activity": "T14",
        "critical_time": 26.0
    }
]