diff --git a/.env b/.env index def8707..1a8f4e1 100644 --- a/.env +++ b/.env @@ -7,10 +7,11 @@ NEXTAUTH_URL="https://demo.waternetwork.cn/" # 为前端暴露的变量添加 NEXT_PUBLIC_ 前缀 NEXT_PUBLIC_BACKEND_URL="https://server.waternetwork.cn" NEXT_PUBLIC_COPILOT_URL="https://agent.waternetwork.cn" +NEXT_PUBLIC_AUDIO_SERVICE_URL="http://127.0.0.1:18083" NEXT_PUBLIC_MAP_URL="https://geoserver.waternetwork.cn/geoserver" NEXT_PUBLIC_MAP_WORKSPACE="tjwater" NEXT_PUBLIC_MAP_EXTENT="13490131, 3630016, 13525879, 3666968.25" # NEXT_PUBLIC_MAP_AVAILABLE_LAYERS="junctions, pipes, reservoirs, scada" NEXT_PUBLIC_NETWORK_NAME="tjwater" NEXT_PUBLIC_MAPBOX_TOKEN="pk.eyJ1IjoiemhpZnUiLCJhIjoiY205azNyNGY1MGkyZDJxcTJleDUwaHV1ZCJ9.wOmSdOnDDdre-mB1Lpy6Fg" -NEXT_PUBLIC_TIANDITU_TOKEN="e3e8ad95ee911741fa71ed7bff2717ec" \ No newline at end of file +NEXT_PUBLIC_TIANDITU_TOKEN="e3e8ad95ee911741fa71ed7bff2717ec" diff --git a/src/components/chat/GlobalChatbox.voice.ts b/src/components/chat/GlobalChatbox.voice.ts index cce50d9..2524c2b 100644 --- a/src/components/chat/GlobalChatbox.voice.ts +++ b/src/components/chat/GlobalChatbox.voice.ts @@ -1,6 +1,31 @@ import { useCallback, useEffect, useRef, useState } from "react"; +import config from "@/config/config"; import type { SpeechState } from "./GlobalChatbox.types"; +type AudioStreamStartResponse = { + stream_id?: string; + audio_url?: string; + status_url?: string; + result_url?: string; + sample_rate?: number; + channels?: number; + error?: string; +}; + +type AudioStreamStatusResponse = { + state?: "starting" | "running" | "done" | "failed" | "closed"; + ready?: boolean; + failed?: boolean; + closed?: boolean; + status_text?: string; + error?: string; +}; + +type AudioStreamResultResponse = { + run_status?: string; + error?: string; +}; + // WebKit Speech Recognition compatibility interface SpeechRecognitionEvent extends Event { readonly resultIndex: number; @@ -29,70 +54,534 @@ declare global { new (): SpeechRecognition; prototype: SpeechRecognition; }; + webkitAudioContext?: typeof AudioContext; } } export function useSpeechSynthesis() { const [speechState, setSpeechState] = useState("idle"); const [speakingMessageId, setSpeakingMessageId] = useState(null); - const utteranceRef = useRef(null); + const audioContextRef = useRef(null); + const streamAbortControllerRef = useRef(null); + const activeSourceNodesRef = useRef>(new Set()); + const streamIdRef = useRef(null); + const closeUrlRef = useRef(null); + const statusUrlRef = useRef(null); + const resultUrlRef = useRef(null); + const statusPollTimeoutRef = useRef(null); + const playbackTokenRef = useRef(0); - const isSupported = typeof window !== "undefined" && "speechSynthesis" in window; + const isSupported = + typeof window !== "undefined" && + typeof window.FormData !== "undefined" && + (typeof window.AudioContext !== "undefined" || + typeof window.webkitAudioContext !== "undefined"); - const stop = useCallback(() => { - if (!isSupported) return; - window.speechSynthesis.cancel(); - utteranceRef.current = null; + const trimTrailingSlash = useCallback((value: string) => value.replace(/\/+$/, ""), []); + + const buildServiceUrl = useCallback( + (path: string) => `${trimTrailingSlash(config.AUDIO_SERVICE_URL)}${path.startsWith("/") ? path : `/${path}`}`, + [trimTrailingSlash], + ); + + const resolveServiceUrl = useCallback( + (pathOrUrl: string) => { + if (/^https?:\/\//i.test(pathOrUrl)) { + return pathOrUrl; + } + return buildServiceUrl(pathOrUrl); + }, + [buildServiceUrl], + ); + + const withQueryParams = useCallback( + (urlString: string, params: Record) => { + const url = new URL(urlString); + Object.entries(params).forEach(([key, value]) => { + url.searchParams.set(key, value); + }); + return url.toString(); + }, + [], + ); + + const readErrorMessage = useCallback(async (response: Response, fallback: string) => { + try { + const payload = (await response.json()) as { error?: string; message?: string }; + return payload.error || payload.message || fallback; + } catch { + return fallback; + } + }, []); + + const closeStream = useCallback(async (closeUrl: string) => { + const response = await fetch(closeUrl, { + method: "POST", + }); + + if (!response.ok) { + console.error("[GlobalChatbox] Failed to close audio stream:", closeUrl); + } + }, []); + + const stopStatusPolling = useCallback(() => { + if (statusPollTimeoutRef.current !== null) { + window.clearTimeout(statusPollTimeoutRef.current); + statusPollTimeoutRef.current = null; + } + }, []); + + const fetchStreamResult = useCallback( + async (resultUrl: string) => { + const response = await fetch(resultUrl); + if (response.status === 202) { + return false; + } + if (!response.ok) { + throw new Error( + await readErrorMessage( + response, + `Audio stream result failed with status ${response.status}`, + ), + ); + } + + const payload = (await response.json()) as AudioStreamResultResponse; + if (payload.error) { + throw new Error(payload.error); + } + + return true; + }, + [readErrorMessage], + ); + + const clearAudio = useCallback(async () => { + const abortController = streamAbortControllerRef.current; + streamAbortControllerRef.current = null; + abortController?.abort(); + + activeSourceNodesRef.current.forEach((source) => { + try { + source.onended = null; + source.stop(); + } catch { + // ignore stop errors when source already ended + } + source.disconnect(); + }); + activeSourceNodesRef.current.clear(); + + const audioContext = audioContextRef.current; + audioContextRef.current = null; + if (!audioContext) return; + + try { + await audioContext.close(); + } catch { + // ignore close errors when context already closed + } + }, []); + + const playPcmStream = useCallback( + async ({ + audioUrl, + sampleRate, + channels, + playbackToken, + }: { + audioUrl: string; + sampleRate: number; + channels: number; + playbackToken: number; + }) => { + const AudioContextCtor = window.AudioContext ?? window.webkitAudioContext; + if (!AudioContextCtor) { + throw new Error("WebAudio AudioContext is not available in this browser"); + } + + const abortController = new AbortController(); + streamAbortControllerRef.current = abortController; + + const response = await fetch(withQueryParams(audioUrl, { format: "pcm" }), { + signal: abortController.signal, + }); + if (!response.ok) { + throw new Error( + await readErrorMessage(response, `Audio stream failed with status ${response.status}`), + ); + } + if (!response.body) { + throw new Error("Audio stream response body is missing"); + } + + const audioContext = new AudioContextCtor({ + sampleRate, + }); + audioContextRef.current = audioContext; + + const reader = response.body.getReader(); + const bytesPerFrame = Math.max(1, channels) * 2; + let bufferedRemainder = new Uint8Array(0); + let nextStartTime = audioContext.currentTime + 0.05; + let activeSources = 0; + let streamEnded = false; + let resolvePlaybackDrain: (() => void) | null = null; + const playbackDrainPromise = new Promise((resolve) => { + resolvePlaybackDrain = resolve; + }); + + const maybeResolvePlaybackDrain = () => { + if (streamEnded && activeSources === 0) { + resolvePlaybackDrain?.(); + } + }; + + const schedulePcmChunk = (pcmBytes: Uint8Array) => { + const frameCount = pcmBytes.byteLength / bytesPerFrame; + if (frameCount <= 0) return; + + const buffer = audioContext.createBuffer(Math.max(1, channels), frameCount, sampleRate); + const view = new DataView(pcmBytes.buffer, pcmBytes.byteOffset, pcmBytes.byteLength); + for (let frame = 0; frame < frameCount; frame += 1) { + for (let channel = 0; channel < Math.max(1, channels); channel += 1) { + const sampleIndex = frame * Math.max(1, channels) + channel; + const pcm = view.getInt16(sampleIndex * 2, true); + buffer.getChannelData(channel)[frame] = pcm / 32768; + } + } + + const source = audioContext.createBufferSource(); + source.buffer = buffer; + source.connect(audioContext.destination); + const sourceStartTime = Math.max(nextStartTime, audioContext.currentTime + 0.01); + nextStartTime = sourceStartTime + buffer.duration; + + activeSources += 1; + activeSourceNodesRef.current.add(source); + source.onended = () => { + activeSources -= 1; + activeSourceNodesRef.current.delete(source); + source.disconnect(); + maybeResolvePlaybackDrain(); + }; + source.start(sourceStartTime); + }; + + const concatUint8Arrays = (a: Uint8Array, b: Uint8Array) => { + if (a.byteLength === 0) return b; + if (b.byteLength === 0) return a; + const merged = new Uint8Array(a.byteLength + b.byteLength); + merged.set(a); + merged.set(b, a.byteLength); + return merged; + }; + + while (true) { + if (playbackToken !== playbackTokenRef.current) { + throw new DOMException("PCM stream playback cancelled", "AbortError"); + } + + const { done, value } = await reader.read(); + if (done) break; + if (!value || value.byteLength === 0) continue; + + const merged = concatUint8Arrays(bufferedRemainder, value); + const alignedByteLength = merged.byteLength - (merged.byteLength % bytesPerFrame); + if (alignedByteLength === 0) { + bufferedRemainder = new Uint8Array(merged); + continue; + } + + const alignedChunk = merged.slice(0, alignedByteLength); + bufferedRemainder = new Uint8Array(merged.slice(alignedByteLength)); + schedulePcmChunk(alignedChunk); + } + + streamEnded = true; + maybeResolvePlaybackDrain(); + await playbackDrainPromise; + }, + [readErrorMessage, withQueryParams], + ); + + const stopPlayback = useCallback(async () => { + await clearAudio(); + stopStatusPolling(); + + const closeUrl = closeUrlRef.current; + streamIdRef.current = null; + closeUrlRef.current = null; + statusUrlRef.current = null; + resultUrlRef.current = null; setSpeechState("idle"); setSpeakingMessageId(null); - }, [isSupported]); + + if (closeUrl) { + try { + await closeStream(closeUrl); + } catch (error) { + console.error("[GlobalChatbox] Failed to close audio stream:", error); + } + } + }, [clearAudio, closeStream, stopStatusPolling]); + + const pollStreamStatus = useCallback( + (playbackToken: number, statusUrl: string, resultUrl: string) => { + stopStatusPolling(); + + statusPollTimeoutRef.current = window.setTimeout(async () => { + if ( + playbackToken !== playbackTokenRef.current || + statusUrlRef.current !== statusUrl || + resultUrlRef.current !== resultUrl + ) { + return; + } + + try { + const response = await fetch(statusUrl); + if (!response.ok) { + throw new Error( + await readErrorMessage( + response, + `Audio stream status failed with status ${response.status}`, + ), + ); + } + + const payload = (await response.json()) as AudioStreamStatusResponse; + if ( + playbackToken !== playbackTokenRef.current || + statusUrlRef.current !== statusUrl || + resultUrlRef.current !== resultUrl + ) { + return; + } + + if (payload.failed || payload.state === "failed") { + console.error( + "[GlobalChatbox] Audio stream failed:", + payload.error || payload.status_text || statusUrl, + ); + playbackTokenRef.current += 1; + void stopPlayback(); + return; + } + + if (payload.closed || payload.state === "closed") { + stopStatusPolling(); + return; + } + + if (payload.ready || payload.state === "done") { + try { + const isResultReady = await fetchStreamResult(resultUrl); + if (isResultReady) { + stopStatusPolling(); + return; + } + } catch (error) { + console.error("[GlobalChatbox] Failed to fetch audio stream result:", error); + } + } + + pollStreamStatus(playbackToken, statusUrl, resultUrl); + } catch (error) { + if ( + playbackToken === playbackTokenRef.current && + statusUrlRef.current === statusUrl && + resultUrlRef.current === resultUrl + ) { + console.error("[GlobalChatbox] Failed to poll audio stream status:", error); + pollStreamStatus(playbackToken, statusUrl, resultUrl); + } + } + }, 1000); + }, + [fetchStreamResult, readErrorMessage, stopPlayback, stopStatusPolling], + ); + + const stop = useCallback(() => { + playbackTokenRef.current += 1; + void stopPlayback(); + }, [stopPlayback]); const speak = useCallback( - (messageId: string, text: string) => { - if (!isSupported || !text) return; - window.speechSynthesis.cancel(); + async (messageId: string, text: string) => { + const normalizedText = text.trim(); + if (!isSupported || !normalizedText) return; - const utterance = new SpeechSynthesisUtterance(text); - utterance.lang = "zh-CN"; - utterance.rate = 1; - utterance.onend = () => { - setSpeechState("idle"); - setSpeakingMessageId(null); - utteranceRef.current = null; - }; - utterance.onerror = () => { - setSpeechState("idle"); - setSpeakingMessageId(null); - utteranceRef.current = null; - }; - utterance.onpause = () => setSpeechState("paused"); - utterance.onresume = () => setSpeechState("playing"); + const playbackToken = playbackTokenRef.current + 1; + playbackTokenRef.current = playbackToken; + await stopPlayback(); - utteranceRef.current = utterance; setSpeakingMessageId(messageId); setSpeechState("playing"); - window.speechSynthesis.speak(utterance); + + try { + const formData = new FormData(); + formData.append("text", normalizedText); + formData.append("demo_id", "demo-1"); + + const response = await fetch(buildServiceUrl("/api/generate-stream/start"), { + method: "POST", + body: formData, + }); + + if (!response.ok) { + throw new Error( + await readErrorMessage( + response, + `Audio stream start failed with status ${response.status}`, + ), + ); + } + + const payload = (await response.json()) as AudioStreamStartResponse; + const streamId = payload.stream_id; + const sampleRate = + typeof payload.sample_rate === "number" && payload.sample_rate > 0 + ? payload.sample_rate + : 24000; + const channels = + typeof payload.channels === "number" && payload.channels > 0 + ? payload.channels + : 1; + const audioUrl = payload.audio_url + ? resolveServiceUrl(payload.audio_url) + : buildServiceUrl( + `/api/generate-stream/${encodeURIComponent(streamId ?? "")}/audio?format=pcm`, + ); + const rawStatusUrl = payload.status_url + ? resolveServiceUrl(payload.status_url) + : buildServiceUrl(`/api/generate-stream/${encodeURIComponent(streamId ?? "")}/status`); + const statusUrl = withQueryParams(rawStatusUrl, { compact: "1" }); + const rawResultUrl = payload.result_url + ? resolveServiceUrl(payload.result_url) + : buildServiceUrl(`/api/generate-stream/${encodeURIComponent(streamId ?? "")}/result`); + const resultUrl = withQueryParams(rawResultUrl, { + compact: "1", + include_audio: "0", + }); + const closeUrl = buildServiceUrl( + `/api/generate-stream/${encodeURIComponent(streamId ?? "")}/close`, + ); + + if (!streamId) { + throw new Error(payload.error || "Audio stream start response is missing stream_id"); + } + + if (playbackToken !== playbackTokenRef.current) { + await closeStream(closeUrl); + return; + } + + streamIdRef.current = streamId; + closeUrlRef.current = closeUrl; + statusUrlRef.current = statusUrl; + resultUrlRef.current = resultUrl; + + pollStreamStatus(playbackToken, statusUrl, resultUrl); + await playPcmStream({ + audioUrl, + sampleRate, + channels, + playbackToken, + }); + + if (playbackToken !== playbackTokenRef.current) { + return; + } + + await clearAudio(); + if (streamIdRef.current === streamId) { + streamIdRef.current = null; + closeUrlRef.current = null; + statusUrlRef.current = null; + resultUrlRef.current = null; + setSpeechState("idle"); + setSpeakingMessageId(null); + } + stopStatusPolling(); + await fetchStreamResult(resultUrl).catch((error) => { + console.error("[GlobalChatbox] Failed to fetch audio stream result:", error); + }); + await closeStream(closeUrl); + } catch (error) { + await clearAudio(); + if ( + error instanceof DOMException && + error.name === "AbortError" && + playbackToken !== playbackTokenRef.current + ) { + return; + } + const closeUrl = closeUrlRef.current; + streamIdRef.current = null; + closeUrlRef.current = null; + statusUrlRef.current = null; + resultUrlRef.current = null; + setSpeechState("idle"); + setSpeakingMessageId(null); + if (closeUrl) { + try { + await closeStream(closeUrl); + } catch (closeError) { + console.error("[GlobalChatbox] Failed to close audio stream:", closeError); + } + } + console.error("[GlobalChatbox] Failed to play audio stream:", error); + } }, - [isSupported], + [ + buildServiceUrl, + clearAudio, + closeStream, + fetchStreamResult, + isSupported, + playPcmStream, + readErrorMessage, + resolveServiceUrl, + pollStreamStatus, + stopPlayback, + stopStatusPolling, + withQueryParams, + ], ); const pause = useCallback(() => { - if (!isSupported) return; - window.speechSynthesis.pause(); + if (!isSupported || !audioContextRef.current) return; + void audioContextRef.current.suspend().then( + () => { + setSpeechState("paused"); + }, + (error) => { + console.error("[GlobalChatbox] Failed to pause PCM playback:", error); + }, + ); }, [isSupported]); const resume = useCallback(() => { - if (!isSupported) return; - window.speechSynthesis.resume(); - }, [isSupported]); + if (!isSupported || !audioContextRef.current) return; + void audioContextRef.current.resume().then( + () => { + setSpeechState("playing"); + }, + (error) => { + playbackTokenRef.current += 1; + void stopPlayback(); + console.error("[GlobalChatbox] Failed to resume audio playback:", error); + }, + ); + }, [isSupported, stopPlayback]); useEffect(() => { return () => { - if (typeof window !== "undefined" && "speechSynthesis" in window) { - window.speechSynthesis.cancel(); - } + playbackTokenRef.current += 1; + void stopPlayback(); }; - }, []); + }, [stopPlayback]); return { speechState, speakingMessageId, speak, pause, resume, stop, isSupported }; } diff --git a/src/config/config.ts b/src/config/config.ts index ca689f2..9a9a256 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -1,6 +1,8 @@ export const config = { BACKEND_URL: process.env.NEXT_PUBLIC_BACKEND_URL || "http://127.0.0.1:8000", COPILOT_URL: process.env.NEXT_PUBLIC_COPILOT_URL || "http://127.0.0.1:8787", + AUDIO_SERVICE_URL: + process.env.NEXT_PUBLIC_AUDIO_SERVICE_URL || "http://127.0.0.1:18083", MAP_URL: process.env.NEXT_PUBLIC_MAP_URL || "http://127.0.0.1:8080/geoserver", MAP_WORKSPACE: process.env.NEXT_PUBLIC_MAP_WORKSPACE || "tjwater", MAP_EXTENT: process.env.NEXT_PUBLIC_MAP_EXTENT