The world is going realtime, and so should your tasks. With the Streams API, you can stream data from your tasks to the outside world in realtime. This is useful for a variety of use cases, including AI.

How it works

The Streams API is a simple API that allows you to send data from your tasks to the outside world in realtime using the metadata system. You can send any kind of data that is streamed in realtime, but the most common use case is to send streaming output from streaming LLM providers, like OpenAI.

Usage

To use the Streams API, you need to register a stream with a specific key using metadata.stream. The following example uses the OpenAI SDK with stream: true to stream the output of the LLM model in realtime:

import { task, metadata } from "@trigger.dev/sdk/v3";
import OpenAI from "openai";

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

export type STREAMS = {
  openai: OpenAI.ChatCompletionChunk; // The type of the chunk is determined by the provider
};

export const myTask = task({
  id: "my-task",
  run: async (payload: { prompt: string }) => {
    const completion = await openai.chat.completions.create({
      messages: [{ role: "user", content: payload.prompt }],
      model: "gpt-3.5-turbo",
      stream: true,
    });

    // Register the stream with the key "openai"
    // This will "tee" the stream and send it to the metadata system
    const stream = await metadata.stream("openai", completion);

    let text = "";

    // You can read the returned stream as an async iterator
    for await (const chunk of stream) {
      logger.log("Received chunk", { chunk });

      // The type of the chunk is determined by the provider
      text += chunk.choices.map((choice) => choice.delta?.content).join("");
    }

    return { text };
  },
});

You can then subscribe to the stream using the runs.subscribeToRun method:

runs.subscribeToRun should be used from your backend or another task. To subscribe to a run from your frontend, you can use our React hooks.

import { runs } from "@trigger.dev/sdk/v3";
import type { myTask, STREAMS } from "./trigger/my-task";

// Somewhere in your backend
async function subscribeToStream(runId: string) {
  // Use a for-await loop to subscribe to the stream
  for await (const part of runs.subscribeToRun<typeof myTask>(runId).withStreams<STREAMS>()) {
    switch (part.type) {
      case "run": {
        console.log("Received run", part.run);
        break;
      }
      case "openai": {
        // part.chunk is of type OpenAI.ChatCompletionChunk
        console.log("Received OpenAI chunk", part.chunk);
        break;
      }
    }
  }
}

You can register and subscribe to multiple streams in the same task. Let’s add a stream from the response body of a fetch request:

import { task, metadata } from "@trigger.dev/sdk/v3";
import OpenAI from "openai";

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

export type STREAMS = {
  openai: OpenAI.ChatCompletionChunk; // The type of the chunk is determined by the provider
  fetch: string; // The response body will be an array of strings
};

export const myTask = task({
  id: "my-task",
  run: async (payload: { prompt: string }) => {
    const completion = await openai.chat.completions.create({
      messages: [{ role: "user", content: payload.prompt }],
      model: "gpt-3.5-turbo",
      stream: true,
    });

    // Register the stream with the key "openai"
    await metadata.stream("openai", completion);

    const response = await fetch("https://jsonplaceholder.typicode.com/posts");

    if (!response.body) {
      return;
    }

    // Register the stream with the key "fetch"
    // Pipe the response.body through a TextDecoderStream to convert it to a string
    await metadata.stream("fetch", response.body.pipeThrough(new TextDecoderStream()));
  },
});

You may notice above that we aren’t consuming either of the streams in the task. In the background, we’ll wait until all streams are consumed before the task is considered complete (with a max timeout of 60 seconds). If you have a longer running stream, make sure to consume it in the task.

And then subscribing to the streams:

import { runs } from "@trigger.dev/sdk/v3";
import type { myTask, STREAMS } from "./trigger/my-task";

// Somewhere in your backend
async function subscribeToStream(runId: string) {
  // Use a for-await loop to subscribe to the stream
  for await (const part of runs.subscribeToRun<typeof myTask>(runId).withStreams<STREAMS>()) {
    switch (part.type) {
      case "run": {
        console.log("Received run", part.run);
        break;
      }
      case "openai": {
        // part.chunk is of type OpenAI.ChatCompletionChunk
        console.log("Received OpenAI chunk", part.chunk);
        break;
      }
      case "fetch": {
        // part.chunk is a string
        console.log("Received fetch chunk", part.chunk);
        break;
      }
    }
  }
}

