Skip to content

SSE Integration Guide

Complete reference for connecting to SchemaStack's real-time Server-Sent Events streams.

SSE Endpoints

1. View Stream — /sse/view/{orgSlug}/{workspaceSlug}/{viewUuid}/stream

Auth: JWT + VIEW_DATA permission (checked at connection time) Use: Cell edits, row inserts, column changes, bulk operations for a specific view.

typescript
const viewSSE = new EventSource(
  `${BASE_URL}/sse/view/${orgSlug}/${workspaceSlug}/${viewUuid}/stream`,
  { headers: { Authorization: `Bearer ${jwt}` } }
);

2. Workspace Stream — /sse/workspace/{orgSlug}/{workspaceSlug}/stream

Auth: JWT + workspace membership (org OWNER/ADMIN get automatic access) Use: View lifecycle, column structural changes, workspace config, member changes.

typescript
const workspaceSSE = new EventSource(
  `${BASE_URL}/sse/workspace/${orgSlug}/${workspaceSlug}/stream`,
  { headers: { Authorization: `Bearer ${jwt}` } }
);

3. Organisation Stream — /sse/organisation/{orgSlug}/stream

Auth: JWT + organisation membership Use: Workspace lifecycle, org settings, member changes, dashboard stats, subscription updates.

Supports X-Client-Id header for echo filtering.

typescript
const orgSSE = new EventSource(
  `${BASE_URL}/sse/organisation/${orgSlug}/stream`,
  { headers: { Authorization: `Bearer ${jwt}`, 'X-Client-Id': clientId } }
);

4. Task Completions — /sse/task-completions/stream

Auth: JWT required Use: Schema processing task completion notifications. Query param: ?workspaceId={slug} (optional filter)

typescript
const taskSSE = new EventSource(
  `${BASE_URL}/sse/task-completions/stream?workspaceId=${workspaceSlug}`
);

Health Endpoints

EndpointReturns
GET /sse/view/{org}/{ws}/{viewUuid}/health{ viewUuid, activeConnections }
GET /sse/view/health{ totalActiveViewConnections }
GET /sse/workspace/{org}/{ws}/health{ workspace, activeConnections }
GET /sse/workspace/health{ totalActiveConnections }
GET /sse/organisation/{orgSlug}/health{ organisation, activeConnections }
GET /sse/organisation/health{ totalActiveConnections }
GET /sse/task-completions/health{ workspace, activeConnections }

Connection Events

On successful connection, each stream sends an initial event:

event: connection
data: {"eventType":"connection.established","timestamp":"2026-03-14T12:00:00Z",...}

All streams send keepalive pings every 30 seconds.


Event Envelope (WorkspaceEvent)

Every event (except task completions) follows this JSON structure:

typescript
interface WorkspaceEvent {
  eventType: string;         // Dot-notation type (see tables below)
  workspaceId: string;       // Workspace slug
  entityType: string;        // "View", "ViewColumn", "Row", "Workspace", etc.
  entityId: string;          // UUID of affected entity
  operation: string;         // "CREATE" | "UPDATE" | "DELETE" | "ARCHIVE" | "RESTORE"
  status: string;            // "STARTED" | "COMPLETED" | "FAILED"
  timestamp: string;         // ISO 8601
  userId: string;            // UUID of acting user
  originClientId?: string;   // For echo filtering (see below)
  organisationSlug?: string; // Present when event routes to org stream
  data: Record<string, any>; // Event-specific payload (includes viewUuid for view events)
}

Payload Strategy

Events use one of four data payload strategies depending on the event type:

StrategyWhen UsedFrontend Action
ChangesetEntity updated — only changed fields sent as { changes: { field: newValue, ... } }Merge changes into local state
Full objectRow/cell data, computed stats — complete object sentReplace local state
EntityEntity created — full DTO sent as { entity: { ...allFields } }Insert into local state
IdentifiersEntity deleted/toggled — UUIDs sent, FE refetchesRefetch from REST API
SignalConfig events where details are sensitive or unnecessary — empty {}Refetch or show notification

Event Types — Complete Reference

view.* — View Stream Events

All view.* events are delivered to the View stream only. Columns, data, presets, constraints, files, members, guests — everything scoped to an open spreadsheet.

