import random from pathlib import Path import time import sys import os sys.path.append("./") from tools.yaml_operator import read_yaml from tools.log import CustomLogger import requests current_file_path = os.path.abspath(__file__) Language_Path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path))) # print("Language_Path:",Language_Path) # 找到上三级目录的父级(project_root),然后拼接目标文件夹路径 MassageRobot_Dobot_Path = os.path.dirname(Language_Path) # print("MassageRobot_Dobot_Path:",MassageRobot_Dobot_Path) # 添加目标文件夹到系统路径 sys.path.append(MassageRobot_Dobot_Path) # # 测试 try: from LLM_dify import DifyClient except: # 外部调用 from .LLM_dify import DifyClient from datetime import datetime import json from openai import OpenAI import re import threading import subprocess import os import requests from VortXDB.client import VTXClient import asyncio import websockets class DashscopeClient: def __init__(self): # 获取密钥 self.weather_info=None self.vtxdb = VTXClient() self.Sillcon_OpenAI_api_key = self.vtxdb.get("robot_config", "Language.LLM.Sillcon_OpenAI_api_key") # print("self.Sillcon_OpenAI_api_key:",self.Sillcon_OpenAI_api_key) self.Sillcon_OpenAI_BaseUrl = self.vtxdb.get("robot_config", "Language.LLM.Sillcon_OpenAI_BaseUrl") self.client = OpenAI(api_key=self.Sillcon_OpenAI_api_key, base_url=self.Sillcon_OpenAI_BaseUrl) self.client_deep=OpenAI(base_url='https://infer-modelarts-cn-southwest-2.modelarts-infer.com/v1/infers/952e4f88-ef93-4398-ae8d-af37f63f0d8e/v1/',api_key='flZrvJ8oGURp5V7JLKLwLUyOIe2_sRL1FaKhz7SN6jIk1XE-Chye1KxhFyN8dVbMMIUiIS0r0ZRzFatiVFPNWg') # self.client_search=DifyClient(base_url='http://124.71.62.243/v1',api_key='app-9vrC0QkaVFbj1g2rKiwkyzv3') # self.client_search_deep=DifyClient(base_url='http://124.71.62.243/v1',api_key='app-KFatoFKPMwjIt2za2paXvVA7') # self.massage_method_suggestion=DifyClient(base_url='http://124.71.62.243/v1',api_key='app-Kcm2KEaWmAIS5FiMmsX5hcyd') self.client_search=DifyClient(base_url='https://robotstorm.tech/dify/v1',api_key='app-9vrC0QkaVFbj1g2rKiwkyzv3') self.client_search_deep=DifyClient(base_url='https://robotstorm.tech/dify/v1',api_key='app-KFatoFKPMwjIt2za2paXvVA7') self.massage_method_suggestion=DifyClient(base_url='https://robotstorm.tech/dify/v1',api_key='app-Kcm2KEaWmAIS5FiMmsX5hcyd') # self.massage_method_suggestion=DifyClient(base_url='http://124.71.62.243/v1',api_key='app-RzktoHvGfosZmrWW5hwQyT5z') self.logger = CustomLogger() self.massage_status_languge={'task_time':'3','progress':'30','force': 0, 'press': 0, 'frequency': 0, 'temperature': 0, 'gear': 0,'body_part':'back','current_head':'thermotherapy_head','shake':5,'speed':1,'direction':1,'start_pos':'滑肉左','end_pos':'滑肉右','massage_path':'line'} self.deep_thinking_flag=False self.search_flag=False self.suggestion_flag=False self.suggestion_deekseek_flag=False self.stop_event = threading.Event() self.thread = None self.suggestion_mode_flag=False # self.massage_status_languge=None # 初始化天气和新闻 # self.weather_and_news=Weather_AND_News(api_keys) # 读取prompt with open('LLM/config/classify_prompts_new.txt', 'r') as f: self.classify_prompts = f.read().strip() self.classify_message = [{'role': 'system', 'content': self.classify_prompts}] with open('LLM/config/chat_prompts_new.txt', 'r') as f: self.chat_prompts = f.read().strip() self.chat_message = [{'role': 'system', 'content': self.chat_prompts}] self.weather_message = self.chat_message with open('LLM/config/retrieve_prompts.txt', 'r') as f: self.retrieve_prompts = f.read().strip() self.retrieve_message = [{'role': 'system', 'content': self.retrieve_prompts}] with open('LLM/config/adjust_volumn_prompts.txt', 'r') as f: self.adjust_volumn_prompts = f.read().strip() self.adjust_volumn_message = [{'role': 'system', 'content': self.adjust_volumn_prompts}] self.model = { 'classify_model1': 'Qwen/Qwen2-7B-Instruct', 'chat_model': 'Qwen/Qwen2-7B-Instruct', 'retrieve_model': 'Qwen/Qwen2-7B-Instruct', 'adjust_volumn_model': 'Qwen/Qwen2-7B-Instruct', 'deep_thinking_model': 'DeepSeek-R1' } # self.location = self.get_current_position()[3:] # 定义操作字典 self.operations = { 'ksam': '按摩过程调整:开始,无', 'tzam': '按摩过程调整:停止,无', 'ldzd': '按摩过程调整:变重,一点', 'ldjx': '按摩过程调整:变轻,一点', 'wdzj': '按摩过程调整:升温,一点', 'wdjx': '按摩过程调整:降温,一点', 'dlzd': '按摩过程调整:增电,一点', 'dljx': '按摩过程调整:减电,一点', 'pljk': '按摩过程调整:增频,一点', 'pljd': '按摩过程调整:减频,一点', 'cjlz': '按摩过程调整:增冲,一点', 'cjlj': '按摩过程调整:减冲,一点', 'zszj': '按摩过程调整:增速,一点', 'zsjx': '按摩过程调整:减速,一点', 'gbzx': '按摩过程调整:换向,一点', 'dayh': '按摩过程调整:时间,加长', 'jtrq': '', 'xzsj': '', 'jttq': 'weather', 'jtxw': 'news', 'qthd': '', 'bfyy': '音乐播放调整:播放,无', 'tzbf': '音乐播放调整:停播,无', 'ssyy': '音乐播放调整:上首', 'xsyy': '音乐播放调整:下首', } self.task_mapping = { "finger": "指疗通络", "shockwave": "点阵按摩", "roller": "滚滚刺疗", "thermotherapy": "深部热疗", "stone": "温砭舒揉", "ball": "全能滚珠" } self.body_mapping = { "back": "背部", "belly": "腹部", "waist": "腰部", "shoulder": "肩颈", "leg": "腿部" } self.pathTypeMap = { 'line': "循经直推法", 'in_spiral': "螺旋内揉法", 'out_spiral': "螺旋外散法", 'ellipse': "周天环摩法", 'lemniscate': "双环疏经法", 'cycloid': "摆浪通络法", 'point': "定穴点按法", } # 获取位置信息 # def get_current_position(self, **kwargs): # """ 调用API获取位置 """ # try: # location = get_location()['城市'] # return '位置:' + location # except Exception as e: # self.logger.log_error("获取位置信息失败") # 获取时间信息 def xzsj(self, **kwargs): time_result = datetime.now().strftime('%Y-%m-%d %H:%M:%S') time_result = time_result.split(' ')[1] return '当前时间:'+time_result # 获取日期信息 def jtrq(self, **kwargs): time_result = datetime.now().strftime('%Y-%m-%d %H:%M:%S') date_result = time_result.split(' ')[0] print('当前日期:'+date_result) return '当前日期:'+date_result def get_volume(self): """ 查询当前音量并返回结果。 """ try: # 执行命令并捕获输出 result = subprocess.run( ["/usr/bin/amixer", "-D", "pulse", "get", "Master"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if result.returncode == 0: # 解析结果 output = result.stdout return output else: print("查询音量失败:", result.stderr) return None except Exception as e: print("发生错误:", e) return None # 通用处理方法 def handle_request(self, operation, **kwargs): self.stop_thread() # 默认获取用户输入 human_input = kwargs.get('human_input') # 获取当前机器人状态值 function_to_status = { 'lddx': str(self.massage_status_languge['force']), 'wddx': str(self.massage_status_languge['temperature']), 'dldw': str(self.massage_status_languge['gear']), # 'pldw': str(self.massage_status_languge['frequency']), 'pldw': str(self.massage_status_languge['frequency']) if str(self.massage_status_languge['current_head']) == 'shockwave_head' else str(self.massage_status_languge['shake']) if str(self.massage_status_languge['current_head']) == 'thermotherapy_head' else str(self.massage_status_languge['frequency']), # 默认值 'cjdx': str(self.massage_status_languge['press']), 'amjd': str(self.massage_status_languge['progress']), 'amdt': str(self.massage_status_languge['current_head']), 'ambw': str(self.massage_status_languge['body_part']), 'gnjs': str(self.massage_status_languge['current_head']), 'zsdw': str(self.massage_status_languge['speed']) } status_result = function_to_status.get(operation, None) print('status_result:',status_result) if operation in ['lddx', 'wddx', 'dldw', 'pldw', 'cjdx','zsdw']: if operation in ['lddx', 'cjdx']: self.unit_status = "牛" elif operation=='wddx': self.unit_status = "档" elif operation in ['dldw', 'pldw', 'zsdw']: self.unit_status="档" human_input = f"{human_input} {status_result}{self.unit_status} " if operation == 'ambw': if status_result == 'back': human_input = f"{human_input} 目前按摩部位:{'背部'}" elif status_result == 'shoulder': human_input = f"{human_input} 目前按摩部位:{'肩颈'}" elif status_result == 'belly': human_input = f"{human_input} 目前按摩部位:{'腹部'}" elif status_result == 'back_shoulder': human_input = f"{human_input} 目前按摩部位:{'肩颈和背部'}" if operation in ['amdt','gnjs']: if status_result == 'thermotherapy_head': human_input = f"{human_input} 目前使用按摩头:{'热疗按摩头'}" elif status_result == 'shockwave_head': human_input = f"{human_input} 目前使用按摩头:{'冲击波按摩头'}" elif status_result == 'ball_head': human_input = f"{human_input} 目前使用按摩头:{'全能滚珠按摩头'}" elif status_result == 'finger_head': human_input = f"{human_input} 目前使用按摩头:{'指疗通络按摩头'}" elif status_result == 'roller_head': human_input = f"{human_input} 目前使用按摩头:{'狼牙滚珠按摩头'}" if operation == 'amjd': massage_progress=int(self.massage_status_languge['progress']) massage_task_time=int(self.massage_status_languge['task_time']) remaining_time = massage_task_time*12-(round((massage_task_time * 12) * massage_progress / 100)) print("massage_progress:",massage_progress) print("massage_task_time:",massage_task_time) print("remaining_time:",remaining_time) human_input=f"{human_input} 目前按摩进度:{massage_progress},按摩剩余时间:{remaining_time}" # 如果操作为 jtrq 或 xzsj,则获取相应函数的返回值作为 human_input if operation == 'jtrq': human_input = self.jtrq() elif operation == 'xzsj': human_input = self.xzsj() if operation == 'cqjm': try: # 提供密码通过标准输入传递给 sudo result = subprocess.run( ['/bin/sudo', '-S', 'systemctl', 'restart', 'ui_next_app'], input='jsfb\n', text=True, check=True ) print("服务 ui_next_app 已成功重启") except subprocess.CalledProcessError as e: print(f"重启服务失败,错误信息: {e}") if operation == 'yltj': try: self.adjust_volumn_message = [{'role': 'system', 'content': self.adjust_volumn_prompts}] input_prompt = { "role": "user", "content": human_input } self.adjust_volumn_message.append(input_prompt) response = self.client.chat.completions.create( model=self.model['adjust_volumn_model'], messages=self.adjust_volumn_message, stream=False, stop=["}"], timeout=10 ).json() # 解析并提取 question_function response = json.loads(response) adjust_volumn_result = response['choices'][0]['message']['content'] adjust_volumn_result = adjust_volumn_result + "}" adjust_volumn_result = re.findall(r'\{(.*?)\}', adjust_volumn_result)[0] print("adjust_volumn_result:",adjust_volumn_result) except Exception as e: self.logger.log_error("音量调节模型有问题") try: # response = requests.post('http://127.0.0.1:5000/adjust_volumn', data=adjust_volumn_result) response = requests.post( 'http://127.0.0.1:5000/adjust_volume', json={'adjust_volumn_result': adjust_volumn_result} ) # # 判断符号 # if "+" in adjust_volumn_result: # volume_change = int(re.search(r'\+(\d+)', adjust_volumn_result).group(1)) # print(f"增加音量: {volume_change}%") # os.system(f"/usr/bin/amixer -D pulse sset Master {volume_change}%+") # elif "-" in adjust_volumn_result: # volume_change = int(re.search(r'-(\d+)', adjust_volumn_result).group(1)) # print(f"减少音量: {volume_change}%") # os.system(f"/usr/bin/amixer -D pulse sset Master {volume_change}%-") # elif "=" in adjust_volumn_result: # volume_set = int(re.search(r'=(\d+)', adjust_volumn_result).group(1)) # print(f"设置音量为: {volume_set}%") # os.system(f"/usr/bin/amixer -D pulse sset Master {volume_set}%") # else: # print("无法识别的音量调节指令") except Exception as e: print(f"处理音量调节指令时发生错误: {e}") if operation == 'ylcx': volumn_result = self.get_volume() print("volumn_result:",volumn_result) try: # 使用正则表达式匹配音量百分比值 match = re.search(r'(\d+)%', volumn_result) if match: print("int(match.group(1)):",int(match.group(1))) match = int(match.group(1)) human_input = f"{human_input} {match}" print("human_input:",human_input) else: self.logger.log_error("无法提取出音量信息") # 未找到匹配 except Exception as e: print(f"提取音量时发生错误: {e}") if operation=='jttq': self.weather_info = self.get_info('/get_weather') print("weather_info",self.weather_info) time_result = datetime.now().strftime('%Y-%m-%d %H:%M:%S') time_result = time_result.split(' ')[1] human_input = f"天气提问:{human_input} 天气数据:{self.weather_info} (其中daily.0.fxDate为今日日期, 当前时间为{time_result}) 你需要结合geo_info和时间来分析当前用户提问的是接下来几天daily还是接下来几小时hourly还是当前天气now, 你需要综合分析并准确根据数据回答,不需要给分析过程,给出分析后的信息就可以,回复用户询问的相应时间点的天气就可以,回复需要简短,一句话说完" if operation=='dayh': massage_path = self.massage_status_languge['massage_path'] if massage_path in self.pathTypeMap: massage_path = self.pathTypeMap[massage_path] print(massage_path) # This will output: 循经直推法 else: massage_path=None human_input = f"{human_input} 目前按摩穴位:从{self.massage_status_languge['start_pos']}到{self.massage_status_languge['end_pos']} 目前按摩手法:{massage_path}" if operation=='sftj': self.suggestion_mode_flag=True self.suggestion_flag=False user_id = os.uname()[1] self.stop_thread() def task(): punctuation_marks_regex = r'[。,,;!?]' full_content = '' full_reasoning_content = '' last_index = 0 response_generator = self.massage_method_suggestion.chat_completion(user_id=user_id, query=human_input,callback=self.finished_callback) for response in response_generator: if self.stop_event.is_set(): print("线程停止信号收到,终止执行。") break # if dashscopeclient.suggestion_flag==False: # print(response) if response.choices: reasoning_result = getattr(response.choices[0].delta, "reasoning_content", None) # print("reasoning_content:",reasoning_result) result = getattr(response.choices[0].delta, "content", None) # print("content:",reasoning_result) if reasoning_result: full_reasoning_content += reasoning_result print(reasoning_result) instruction = {'message':reasoning_result,'isReasoning':True} requests.post('http://127.0.0.1:5000/ai_respon', data=instruction) if result: full_content += result if self.suggestion_deekseek_flag==False: instruction = {'message':result} requests.post('http://127.0.0.1:5000/ai_respon', data=instruction) punctuation_indices = [m.start() for m in re.finditer(punctuation_marks_regex, full_content)] for index in punctuation_indices: if index > last_index: accumulated_text = full_content[last_index:index+1] last_index = index + 1 if accumulated_text.strip(): print(accumulated_text.strip()) # 补充标点符号最后非punctuation_marks_regex时的输出 if last_index < len(full_content): accumulated_text = full_content[last_index:] if accumulated_text.strip(): print(accumulated_text.strip()) try: json_match = re.search(r'\{[\s\S]*\}', full_content) if json_match: json_str = json_match.group() massage_plan = json.loads(json_str) try: plan_timestamp = datetime.now().strftime("%Y%m%d%H%M%S") result = f"{self.task_mapping[massage_plan['choose_task']]}-{self.body_mapping[massage_plan['body_part']]}-{massage_plan['title']}{plan_timestamp}" result1 = f"按摩头以及部位选择:{self.task_mapping[massage_plan['choose_task']]}-{self.body_mapping[massage_plan['body_part']]}" massage_plan['can_delete'] = True self.vtxdb.set("massage_plan", result, massage_plan) instruction = {'message':f"____\n{result1}\n"} requests.post('http://127.0.0.1:5000/ai_respon', data=instruction) instruction = {'message':f"____\n按摩方案名称: {result}"} requests.post('http://127.0.0.1:5000/ai_respon', data=instruction) asyncio.run(self.send_message("massage_plan_finish")) except Exception as e: self.logger.log_error(f"检查参数服务器key格式或者没写进参数服务器{e}") print("提取到按摩方案 JSON:") print(json.dumps(massage_plan, indent=2, ensure_ascii=False)) else: print("未找到 JSON 格式数据。") except json.JSONDecodeError as e: print("JSON 解码失败:", e) asyncio.run(self.send_message("massage_plan_start")) self.thread = threading.Thread(target=task, daemon=True) self.stop_event.clear() self.thread.start() # 在这里 return,防止主线程继续阻塞或误操作 return "守护线程已启动处理suggestion" # return { # 'chat_message': human_input, # 'response_generator': response_generator # } # 如果操作为 jtrq、xzsj 或 qthd,则生成模型响应 if operation in ['jtrq', 'xzsj', 'qthd','lddx', 'wddx', 'dldw', 'pldw', 'cjdx', 'amjd','amdt','ambw','gnjs','cqjm','ylcx','zsdw','jtxw','dayh']: input_prompt = { "role": "user", "content": human_input } self.chat_message.append(input_prompt) if operation=='qthd' or operation == 'jtxw': if self.deep_thinking_flag == True and self.search_flag == False: response_generator = self.client_deep.chat.completions.create( model=self.model['deep_thinking_model'], messages=self.chat_message, stream=True, timeout=10 ) elif self.deep_thinking_flag == False and self.search_flag == True: user_id = os.uname()[1] response_generator = self.client_search.chat_completion(user_id=user_id, query=human_input,callback=self.finished_callback) elif self.deep_thinking_flag == True and self.search_flag == True: user_id = os.uname()[1] response_generator = self.client_search_deep.chat_completion(user_id=user_id, query=human_input,callback=self.finished_callback) else: response_generator = self.client.chat.completions.create( model=self.model['chat_model'], messages=self.chat_message, stream=True, timeout=10 ) else: response_generator = self.client.chat.completions.create( model=self.model['chat_model'], messages=self.chat_message, stream=True, timeout=10 ) if operation=='dayh': human_input=f"{human_input} 按摩过程调整:时间,加长" return { 'chat_message': human_input, 'response_generator': response_generator } if operation == 'jttq': input_prompt = { "role": "user", "content": human_input } self.weather_message.append(input_prompt) if self.deep_thinking_flag == True: response_generator = self.client_deep.chat.completions.create( model=self.model['deep_thinking_model'], messages=self.weather_message, stream=True, timeout=10 ) else: response_generator = self.client.chat.completions.create( model=self.model['chat_model'], messages=self.weather_message, stream=True, timeout=10 ) self.weather_message.pop() return { 'chat_message': "询问天气", 'response_generator': response_generator } # 如果操作不在 jtrq、xzsj、qthd 之中,则返回其他操作的结果 return self.operations.get(operation, self.operations['qthd']) async def send_message(self,data): uri = "ws://localhost:8766" # 或替换成目标服务器地址 async with websockets.connect(uri) as websocket: await websocket.send(data) print("消息已发送:massage_plan_finish") def stop_thread(self): if hasattr(self, 'thread') and self.thread and self.thread.is_alive(): self.stop_event.set() self.thread.join(timeout=1) print("旧线程安全终止") def finished_callback(self,event_type,event_title): if event_type=="node_finished" and event_title=="massage_method_suggestion_llm": self.suggestion_flag=True print("self.suggestion_flag:",self.suggestion_flag) print("执行到播报结束...") if event_type=="node_started" and event_title=="吹水": self.suggestion_flag=False print("self.suggestion_flag:",self.suggestion_flag) print("执行到开始json...") if event_type=="node_started" and event_title=="深度思考轨迹": self.suggestion_deekseek_flag=True print("self.suggestion_deekseek_flag:",self.suggestion_deekseek_flag) print("执行到开始深度思考轨迹...") if event_type=="node_finished" and event_title=="深度思考轨迹": self.suggestion_deekseek_flag=False print("self.suggestion_flag:",self.suggestion_deekseek_flag) print("执行到结束深度思考轨迹...") # 三个线程进行分类 def classify_threading(self, human_input): """ 输入语句并与LLM交流,启动多个线程分别进行三次模型回复,返回每次的结果并统计时间 """ start_time = time.time() # 开始计时 return_dict = {} return_lock = threading.Lock() # 创建一个锁,用于线程安全地修改 return_dict self.classify_message = [{'role': 'system', 'content': self.classify_prompts}] input_prompt = { "role": "user", "content": human_input } self.classify_message.append(input_prompt) # 定义一个处理单次模型回复的函数 def process_model_reply(index): thread_start_time = time.time() try: # LLM交流并获取回复 response = self.client.chat.completions.create( model=self.model['classify_model1'], messages=self.classify_message, stream=False, stop=["}"], timeout=10 ).json() # 解析并提取 question_function response = json.loads(response) question_function = response['choices'][0]['message']['content'] question_function = question_function + "}" question_function = re.findall(r'\{(.*?)\}', question_function)[0] # 使用锁安全地更新 return_dict with return_lock: return_dict[f"Thread-{index}"] = question_function thread_end_time = time.time() print(f"Thread-{index}: Processed question_function: {question_function} 运行时间:{thread_end_time-thread_start_time}") except Exception as e: # print(f"Error in Thread-{index} processing question_function: {e}") self.logger.log_error(f"Error in Thread-{index} processing question_function: {e}") # 启动三个线程,分别进行三次模型回复 threads = [] for i in range(3): thread = threading.Thread(target=process_model_reply, args=(i+1,)) threads.append(thread) thread.start() # 等待所有线程完成 for thread in threads: thread.join() # 计算总运行时间 total_time = time.time() - start_time print(f"Total time for processing: {total_time:.2f} seconds") # 获取所有回复的值 results = list(return_dict.values()) # 判断回复是否有相同的 if len(set(results)) == 1: # 三个回复都相同,返回相同的question_function return results[0] elif len(set(results)) < 3: # 有两个回复相同,找到并返回相同的值 for result in results: if results.count(result) > 1: return result else: # 三个回复都不同,返回 "cwsb" return "cwsb" def music_keyword_retrieve(self,human_input): try: self.retrieve_message = [{'role': 'system', 'content': self.retrieve_prompts}] input_prompt = { "role": "user", "content": human_input } self.retrieve_message.append(input_prompt) response = self.client.chat.completions.create( model=self.model['retrieve_model'], messages=self.retrieve_message, stream=False, stop=["}"], timeout=10 ).json() # 解析并提取 question_function response = json.loads(response) question_retrieve = response['choices'][0]['message']['content'] print(question_retrieve) return question_retrieve except Exception as e: self.logger.log_error("获得音乐关键词失败") def get_info(self,endpoint,params=None): base_url = 'http://127.0.0.1:5000' url = f"{base_url}{endpoint}" try: # 发送 GET 请求 response = requests.get(url, params=params) # 如果请求成功,返回 JSON 数据 if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code}, {response.text}") return None except requests.RequestException as e: print(f"请求失败: {e}") return None def chat(self,human_input): try: return_dict = {} question_function = self.classify_threading(human_input) # print("提取函数后值:",question_function) self.logger.log_blue(f"提取函数后值:{question_function}") valid_functions = [ 'ksam', 'tzam', 'ldzd', 'ldjx', 'jtrq', 'xzsj', 'jttq', 'jtxw', 'dlzd', 'dljx', 'pljk', 'pljd', 'cjlz', 'cjlj', 'zszj', 'zsjx', 'gbzx', 'dayh', 'lddx', 'wddx', 'dldw', 'pldw', 'cjdx', 'amjd', 'amdt', 'ambw', 'gnjs', 'cqjm','ylcx','yltj', 'zsdw', 'wdzj', 'wdjx', 'qthd', 'cwsb', 'bfyy', 'tzbf', 'ssyy', 'xsyy', 'sftj'] if question_function not in valid_functions: question_function = 'cwsb' return_dict.update({'question_function': question_function}) # print("return_dict:",return_dict) time4=time.time() # self.logger.log_blue(f"一层大模型分类时间:{time4-time3}") kwargs = {'human_input': human_input} if question_function != 'cwsb': time5=time.time() chat_response = self.handle_request(question_function, **kwargs) print("chat_response:",chat_response) if question_function=='bfyy': question_retrieve=self.music_keyword_retrieve(human_input) chat_response=f"音乐播放调整:{question_retrieve}" time6=time.time() self.logger.log_blue(f"二层时间:{time6-time5}") # 处理返回的多种信息 if isinstance(chat_response, dict): chat_message = chat_response.get('chat_message', '') response_generator = chat_response.get('response_generator', '') return_dict.update({'chat_message': chat_message, 'response_generator': response_generator}) else: return_dict.update({'chat_message': chat_response}) else: return '我没有理解您的意思,请重新提问。' return return_dict except Exception as e: self.logger.log_error(f"二层模型问题: {e}") if __name__ == '__main__': dashscopeclient = DashscopeClient() while True: # 从终端输入获取 human_input human_input = input("请输入问题:") if human_input.lower() in ['exit', 'quit', 'q']: print("程序结束") break time1 = time.time() return_dict = dashscopeclient.chat(human_input) print(return_dict) ######流式大模型输出的时输出方式######## if isinstance(return_dict, str): pass else: response_generator = return_dict.get('response_generator', '') punctuation_marks_regex = r'[。,,;!?]' full_content = '' last_index = 0 full_reasoning_content = '' for response in response_generator: # if dashscopeclient.suggestion_flag==False: # print(response) if response.choices: reasoning_result = getattr(response.choices[0].delta, "reasoning_content", None) # print("reasoning_content:",reasoning_result) result = getattr(response.choices[0].delta, "content", None) # print("content:",reasoning_result) if reasoning_result: full_reasoning_content += reasoning_result print(reasoning_result) if result: full_content += result punctuation_indices = [m.start() for m in re.finditer(punctuation_marks_regex, full_content)] for index in punctuation_indices: if index > last_index: accumulated_text = full_content[last_index:index+1] last_index = index + 1 if accumulated_text.strip(): print(accumulated_text.strip()) # else: # print("结束了") # break ################################### time2 = time.time() print(time2-time1)