React hooks

If you’re building a frontend application, you can use our React hooks to subscribe to streams. Here’s an example of how you can use the useRealtimeRunWithStreams hook to subscribe to a stream:

import { useRealtimeRunWithStreams } from "@trigger.dev/sdk/v3";
import type { myTask, STREAMS } from "./trigger/my-task";

// Somewhere in your React component
function MyComponent({ runId, publicAccessToken }: { runId: string; publicAccessToken: string }) {
  const { run, streams } = useRealtimeRunWithStreams<typeof myTask, STREAMS>(runId, {
    accessToken: publicAccessToken,
  });

  if (!run) {
    return <div>Loading...</div>;
  }

  return (
    <div>
      <h1>Run ID: {run.id}</h1>
      <h2>Streams:</h2>
      <ul>
        {Object.entries(streams).map(([key, value]) => (
          <li key={key}>
            <strong>{key}</strong>: {JSON.stringify(value)}
          </li>
        ))}
      </ul>
    </div>
  );
}

Read more about using the React hooks in the React hooks documentation.

Usage with the ai SDK

The ai SDK provides a higher-level API for working with AI models. You can use the ai SDK with the Streams API by using the streamText method:

import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk/v3";
import { streamText } from "ai";
import { z } from "zod";

export type STREAMS = {
  openai: string;
};

export const aiStreaming = schemaTask({
  id: "ai-streaming",
  description: "Stream data from the AI sdk",
  schema: z.object({
    model: z.string().default("o1-preview"),
    prompt: z.string().default("Hello, how are you?"),
  }),
  run: async ({ model, prompt }) => {
    logger.info("Running OpenAI model", { model, prompt });

    const result = streamText({
      model: openai(model),
      prompt,
    });

    // pass the textStream to the metadata system
    const stream = await metadata.stream("openai", result.textStream);

    let text = "";

    for await (const chunk of stream) {
      logger.log("Received chunk", { chunk });

      text += chunk; // chunk is a string
    }

    return { text };
  },
});

And then render the stream in your frontend:

import { useRealtimeRunWithStreams } from "@trigger.dev/sdk/v3";
import type { aiStreaming, STREAMS } from "./trigger/ai-streaming";

function MyComponent({ runId, publicAccessToken }: { runId: string; publicAccessToken: string }) {
  const { streams } = useRealtimeRunWithStreams<typeof aiStreaming, STREAMS>(runId, {
    accessToken: publicAccessToken,
  });

  if (!streams.openai) {
    return <div>Loading...</div>;
  }

  const text = streams.openai.join(""); // `streams.openai` is an array of strings

  return (
    <div>
      <h2>OpenAI response:</h2>
      <p>{text}</p>
    </div>
  );
}

Using tools and fullStream

When calling streamText, you can provide a tools object that allows the LLM to use additional tools. You can then access the tool call and results using the fullStream method:

import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk/v3";
import { streamText, tool, type TextStreamPart } from "ai";
import { z } from "zod";

const tools = {
  getWeather: tool({
    description: "Get the weather in a location",
    parameters: z.object({
      location: z.string().describe("The location to get the weather for"),
    }),
    execute: async ({ location }) => ({
      location,
      temperature: 72 + Math.floor(Math.random() * 21) - 10,
    }),
  }),
};

export type STREAMS = {
  // Give the stream a type of TextStreamPart along with the tools
  openai: TextStreamPart<{ getWeather: typeof tools.getWeather }>;
};

