import requests
import json
import subprocess
from typing import Iterator
from datetime import datetime
import logging
# 日志配置
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] - [%(levelname)s] %(message)s')
# 用于流式播放音频,需下载mpv播放器(适用 Linux/mac 系统)
mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"]
mpv_process = subprocess.Popen(
mpv_command,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
token = '输入你在minimax开发者平台获取的token,前往 https://platform.minimaxi.com/user-center/basic-information/interface-key 获取'
def ccv2_audio_stream(text) -> Iterator[bytes] :
payload = {
"model": "MiniMax-Text-01",
"messages": [
{
"role": "system",
"name": "MM智能助理",
"content": "MM智能助理是一款智能小助手"
},
{
"role": "user",
"name": "用户",
"content": text
},
],
"stream": True,
"tools": [
{"type":"web_search"}
],
"tool_choice": "auto",
"max_tokens": 1024,
"stream_options": { # 开启语音输出
"speech_output": True
},
"voice_setting":{
"model":"speech-01-turbo-240228",
"voice_id":"female-tianmei"
}
}
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}',
}
logging.info(f"【文本输入】{text}")
response = requests.post("http://api.minimax.chat/v1/text/chatcompletion_v2", headers=headers, json=payload, stream=True)
logging.info(f"Get response, trace-id: {response.headers.get('Trace-Id')}")
i = 0
for line in response.iter_lines(decode_unicode=True):
if not line.startswith("data:"):
continue
i+=1
logging.debug(f"[sse] data chunck-{i}")
resp = json.loads(line.strip("data:"))
if resp.get("choices") and resp["choices"][0].get("delta"):
delta = resp["choices"][0]["delta"]
if delta.get("role") == "assistant": # AI 助手回复
if delta.get("content"): logging.info(f"【文本输出】 {delta['content']}")
if delta.get("audio_content") and delta["audio_content"] != "": yield delta["audio_content"]
if delta.get("tool_calls") : logging.info(f"【搜索中】...")
# 流式播放音频并保存到本地
def audio_play(audio_stream: Iterator[bytes]) :
audio = b""
for chunk in audio_stream:
if chunk is not None and chunk != '\n':
decoded_hex = bytes.fromhex(chunk)
mpv_process.stdin.write(decoded_hex) # type: ignore
mpv_process.stdin.flush()
audio += decoded_hex
if not audio:
return
now = datetime.now().strftime('%Y%m%d-%H%M%S')
file_name = f'ccv2_audio_{now}.mp3'
with open(file_name, 'wb') as file:
file.write(audio)
logging.info(f"音频文件保存成功: {file_name}")
if __name__ == '__main__':
audio_play(ccv2_audio_stream("请介绍自己"))