Streaming
How to get real-time feedback from workflows
NOTE: THIS PAGE IS UNDER CONSTRUCTION FOR OUR NEW STREAMING TECHNIQUE. Please contact [email protected] if you are urgently working to implement streaming.
Tennr workflows with text output support streaming the results back to the client as each token is generated. This is useful for ensuring rapid visual feedback for users, particularly in chat applications.
However, there is a noticeable increase in complexity when handling these streamed results on the client side. This document aims to provide some guidance on how to handle these results with examples from our own frontends.
In order to trigger streaming at the workflow run endpoint, you must set the stream
parameter to true
in the request body.
Next.js
The following code shows an example of how to call the API in stream mode and how to parse and handle the messages as they are received from the backend.
Notice that we parse two types of data, normal responses and the sources object. The sources object is returned after all normal message content is sent, and its example structure can be seen below.
[
{
"embedding": "The text that was used as a source.",
"metadata": {
...various metadata for internal usage...
},
"confidence": 0.855
},
{
"embedding": "More text that was used as a source.",
"metadata": {
...various metadata for internal usage...
},
"confidence": 0.766
}
]
We recommend combining this code with state management callbacks (onDataReceived and onSourcesReceived in our example) to update your UI as new content is delivered.
export async function runWorkflow(
agentId: string,
input: string | null,
pastMessages: {
content: string,
role: string | null,
}[],
apiKey: string,
onDataReceived: (data: string) => void,
onSourcesReceived: (sources: any) => void
) {
try {
const response = await fetch(
`https://agent.tennr.com/api/v1/workflow/run`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `api-key ${apiKey}`,
},
body: JSON.stringify({
agentId,
input,
stream: true,
pastMessages, // optional
}),
}
);
if (!response.ok) {
console.error("Failed to fetch workflow response.", response);
return;
}
if (!response.body) {
console.error("Failed to fetch workflow response.", response);
return;
}
const reader = response?.body?.getReader();
let decoder = new TextDecoder("utf-8");
let timeoutId: string | number | NodeJS.Timeout;
let messageAccumulator = ""; // Accumulator for incomplete messages
let sourceString = ""; // String for the sources object
const read = () => {
reader
.read()
.then(({ done, value }) => {
if (done) {
// Handle sources once we know we've received everything
if (sourceString && sourceString.length > 0) {
// Parse the text to get the sources object
try {
const parsed = JSON.parse(sourceString);
onSourcesReceived(parsed.sources);
} catch (error) {
console.error("Failed to parse sources object.", error);
}
}
return;
}
let decodedValue = decoder.decode(value);
// Append the decoded chunk to the message accumulator
messageAccumulator += decodedValue;
// Split the accumulated data into messages based on the double newline separator
const messages = messageAccumulator.split("\n\n");
// If the last message is incomplete, keep it in the accumulator
messageAccumulator = messages.pop() || "";
for (let message of messages) {
message = message
.split("\n")
.map((line) => line.replace(/^data: /, ""))
.join("\n");
// now replace the first one with just "data"
// Now we have to remove all whitespace
if (message === "[DONE]") {
continue;
}
// Always replace any sequences of multiple <br/> with just one of them
const regex = /(<br\s*\/?\s*>[\s\r\n]*){2,}/g;
message = message.replace(regex, "<br/> <br/>");
// If we're already working on sources just keep accumulating that object
if (sourceString) {
sourceString += message;
continue;
} else {
// Look for the beginning of the sources JSON in the received string
const sourceStartIndex = message.indexOf(`{"sources":`);
if (sourceStartIndex !== -1) {
sourceString = message.substring(sourceStartIndex);
message = message.substring(0, sourceStartIndex);
}
}
// Send any normal messages back to your state management
if (message && message.length > 0) {
onDataReceived(message);
}
}
timeoutId = setTimeout(read, 20);
})
.catch((error) => {
console.error(error);
});
};
read();
} catch (e) {
console.error(e);
}
}