Event TypeStrategydata fields
view.column.createdEntityviewUuid, entity: { columnUuid, columnName }
view.column.updatedChangesetcolumnUuid, columnName, viewUuid, changes: { ...changedFields }
view.column.deletedIdentifiersdisplayName, columnName, tableName, viewUuid, message
view.column.schema.changingSignalcolumnUuid, columnName, viewUuid, reason, impactLevel?, blocksReads?, blocksWrites?, estimatedDurationMs?
view.column.schema.progressFull objectcolumnUuid, viewUuid, progressPercent, phase, progressSource, elapsedMs, estimatedDurationMs, totalRows?, rowsProcessed?
view.column.schema.changedSignalcolumnUuid, columnName, viewUuid, message
view.data.row.insertedFull objectviewUuid, row: { ...completeRowWithPK }
view.data.cell.editedFull objectviewUuid, columnUuid, rowId, row: { ...completeRow }
view.data.cells.editedFull objectviewUuid, columnUuids, rowId, row: { ...completeRow }
view.data.column.filledIdentifiersviewUuid, columnUuid, rowsAffected, value
view.data.bulk.editedIdentifiersviewUuid, columnUuids, rowIds, rowsAffected
view.data.bulk.deleteSignaljobId, viewUuid, viewName, successCount, errorCount, totalProcessed, message
view.data.bulk.updateSignaljobId, viewUuid, viewName, successCount, errorCount, totalProcessed, message
view.data.bulk.duplicateSignaljobId, viewUuid, viewName, successCount, errorCount, totalProcessed, message
view.data.bulk.exportSignaljobId, viewUuid, viewName, fileName, fileSize, mimeType, rowCount, message
view.constraint.createdEntitycolumnUuid?, entity: { ...constraintDTO }
view.constraint.updatedChangesetcolumnUuid?, changes: { ...changedFields }
view.constraint.deletedIdentifierscolumnUuid?
view.constraint.toggledIdentifierscolumnUuid?
view.constraint.reorderedSignal{}
view.entity-constraint.createdEntityviewUuid?, entity: { ...entityConstraintDTO }
view.entity-constraint.updatedChangesetviewUuid?, changes: { ...changedFields }
view.entity-constraint.deletedIdentifiersviewUuid?
view.entity-constraint.toggledIdentifiersviewUuid?
view.preset.createdEntityviewUuid?, entity: { ...filterPresetDTO }
view.preset.updatedChangesetviewUuid?, changes: { ...changedFields }
view.preset.deletedIdentifiersviewUuid?
view.file.uploadedEntityfileId?, viewUuid?, entity: { ...fileReference }
view.file.deletedIdentifiersfileId?, viewUuid?
view.member.addedEntityviewUuid, entity: { memberEmail, role, source }
view.member.removedIdentifiersviewUuid, memberEmail
view.member.role.changedChangesetviewUuid, memberEmail, changes: { previousRole, newRole }
view.guest.createdEntityviewUuid, entity: { guestUuid, role, expiresAt? }
view.guest.revokedIdentifiersviewUuid, guestUuid
view.access.updatedChangesetviewUuid, changes: { ...changedFlags }
view.schema.updatedSignalviewUuid, viewSlug, viewName, message

workspace.* — Workspace Stream Events

Delivered to: Workspace stream + Organisation stream. Includes view lifecycle (create/delete/archive) and workspace admin events.

Event TypeStrategydata fields
workspace.view.createdEntityentity: { viewSlug, viewName, tableName, message }
workspace.view.updatedChangesetviewUuid, viewSlug, viewName, changes: { ...changedFields }
workspace.view.deletedIdentifiersviewSlug, viewName, tableName, message
workspace.view.archivedIdentifiersviewSlug, viewName
workspace.view.restoredIdentifiersviewSlug, viewName
workspace.status.changedChangesetchanges: { previousStatus, newStatus }
workspace.settings.updatedChangesetchanges: { ...changedFields }
workspace.database.configuredSignaldatabaseName, connectionStatus
workspace.storage.configuredSignal{}
workspace.api.configuredSignalmaxExpandDepth?, corsAllowedOrigins?
workspace.mcp.configuredSignalaccessMode
workspace.member.addedEntityentity: { memberEmail, role, source }
workspace.member.removedIdentifiersmemberEmail
workspace.member.role.changedChangesetmemberEmail, changes: { previousRole, newRole }
workspace.apikey.createdEntityentity: { keyName, keyPrefix, createdAt }
workspace.apikey.revokedSignal{}
workspace.apikey.rotatedChangesetchanges: { keyName, keyPrefix }
workspace.mcpkey.createdEntityentity: { keyName, keyPrefix, createdAt }
workspace.mcpkey.revokedSignal{}
workspace.mcpkey.rotatedChangesetchanges: { keyName, keyPrefix }
workspace.stats.updatedFull objectviewCount, memberCount, storageBytes, lastActivity, ...
workspace.schema.checkedFull objecthasDrift, lastSynced, checkedAt, viewCount, source? ("sync" or absent), storedHash?, currentHash?
workspace.schema.resetSignal{}

