Skip to content

SSE Broadcasting Architecture

How real-time events flow from REST API mutations through RabbitMQ to frontend SSE streams.

Overview

SchemaStack uses a message-first broadcasting pattern: REST API endpoints never broadcast SSE events directly. Instead, they publish events to RabbitMQ, and a separate consumer-worker instance routes them to the correct SSE streams.

This separation is critical because in production the REST API runs as stateless Lambda instances while SSE connections live on long-running Fargate instances.

Event Flow

 ┌─────────────────────────────────────────────────────────────────┐
 │                     REST API Instance (Lambda)                  │
 │                                                                 │
 │  WorkspaceResource ──► WorkspaceService ──► WorkspaceEvent     │
 │  MemberResource   ──► WorkspaceService ──► WorkspaceEvent      │
 │  ViewResource     ──► ViewService      ──► WorkspaceEvent      │
 │  OrgResource      ──► OrgService       ──► WorkspaceEvent      │
 │                                              │                  │
 │                                    WorkspaceEventProducer       │
 │                                              │                  │
 └──────────────────────────────────────────────┼──────────────────┘

                                         RabbitMQ Exchange
                                       "workspace-events"

 ┌──────────────────────────────────────────────┼──────────────────┐
 │                 Consumer-Worker Instance (Fargate)               │
 │                                              │                  │
 │                                  WorkspaceEventConsumer          │
 │                               ┌──────┼──────────────┐          │
 │                               ▼      ▼              ▼          │
 │                ViewEventBroadcaster  WorkspaceEvent  OrganisationEvent
 │                     │               Broadcaster      Broadcaster
 │          SSE: /sse/view/             │                    │      │
 │          {org}/{ws}/{uuid}/stream    │                    │      │
 │                     │     SSE: /sse/workspace/{id}/stream │      │
 │                     │                │    SSE: /sse/organization/{slug}/stream
 │                     │                │                    │      │
 └─────────────────────┼────────────────┼────────────────────┼──────┘
                       │                │                    │
                  View Users       Data App (FE)       Admin App (FE)

Three-Tier Routing

WorkspaceEventConsumer inspects routing fields on every WorkspaceEvent to decide which SSE stream(s) receive it:

java
// 1. Route to view-level SSE if event has a viewUuid
if (event.isViewStreamEvent()) {
    viewBroadcaster.broadcast(event);
}

// 2. Route to workspace-level SSE if workspaceId is set AND event is not view-only
// (view.* events only go to view stream, workspace.* goes to workspace + org stream)
if (event.getWorkspaceId() != null && !event.getWorkspaceId().isBlank()
        && !event.isViewOnlyEvent()) {
    workspaceBroadcaster.broadcast(event);
}

// 3. Route to org-level SSE if organisationSlug is set
if (event.getOrganisationSlug() != null && !event.getOrganisationSlug().isBlank()) {
    organisationBroadcaster.broadcast(event);
}

Event Classification

The event type prefix determines which stream(s) receive it:

  • view.* events: All view-scoped events (data, columns, constraints, presets, files, members, guests, access, schema) — go to the view stream only
  • workspace.* events: View lifecycle (workspace.view.*), status changes, member changes, config updates, schema checks — go to workspace stream (and org stream if org slug is set)
  • organization.* / dashboard.* events: Org settings, subscription changes, dashboard stats — go to org stream only

Routing Table

Event CategoryView SSEWorkspace SSEOrg SSE
view.data.* (row, cell, bulk)yes
view.column.*, view.member.*, view.guest.*yes
view.constraint.*, view.entity-constraint.*yes
view.preset.*, view.file.*yes
view.access.updated, view.schema.updatedyes
workspace.view.* (created/updated/deleted/archived/restored)yesyes
workspace.member.*, workspace.status.*yesyes
workspace.database.*, workspace.storage.*yesyes
workspace.schema.checked, workspace.schema.resetyesyes
workspace.stats.updatedyesyes
organization.*yes
dashboard.updatedyes

Connection-Time Authorization

All SSE streams enforce RBAC checks at connection time, matching the same cascading permission model used by REST endpoints. Once connected, events are broadcast without per-event checks.

View Stream Authorization

The view SSE stream checks VIEW_DATA permission via ViewAccessService:

  1. Panache.withSession() loads the view with workspace
  2. ViewAccessService.requireViewPermission(view, VIEW_DATA) checks org membership + view role
  3. On success: connection registered, connection.established event sent
  4. On failure: 403/404 returned, no connection registered

