Subscribing
Server-side subscription covers two operations: minting tokens for client use and consuming streams on the server. The v4 SDK exposes both standalone helpers and client aliases:
getSubscriptionToken(app, ...)orinngest.realtime.token(...)subscribe({ app, ... })orinngest.realtime.subscribe(...)
getSubscriptionToken(app, options)
Mints a scoped subscription token on the server. The token authorizes a client to subscribe to specific topics on a specific channel.
- Name
app- Type
- Inngest
- Required
- required
- Description
Your Inngest client instance.
- Name
options.channel- Type
- ChannelInstance | string
- Required
- required
- Description
The channel to authorize. Can be a channel instance or a plain string.
- Name
options.topics- Type
- string[]
- Required
- required
- Description
The topics the token grants access to. The client can only subscribe to these topics.
Returns Promise<Token> with key (the JWT string), channel, and topics.
import { getSubscriptionToken } from "inngest/react";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";
const ch = pipelineChannel({ runId: "abc123" });
const token = await getSubscriptionToken(inngest, {
channel: ch,
topics: ["status", "tokens"],
});
// token.key - JWT string for the client
// token.channel - the channel instance/name
// token.topics - ["status", "tokens"]
Always mint tokens on the server. Never expose your Inngest signing key to the client. The token is scoped to the specified channel and topics, so a client cannot use it to access other channels.
If you already have an Inngest client instance in scope, inngest.realtime.token({ channel, topics }) is equivalent to getSubscriptionToken(inngest, { channel, topics }).
Framework examples
// app/actions.ts
"use server";
import { getSubscriptionToken } from "inngest/react";
import { inngest } from "../inngest/client";
import { pipelineChannel } from "../inngest/channels";
export async function getRealtimeToken(runId: string) {
const ch = pipelineChannel({ runId });
const token = await getSubscriptionToken(inngest, {
channel: ch,
topics: ["status", "tokens"],
});
return token.key;
}
subscribe(options)
Creates a server-side subscription to a realtime channel. Without onMessage, it returns a ReadableStream-based subscription object. With onMessage, it returns a callback subscription handle.
Stream subscription
Without onMessage, subscribe returns a stream subscription.
- Name
app- Type
- Inngest
- Required
- optional
- Description
Your Inngest client instance. Used to resolve connection details.
- Name
channel- Type
- ChannelInstance | string
- Required
- required
- Description
The channel to subscribe to.
- Name
topics- Type
- string[]
- Required
- required
- Description
The topics to subscribe to.
- Name
key- Type
- string
- Required
- optional
- Description
A pre-minted JWT token key. If not provided,
appis used to mint a token automatically.
- Name
validate- Type
- boolean
- Required
- optional
- Description
Enable schema validation on incoming messages. Defaults to
true.
import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";
const ch = pipelineChannel({ runId: "abc123" });
const stream = await subscribe({
app: inngest,
channel: ch,
topics: ["status", "tokens"],
});
// ReadableStream - use getReader()
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value.topic, value.data);
}
The same API is available as inngest.realtime.subscribe({ channel, topics }), which omits the app option because the client is already known.
Stream methods
The returned stream has additional helper methods:
- Name
getJsonStream()- Type
- ReadableStream<Message>
- Required
- optional
- Description
Returns a new
ReadableStreamthat emits parsed JSON messages. Each call creates a fresh reader view of future messages only.
- Name
getEncodedStream()- Type
- ReadableStream<Uint8Array>
- Required
- optional
- Description
Returns a new
ReadableStreamwith SSE-formattedUint8Arraychunks (data: {...}\n\n). Useful for piping the subscription through a streaming HTTP response.
- Name
close(reason?)- Type
- (reason?: string) => void
- Required
- optional
- Description
Closes the underlying WebSocket connection.
- Name
unsubscribe(reason?)- Type
- (reason?: string) => void
- Required
- optional
- Description
Alias for
close().
// Stream JSON to another consumer
const jsonStream = stream.getJsonStream();
const response = new Response(jsonStream);
// Stream as SSE
const sseStream = stream.getEncodedStream();
return new Response(sseStream, {
headers: { "Content-Type": "text/event-stream" },
});
// Clean up
stream.close();
Callback subscription
Pass onMessage to use an event-driven pattern instead of streams.
- Name
onMessage- Type
- (message: Message) => void
- Required
- required
- Description
Called for each incoming message.
- Name
onError- Type
- (error: unknown) => void
- Required
- optional
- Description
Called when a connection error occurs.
Returns Promise<{ close, unsubscribe }>.
import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";
const ch = pipelineChannel({ runId: "abc123" });
const sub = await subscribe({
app: inngest,
channel: ch,
topics: ["status"],
onMessage: (message) => {
console.log(`[${message.topic}]`, message.data);
},
onError: (err) => {
console.error("Subscription error:", err);
},
});
// Clean up when done
sub.close();
Message shape
Each message received from a subscription includes:
- Name
topic- Type
- string
- Required
- optional
- Description
The topic name this message was published to.
- Name
channel- Type
- string
- Required
- optional
- Description
The resolved channel name.
- Name
data- Type
- TData
- Required
- optional
- Description
The message payload, typed according to the topic's schema.
- Name
kind- Type
- "data" | "run" | "datastream-start" | "datastream-end" | "chunk"
- Required
- optional
- Description
The message kind. Most topic publishes are
"data". Run lifecycle updates arrive as"run".
- Name
runId- Type
- string | undefined
- Required
- optional
- Description
The Inngest function run ID, if the message was published from within a function.
- Name
fnId- Type
- string | undefined
- Required
- optional
- Description
The Inngest function ID.
- Name
createdAt- Type
- Date
- Required
- optional
- Description
When the message was created.
Run messages are platform lifecycle events. Their data is intentionally broad, while topic messages remain typed from your channel schema.
Server-side stream example
A complete example using subscribe to monitor a long-running function and react to its output:
import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { workflowChannel } from "./channels";
async function monitorWorkflow(runId: string) {
const ch = workflowChannel({ runId });
const stream = await subscribe({
app: inngest,
channel: ch,
topics: ["status", "result"],
});
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value.topic === "status") {
console.log("Status:", value.data.message);
}
if (value.topic === "result") {
console.log("Result:", value.data);
stream.close();
}
}
}