Subscribing

Server-side subscription covers two operations: minting tokens for client use and consuming streams on the server. The v4 SDK exposes both standalone helpers and client aliases:

  • getSubscriptionToken(app, ...) or inngest.realtime.token(...)
  • subscribe({ app, ... }) or inngest.realtime.subscribe(...)

getSubscriptionToken(app, options)

Mints a scoped subscription token on the server. The token authorizes a client to subscribe to specific topics on a specific channel.

  • Name
    app
    Type
    Inngest
    Required
    required
    Description

    Your Inngest client instance.

  • Name
    options.channel
    Type
    ChannelInstance | string
    Required
    required
    Description

    The channel to authorize. Can be a channel instance or a plain string.

  • Name
    options.topics
    Type
    string[]
    Required
    required
    Description

    The topics the token grants access to. The client can only subscribe to these topics.

Returns Promise<Token> with key (the JWT string), channel, and topics.

import { getSubscriptionToken } from "inngest/react";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const token = await getSubscriptionToken(inngest, {
  channel: ch,
  topics: ["status", "tokens"],
});

// token.key     - JWT string for the client
// token.channel  - the channel instance/name
// token.topics   - ["status", "tokens"]

Always mint tokens on the server. Never expose your Inngest signing key to the client. The token is scoped to the specified channel and topics, so a client cannot use it to access other channels.

If you already have an Inngest client instance in scope, inngest.realtime.token({ channel, topics }) is equivalent to getSubscriptionToken(inngest, { channel, topics }).

Framework examples

// app/actions.ts
"use server";
import { getSubscriptionToken } from "inngest/react";
import { inngest } from "../inngest/client";
import { pipelineChannel } from "../inngest/channels";

export async function getRealtimeToken(runId: string) {
  const ch = pipelineChannel({ runId });
  const token = await getSubscriptionToken(inngest, {
    channel: ch,
    topics: ["status", "tokens"],
  });
  return token.key;
}

subscribe(options)

Creates a server-side subscription to a realtime channel. Without onMessage, it returns a ReadableStream-based subscription object. With onMessage, it returns a callback subscription handle.

Stream subscription

Without onMessage, subscribe returns a stream subscription.

  • Name
    app
    Type
    Inngest
    Required
    optional
    Description

    Your Inngest client instance. Used to resolve connection details.

  • Name
    channel
    Type
    ChannelInstance | string
    Required
    required
    Description

    The channel to subscribe to.

  • Name
    topics
    Type
    string[]
    Required
    required
    Description

    The topics to subscribe to.

  • Name
    key
    Type
    string
    Required
    optional
    Description

    A pre-minted JWT token key. If not provided, app is used to mint a token automatically.

  • Name
    validate
    Type
    boolean
    Required
    optional
    Description

    Enable schema validation on incoming messages. Defaults to true.

import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const stream = await subscribe({
  app: inngest,
  channel: ch,
  topics: ["status", "tokens"],
});

// ReadableStream - use getReader()
const reader = stream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  console.log(value.topic, value.data);
}

The same API is available as inngest.realtime.subscribe({ channel, topics }), which omits the app option because the client is already known.

Stream methods

The returned stream has additional helper methods:

  • Name
    getJsonStream()
    Type
    ReadableStream<Message>
    Required
    optional
    Description

    Returns a new ReadableStream that emits parsed JSON messages. Each call creates a fresh reader view of future messages only.

  • Name
    getEncodedStream()
    Type
    ReadableStream<Uint8Array>
    Required
    optional
    Description

    Returns a new ReadableStream with SSE-formatted Uint8Array chunks (data: {...}\n\n). Useful for piping the subscription through a streaming HTTP response.

  • Name
    close(reason?)
    Type
    (reason?: string) => void
    Required
    optional
    Description

    Closes the underlying WebSocket connection.

  • Name
    unsubscribe(reason?)
    Type
    (reason?: string) => void
    Required
    optional
    Description

    Alias for close().

// Stream JSON to another consumer
const jsonStream = stream.getJsonStream();
const response = new Response(jsonStream);

// Stream as SSE
const sseStream = stream.getEncodedStream();
return new Response(sseStream, {
  headers: { "Content-Type": "text/event-stream" },
});

// Clean up
stream.close();

Callback subscription

Pass onMessage to use an event-driven pattern instead of streams.

  • Name
    onMessage
    Type
    (message: Message) => void
    Required
    required
    Description

    Called for each incoming message.

  • Name
    onError
    Type
    (error: unknown) => void
    Required
    optional
    Description

    Called when a connection error occurs.

Returns Promise<{ close, unsubscribe }>.

import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const sub = await subscribe({
  app: inngest,
  channel: ch,
  topics: ["status"],
  onMessage: (message) => {
    console.log(`[${message.topic}]`, message.data);
  },
  onError: (err) => {
    console.error("Subscription error:", err);
  },
});

// Clean up when done
sub.close();

Message shape

Each message received from a subscription includes:

  • Name
    topic
    Type
    string
    Required
    optional
    Description

    The topic name this message was published to.

  • Name
    channel
    Type
    string
    Required
    optional
    Description

    The resolved channel name.

  • Name
    data
    Type
    TData
    Required
    optional
    Description

    The message payload, typed according to the topic's schema.

  • Name
    kind
    Type
    "data" | "run" | "datastream-start" | "datastream-end" | "chunk"
    Required
    optional
    Description

    The message kind. Most topic publishes are "data". Run lifecycle updates arrive as "run".

  • Name
    runId
    Type
    string | undefined
    Required
    optional
    Description

    The Inngest function run ID, if the message was published from within a function.

  • Name
    fnId
    Type
    string | undefined
    Required
    optional
    Description

    The Inngest function ID.

  • Name
    createdAt
    Type
    Date
    Required
    optional
    Description

    When the message was created.

Run messages are platform lifecycle events. Their data is intentionally broad, while topic messages remain typed from your channel schema.

Server-side stream example

A complete example using subscribe to monitor a long-running function and react to its output:

import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { workflowChannel } from "./channels";

async function monitorWorkflow(runId: string) {
  const ch = workflowChannel({ runId });

  const stream = await subscribe({
    app: inngest,
    channel: ch,
    topics: ["status", "result"],
  });

  const reader = stream.getReader();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    if (value.topic === "status") {
      console.log("Status:", value.data.message);
    }

    if (value.topic === "result") {
      console.log("Result:", value.data);
      stream.close();
    }
  }
}