如何在 Python 中将 websocket 中的 JSON 数据解析为变量

How to parse JSON data from websocket to variable in Python

提问人:Norbert Yuhas 提问时间:7/27/2023 最后编辑:Norbert Yuhas 更新时间:7/27/2023 访问量:38

问:

我正在使用 agi 星号进行语音识别。它通过 websocket 连接并接收 json 格式的响应。 该脚本收集单词并将它们粘合成一个短语。 在这个阶段,一切都在工作。

我需要进一步使用收集到的短语(发送到电报,然后通过 api 发送到帮助台系统),但如果我只是使用文本作为变量,它就不起作用。 如何使文本成为变量?

#!/usr/bin/python3
from asterisk.agi import *
import os
from websocket import create_connection
import json
import traceback
import requests

AUDIO_FD = 3
CONTENT_TYPE = 'audio/l16; rate=8000; channels=1'
ACCEPT = 'audio/pcm'

def telegram_bot_sendtext(text):
    bot_token = '6069wxts_nWcA'
    bot_chatID = '-10300'
    bot_message = text
    send_text = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + bot_chatID + '&reply_to_message_id=2&parse_mode=Markdown&text=' + bot_message
    response = requests.get(send_text)
    return response.json()

def process_chunk(agi, ws, buf):
    agi.verbose("Processing chunk")
    ws.send_binary(buf)
    res = json.loads(ws.recv())
    agi.verbose("Result: " + str(res))
    if 'result' in res:
        **text = " ".join([w['word'] for w in res['result']])**
        os.system("espeak -w /tmp/response22.wav \"" + text.encode('utf-8') + "\"")
        os.system("sox /tmp/response22.wav -r 8000 /tmp/response.wav")
        agi.stream_file("/tmp/response")
        os.remove("/tmp/response.wav")

def startAGI():
    agi = AGI()
    agi.verbose("EAGI script started...")
    ani = agi.env['agi_callerid']
    did = agi.env['agi_extension']
    agi.verbose("Call answered from: %s to %s" % (ani, did))
    ws = create_connection("ws://localhost:2700")
    ws.send('{ "config" : { "sample_rate" : 8000 } }')
    agi.verbose("Connection created")
    try:
        while True:
            data = os.read(AUDIO_FD, 8000)
            if not data:
                break
            process_chunk(agi, ws, data)
    except Exception as err:
        agi.verbose(''.join(traceback.format_exception(type(err), err, err.__traceback__)).replace('\n', ' '))
    try:
        telegram_bot_sendtext(text)
    except Exception as exc:
        print("Post_Auth_Script_Telega : Error : ", str(exc))
    finally:
        ws.close()

startAGI()

JSON 如下所示

Result: {
'result': [
{'conf': 1.0, 'end': 2.07, 'start': 1.71, 'word': 'One'}, 
{'conf': 1.0, 'end': 2.34, 'start': 2.07, 'word': 'Two'}, 
], 
'text': 'One Two'}
python-3.x 变量 加入 websocket

评论


答:

0赞 Norbert Yuhas 7/27/2023 #1

最终工作版本

#!/usr/bin/python3

from asterisk.agi import *
import os
from websocket import create_connection
import json
import traceback
import requests

AUDIO_FD = 3
CONTENT_TYPE = 'audio/l16; rate=8000; channels=1'
ACCEPT = 'audio/pcm'

def telegram_bot_sendtext(text):
    bot_token = '606922aNwxts_nWcA'
    bot_chatID = '-100100'
    bot_message = text 
    send_text = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + bot_chatID + '&reply_to_message_id=2&parse_mode=Markdown&text=' + bot_message
    response = requests.get(send_text)
    return response.json()

def process_chunk(agi, ws, buf):
    agi.verbose("Processing chunk")
    ws.send_binary(buf)
    res = json.loads(ws.recv())
    agi.verbose("Result: " + str(res))
    if 'result' in res:
        text = " ".join([w['word'] for w in res['result']])
        try:
            telegram_bot_sendtext(text)
        except Exception as exc:
            print("rror : ", str(exc))
        os.system("espeak -w /tmp/response22.wav \"" + text.encode('utf-8') + "\"")
        os.system("sox /tmp/response22.wav -r 8000 /tmp/response.wav")
        agi.stream_file("/tmp/response")
        os.remove("/tmp/response.wav")

def startAGI():
    agi = AGI()
    agi.verbose("EAGI script started...")
    ani = agi.env['agi_callerid']
    did = agi.env['agi_extension']
    agi.verbose("Call answered from: %s to %s" % (ani, did))
    ws = create_connection("ws://localhost:2700")
    ws.send('{ "config" : { "sample_rate" : 8000 } }')
    agi.verbose("Connection created")
    try:
        while True:
            data = os.read(AUDIO_FD, 8000)
            if not data:
                break
            process_chunk(agi, ws, data)
    except Exception as err:
        agi.verbose(''.join(traceback.format_exception(type(err), err, err.__traceback__)).replace('\n', ' '))
#    try:
#        telegram_bot_sendtext(text)
#    except Exception as exc:
#        print("rror : ", str(exc))
    finally:
        ws.close()

startAGI()

评论

0赞 Community 7/31/2023
正如目前所写的那样,你的答案尚不清楚。请编辑以添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。您可以在帮助中心找到有关如何写出好答案的更多信息。