This commit is contained in:
yinsx
2025-12-26 11:29:31 +08:00
parent 14bfdcbf51
commit 43ffe4486a
12 changed files with 196 additions and 40 deletions

View File

@ -3,38 +3,51 @@ import sys
import os
from pathlib import Path
import glob
import tempfile
import shutil
from datetime import datetime
class A2FService:
def __init__(self, a2f_url="192.168.1.39:52000"):
self.base_dir = Path(__file__).parent.parent.parent
self.output_dir = self.base_dir / "data" / "output"
self.a2f_script = self.base_dir / "external" / "Audio2Face-3D-Samples" / "scripts" / "audio2face_3d_microservices_interaction_app" / "a2f_3d.py"
self.config_file = self.base_dir / "external" / "Audio2Face-3D-Samples" / "scripts" / "audio2face_3d_microservices_interaction_app" / "config" / "config_james.yml"
self.a2f_url = a2f_url
os.makedirs(self.output_dir, exist_ok=True)
def audio_to_csv(self, audio_path: str) -> str:
cmd = [
sys.executable,
str(self.a2f_script),
"run_inference",
audio_path,
str(self.config_file),
"--url",
self.a2f_url
]
def audio_to_csv(self, audio_path: str) -> tuple[str, str]:
# 使用时间戳创建独立的临时工作目录
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
temp_work_dir = tempfile.mkdtemp(prefix=f"a2f_work_{timestamp}_")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=str(self.output_dir))
try:
cmd = [
sys.executable,
str(self.a2f_script),
"run_inference",
audio_path,
str(self.config_file),
"--url",
self.a2f_url
]
if result.returncode != 0:
raise RuntimeError(f"A2F inference failed: {result.stdout}")
# 在独立的工作目录中运行
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=temp_work_dir)
output_dirs = sorted(glob.glob(str(self.output_dir / "output_*")))
if not output_dirs:
raise RuntimeError("No output directory found")
if result.returncode != 0:
raise RuntimeError(f"A2F inference failed: {result.stdout}")
csv_path = os.path.join(output_dirs[-1], "animation_frames.csv")
if not os.path.exists(csv_path):
raise RuntimeError(f"CSV file not found: {csv_path}")
# 在工作目录中查找输出
output_dirs = sorted(glob.glob(os.path.join(temp_work_dir, "output_*")))
if not output_dirs:
raise RuntimeError(f"No output directory found in {temp_work_dir}")
return csv_path
csv_path = os.path.join(output_dirs[-1], "animation_frames.csv")
if not os.path.exists(csv_path):
raise RuntimeError(f"CSV file not found: {csv_path}")
# 返回CSV路径和临时目录路径用于后续清理
return csv_path, temp_work_dir
except Exception as e:
# 出错时清理临时目录
shutil.rmtree(temp_work_dir, ignore_errors=True)
raise e

View File

