NOTE: THIS PAGE IS UNDER CONSTRUCTION FOR OUR NEW STREAMING TECHNIQUE. Please contact [email protected] if you are urgently working to implement streaming.

Tennr workflows with text output support streaming the results back to the client as each token is generated. This is useful for ensuring rapid visual feedback for users, particularly in chat applications.

However, there is a noticeable increase in complexity when handling these streamed results on the client side. This document aims to provide some guidance on how to handle these results with examples from our own frontends.

In order to trigger streaming at the workflow run endpoint, you must set the stream parameter to true in the request body.

Next.js

The following code shows an example of how to call the API in stream mode and how to parse and handle the messages as they are received from the backend.

Notice that we parse two types of data, normal responses and the sources object. The sources object is returned after all normal message content is sent, and its example structure can be seen below.

[
  {
    "embedding": "The text that was used as a source.",
    "metadata": {
      ...various metadata for internal usage...
    },
    "confidence": 0.855
  },
  {
    "embedding": "More text that was used as a source.",
    "metadata": {
      ...various metadata for internal usage...
    },
    "confidence": 0.766
  }
]

We recommend combining this code with state management callbacks (onDataReceived and onSourcesReceived in our example) to update your UI as new content is delivered.

export async function runWorkflow(
  agentId: string,
  input: string | null,
  pastMessages: {
    content: string,
    role: string | null,
  }[],
  apiKey: string,
  onDataReceived: (data: string) => void,
  onSourcesReceived: (sources: any) => void
) {
  try {
    const response = await fetch(
      `https://agent.tennr.com/api/v1/workflow/run`,
      {
        method: "POST",
        headers: {
          "Content-Type": "application/json",
          Authorization: `api-key ${apiKey}`,
        },
        body: JSON.stringify({
          agentId,
          input,
          stream: true,
          pastMessages, // optional
        }),
      }
    );
    if (!response.ok) {
      console.error("Failed to fetch workflow response.", response);
      return;
    }
    if (!response.body) {
      console.error("Failed to fetch workflow response.", response);
      return;
    }

    const reader = response?.body?.getReader();
    let decoder = new TextDecoder("utf-8");
    let timeoutId: string | number | NodeJS.Timeout;
    let messageAccumulator = ""; // Accumulator for incomplete messages
    let sourceString = ""; // String for the sources object

    const read = () => {
      reader
        .read()
        .then(({ done, value }) => {
          if (done) {
            // Handle sources once we know we've received everything
            if (sourceString && sourceString.length > 0) {
              // Parse the text to get the sources object
              try {
                const parsed = JSON.parse(sourceString);
                onSourcesReceived(parsed.sources);
              } catch (error) {
                console.error("Failed to parse sources object.", error);
              }
            }
            return;
          }

          let decodedValue = decoder.decode(value);

          // Append the decoded chunk to the message accumulator
          messageAccumulator += decodedValue;

          // Split the accumulated data into messages based on the double newline separator
          const messages = messageAccumulator.split("\n\n");

          // If the last message is incomplete, keep it in the accumulator
          messageAccumulator = messages.pop() || "";

          for (let message of messages) {
            message = message
              .split("\n")
              .map((line) => line.replace(/^data: /, ""))
              .join("\n");
            // now replace the first one with just "data"
            // Now we have to remove all whitespace
            if (message === "[DONE]") {
              continue;
            }

            // Always replace any sequences of multiple <br/> with just one of them
            const regex = /(<br\s*\/?\s*>[\s\r\n]*){2,}/g;
            message = message.replace(regex, "<br/> <br/>");

            // If we're already working on sources just keep accumulating that object
            if (sourceString) {
              sourceString += message;
              continue;
            } else {
              // Look for the beginning of the sources JSON in the received string
              const sourceStartIndex = message.indexOf(`{"sources":`);
              if (sourceStartIndex !== -1) {
                sourceString = message.substring(sourceStartIndex);
                message = message.substring(0, sourceStartIndex);
              }
            }

            // Send any normal messages back to your state management
            if (message && message.length > 0) {
              onDataReceived(message);
            }
          }
          timeoutId = setTimeout(read, 20);
        })
        .catch((error) => {
          console.error(error);
        });
    };

    read();
  } catch (e) {
    console.error(e);
  }
}