Appearance
Schema Migration Flow
End-to-end reference for how schema changes flow through the system — from user action to database migration to frontend notification. This document ties together the individual pieces documented in Column Operations, View Operations, Processor, and Migration Coordination.
Complete Flow Overview
┌──────────────────────────────────────────────────────────────────────────────┐
│ USER ACTION │
│ (change column type, toggle nullable, rename table, create/delete view) │
└───────────────────────────────────┬──────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ QUARKUS REST API (Lambda) │
│ │
│ ColumnResource / ViewResource │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Schema Change Detection │ │
│ │ ┌─────────────────┐ ┌──────────────────────────┐ │ │
│ │ │ hasSchemaChanges │───►│ SchemaChangeDetector │ │ │
│ │ │ gate check │ │ .detectChanges() │ │ │
│ │ └─────────────────┘ └──────────┬───────────────┘ │ │
│ │ │ │ │
│ │ ┌──────────┴──────────┐ │ │
│ │ │ │ │ │
│ │ No changes Has changes │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ 200 OK (sync) Impact Analysis │ │
│ │ │ │ │
│ └───────────────────────────────────────────┼──────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────┼──────────────┐ │
│ │ Pre-Migration Checks │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────┐ ┌────────────────────────────┐ │ │
│ │ │ Dry-Run Mode? │ │ MigrationImpactAnalyzer │ │ │
│ │ │ (?dryRun=true) │ │ - Vendor-aware lock class. │ │ │
│ │ │ → Return impact │ │ - Row count estimation │ │ │
│ │ │ without acting │ │ - FK cascade analysis │ │ │
│ │ └──────────────────┘ │ - Duration estimation │ │ │
│ │ │ - Historical rate refine. │ │ │
│ │ └────────────┬───────────────┘ │ │
│ │ │ │ │
│ │ ┌──────────────────────────────────┼───────────────┐ │ │
│ │ │ Concurrent Migration Check │ │ │ │
│ │ │ ActiveMigrationService │ │ │ │
│ │ │ .hasActiveBlockingMigration() │ │ │ │
│ │ │ → 409 Conflict if table busy │ │ │ │
│ │ └──────────────────────────────────┼───────────────┘ │ │
│ └───────────────────────────────────────────┼──────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────┼──────────────┐ │
│ │ Migration Queuing │ │ │
│ │ ▼ │ │
│ │ 1. Apply sync changes immediately (displayName, etc.) │ │
│ │ 2. ActiveMigrationService.startMigration() │ │
│ │ → Persist to DB + add to in-memory cache │ │
│ │ 3. MigrationDurationTracker.recordMigrationStart() │ │
│ │ 4. MetadataUpdateProducer.onColumnSchemaChange() │ │
│ │ → Publish MetadataUpdateMessage to RabbitMQ │ │
│ │ 5. Broadcast SSE: view.column.schema.changing │ │
│ │ (with impact data for frontend locking) │ │
│ │ 6. Return 202 Accepted │ │
│ └──────────────────────────────────────────────────────────┘ │
└───────────────────────────────────┬──────────────────────────────────────────┘
│
│ MetadataUpdateMessage
│ via RabbitMQ
▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ SPRING BOOT PROCESSOR (Fargate) │
│ │
│ EntityMetadataConsumer (@RabbitListener) │
│ │ │
│ ├── Column ops → processColumnMetadata() │
│ └── Entity ops → processEntityMetadataInternal() │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ DDL Generation & Execution │ │
│ │ │ │
│ │ HibernateSchemaTools.generateMigrationDDL() │ │
│ │ → Dialect-aware SQL (PostgreSQL/MySQL) │ │
│ │ → Generates UP + DOWN scripts │ │
│ │ │ │
│ │ FlywayMigrationService.applyMigration() │ │
│ │ → Track: PENDING → APPLYING → APPLIED/FAILED │ │
│ │ → Execute DDL on workspace database │ │
│ │ → Store in workspace_schema_migration table │ │
│ └────────────────────────────┬─────────────────────────┘ │
│ │ │
│ ┌────────────────────────────┼─────────────────────────┐ │
│ │ Progress Reporting │ (if est. > 1s) │ │
│ │ ▼ │ │
│ │ MigrationProgressPoller (every 2s) │ │
│ │ │ │ │
│ │ ├── PostgreSQL 12+: pg_stat_progress_alter_table│ │
│ │ │ → Real row counts, phase descriptions │ │
│ │ │ │ │
│ │ └── MySQL / older PG: time-based estimation │ │
│ │ → % = elapsed / estimated × 100 │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ MigrationProgressProducer │ │
│ │ → MigrationProgressMessage to RabbitMQ │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ TaskCompletionProducer │
│ → TaskCompletionMessage (SUCCESS/FAILED) to RabbitMQ │
│ → Never throws — failure communicated via message │
└───────────────────────────────┬──────────────────────────────────────────────┘
│
│ TaskCompletionMessage + MigrationProgressMessage
│ via RabbitMQ
▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ QUARKUS CONSUMERS (Fargate) │
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ MigrationProgressConsumer (progress events) │ │
│ │ → Convert to WorkspaceEvent.columnSchemaProgress() │ │
│ │ → Broadcast via ViewEventBroadcaster │ │
│ │ → SSE: view.column.schema.progress │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ TaskCompletionBusinessLogicConsumer (completion events) │ │
│ │ │ │ │
│ │ ├── Persist TaskCompletion audit record │ │
│ │ │ │ │
│ │ └── Route to operation handler: │ │
│ │ ├── ColumnOperationHandler │ │
│ │ │ ├── Clear migration: completeMigrationByColumnUuid() │ │
│ │ │ ├── Record duration: recordMigrationCompletion() │ │
│ │ │ ├── On SUCCESS: apply schema changes to metadata, │ │
│ │ │ │ persist ColumnMetadata + ViewColumn │ │
│ │ │ └── Broadcast: view.column.schema.changed │ │
│ │ │ │ │
│ │ ├── ViewOperationHandler │ │
│ │ │ ├── On CREATE: create View + ViewColumns + metadata │ │
│ │ │ └── On DELETE: cascade delete all records │ │
│ │ │ │ │
│ │ └── BulkActionHandler │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ WorkspaceEventConsumer (SSE routing) │ │
│ │ │ │ │
│ │ ├── view.* events → ViewEventBroadcaster │ │
│ │ │ → SSE clients at /api/sse/view/{viewUuid} │ │
│ │ │ │ │
│ │ ├── workspace.* events → WorkspaceEventBroadcaster │ │
│ │ │ → SSE clients at /api/sse/workspace/{orgSlug}/{slug} │ │
│ │ │ │ │
│ │ └── organization.* events → OrganisationEventBroadcaster │ │
│ │ → SSE clients at /api/sse/organisation/{orgSlug} │ │
│ └────────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────┬──────────────────────────────────────────────┘
│
│ SSE (Server-Sent Events)
▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ FRONTEND (Browser) │
│ │
│ EventSource connections: │
│ ├── /api/sse/view/{viewUuid} (column-level events) │
│ ├── /api/sse/workspace/{orgSlug}/{slug} (workspace-level events) │
│ └── /api/sse/organisation/{orgSlug} (org-level events) │
│ │
│ On view.column.schema.changing: │
│ → Lock affected column(s), show migration indicator │
│ → Impact level determines UI behavior (see below) │
│ │
│ On view.column.schema.progress: │
│ → Update progress bar with percent, phase, elapsed time │
│ │
│ On view.column.schema.changed: │
│ → Unlock column, reload metadata + data │
│ │
│ On view.column.schema.change.failed: │
│ → Unlock column, show error notification │
└──────────────────────────────────────────────────────────────────────────────┘Message Types
Three RabbitMQ message types drive the migration flow:
MetadataUpdateMessage
Sent from Quarkus to the Spring Boot processor when a schema change is queued.
| Field | Type | Description |
|---|---|---|
workspaceId | Long | Workspace numeric ID |
workspaceSlug | String | For SSE routing |
entityType | String | "Column" or "EntityMetadata" |
operation | String | CREATE, UPDATE, DELETE |
columnUuid / viewUuid | UUID | Entity identifiers |
schemaChanges | List | Property, old/new value, SQL operation |
operationId | UUID | Tracks this migration end-to-end |
estimatedDurationMs | Long | For progress reporting |
userId | Long | Who initiated this |
originClientId | String | Browser tab ID for echo prevention |
MigrationProgressMessage
Sent periodically from the processor during long-running migrations (estimated > 1s).
| Field | Type | Description |
|---|---|---|
workspaceSlug | String | SSE routing |
columnUuid / viewUuid | UUID | What's being migrated |
operationId | UUID | Links to the MetadataUpdateMessage |
progressPercent | int | 0–100, or -1 if indeterminate |
phase | String | "Rewriting table", "Building index", etc. |
progressSource | String | PG_STAT or TIME_ESTIMATE |
rowsProcessed / totalRows | Long | From pg_stat (PG only) |
elapsedMs / estimatedDurationMs | Long | Timing data |
TaskCompletionMessage
Sent from the processor to Quarkus when the migration finishes (success or failure).
| Field | Type | Description |
|---|---|---|
status | String | SUCCESS or FAILED |
entityType | String | "Column", "View", "BulkAction" |
entityId | String | Entity identifier |
operation | String | CREATE, UPDATE, DELETE |
message | String | Human-readable result |
errorMessage | String | Error details (if failed) |
result | Map | migrationVersion, executionTimeMs, schemaChanges |
userId / originClientId | Passed through for SSE attribution |
Schema Change Detection
What Triggers a Migration
The hasSchemaChanges gate in ColumnResource determines the routing:
java
boolean hasSchemaChanges = columnDTO.getAdvancedOptions() != null
|| columnDTO.getWidgetOptions() != null
|| columnDTO.getWidgetType() != null;| Change Category | Fields | Route |
|---|---|---|
| Schema (async) | type, nullable, unique, length, precision, scale, defaultValue | updateWithSchemaDetection() → 202 |
| Widget (async) | widgetType, widgetOptions (when DB defaults/triggers affected) | updateWithSchemaDetection() → 202 |
| Presentation (sync) | displayName, position, hidden, readonly, sortable, description | updateWithAffected() → 200 |
A single PATCH can contain both sync and async changes. Sync changes are applied immediately; async changes are queued. Response is 202 if any schema change was detected.
SchemaChangeDetector
SchemaChangeDetector.detectChanges() compares old vs new column properties and produces a list of SchemaChangeDTO objects:
SchemaChangeDTO {
property: "type" | "nullable" | "unique" | "length" | ...
oldValue: "varchar"
newValue: "integer"
sqlOperation: "ALTER TABLE users ALTER COLUMN age TYPE INTEGER USING age::INTEGER"
}SchemaChangeDetector.detectDefaultAndTriggerChanges() handles widget option changes that affect DB-level defaults and triggers (e.g., DATETIME auto-set on insert).
View-Level Schema Changes
ViewResource also routes through updateWithSchemaDetection() for:
advancedOptions.tableName— renames the underlying tableadvancedOptions.schema— moves table to a different schema
Impact Analysis
Before queuing a migration, MigrationImpactAnalyzer.analyze() computes a MigrationImpactDTO:
┌──────────────────────────────────────────────────────────────────┐
│ Impact Analysis Pipeline │
│ │
│ Schema Changes │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Vendor Detection │ PostgreSQL / MySQL (InnoDB / MyISAM) │
│ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Lock Classifier │ │ TableStatsService │ │
│ │ per operation │ │ - Row count │ │
│ │ per vendor │ │ - Storage engine │ │
│ └────────┬─────────┘ │ - MySQL version │ │
│ │ └────────┬─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Duration Estimation │ │
│ │ │ │
│ │ Base rates (ms/row): │ │
│ │ REWRITE: 0.05 SCAN: 0.02 INDEX: 0.03│ │
│ │ │ │
│ │ Refined by MigrationDurationTracker: │ │
│ │ → Average of last 20 actual durations │ │
│ │ → Per workspace, per vendor │ │
│ └────────┬────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ ┌──────────────────────┐ │
│ │ FkCascadeAnalyzer │ │ Output: ImpactDTO │ │
│ │ - Walk FK graph │───►│ - level (T/B/BL) │ │
│ │ - Both directions │ │ - blocksReads/Writes │ │
│ │ - Cascade rules │ │ - estimatedDurationMs│ │
│ └──────────────────┘ │ - operations[] │ │
│ │ - affectedFkTables[] │ │
│ └──────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘Impact Levels
| Level | Lock Duration | User Experience |
|---|---|---|
| TRANSPARENT | < 100ms | Invisible — no UI change needed |
| BRIEF | 100ms – 5s | Subtle indicator on column header |
| BLOCKING | > 5s | Full migration overlay with progress |
Dry-Run Mode
PATCH /api/columns/item/{uuid}?dryRun=true returns the impact analysis without executing:
json
{
"dryRun": true,
"requiresMigration": true,
"impact": {
"level": "BLOCKING",
"blocksReads": true,
"blocksWrites": true,
"estimatedDurationMs": 45000,
"operations": [
{
"property": "type",
"description": "Change type VARCHAR → INTEGER",
"sqlOperation": "ALTER TABLE ... ALTER COLUMN ... TYPE INTEGER",
"blocksReads": true,
"blocksWrites": true,
"requiresTableRewrite": true
}
]
},
"confirmationRequired": true
}Data Protection During Migration
MigrationGuardFilter
A JAX-RS ContainerRequestFilter intercepts all /api/data/* requests and returns 503 Service Unavailable when the target table is being migrated.
Request → MigrationGuardFilter → Endpoint
│
├── No active migrations → pass through (fast path)
│
├── Extract viewUuid from path
│
├── ActiveMigrationService.findBlockingMigrationForView()
│ (in-memory cache lookup, no DB hit)
│
├── Read request + blocksReads=false → pass through
│
└── Blocked → 503 + Retry-After header
{
"error": "SCHEMA_MIGRATION_IN_PROGRESS",
"tableName": "customers",
"retryAfterSeconds": 45,
"blocksReads": true,
"blocksWrites": true
}ActiveMigration Cache
ActiveMigrationService maintains a ConcurrentHashMap cache keyed by workspaceId:tableName:
- Populated when migration is queued (
startMigration()) - Cleared on TaskCompletionMessage (
completeMigration()) - 5-minute TTL safety net for missed completion messages
- Entries include the primary table AND all FK-affected tables
Processor Internals
DDL Generation
HibernateSchemaTools.generateMigrationDDL() produces dialect-aware SQL:
- Uses Hibernate's
TableandColumnmodel objects DynamicSessionFactoryBuildercreates the model from metadata DTOs- Generates both UP (apply) and DOWN (rollback) scripts
- Supports: CREATE TABLE, DROP TABLE, ADD COLUMN, DROP COLUMN, ALTER COLUMN TYPE, constraints
Migration Tracking
Each migration is tracked in workspace_schema_migration:
| Status Flow | Description |
|---|---|
PENDING → APPLYING → APPLIED | Happy path |
PENDING → APPLYING → FAILED | DDL execution error |
Duplicate migrations (race conditions) are handled gracefully — UnexpectedRollbackException is treated as idempotent success.
Rollback is supported via FlywayMigrationService.rollbackMigration() using the stored DOWN script.
Progress Polling
For long migrations, MigrationProgressPoller runs on a separate thread:
┌─────────────────────────────────────────────────────────┐
│ MigrationProgressPoller (every 2 seconds) │
│ │
│ PostgreSQL 12+: │
│ SELECT * FROM pg_stat_progress_alter_table │
│ → phase, blocks_total, blocks_done │
│ → Compute real % from block counts │
│ │
│ MySQL / older PostgreSQL: │
│ elapsed_ms / estimated_duration_ms × 100 │
│ → Time-based estimate (less accurate) │
│ │
│ Emits MigrationProgressMessage with: │
│ progressPercent, phase, rowsProcessed, totalRows │
└─────────────────────────────────────────────────────────┘Schema Sync (Non-Migration Path)
Schema sync is a separate flow that does NOT go through the processor. It reads the customer database directly and updates SchemaStack's metadata to match.
POST /api/workspace/{uuid}/schema/sync
│
▼
SchemaSyncOrchestrator (state machine with checkpoint/resume)
│
├── INITIATED
│ └── Connect to customer DB via JDBC
│
├── SCHEMA_EXTRACTED
│ └── Introspect tables, columns, FKs via DatabaseMetaData
│
├── ENTITIES_MERGED
│ └── Compare metadata ↔ actual schema, merge differences
│
├── RELATIONSHIPS_MERGED
│ └── Sync FK relationships
│
├── VIEW_COLUMNS_GENERATED
│ └── Create ViewColumns for new columns
│
├── HASH_UPDATED
│ └── Recompute SHA-256 schema hash
│
└── COMPLETEDSchema Hash (Quick Drift Check)
SchemaHashService.quickCheckByOrgAndUuid() provides a fast drift check:
- Computes SHA-256 hash of metadata (stored entities) vs live DB (JDBC introspection)
- Hash includes: table names, column names/types/nullable/unique/PK flags, FK references
- All elements sorted for deterministic hashing
- Stored in
workspace_database_config.schemaHashUpdatedAt - Sub-second response time vs full drift detection which requires detailed comparison
Full Drift Detection
SchemaDriftDetectionService.detectDrift() provides a detailed report:
- Tables added, removed, modified
- Per-table: columns added/removed/modified, PK changes, unique constraint changes
- Returns
SchemaDriftResultwith summary messages
SSE Event Reference
Events emitted during the migration lifecycle:
| Event Type | When | Payload Highlights |
|---|---|---|
view.column.schema.changing | Migration queued (before processor runs) | impactLevel, blocksReads/Writes, estimatedDurationMs |
view.column.schema.progress | During migration (every 2s) | progressPercent, phase, rowsProcessed, totalRows |
view.column.schema.changed | Migration succeeded | columnUuid, viewUuid |
view.column.schema.change.failed | Migration failed | errorMessage |
view.column.created | Column added | column metadata |
view.column.deleted | Column dropped | columnUuid |
workspace.view.created | Table created | view metadata |
workspace.view.deleted | Table dropped | viewUuid |
Echo Filtering
- Migration lock events (
schema.changing,schema.progress) are NOT echo-filtered — the originating client also needs to see the lock state - Data update events (
column.updated) ARE echo-filtered viaoriginClientId
SSE Routing
| Event prefix | Broadcaster | Endpoint |
|---|---|---|
view.* | ViewEventBroadcaster | /api/sse/view/{viewUuid} |
workspace.* | WorkspaceEventBroadcaster | /api/sse/workspace/{orgSlug}/{slug} |
organization.* | OrganisationEventBroadcaster | /api/sse/organisation/{orgSlug} |
Error Handling
Processor Failure
The processor never throws from message handlers — failure is always communicated via TaskCompletionMessage:
Processor catches exception
│
▼
TaskCompletionProducer.sendFailure()
→ status: FAILED
→ errorMessage: "ERROR: column \"age\" cannot be cast to type integer"
│
▼
Quarkus ColumnOperationHandler
│
├── Clear migration cache (ActiveMigrationService)
├── Record failure duration (MigrationDurationTracker)
└── Broadcast: view.column.schema.change.failed
│
▼
Frontend: unlock column, show error notificationConcurrent Migration Prevention
PATCH /api/columns/{uuid} with schema change
│
▼
ActiveMigrationService.hasActiveBlockingMigration(workspaceId, tableName)
│
├── No active migration → proceed
└── Active migration found → 409 Conflict
"A migration is already in progress on table 'products'"Only BLOCKING migrations are serialized. TRANSPARENT and BRIEF migrations are allowed concurrently — PostgreSQL's own lock management handles contention.
Stale Migration Cleanup
If a TaskCompletionMessage is lost (processor crash, network failure):
- In-memory cache entries expire after 5 minutes (TTL safety net)
- The
active_migrationDB record remains until manually cleared or TTL-based cleanup runs
Key File Reference
| Component | File | Purpose |
|---|---|---|
| Trigger | ColumnResource.java | REST endpoint, hasSchemaChanges gate |
| Detection | SchemaChangeDetector.java | Compare old vs new properties |
| Impact | MigrationImpactAnalyzer.java | Vendor-aware classification |
| FK Analysis | FkCascadeAnalyzer.java | Walk FK graph |
| Row Count | TableStatsService.java | Query workspace DB for stats |
| Duration | MigrationDurationTracker.java | Historical rate refinement |
| Migration State | ActiveMigrationService.java | In-memory cache + DB tracking |
| Guard | MigrationGuardFilter.java | 503 response for blocked requests |
| Producer | MetadataUpdateProducer.java | Publish to RabbitMQ |
| Consumer | EntityMetadataConsumer.java | Receive and process migrations |
| DDL | HibernateSchemaTools.java | Generate dialect-aware SQL |
| Execution | FlywayMigrationService.java | Apply DDL, track status |
| Progress | MigrationProgressPoller.java | Poll pg_stat / time estimate |
| Completion | TaskCompletionBusinessLogicConsumer.java | Route to handlers |
| Column Handler | ColumnOperationHandler.java | Apply result, broadcast SSE |
| View Handler | ViewOperationHandler.java | Create/delete views |
| SSE | ViewEventBroadcaster.java | Broadcast to view clients |
| Sync | SchemaSyncOrchestrator.java | State machine for schema sync |
| Hash | SchemaHashService.java | SHA-256 drift detection |
| Drift | SchemaDriftDetectionService.java | Detailed drift report |
Cross-References
- Column Operations — individual column CRUD flows
- View Operations — individual view CRUD flows
- Processor — DDL generation internals
- Migration Coordination — lock levels, impact classification, frontend state machine
- SSE Broadcasting — event routing and connection management
- Multi-Tenancy — schema sync orchestrator details