organization.* — Organisation Events

Delivered to: Organisation stream only

Event TypeStrategydata fields
organization.workspace.createdEntityentity: { workspaceName, workspaceSlug, createdByEmail }
organization.workspace.updatedChangesetworkspaceName, workspaceSlug, changes: { ...changedFields }
organization.workspace.deletedIdentifiersworkspaceName
organization.settings.updatedChangesetchanges: { ...changedFields }
organization.member.createdEntityentity: { email, role, status }
organization.member.updatedChangesetchanges: { email, role, status }
organization.member.deletedIdentifiersemail
organization.subscription.updatedChangesetchanges: { planName, status }

dashboard.* — Dashboard Events

Delivered to: Organisation stream only

Event TypeStrategydata fields
dashboard.updatedFull objectcomplete dashboard payload (usage stats + recent activity)

Three-Tier Routing Summary

Event prefix                          │ View SSE │ Workspace SSE │ Org SSE
──────────────────────────────────────┼──────────┼───────────────┼─────────
data.*                                │    ✓     │       —       │    —
view.column.* (incl. schema.changing, │    ✓     │       —       │    —
  schema.progress, schema.changed),   │          │               │
  view.member.*, view.guest.*,        │          │               │
  view.access.*, view.schema.*        │          │               │
view.created/updated/deleted/         │    —     │       ✓       │    —
  archived/restored                   │          │               │
workspace.*                           │    —     │       ✓       │    ✓
organization.*                        │    —     │       —       │    ✓
dashboard.*                           │    —     │       —       │    ✓

Echo Filtering

Prevents the originating client tab from double-applying its own mutations.

Setup

  1. Generate a UUID per browser tab (persists for tab lifetime):
typescript
const CLIENT_ID = crypto.randomUUID();
  1. Send X-Client-Id header on every REST mutation and on the SSE connection:
typescript
// REST mutations
fetch('/api/views/...', {
  method: 'PUT',
  headers: {
    'Authorization': `Bearer ${jwt}`,
    'X-Client-Id': CLIENT_ID
  }
});

// SSE connection (org stream supports it natively)
const orgSSE = new EventSource(url, {
  headers: { 'X-Client-Id': CLIENT_ID }
});
  1. Filter incoming events:
typescript
sseSource.onmessage = (event) => {
  const data = JSON.parse(event.data);

  // Skip events that originated from this tab
  if (data.originClientId === CLIENT_ID) {
    return;
  }

  // Process the event
  handleEvent(data);
};

Handling workspace.schema.checked

Fired when any admin runs a drift check (quick or full) or a schema sync. Delivered to the workspace stream and organisation stream. Use echo filtering to skip the event on the tab that triggered the check.

Payload

typescript
interface SchemaCheckedData {
  // Always present (quick check + sync)
  hasDrift: boolean;
  checkedAt: string;       // ISO 8601
  viewCount: number;

  // Present on quick check
  storedHash?: string;
  currentHash?: string;
  lastSynced?: string;     // ISO 8601

  // Present on sync completion
  source?: 'sync';         // Absent for drift checks
}
typescript
case 'workspace.schema.checked': {
  const d = event.data;

  // Update local schema status panel
  store.dispatch(updateSchemaStatus({
    hasDrift: d.hasDrift,
    lastChecked: d.checkedAt,
    lastSynced: d.lastSynced ?? d.checkedAt,  // sync sets both
    viewCount: d.viewCount,
  }));

  // Optionally show a toast when drift is detected by another admin
  if (d.hasDrift) {
    toast.warn('Schema drift detected — database has changed since last sync');
  }

  // If source is 'sync', another admin just synced — refresh views/columns
  if (d.source === 'sync') {
    store.dispatch(refetchViews());
  }
  break;
}

