Skip to content

Import and Export

Save and load workflows with JSON serialization.

Overview

PyNodeWidget workflows can be exported to JSON for:

  • Persistence: Save workflows between sessions
  • Sharing: Share workflows with others
  • Version control: Track workflow changes in git
  • Templates: Create reusable workflow templates
  • Backup: Archive workflow configurations

The exported JSON includes: - Nodes (configuration, position, type) - Edges (connections between nodes) - Node values (current field values) - Viewport (zoom and pan state) - Node templates (registered node types)

Basic Export/Import

Export to File

Save current workflow:

from pynodewidget import NodeFlowWidget

flow = NodeFlowWidget()

# Build workflow...
# ...

# Export to JSON file
flow.export_json("my_workflow.json")
# ✓ Flow exported to my_workflow.json

Import from File

Load saved workflow:

flow = NodeFlowWidget()

# Load workflow
flow.load_json("my_workflow.json")
# ✓ Flow loaded from my_workflow.json

Custom Filenames

# Export with custom name
flow.export_json("data_pipeline_v2.json")

# Load from custom location
flow.load_json("/path/to/saved_workflow.json")

JSON Structure

Complete Export

Exported JSON contains:

{
  "nodes": [
    {
      "id": "node-1",
      "type": "data_loader",
      "position": {"x": 100, "y": 100},
      "data": {
        "label": "Data Loader",
        "values": {
          "file_path": "data.csv",
          "format": "csv"
        }
      }
    }
  ],
  "edges": [
    {
      "id": "edge-1",
      "source": "node-1",
      "target": "node-2",
      "sourceHandle": "data",
      "targetHandle": "input"
    }
  ],
  "viewport": {
    "x": 0,
    "y": 0,
    "zoom": 1
  },
  "node_templates": [
    {
      "type": "data_loader",
      "label": "Data Loader",
      "defaultData": {...}
    }
  ]
}

Node Structure

Each node contains:

{
  "id": "unique-node-id",
  "type": "node-type-name",
  "position": {"x": 100, "y": 200},
  "data": {
    "label": "Node Label",
    "parameters": {...},  // JSON Schema
    "inputs": [...],
    "outputs": [...],
    "values": {...}  // Current field values
  }
}

Edge Structure

Each edge contains:

{
  "id": "unique-edge-id",
  "source": "source-node-id",
  "target": "target-node-id",
  "sourceHandle": "output-handle-id",
  "targetHandle": "input-handle-id"
}

Programmatic Access

Get Flow Data

Access flow data as dictionary:

# Get complete flow data
data = flow.get_flow_data()

# Access components
nodes = data["nodes"]
edges = data["edges"]
viewport = data["viewport"]

Export to Dictionary

# Get as dict instead of file
flow_dict = {
    "nodes": flow.nodes,
    "edges": flow.edges,
    "viewport": flow.viewport,
    "node_templates": flow.node_templates
}

# Use for custom serialization
import json
json_string = json.dumps(flow_dict, indent=2)

Import from Dictionary

import json

# Load JSON string
json_string = '{"nodes": [...], "edges": [...]}'
data = json.loads(json_string)

# Apply to widget
flow.nodes = data["nodes"]
flow.edges = data["edges"]
flow.viewport = data.get("viewport", {"x": 0, "y": 0, "zoom": 1})

Node Values

Include Values in Export

Node values are automatically included:

# Set node values
flow.set_node_values("processor-1", {
    "threshold": 0.8,
    "mode": "advanced",
    "workers": 8
})

# Export (includes values)
flow.export_json("workflow_with_values.json")

Access Node Values from JSON

# Load workflow
flow.load_json("workflow_with_values.json")

# Access values
values = flow.get_node_values("processor-1")
print(values)  # {"threshold": 0.8, "mode": "advanced", "workers": 8}

Clear Values Before Export

# Clear all node values
flow.node_values = {}

# Export without values
flow.export_json("workflow_template.json")

Workflow Templates

Create Template

Export workflow as reusable template:

from pynodewidget import NodeFlowWidget

# Create template workflow
template = NodeFlowWidget(nodes=[DataLoader, Processor, OutputNode])

# Add template nodes (no connections)
template.nodes = [
    {
        "id": "loader",
        "type": "data_loader",
        "position": {"x": 50, "y": 100},
        "data": {...}
    },
    {
        "id": "processor",
        "type": "processor",
        "position": {"x": 300, "y": 100},
        "data": {...}
    },
    {
        "id": "output",
        "type": "output",
        "position": {"x": 550, "y": 100},
        "data": {...}
    }
]

# Export as template
template.export_json("data_pipeline_template.json")

Use Template

# Load template
flow = NodeFlowWidget()
flow.load_json("data_pipeline_template.json")

# Customize and use
flow.set_node_values("loader", {"file_path": "my_data.csv"})

Sharing Workflows

Export for Sharing

# Export with descriptive name
flow.export_json("ml_training_pipeline.json")

# Share the JSON file
# Users can load it with:
# flow.load_json("ml_training_pipeline.json")

Version Control