Workspace Stream Authorization

The workspace SSE stream checks workspace membership via SseAuthService:

  1. Loads workspace by org slug + workspace slug
  2. Checks org membership — org OWNER/ADMIN get automatic access
  3. Non-admin users must have explicit workspace membership
  4. On failure: 403/404 returned, no connection registered

Organisation Stream Authorization

The organisation SSE stream checks org membership via SseAuthService:

  1. Loads organisation by slug
  2. Verifies the JWT user is a member of the organisation
  3. On failure: 403/404 returned, no connection registered

Task Completions Stream

Requires JWT authentication (@Authenticated). No further membership check.

Key Components

WorkspaceEvent (Unified Envelope)

A single event class in messaging-contracts serves as the envelope for all SSE events:

FieldPurpose
eventTypeDot-notation type (e.g. workspace.member.added)
entityTypeEntity class name (e.g. WorkspaceMember)
entityIdUUID of the affected entity
organisationSlugRoutes to org SSE stream (null = skip)
workspaceIdRoutes to workspace SSE stream (null = skip)
originClientIdEchoed from X-Client-Id header for echo filtering
userIdActing user's UUID
timestampISO-8601 event time
dataEvent-specific payload (Map) — includes viewUuid for view-routable events. CREATE ops include entity (full DTO), UPDATE ops include changes (changeset)

Routing Helpers

MethodPurpose
getViewUuid()Extracts viewUuid from data.viewUuid or entityId for View-type events
isViewOnlyEvent()true for view.* events — skip workspace stream
isViewStreamEvent()true when getViewUuid() is non-null — route to view stream

WorkspaceEventProducer

Pure RabbitMQ producer. Serializes WorkspaceEvent to byte[] and sends via MutinyEmitter. Has no database access, making it safe to call inside active Hibernate sessions and transactions.

ViewEventBroadcaster / WorkspaceEventBroadcaster / OrganisationEventBroadcaster

Maintain ConcurrentMap<String, Set<SseEventSink>> of active SSE connections, keyed by view UUID, workspace ID, or org slug. The broadcast() method iterates connected sinks and writes the event as JSON. Dead connections are cleaned up automatically. All three have 30-second keepalive timers.

WorkspaceEventConsumer

Consumes from the workspace-events-in RabbitMQ queue. Deserializes the WorkspaceEvent and routes to the appropriate broadcaster(s) using the three-tier logic above. Always acknowledges messages (even on error) to prevent queue blocking.

Critical Constraint: No Nested Sessions

Services must never call broadcast services that open Panache.withSession() from inside an active transaction. This causes Hibernate Reactive session corruption.

Safe pattern: Use WorkspaceEventProducer (RabbitMQ, no DB) inside service methods. The consumer-worker handles all SSE broadcasting in its own session context.

Unsafe pattern: Calling WorkspaceStatsBroadcastService.broadcastWorkspaceStats() or DashboardBroadcastService.broadcastDashboardUpdate() from inside a @WithTransaction block or Panache.withTransaction() lambda.

The aggregate broadcast services (WorkspaceStatsBroadcastService, DashboardBroadcastService) use Panache.withSession() internally to compute stats. They must only be called from contexts where no session is already active — typically from RabbitMQ consumers or scheduled jobs.

SSE Endpoints

EndpointBroadcasterAuthPurpose
GET /sse/view/{org}/{ws}/{viewUuid}/streamViewEventBroadcasterJWT + VIEW_DATA permissionCell edits, column changes, bulk ops for a specific view
GET /sse/workspace/{org}/{ws}/streamWorkspaceEventBroadcasterJWT + workspace membershipView lifecycle, workspace config, members, schema checks
GET /sse/organisation/{orgSlug}/streamOrganisationEventBroadcasterJWT + org membershipWorkspace lifecycle, org settings, member changes, dashboard stats
GET /sse/task-completions/streamInternalJWTTask completion notifications (legacy)

Health endpoints: GET /sse/view/{org}/{ws}/{viewUuid}/health, GET /sse/view/health, GET /sse/workspace/{id}/health, GET /sse/organization/{slug}/health

Echo Filtering

  1. Frontend sends X-Client-Id (UUID) on the SSE connection and on every REST mutation
  2. REST endpoint passes clientId to the service layer
  3. Service includes originClientId in the WorkspaceEvent
  4. Frontend compares event.originClientId against its own client ID
  5. If they match, the event originated from this tab — frontend skips UI updates to avoid overwriting in-progress form state

SchemaStack Internal Developer Documentation