export const aiStreamingWithTools = schemaTask({
  id: "ai-streaming-with-tools",
  description: "Stream data from the AI SDK and use tools",
  schema: z.object({
    model: z.string().default("gpt-4o-mini"),
    prompt: z
      .string()
      .default(
        "Based on the temperature, will I need to wear extra clothes today in San Fransico? Please be detailed."
      ),
  }),
  run: async ({ model, prompt }) => {
    logger.info("Running OpenAI model", { model, prompt });

    const result = streamText({
      model: openai(model),
      prompt,
      tools, // Pass in the tools to use
      maxSteps: 5, // Allow streamText to repeatedly call the model
    });

    // pass the fullStream to the metadata system
    const stream = await metadata.stream("openai", result.fullStream);

    let text = "";

    for await (const chunk of stream) {
      logger.log("Received chunk", { chunk });

      // chunk is a TextStreamPart
      if (chunk.type === "text-delta") {
        text += chunk.textDelta;
      }
    }

    return { text };
  },
});

Now you can get access to the tool call and results in your frontend:

import { useRealtimeRunWithStreams } from "@trigger.dev/sdk/v3";
import { useRealtimeRunWithStreams } from "@trigger.dev/sdk/v3";
import type { aiStreamingWithTools, STREAMS } from "./trigger/ai-streaming";

function MyComponent({ runId, publicAccessToken }: { runId: string; publicAccessToken: string }) {
  const { streams } = useRealtimeRunWithStreams<typeof aiStreamingWithTools, STREAMS>(runId, {
    accessToken: publicAccessToken,
  });

  if (!streams.openai) {
    return <div>Loading...</div>;
  }

  // streams.openai is an array of TextStreamPart
  const toolCall = streams.openai.find(
    (stream) => stream.type === "tool-call" && stream.toolName === "getWeather"
  );
  const toolResult = streams.openai.find((stream) => stream.type === "tool-result");
  const textDeltas = streams.openai.filter((stream) => stream.type === "text-delta");

  const text = textDeltas.map((delta) => delta.textDelta).join("");
  const weatherLocation = toolCall ? toolCall.args.location : undefined;
  const weather = toolResult ? toolResult.result.temperature : undefined;

  return (
    <div>
      <h2>OpenAI response:</h2>
      <p>{text}</p>
      <h2>Weather:</h2>
      <p>
        {weatherLocation
          ? `The weather in ${weatherLocation} is ${weather} degrees.`
          : "No weather data"}
      </p>
    </div>
  );
}

Using toolTask

As you can see above, we defined a tool which will be used in the aiStreamingWithTools task. You can also define a Trigger.dev task that can be used as a tool, and will automatically be invoked with triggerAndWait when the tool is called. This is done using the toolTask function:

import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask, toolTask } from "@trigger.dev/sdk/v3";
import { streamText, tool, type TextStreamPart } from "ai";
import { z } from "zod";

export const getWeather = toolTask({
  id: "get-weather",
  description: "Get the weather for a location",
  // Define the parameters for the tool, which becomes the task payload
  parameters: z.object({
    location: z.string(),
  }),
  run: async ({ location }) => {
    // return mock data
    return {
      location,
      temperature: 72 + Math.floor(Math.random() * 21) - 10,
    };
  },
});

export type STREAMS = {
  // Give the stream a type of TextStreamPart along with the tools
  openai: TextStreamPart<{ getWeather: typeof getWeather.tool }>;
};

export const aiStreamingWithTools = schemaTask({
  id: "ai-streaming-with-tools",
  description: "Stream data from the AI SDK and use tools",
  schema: z.object({
    model: z.string().default("gpt-4o-mini"),
    prompt: z
      .string()
      .default(
        "Based on the temperature, will I need to wear extra clothes today in San Fransico? Please be detailed."
      ),
  }),
  run: async ({ model, prompt }) => {
    logger.info("Running OpenAI model", { model, prompt });

    const result = streamText({
      model: openai(model),
      prompt,
      tools: {
        getWeather: getWeather.tool, // pass weatherTask.tool as a tool
      },
      maxSteps: 5, // Allow streamText to repeatedly call the model
    });

    // pass the fullStream to the metadata system
    const stream = await metadata.stream("openai", result.fullStream);

    let text = "";

    for await (const chunk of stream) {
      logger.log("Received chunk", { chunk });

      // chunk is a TextStreamPart
      if (chunk.type === "text-delta") {
        text += chunk.textDelta;
      }
    }

    return { text };
  },
});