Weave.js
Server

WeaveAzureWebPubsubServer

API reference for the WeaveAzureWebPubsubServer class

Overview

The WeaveAzureWebPubsubServer is a server-side class in the Weave.js Azure Web PubSub store that enables real-time collaboration using Azure Web PubSub as the transport layer. It serves as a store provider, handling the synchronization of shared document state across all clients connected to a specific room.

Built on top of Yjs, this class leverages Azure's managed Web PubSub infrastructure to offer scalable, low-latency communication, with support for CRDT-based document syncing, user presence (awareness), and optional persistence of shared state.

This class enables seamless collaboration by handling server-side:

  • Connection management via WebSocket clients
  • Document synchronization using CRDTs (through Yjs)
  • Broadcasting updates between users in real time
  • Support for awareness events
  • Support for persistence of the shared-state

Import

import { WeaveAzureWebPubsubServer } from "@inditextech/weave-store-azure-web-pubsub/server";

Instantiation

const azureWebPubSubServer = new WeaveAzureWebPubsubServer(params: WeaveAzureWebPubsubServerParams);

Parameters

PropTypeDefault
syncHostConfig.resync.attemptsLimit?
number
12
syncHostConfig.resync.checkIntervalMs?
number
5000
syncHostConfig.heartbeat.sendIntervalMs?
number
2500
initialState?
initialState
defaultInitialState
fetchRoom?
FetchRoom
-
persistRoom?
PersistRoom
-
eventsHandlerConfig?
WebPubSubEventHandlerOptions
-
pubsubConfig
WeaveAzureWebPubsubConfig
-

Methods

getMiddleware

getMiddleware();

This method return the necessary middlewares that you need to attach to your Express server in order to manage the Azure Web PubSub connectivity.

clientConnect

clientConnect(roomId: string): Promise<string>

This method return a secure connection URL to use client-side to connect to the Azure Web PubSub infrastructure.

Used

TypeScript types

type WeaveStoreAzureWebPubsubConfig = {
  endpoint: string;
  persistIntervalMs?: number;
  hubName: string;
  auth?: {
    key?: string;
    custom?: TokenCredential;
  };
  connectionHandlers?: DeepPartial<WeaveAzureWebPubsubSyncHandlerOptions>;
};

type WeaveAzureWebPubsubSyncHandlerOptions = {
  onConnect?: (
    connectionId: string,
    queries: Record<string, string[]> | undefined,
  ) => Promise<void>;
  onConnected?: (connectionId: string) => Promise<void>;
  removeConnection?: (connectionId: string) => Promise<void>;
  getConnectionRoom?: (connectionId: string) => Promise<string | null>;
  getRoomConnections?: (roomId: string) => Promise<string[]>;
  persistIntervalMs?: number;
};

type WeaveStoreAzureWebPubsubOptions = {
  roomId: string;
  url: string;
  fetchClient?: FetchClient;
  syncClientOptions?: DeepPartial<WeaveStoreAzureWebPubSubSyncClientOptions>;
};

type WeaveStoreAzureWebPubsubOnStoreFetchConnectionUrlEvent = {
  loading: boolean;
  error: Error | null;
};

type FetchClient = (
  input: string | URL | globalThis.Request,
  init?: RequestInit,
) => Promise<Response>;

type FetchInitialState = (doc: Doc) => void;
type PersistRoom = (
  roomId: string,
  actualState: Uint8Array<ArrayBufferLike>,
) => Promise<void>;
type FetchRoom = (roomId: string) => Promise<Uint8Array | null>;

type WeaveStoreAzureWebPubsubEvents = {
  onConnect: WeaveStoreAzureWebPubsubOnConnectEvent;
  onConnected: WeaveStoreAzureWebPubsubOnConnectedEvent;
  onDisconnected: WeaveStoreAzureWebPubsubOnDisconnectedEvent;
};

type WeaveStoreAzureWebPubsubOnConnectEvent = {
  context: ConnectionContext;
  queries: Record<string, string[]> | undefined;
};

type WeaveStoreAzureWebPubsubOnConnectedEvent = {
  context: ConnectionContext;
  queries?: Record<string, string[]>;
};

