Skip to content

Commit

Permalink
Revert "Revert "feat: stream response from chat interface""
Browse files Browse the repository at this point in the history
This reverts commit 990e345.
  • Loading branch information
gmpetrov committed Apr 28, 2023
1 parent 990e345 commit f217426
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 39 deletions.
1 change: 1 addition & 0 deletions next.config.base.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const nextConfig = {
webpack: (config, { buildId, dev, isServer, defaultLoaders, webpack }) => {
config.externals.push({
playwright: true,
'pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js': true,
'@huggingface/inference': 'commonjs @huggingface/inference',
replicate: 'commonjs replicate',
'cohere-ai': 'commonjs cohere-ai',
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@headlessui/react": "^1.7.13",
"@heroicons/react": "^2.0.16",
"@hookform/resolvers": "^3.0.0",
"@microsoft/fetch-event-source": "^2.0.1",
"@mui/icons-material": "^5.11.11",
"@mui/joy": "5.0.0-alpha.72",
"@mui/lab": "5.0.0-alpha.124",
Expand Down Expand Up @@ -62,7 +63,7 @@
"ioredis": "^5.3.1",
"js-yaml": "^4.1.0",
"jszip": "^3.10.1",
"langchain": "^0.0.56",
"langchain": "^0.0.64",
"mammoth": "^1.5.1",
"mime-types": "^2.1.35",
"nanoid": "^4.0.2",
Expand Down
122 changes: 105 additions & 17 deletions pages/agents/[agentId]/index.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import {
EventStreamContentType,
fetchEventSource,
} from '@microsoft/fetch-event-source';
import ArrowForwardRoundedIcon from '@mui/icons-material/ArrowForwardRounded';
import AutoGraphRoundedIcon from '@mui/icons-material/AutoGraphRounded';
import ChevronRightRoundedIcon from '@mui/icons-material/ChevronRightRounded';
Expand Down Expand Up @@ -44,7 +48,7 @@ import { getAgent } from '@app/pages/api/agents/[id]';
import { BulkDeleteDatasourcesSchema } from '@app/pages/api/datasources/bulk-delete';
import { RouteNames } from '@app/types';
import agentToolFormat from '@app/utils/agent-tool-format';
import { ApiErrorType } from '@app/utils/api-error';
import { ApiError, ApiErrorType } from '@app/utils/api-error';
import getRootDomain from '@app/utils/get-root-domain';
import { fetcher } from '@app/utils/swr-fetcher';
import { withAuth } from '@app/utils/withAuth';
Expand Down Expand Up @@ -86,6 +90,7 @@ export default function AgentPage() {
}

const history = [...state.history, { from: 'human', message }];
const nextIndex = history.length;

setState({
history: history as any,
Expand All @@ -95,18 +100,101 @@ export default function AgentPage() {
let error = '';

try {
const { data } = await axios.post(
`/api/agents/${getAgentQuery?.data?.id}/query`,
{
const ctrl = new AbortController();
let buffer = '';

class RetriableError extends Error {}
class FatalError extends Error {}

await fetchEventSource(`/api/agents/${getAgentQuery?.data?.id}/query`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
// Accept: 'text/event-stream',
},
body: JSON.stringify({
streaming: true,
query: message,
}
);
}),
signal: ctrl.signal,

async onopen(response) {
if (
response.ok &&
response.headers.get('content-type') === EventStreamContentType
) {
return; // everything's good
} else if (
response.status >= 400 &&
response.status < 500 &&
response.status !== 429
) {
if (response.status === 402) {
throw new ApiError(ApiErrorType.USAGE_LIMIT);
}
// client-side errors are usually non-retriable:
throw new FatalError();
} else {
throw new RetriableError();
}
},
onclose() {
// if the server closes the connection unexpectedly, retry:
throw new RetriableError();
},
onerror(err) {
console.log('on error', err, Object.keys(err));
if (err instanceof FatalError) {
ctrl.abort();
throw err; // rethrow to stop the operation
} else if (err instanceof ApiError) {
console.log('ApiError', ApiError);
throw err;
} else {
// do nothing to automatically retry. You can also
// return a specific retry interval here.
}
},

onmessage: (event) => {
if (event.data === '[DONE]') {
ctrl.abort();
} else if (event.data?.startsWith('[ERROR]')) {
ctrl.abort();

setState({
history: [
...history,
{
from: 'agent',
message: event.data.replace('[ERROR]', ''),
} as any,
],
});
} else {
// const data = JSON.parse(event.data || `{}`);
buffer += event.data as string;
console.log(buffer);

const h = [...history];

if (h?.[nextIndex]) {
h[nextIndex].message = `${buffer}`;
} else {
h.push({ from: 'agent', message: buffer });
}

answer = data?.answer;
setState({
history: h as any,
});
}
},
});
} catch (err) {
if (err instanceof AxiosError) {
if (err.response?.data?.error) {
error = err.response?.data?.error;
console.log('err', err);
if (err instanceof ApiError) {
if (err?.message) {
error = err?.message;

if (error === ApiErrorType.USAGE_LIMIT) {
answer =
Expand All @@ -117,15 +205,15 @@ export default function AgentPage() {
} else {
answer = `Error: ${error}`;
}

setState({
history: [
...history,
{ from: 'agent', message: answer as string },
] as any,
});
}
}

setState({
history: [
...history,
{ from: 'agent', message: answer as string },
] as any,
});
};

const handleChangeTab = (tab: string) => {
Expand Down
27 changes: 21 additions & 6 deletions pages/api/agents/[id]/query.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Usage } from '@prisma/client';
import { ConsoleCallbackHandler } from 'langchain/dist/callbacks';
import { NextApiResponse } from 'next';

import { AppNextApiRequest, ChatRequest } from '@app/types';
Expand Down Expand Up @@ -47,13 +46,25 @@ export const chatAgentRequest = async (
});

if (agent?.ownerId !== session?.user?.id) {
throw new Error('Unauthorized');
throw new ApiError(ApiErrorType.UNAUTHORIZED);
}

const manager = new AgentManager({ agent, topK: 3 });

if (data.streaming) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
});
}

const streamData = (data: string) => {
res.write(`data: ${data}\n\n`);
};

const [answer] = await Promise.all([
manager.query(data.query),
manager.query(data.query, data.streaming ? streamData : undefined),
prisma.usage.update({
where: {
id: agent?.owner?.usage?.id,
Expand All @@ -64,9 +75,13 @@ export const chatAgentRequest = async (
}),
]);

return {
answer,
};
if (data.streaming) {
streamData('[DONE]');
} else {
return {
answer,
};
}
};

handler.post(respond(chatAgentRequest));
Expand Down
32 changes: 23 additions & 9 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions types/dtos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export type UpdateResponseSchema = z.infer<typeof UpdateResponseSchema>;

export const ChatRequest = z.object({
query: z.string(),
streaming: z.boolean().optional().default(false),
});

export type ChatRequest = z.infer<typeof ChatRequest>;
Expand Down
3 changes: 2 additions & 1 deletion utils/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ export default class AgentManager {
this.topK = topK;
}

async query(input: string) {
async query(input: string, stream?: any) {
if (this.agent.tools.length <= 1) {
const { answer } = await chat({
prompt: this.agent.prompt as string,
datastore: this.agent?.tools[0]?.datastore as any,
query: input,
topK: this.topK,
stream,
});

return answer;
Expand Down
Loading

1 comment on commit f217426

@vercel
Copy link

@vercel vercel bot commented on f217426 Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.