@ -22,6 +22,7 @@ class TextRequest(BaseModel):
split_punctuations: str = None
max_sentence_length: int = None
first_sentence_split_size: int = None
tts_provider: str = 'pyttsx3' # 'pyttsx3' 或 'edge-tts'
@app.get('/health')
async def health():
@ -30,7 +31,10 @@ async def health():
@app.post('/text-to-blendshapes')
async def text_to_blendshapes(request: TextRequest):
try:
service = TextToBlendShapesService(lang=request.language)
service = TextToBlendShapesService(
lang=request.language,
tts_provider=request.tts_provider
)
result = service.text_to_blend_shapes(
request.text,
segment=request.segment,
@ -46,7 +50,10 @@ async def text_to_blendshapes(request: TextRequest):
@app.post('/text-to-blendshapes/stream')
async def text_to_blendshapes_stream(request: TextRequest):
async def generate():
service = TextToBlendShapesService(lang=request.language)
service = TextToBlendShapesService(
lang=request.language,
tts_provider=request.tts_provider
)
try:
for message in service.iter_text_to_blend_shapes_stream(
request.text,

View File

@ -0,0 +1,29 @@
import os
import asyncio
import edge_tts
class EdgeTTSService:
def __init__(self, lang='zh-CN'):
self.lang = lang
# 中文语音选项
self.voice_map = {
'zh-CN': 'zh-CN-XiaoxiaoNeural', # 晓晓
'zh-TW': 'zh-TW-HsiaoChenNeural',
'en-US': 'en-US-AriaNeural'
}
def text_to_audio(self, text: str, output_path: str) -> str:
"""将文本转换为WAV音频文件使用edge-tts"""
os.makedirs(os.path.dirname(output_path), exist_ok=True)
voice = self.voice_map.get(self.lang, 'zh-CN-XiaoxiaoNeural')
# edge-tts 是异步的,需要在同步函数中运行
asyncio.run(self._async_text_to_audio(text, output_path, voice))
return output_path
async def _async_text_to_audio(self, text: str, output_path: str, voice: str):
"""异步生成音频"""
communicate = edge_tts.Communicate(text, voice)
await communicate.save(output_path)

View File

@ -6,14 +6,26 @@ import queue
import threading
from datetime import datetime
from tts_service import TTSService
from edge_tts_service import EdgeTTSService
from a2f_service import A2FService
from blend_shape_parser import BlendShapeParser
class TextToBlendShapesService:
DEFAULT_SPLIT_PUNCTUATIONS = '。!?;!?;,'
def __init__(self, lang='zh-CN', a2f_url="192.168.1.39:52000"):
self.tts = TTSService(lang=lang)
def __init__(self, lang='zh-CN', a2f_url="192.168.1.39:52000", tts_provider='edge-tts'):
"""
初始化服务
:param lang: 语言
:param a2f_url: A2F服务地址
:param tts_provider: TTS提供商 ('pyttsx3''edge-tts')
"""
# 根据选择初始化TTS服务
if tts_provider == 'edge-tts':
self.tts = EdgeTTSService(lang=lang)
else:
self.tts = TTSService(lang=lang)
self.a2f = A2FService(a2f_url=a2f_url)
self.parser = BlendShapeParser()
@ -67,7 +79,18 @@ class TextToBlendShapesService:
yield {'type': 'error', 'message': '文本为空'}
return
yield {'type': 'status', 'stage': 'split', 'sentences': len(sentences), 'message': f'已拆分为 {len(sentences)} 个句子'}
yield {
'type': 'status',
'stage': 'split',
'sentences': len(sentences),
'sentence_texts': sentences, # 发送句子文本列表
'message': f'已拆分为 {len(sentences)} 个句子'
}
# 打印句子列表用于调试
print(f"[调试] 发送给前端的句子列表:")
for i, s in enumerate(sentences):
print(f" [{i}] {s}")
# 使用队列来收集处理完成的句子
result_queue = queue.Queue()
@ -126,6 +149,7 @@ class TextToBlendShapesService:
is_continuation = self.is_continuation[next_index] if next_index < len(self.is_continuation) else False
print(f"[主线程] 正在推送句子 {next_index}{len(frames)}{'(连续)' if is_continuation else ''}")
print(f"[调试] 句子 {next_index} 对应文本: {sentences[next_index] if next_index < len(sentences) else 'N/A'}")
# 如果不是连续句子,重置累计时间
if not is_continuation and next_index > 0:
@ -135,7 +159,6 @@ class TextToBlendShapesService:
# 调整时间码:从累计时间开始
frame['timeCode'] = cumulative_time + frame['timeCode']
frame['sentenceIndex'] = next_index
frame['isContinuation'] = is_continuation
total_frames += 1
yield {'type': 'frame', 'frame': frame}
@ -157,6 +180,7 @@ class TextToBlendShapesService:
start_time = time.time()
print(f"[线程 {index}] 开始处理: {sentence[:30]}...")
print(f"[调试] 线程 {index} 实际处理的完整文本: [{sentence}] (长度: {len(sentence)}字)")
_, audio_path = self._prepare_output_paths(output_dir, suffix=f's{index:03d}')
print(f"[线程 {index}] TTS 开始...")
@ -166,7 +190,7 @@ class TextToBlendShapesService:
print(f"[线程 {index}] TTS 完成,耗时 {tts_time:.2f}A2F 开始...")
a2f_start = time.time()
csv_path = self.a2f.audio_to_csv(audio_path)
csv_path, temp_dir = self.a2f.audio_to_csv(audio_path) # 接收临时目录路径
a2f_time = time.time() - a2f_start
print(f"[线程 {index}] A2F 完成,耗时 {a2f_time:.2f}秒,解析中...")
@ -174,6 +198,14 @@ class TextToBlendShapesService:
frames = list(self.parser.iter_csv_to_blend_shapes(csv_path))
parse_time = time.time() - parse_start
# 解析完成后清理临时目录
import shutil
try:
shutil.rmtree(temp_dir, ignore_errors=True)
print(f"[线程 {index}] 已清理临时目录: {temp_dir}")
except Exception as e:
print(f"[线程 {index}] 清理临时目录失败: {e}")
total_time = time.time() - start_time
print(f"[线程 {index}] 完成!生成了 {len(frames)} 帧 | 总耗时: {total_time:.2f}秒 (TTS: {tts_time:.2f}s, A2F: {a2f_time:.2f}s, 解析: {parse_time:.2f}s)")
@ -239,12 +271,15 @@ class TextToBlendShapesService:
length = len(first)
parts = []
if length <= 12:
# 12字以内分两部
if length <= 8:
# 8字以下不拆
parts = [first]
elif length <= 12:
# 8-12字分两部分
mid = length // 2
parts = [first[:mid], first[mid:]]
else:
# 12字之后前6字再6字剩下的
# 12字以上前6字再6字剩下的
parts = [first[:6], first[6:12], first[12:]]
# 替换第一句为多个小句