type WeaveStoreAzureWebPubsubOnDisconnectedEvent = {
  context: ConnectionContext;
  queries?: Record<string, string[]>;
};

type WeaveStoreAzureWebPubsubOnWebsocketOpenEvent = {
  group: string;
  event: WebSocket.Event;
  connectionAttempt: number;
};

type WeaveStoreAzureWebPubsubOnWebsocketJoinGroupEvent = {
  group: string;
  connectionAttempt: number;
};

type WeaveStoreAzureWebPubsubOnWebsocketMessageEvent = {
  group: string;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  event: WebSocket.MessageEvent;
};

type WeaveStoreAzureWebPubsubOnWebsocketCloseEvent = {
  group: string;
  event: CloseEvent;
  connectionAttempt: number;
};

type WeaveStoreAzureWebPubsubOnWebsocketReconnectEvent = {
  group: string;
  connectionAttempt: number;
  timeoutMs: number;
};

type WeaveStoreAzureWebPubsubOnWebsocketErrorEvent = {
  group: string;
  error: ErrorEvent;
  connectionAttempt: number;
};

type WeaveStoreAzureWebPubSubSyncHostClientConnectOptions = {
  expirationTimeInMinutes?: number;
};

// Client types

type WeaveStoreAzureWebPubSubSyncClientConnectionStatusKeys =
  keyof typeof WEAVE_STORE_AZURE_WEB_PUBSUB_CONNECTION_STATUS;
type WeaveStoreAzureWebPubSubSyncClientConnectionStatus =
  (typeof WEAVE_STORE_AZURE_WEB_PUBSUB_CONNECTION_STATUS)[WeaveStoreAzureWebPubSubSyncClientConnectionStatusKeys];

enum MessageType {
  System = "system",
  JoinGroup = "joinGroup",
  SendToGroup = "sendToGroup",
}

enum MessageDataType {
  Init = "init",
  Sync = "sync",
  Awareness = "awareness",
}

interface MessageData {
  payloadId?: string;
  index?: number;
  type?: "heartbeat" | "resync" | "chunk" | "end";
  totalChunks?: number;
  group: string;
  t: string; // type / target uuid
  f: string; // origin uuid
  c: string; // base64 encoded binary data
}

interface Message {
  type: string;
  fromUserId: string;
  from: string;
  group: string;
  data: MessageData;
}

type MessageHandler = (
  encoder: Encoder,
  decoder: Decoder,
  client: WeaveStoreAzureWebPubSubSyncClient,
  clientId: string,
  emitSynced: boolean,
  messageType: number,
) => void;

type WeaveStoreAzureWebPubsubSyncHandlerDestroyRoomStatusKeys =
  keyof typeof WEAVE_STORE_AZURE_WEB_PUBSUB_DESTROY_ROOM_STATUS;
type WeaveStoreAzureWebPubsubSyncHandlerDestroyRoomStatus =
  (typeof WEAVE_STORE_AZURE_WEB_PUBSUB_DESTROY_ROOM_STATUS)[WeaveStoreAzureWebPubsubSyncHandlerDestroyRoomStatusKeys];

type WeaveStoreAzureWebPubSubSyncClientOptions = {
  heartbeat: {
    checkWindowTimeMs: number;
    checkIntervalMs: number;
  };
};

type WeaveStoreAzureWebPubsubSyncHostOptions = {
  heartbeat: {
    sendIntervalMs: number;
  };
  resync: {
    checkIntervalMs: number;
    attemptsLimit: number;
  };
};

Shared-state initial value

If not defined the defaultInitialState is used, which is nothing more than:

A Stage node with 5 children Layer nodes, in the specified order (bottom-to-top):

  • gridLayer: is the layer used by WeaveStageGridPlugin to render the reference grid elements.
  • mainLayer: is the main layer where all the nodes added by the users live.
  • selectionLayer: is the layer used by WeaveNodesSelectionPlugin to render the selection overlay elements.
  • usersPointersLayer: is the layer used by WeaveUsersPointersPlugin to render the users pointers overlay elements.
  • utilityLayer: is a wildcard layer defined that can be used by any plugin.

Check out here the code hat defines the defaultInitialState function.