# Export with version in filename
version = "v1.2"
flow.export_json(f"workflow_{version}.json")

# Commit to git
# git add workflow_v1.2.json
# git commit -m "Update workflow to v1.2"

Documentation

Include README with workflow:

# Export workflow
flow.export_json("workflow.json")

# Create README
readme = """
# Data Processing Workflow

## Description
This workflow processes CSV data and generates visualizations.

## Nodes
- Data Loader: Load CSV from file
- Processor: Clean and transform data
- Visualizer: Generate charts

## Usage
1. Load workflow: `flow.load_json("workflow.json")`
2. Set input file: `flow.set_node_values("loader", {"file_path": "data.csv"})`
3. Run pipeline

## Requirements
- pandas
- matplotlib
"""

with open("workflow_README.md", "w") as f:
    f.write(readme)

Real-World Examples

Save Progress

Auto-save during long workflow:

import time

flow = NodeFlowWidget()

# Build complex workflow
for i in range(10):
    # Add nodes, configure...

    # Auto-save every step
    flow.export_json(f"workflow_backup_{i}.json")

    time.sleep(1)

# Final save
flow.export_json("workflow_final.json")

Workflow Variants

Create multiple configurations:

# Base workflow
flow = NodeFlowWidget()
# ... configure ...
flow.export_json("base_workflow.json")

# High-performance variant
flow.set_node_values("processor", {"workers": 16, "batch_size": 1000})
flow.export_json("workflow_high_perf.json")

# Low-memory variant
flow.set_node_values("processor", {"workers": 2, "batch_size": 100})
flow.export_json("workflow_low_mem.json")

Migration

Update saved workflows:

import json

# Load old workflow
with open("old_workflow.json", "r") as f:
    data = json.load(f)

# Update node types
for node in data["nodes"]:
    if node["type"] == "old_processor":
        node["type"] = "new_processor"
        # Migrate values
        if "data" in node and "values" in node["data"]:
            values = node["data"]["values"]
            # Update field names
            if "threads" in values:
                values["workers"] = values.pop("threads")

# Save migrated workflow
with open("migrated_workflow.json", "w") as f:
    json.dump(data, f, indent=2)

print("✓ Workflow migrated")

Batch Processing

Process multiple workflows:

import os

workflows = [
    "workflow1.json",
    "workflow2.json",
    "workflow3.json"
]

results = []

for workflow_file in workflows:
    flow = NodeFlowWidget()
    flow.load_json(workflow_file)

    # Process
    result = process_workflow(flow)
    results.append(result)

    # Export results
    flow.export_json(f"processed_{workflow_file}")

print(f"✓ Processed {len(workflows)} workflows")

Advanced Usage

Selective Export

Export only specific components:

import json

# Export only nodes
with open("nodes_only.json", "w") as f:
    json.dump({"nodes": flow.nodes}, f, indent=2)

# Export only connections
with open("edges_only.json", "w") as f:
    json.dump({"edges": flow.edges}, f, indent=2)

# Export configuration only
with open("config_only.json", "w") as f:
    json.dump({"node_templates": flow.node_templates}, f, indent=2)

Merge Workflows

Combine multiple workflows:

import json

# Load workflow A
with open("workflow_a.json", "r") as f:
    workflow_a = json.load(f)

# Load workflow B
with open("workflow_b.json", "r") as f:
    workflow_b = json.load(f)

# Merge nodes and edges
merged = {
    "nodes": workflow_a["nodes"] + workflow_b["nodes"],
    "edges": workflow_a["edges"] + workflow_b["edges"],
    "viewport": {"x": 0, "y": 0, "zoom": 1},
    "node_templates": workflow_a.get("node_templates", [])
}

# Avoid ID conflicts
# (Production code should rename IDs)

# Save merged
with open("workflow_merged.json", "w") as f:
    json.dump(merged, f, indent=2)

Extract Subgraph

Export portion of workflow:

import json

# Load full workflow
flow = NodeFlowWidget()
flow.load_json("full_workflow.json")

# Select nodes to extract
selected_ids = ["node-1", "node-2", "node-3"]

# Extract nodes
selected_nodes = [n for n in flow.nodes if n["id"] in selected_ids]

# Extract relevant edges
selected_edges = [
    e for e in flow.edges
    if e["source"] in selected_ids and e["target"] in selected_ids
]

# Create subgraph
subgraph = {
    "nodes": selected_nodes,
    "edges": selected_edges,
    "viewport": {"x": 0, "y": 0, "zoom": 1}
}

# Save
with open("subgraph.json", "w") as f:
    json.dump(subgraph, f, indent=2)

Best Practices

  • Version filenames: Include version or date in filename
  • Validate before export: Check workflow is valid
  • Include metadata: Add creation date, author, version info
  • Error handling: Use try/except for safe export/import
  • Backup before load: Export current state before loading new workflow

Troubleshooting

File not found: Check file path and current directory.

Invalid JSON: Validate JSON syntax before loading.

Missing node templates: Register all node types before loading workflow.

Large files: Check file size and confirm before loading very large workflows.

Next Steps