服务器准备
首先,我们需要进行服务器的准备,这里准备的是 RTX-4090 服务器
连接我们已经创建好的服务器,这里可使用 MobaXterm 进行 ssh 连接
ssh funhpc@IP地址
环境准备
接着先拉取官方代码,然后创建一个虚拟环境,再安装其对应的依赖库,如果下载过慢,可采用下面添加代理的方式进行下载
git clone https://github.com/bytedance/LatentSync.git
# git clone https://gitproxy.click/https://github.com/bytedance/LatentSync.git
接着可运行官方提供的一件设置环境的脚本进行环境的准备
source setup_env.sh
但是注意这里可能下载速度过慢,我这里将模型下载聪huggingface更换成了modelscope
#!/bin/bash
# Create a new conda environment
conda create -y -n latentsync python=3.10.13
conda activate latentsync
# Install ffmpeg
conda install -y -c conda-forge ffmpeg
# Python dependencies
pip install -r requirements.txt -i https://repo.huaweicloud.com/repository/pypi/simple
pip install modelscope -i https://repo.huaweicloud.com/repository/pypi/simple
# OpenCV dependencies
apt -y install libgl1
# Download the checkpoints required for inference from HuggingFace
modelscope download --model ByteDance/LatentSync-1.5 whisper/tiny.pt --local_dir checkpoints
modelscope download --model ByteDance/LatentSync-1.5 latentsync_unet.pt --local_dir checkpoints
modelscope download --model zhuzhukeji/sd-vae-ft-mse --local_dir stabilityai/sd-vae-ft-mse
如果下载成功,checkpoints 文件夹应如下所示
./checkpoints/
|-- latentsync_unet.pt
|-- whisper
| `-- tiny.pt
官方提供的推理脚本为./inference.sh,可直接运行体验
FastAPI部署实现
但这里我们为实现FastAPI的部署,需要观察内部代码,可以看到其内部也是执行一个python脚本
python -m scripts.inference \
--unet_config_path "configs/unet/stage2.yaml" \
--inference_ckpt_path "checkpoints/latentsync_unet.pt" \
--inference_steps 20 \
--guidance_scale 2.0 \
--video_path "assets/demo1_video.mp4" \
--audio_path "assets/demo1_audio.wav" \
--video_out_path "video_out.mp4"
因此去观察scripts文件夹下的inference脚本,这个即为官方的整个推理代码,其按照如下流程进行
- 输入检查 :验证输入的视频和音频文件路径是否存在。
- 设备与精度设置 :根据GPU支持情况选择使用
float16
或float32
精度以优化计算效率。 - 模型加载 :
- 加载 DDIM Scheduler 用于推理阶段的去噪调度;
- 根据配置加载对应的 Whisper 模型 提取音频特征;
- 加载 VAE 模型 用于图像空间与潜在空间之间的编码/解码;
- 加载预训练的 3D UNet 条件模型 作为去噪网络。
- 初始化 Pipeline :将上述组件构建为一个
LipsyncPipeline
,专门用于音频驱动的视频生成。 - 执行推理 :通过传入的视频、音频路径等参数进行推理,生成与音频同步的视频,并保存结果。
import argparse
import os
from omegaconf import OmegaConf
import torch
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from accelerate.utils import set_seed
from latentsync.whisper.audio2feature import Audio2Feature
def main(config, args):
if not os.path.exists(args.video_path):
raise RuntimeError(f"Video path '{args.video_path}' not found")
if not os.path.exists(args.audio_path):
raise RuntimeError(f"Audio path '{args.audio_path}' not found")
# Check if the GPU supports float16
is_fp16_supported = torch.cuda.is_available() and torch.cuda.get_device_capability()[0] > 7
dtype = torch.float16 if is_fp16_supported else torch.float32
print(f"Input video path: {args.video_path}")
print(f"Input audio path: {args.audio_path}")
print(f"Loaded checkpoint path: {args.inference_ckpt_path}")
scheduler = DDIMScheduler.from_pretrained("configs")
if config.model.cross_attention_dim == 768:
whisper_model_path = "checkpoints/whisper/small.pt"
elif config.model.cross_attention_dim == 384:
whisper_model_path = "checkpoints/whisper/tiny.pt"
else:
raise NotImplementedError("cross_attention_dim must be 768 or 384")
audio_encoder = Audio2Feature(
model_path=whisper_model_path,
device="cuda",
num_frames=config.data.num_frames,
audio_feat_length=config.data.audio_feat_length,
)
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=dtype)
vae.config.scaling_factor = 0.18215
vae.config.shift_factor = 0
denoising_unet, _ = UNet3DConditionModel.from_pretrained(
OmegaConf.to_container(config.model),
args.inference_ckpt_path,
device="cpu",
)
denoising_unet = denoising_unet.to(dtype=dtype)
pipeline = LipsyncPipeline(
vae=vae,
audio_encoder=audio_encoder,
denoising_unet=denoising_unet,
scheduler=scheduler,
).to("cuda")
if args.seed != -1:
set_seed(args.seed)
else:
torch.seed()
print(f"Initial seed: {torch.initial_seed()}")
pipeline(
video_path=args.video_path,
audio_path=args.audio_path,
video_out_path=args.video_out_path,
video_mask_path=args.video_out_path.replace(".mp4", "_mask.mp4"),
num_frames=config.data.num_frames,
num_inference_steps=args.inference_steps,
guidance_scale=args.guidance_scale,
weight_dtype=dtype,
width=config.data.resolution,
height=config.data.resolution,
mask_image_path=config.data.mask_image_path,
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--unet_config_path", type=str, default="configs/unet.yaml")
parser.add_argument("--inference_ckpt_path", type=str, required=True)
parser.add_argument("--video_path", type=str, required=True)
parser.add_argument("--audio_path", type=str, required=True)
parser.add_argument("--video_out_path", type=str, required=True)
parser.add_argument("--inference_steps", type=int, default=20)
parser.add_argument("--guidance_scale", type=float, default=1.0)
parser.add_argument("--seed", type=int, default=1247)
args = parser.parse_args()
config = OmegaConf.load(args.unet_config_path)
main(config, args)
梳理完毕这个代码后,我们已经可以提取出其核心步骤,在进行FastAPI的部署,这里直接提供给大家准备好的脚本
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, status, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from pydantic import BaseModel, Field
from enum import Enum
from fastapi.staticfiles import StaticFiles
from typing import Optional, Dict, List
import uuid
import os
import asyncio
import shutil
import logging
from pathlib import Path
from omegaconf import OmegaConf
import torch
import subprocess
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from latentsync.whisper.audio2feature import Audio2Feature
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from accelerate.utils import set_seed
import base64
import mimetypes
# 日志配置
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 配置参数
MODEL_CONFIG_PATH = "configs/unet/stage2.yaml"
MODEL_CKPT_PATH = "checkpoints/latentsync_unet.pt"
TEMP_DIR = Path("/data/coding/tmp")
TEMP_DIR.mkdir(exist_ok=True)
STATIC_DIR = Path("/data/coding/static/results")
STATIC_DIR.mkdir(parents=True, exist_ok=True)
MAX_CONCURRENT_TASKS = 5
VALID_API_KEYS = {"密钥"} # 从环境变量读取更安全
# 认证验证
security = HTTPBearer(auto_error=False)
# 任务状态枚举
class TaskStatus(str, Enum):
IN_QUEUE = "InQueue"
IN_PROGRESS = "InProgress"
SUCCEEDED = "Succeed"
FAILED = "Failed"
CANCELLED = "Cancelled"
# 任务数据模型
class TaskData(BaseModel):
status: TaskStatus = TaskStatus.IN_QUEUE
params: dict
created_at: int = Field(default_factory=lambda: int(datetime.now().timestamp()))
started_at: Optional[int] = None
completed_at: Optional[int] = None
download_url: Optional[str] = None
reason: Optional[str] = None
# 请求模型
class InferenceRequest(BaseModel):
video_source: str = Field(..., description="Base64 encoded video")
audio_source: str = Field(..., description="Base64 encoded audio")
inference_steps: int = 20
guidance_scale: float = 2.0
seed: int = 1247
# 状态查询请求模型
class TaskStatusRequest(BaseModel):
request_id: str
def load_pipeline():
try:
config = OmegaConf.load(MODEL_CONFIG_PATH)
dtype = torch.float32
whisper_model_path = "checkpoints/whisper/small.pt" if config.model.cross_attention_dim == 768 else "checkpoints/whisper/tiny.pt"
audio_encoder = Audio2Feature(
model_path=whisper_model_path,
device="cuda",
num_frames=config.data.num_frames,
audio_feat_length=config.data.audio_feat_length,
)
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=dtype)
vae.config.scaling_factor = 0.18215
vae.config.shift_factor = 0
denoising_unet, _ = UNet3DConditionModel.from_pretrained(
OmegaConf.to_container(config.model),
MODEL_CKPT_PATH,
device="cpu",
)
denoising_unet = denoising_unet.to(dtype=dtype).to("cpu")
pipeline = LipsyncPipeline(
vae=vae,
audio_encoder=audio_encoder,
denoising_unet=denoising_unet,
scheduler=DDIMScheduler.from_pretrained("configs"),
).to("cuda")
return pipeline, config
except Exception as e:
logger.error("Model loading failed:", exc_info=True)
raise
# 生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
# 检查FFmpeg
subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info("FFmpeg check passed")
# 检查模型文件
if not Path(MODEL_CKPT_PATH).exists():
raise RuntimeError(f"Checkpoint not found at {MODEL_CKPT_PATH}")
if not Path(MODEL_CONFIG_PATH).exists():
raise RuntimeError(f"Config not found at {MODEL_CONFIG_PATH}")
# 加载模型
app.state.pipeline, app.state.config = load_pipeline()
logger.info("Model loaded successfully")
# 初始化任务系统
app.state.tasks: Dict[str, TaskData] = {}
app.state.pending_queue: List[str] = []
app.state.task_lock = asyncio.Lock()
app.state.semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
app.state.base_url = "IP+端口/static/results" # 修改为实际域名
# 启动后台处理器
asyncio.create_task(task_processor())
yield
except Exception as e:
logger.error("Startup failed:", exc_info=True)
raise
app = FastAPI(title="Lipsync API", version="1.0.0", lifespan=lifespan)
# 关键配置:挂载静态文件目录
app.mount("/static", StaticFiles(directory="/data/coding/static"), name="static")
# 认证依赖
async def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)):
if not credentials or credentials.scheme != "Bearer":
raise HTTPException(401, {"status": "Failed", "reason": "无效的认证凭证"})
if credentials.credentials not in VALID_API_KEYS:
raise HTTPException(401, {"status": "Failed", "reason": "无效的API密钥"})
return True
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content=exc.detail
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
error_msg = exc.errors()[0]['msg']
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"status": "Failed", "reason": f"参数校验失败: {error_msg}"}
)
# 提交任务接口
@app.post("/lipsync/submit", response_model=dict)
async def submit_task(
request: InferenceRequest,
auth: bool = Depends(verify_auth)
):
request_id = str(uuid.uuid4())
async with app.state.task_lock:
app.state.tasks[request_id] = TaskData(params=request.dict())
app.state.pending_queue.append(request_id)
return {"request_id": request_id}
# 查询状态接口
@app.post("/lipsync/status", response_model=dict)
async def get_status(
request: TaskStatusRequest,
auth: bool = Depends(verify_auth)
):
task = app.state.tasks.get(request.request_id)
if not task:
raise HTTPException(
status_code=404,
detail={"status": "Failed", "reason": "无效的任务ID"}
)
# 默认响应结构(包含 results: null)
response = {
"status": task.status.value,
"reason": task.reason,
"queue_position": None, # 默认为 null
"results": None # 默认为 null
}
# 处理队列位置(仅 InQueue 状态)
if task.status == TaskStatus.IN_QUEUE:
try:
queue_pos = app.state.pending_queue.index(request.request_id) + 1
response["queue_position"] = queue_pos
except ValueError:
response["queue_position"] = 0
# 处理成功状态
if task.status == TaskStatus.SUCCEEDED:
response["results"] = {
"video": [{"url": task.download_url}],
"timings": {
"inference": task.completed_at - task.started_at
}
}
return response
# 取消任务接口
@app.post("/lipsync/cancel", response_model=dict)
async def cancel_task(
request: TaskStatusRequest,
auth: bool = Depends(verify_auth)
):
async with app.state.task_lock:
task = app.state.tasks.get(request.request_id)
if not task:
raise HTTPException(404, {"status": "Failed", "reason": "无效的任务ID"})
if task.status != TaskStatus.IN_QUEUE:
raise HTTPException(400, {"status": "Failed", "reason": f"仅允许取消排队任务,当前状态: {task.status}"})
try:
app.state.pending_queue.remove(request.request_id)
except ValueError:
pass
task.status = TaskStatus.CANCELLED
task.reason = "用户主动取消"
return {"status": "Succeed", "reason": "取消排队任务成功"}
# 后台任务处理器
async def task_processor():
executor = ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TASKS)
while True:
async with app.state.semaphore:
request_id = await get_next_task()
if request_id:
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, lambda: process_task(request_id))
await asyncio.sleep(0.1)
async def get_next_task():
async with app.state.task_lock:
if app.state.pending_queue:
return app.state.pending_queue.pop(0)
return None
def process_task(request_id: str):
task = app.state.tasks.get(request_id)
if not task:
return
try:
task.status = TaskStatus.IN_PROGRESS
task.started_at = int(datetime.now().timestamp())
# 创建临时目录
temp_dir = TEMP_DIR / request_id
temp_dir.mkdir(parents=True, exist_ok=True)
# 保存媒体文件
video_path = save_media(task.params['video_source'], temp_dir, 'video')
audio_path = save_media(task.params['audio_source'], temp_dir, 'audio')
# 准备输出路径
output_filename = f"output_{request_id}{video_path.suffix}"
output_path = temp_dir / output_filename
mask_path = temp_dir / f"mask_{output_filename}"
# 执行推理
set_seed(task.params['seed']) if task.params['seed'] != -1 else torch.seed()
app.state.pipeline(
video_path=str(video_path),
audio_path=str(audio_path),
video_out_path=str(output_path),
video_mask_path=str(mask_path),
num_frames=app.state.config.data.num_frames,
num_inference_steps=task.params['inference_steps'],
guidance_scale=task.params['guidance_scale'],
weight_dtype=torch.float32,
width=app.state.config.data.resolution,
height=app.state.config.data.resolution,
mask_image_path=app.state.config.data.mask_image_path,
)
# 保存结果
result_path = STATIC_DIR / output_filename
shutil.move(str(output_path), str(result_path))
task.download_url = f"{app.state.base_url}/{output_filename}"
task.status = TaskStatus.SUCCEEDED
task.completed_at = int(datetime.now().timestamp())
except Exception as e:
task.status = TaskStatus.FAILED
task.reason = str(e)
task.completed_at = int(datetime.now().timestamp())
logger.error(f"Task {request_id} failed: {str(e)}")
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def save_media(content: str, temp_dir: Path, media_type: str) -> Path:
try:
data = base64.b64decode(content)
except Exception as e:
raise ValueError(f"Invalid base64 {media_type} content: {str(e)}")
ext_map = {'video': '.mp4', 'audio': '.wav'}
ext = ext_map[media_type]
path = temp_dir / f"input_{media_type}{ext}"
with open(path, 'wb') as f:
f.write(data)
detected_type, _ = mimetypes.guess_type(path.name)
type_whitelist = {
'video': ['video/mp4', 'video/quicktime', 'video/x-msvideo'],
'audio': ['audio/wav', 'audio/x-wav']
}
if detected_type not in type_whitelist[media_type]:
path.unlink()
raise ValueError(f"Unsupported {media_type} format: {detected_type}")
return path
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8081)
之后再进入latentsync虚拟环境后,在进入LatentSync后,启动此脚本,即可启动模型部署
该 FastAPI 服务提供了一个高并发、安全可控的接口系统 ,用于接收音视频输入,使用扩散模型生成对口型视频,并支持异步任务提交、状态查询与任务取消。
脚本解析
1. 初始化阶段
模型加载 :加载配置文件(configs/unet/stage2.yaml
);根据配置加载 Whisper 模型用于音频特征提取;加载 VAE 模型用于潜在空间与图像空间的转换;加载预训练的 3D UNet 模型 作为扩散模型的去噪网络;构建 LipsyncPipeline
管道用于后续推理;
环境检查 :检查 FFmpeg 是否可用(用于音视频处理);检查模型权重和配置文件是否存在;
任务队列管理初始化 :设置最大并发任务数(防止资源耗尽);创建任务状态字典、待处理队列和锁机制,确保线程安全;启动后台任务处理器 task_processor
来逐个执行排队的任务;
2. 接口功能
(1) 提交任务 /lipsync/submit
接收 Base64 编码的视频和音频文件;返回一个唯一请求 ID(request_id
),用于后续查询任务结果;将任务加入等待队列;
(2) 查询任务状态 /lipsync/status
输入:任务 ID;
输出:当前任务状态(排队中 / 进行中 / 成功 / 失败 / 已取消);若排队中:返回当前在队列中的位置;若成功:返回生成视频的下载链接及推理时间;若失败:返回错误原因;
(3) 取消任务 /lipsync/cancel
允许用户取消仍在排队中的任务(进行中的任务无法取消);
3. 后台任务处理逻辑
并发控制 :使用 Semaphore
控制最多同时运行的任务数量(默认 5 个);
任务执行流程 :
- 从队列中取出任务;
- 解码并保存 Base64 音视频为临时文件;
- 调用
LipsyncPipeline
执行推理,生成同步视频; - 将输出视频保存到静态目录,并更新任务状态;
- 清理临时文件;
4. 安全与认证
使用 HTTPBearer
实现 Token 认证;支持自定义 API Key 验证(硬编码在代码中,实际应使用环境变量更安全);对无效或非法请求返回结构化 JSON 错误信息;
5. 异常处理
自定义异常捕获器,捕获参数验证错误并返回详细提示,捕获 HTTP 异常并统一格式返回。