When is it fired?

TriggersourcehasDriftNotes
GET .../check-drift/quickabsenttrue / falseFast hash comparison
GET .../check-driftabsenttrue / falseFull structural diff
POST .../sync-schema"sync"falseAlways false — just synced

Handling Migration Progress Events

Schema migrations (column type changes, adding NOT NULL, etc.) can take seconds to minutes on large tables. The system provides a three-event lifecycle for real-time migration feedback.

Event Lifecycle

view.column.schema.changing     →  view.column.schema.progress (0..N)  →  view.column.schema.changed
         (lock)                          (progress updates)                       (unlock)
  1. schema.changing — Migration queued, lock the column/view in the UI
  2. schema.progress — Real-time progress updates every 2 seconds (only for migrations > 1s)
  3. schema.changed — Migration completed (success or failure), unlock the UI

view.column.schema.changing — Lock Event

Fired when a schema migration is queued. Use the impactLevel to decide how much of the UI to lock.

typescript
interface SchemaChangingData {
  columnUuid: string;
  columnName: string;
  viewUuid: string;
  reason: string;                        // "Changing column type to INTEGER"

  // Migration impact (present for async migrations)
  impactLevel?: 'TRANSPARENT' | 'BRIEF' | 'BLOCKING';
  blocksReads?: boolean;                 // true = SELECT blocked (PG type changes)
  blocksWrites?: boolean;                // true = INSERT/UPDATE/DELETE blocked
  estimatedDurationMs?: number;          // Predicted duration based on row count + history
}

Important: Do NOT echo-filter this event. The originating client also needs to show migration state.

view.column.schema.progress — Progress Event

Fired every ~2 seconds during long-running migrations. Delivered to the View stream.

typescript
interface SchemaProgressData {
  columnUuid: string;
  viewUuid: string;
  progressPercent: number;               // 0–100, or -1 if indeterminate
  phase: string;                         // Current operation phase (see table below)
  progressSource: 'PG_STAT' | 'TIME_ESTIMATE';  // How progress was measured
  elapsedMs: number;                     // Wall time since migration started
  estimatedDurationMs: number;           // Original estimate from impact analysis
  totalRows?: number;                    // Total rows in table (when known)
  rowsProcessed?: number;               // Rows processed so far (PG_STAT only)
}

Progress source:

SourceWhen UsedAccuracy
PG_STATPostgreSQL 12+ ALTER TABLE operationsBlock-level accuracy from pg_stat_progress_alter_table
TIME_ESTIMATEMySQL, PostgreSQL < 12, or non-ALTER operationsElapsed time / estimated duration (linear)

PostgreSQL phases (from pg_stat_progress_alter_table):

PhaseDescription
WaitingWaiting for lock acquisition
Rewriting tableFull table rewrite in progress (type change)
Validating constraintScanning rows for NOT NULL / CHECK validation
Building indexCreating or rebuilding an index
Rebuilding indexRebuilding indexes after rewrite
CompletedMigration finished

Time-based phases (MySQL / fallback):

PhaseDescription
Executing migrationMigration is running (no phase detail available)
CompletedMigration finished

view.column.schema.changed — Unlock Event

Fired when the migration completes (success or failure).

typescript
interface SchemaChangedData {
  columnUuid: string;
  columnName: string;
  viewUuid: string;
  message: string;                       // "Schema change completed" or error message
}

The status field on the WorkspaceEvent envelope tells you the outcome:

  • status: "COMPLETED" → success, refresh column metadata
  • status: "FAILED" → failure, show error toast, unlock UI
typescript
// Per-column migration state
interface MigrationState {
  columnUuid: string;
  impactLevel: string;
  blocksReads: boolean;
  blocksWrites: boolean;
  progressPercent: number;
  phase: string;
  elapsedMs: number;
  estimatedDurationMs: number;
}

const activeMigrations = new Map<string, MigrationState>();

function handleSSEEvent(event: WorkspaceEvent) {
  switch (event.eventType) {

    case 'view.column.schema.changing': {
      const d = event.data;
      activeMigrations.set(d.columnUuid, {
        columnUuid: d.columnUuid,
        impactLevel: d.impactLevel ?? 'TRANSPARENT',
        blocksReads: d.blocksReads ?? false,
        blocksWrites: d.blocksWrites ?? false,
        progressPercent: 0,
        phase: 'Starting',
        elapsedMs: 0,
        estimatedDurationMs: d.estimatedDurationMs ?? 0,
      });

      // Lock UI based on impact level
      if (d.impactLevel === 'BLOCKING' && d.blocksReads) {
        lockView(d.viewUuid, 'full');        // Overlay — no reads or writes
      } else if (d.impactLevel === 'BLOCKING') {
        lockView(d.viewUuid, 'read-only');   // Read-only — disable editing
      } else if (d.impactLevel === 'BRIEF') {
        lockColumn(d.columnUuid);            // Lock just the column header
      }
      // TRANSPARENT: no UI change needed
      break;
    }

    case 'view.column.schema.progress': {
      const d = event.data;
      const state = activeMigrations.get(d.columnUuid);
      if (state) {
        state.progressPercent = d.progressPercent;
        state.phase = d.phase;
        state.elapsedMs = d.elapsedMs;
        // Update progress bar / percentage display
        updateProgressUI(d.columnUuid, state);
      }
      break;
    }

    case 'view.column.schema.changed': {
      const d = event.data;
      activeMigrations.delete(d.columnUuid);

      if (event.status === 'COMPLETED') {
        unlockView(d.viewUuid);
        refreshColumnMetadata(d.viewUuid);   // Fetch updated column definition
        toast.success(`Column "${d.columnName}" updated`);
      } else {
        unlockView(d.viewUuid);
        toast.error(`Migration failed: ${d.message}`);
      }
      break;
    }
  }
}

Progress Bar UX Recommendations

Progress SourceRecommended UI
PG_STATDeterminate progress bar (0–100%) with phase label
TIME_ESTIMATEDeterminate progress bar, but label "Estimated" to set expectations
progressPercent === -1Indeterminate/pulsing progress bar with phase label

When progressPercent reaches 100 but schema.changed hasn't arrived yet, keep showing "Completing..." — the final event confirms actual completion.

Edge Cases

  • No progress events: Short migrations (< 1s) will only fire changingchanged with no progress in between. The UI should handle zero progress events gracefully.
  • Progress > estimatedDurationMs: Real migrations can exceed estimates. Don't cap the progress bar at 100% if elapsedMs > estimatedDurationMs — instead show "Taking longer than expected..."
  • Multiple columns migrating: Each column tracks independently. The view lock is the union of all active column locks.
  • Browser reconnect: On SSE reconnect, check if any columns are still in migration state by calling the REST API. Stale locks should self-clear via TTL.

Task Completion Events (Legacy)

The /sse/task-completions/stream endpoint uses a different payload format (TaskCompletionMessage):

typescript
interface TaskCompletionMessage {
  messageId: string;
  workspaceId: string;
  entityType: string;
  entityId: string;
  status: "STARTED" | "COMPLETED" | "FAILED";
  message?: string;
  timestamp: string;
}

SSE event names: task-success, task-failure, task-started

Note: Task completions are also broadcast as workspace.status.changed events on the workspace stream. The task-completions endpoint is maintained for backward compatibility.


Listening for Events (TypeScript Example)

typescript
function connectToViewSSE(orgSlug: string, wsSlug: string, viewUuid: string, jwt: string) {
  const url = `${BASE_URL}/sse/view/${orgSlug}/${wsSlug}/${viewUuid}/stream`;
  const source = new EventSource(url, {
    headers: { Authorization: `Bearer ${jwt}` }
  });

  // Connection established
  source.addEventListener('connection', (e) => {
    console.log('Connected:', JSON.parse(e.data));
  });

  // All workspace events come as 'message' type
  source.addEventListener('message', (e) => {
    const event: WorkspaceEvent = JSON.parse(e.data);

    // Echo filter
    if (event.originClientId === CLIENT_ID) return;

    switch (event.eventType) {
      case 'data.cell.edited':
        updateCell(event.data.rowId, event.data.columnName, event.data.value);
        break;
      case 'data.row.inserted':
        insertRow(event.data.rowData);
        break;
      case 'view.column.created':
        addColumn(event.data.columnName, event.data.columnType);
        break;
      case 'view.column.deleted':
        removeColumn(event.data.columnName);
        break;
      // ... handle other event types
    }
  });

  source.onerror = (e) => {
    console.error('SSE error, will auto-reconnect:', e);
  };

  return source;
}

SchemaStack Internal Developer Documentation