如何在树莓派中将我的 arduino 代码添加到我现有的 Web 应用程序代码中

How to add my arduino code to my existing web app code in raspberry pi

提问人:win 提问时间:11/16/2023 最后编辑:win 更新时间:11/16/2023 访问量:28

问:

当我添加这个从 arduino 获取数据的 python 代码时,我的 Web 应用程序代码中的图形和显示传感器值不会显示在 Web 中。它只显示未知,但不显示实际读数。

import serial
import time

if __name__ == '__main__':
    ser = serial.Serial('/dev/ttyACM0',9600, timeout=1)
    ser.flush()

    while True:
        if ser.in_waiting > 0:
            data = ser.readline().decode('utf-8').rstrip()  # Read and decode the incoming data
            print(data)

    

这是我使用 python 代码创建的现有 Web 应用程序代码,该代码将从 arduino 获取数据:

这是 app.py 代码

import json
from flask import Flask, render_template, request, jsonify,redirect, Response
from flask_socketio import SocketIO
from threading import Lock
from mq6_module import mq6_modules
import RPi.GPIO as GPIO
from datetime import datetime
from mq6_module import Mq6Module
from power import Power_data


# GPIO Pin Configuration

LED_Pin = 23
Buzzer_Pin = 26
Solenoid_Pin = 12


# Setup GPIO
GPIO.setmode(GPIO.BCM)

GPIO.setup(LED_Pin, GPIO.OUT)
GPIO.setup(Buzzer_Pin, GPIO.OUT)
GPIO.setup(Solenoid_Pin, GPIO.OUT)


# Set the initial state of the GPIO pins
GPIO.output(LED_Pin, GPIO.LOW)
GPIO.output(Buzzer_Pin, GPIO.LOW)
GPIO.output(Solenoid_Pin, GPIO.HIGH)

thread = None
thread_lock = Lock()

app = Flask(__name__)
app.config["SECRET_KEY"] = "group3"
socketio = SocketIO(app, cors_allowed_origins="*")


"""
Background Thread
"""
def background_thread():
    while True:
        for mq6 in mq6_modules:
            gasvalue = mq6.get_sensor_readings()
        
            sensor_readings = {
            
                "gas_value": gasvalue,
             
            }
            sensor_json = json.dumps(sensor_readings)
            socketio.emit("updateSensorData", json.dumps(sensor_readings))
            socketio.sleep(1)

"""
Get current date time
"""
def get_current_datetime():
    now = datetime.now()
    return now.strftime("%m/%d/%Y %H:%M:%S")
"""
electrical
"""
ser = serial.Serial('/dev/ttyACM0', 9600, timeout=1)
ser.flush()

def read_serial():
    while True:
        if ser.in_waiting > 0:
            data = ser.readline().decode('utf-8').rstrip()
            socketio.emit('sensor_data', data)
        
        time.sleep(0.1)

"""
Generate random sequence of sensor values and send it to our clients
"""
def background_thread():
    print("Generating sensor values")
    while True:
        gas_sensor_value = mq6_modules[0].mq.MQPercentage()["GAS_LPG"] 
    

        socketio.emit('updateSensorData', {'value': gas_sensor_value, "date": get_current_datetime()})
        socketio.sleep(1)

  

"""
Serve root index file
"""
@app.route("/")
def index():

    return render_template("index.html")



@app.route('/gas_value')
def gas_value():
    gas_value = mq6_modules[0].mq.MQPercentage()["GAS_LPG"]   # Assuming the first instance is used
    return jsonify({'gas_value': gas_value})


# API endpoint to control LED, buzzer, and solenoid valve based on gas value
@app.route('/control')
def control_led_buzzer_valve():
    gas_value = mq6_modules[0].mq.MQPercentage()["GAS_LPG"]  # Assuming the first instance is used
    valve_status = ''
    if gas_value >= 0.2:
        GPIO.output(LED_Pin, GPIO.HIGH)  # Turn on LED
        GPIO.output(Buzzer_Pin, GPIO.HIGH)  # Turn on Buzzer
        GPIO.output(Solenoid_Pin, GPIO.LOW)  # Close the valve
        valve_status = 'Shutdown'  # Valve is shut down
    else:
        GPIO.output(LED_Pin, GPIO.LOW)  # Turn off LED
        GPIO.output(Buzzer_Pin, GPIO.LOW)  # Turn off Buzzer
        GPIO.output(Solenoid_Pin, GPIO.HIGH)  # Open the valve
        valve_status = 'Open'  # Valve is open

    return jsonify({'gas_value': gas_value, 'valve_status': valve_status})





"""
Decorator for connect
"""


@socketio.on("connect")
def connect():
    global thread
    print("Client connected")

    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(background_thread)
        


"""
Decorator for disconnect
"""
@socketio.on("disconnect")
def disconnect():
    print("Client disconnected", request.sid)

if __name__ == "__main__":
    t = threading.Thread(target=read_serial)
    t.daemon = True
    t.start()
   socketio.run(app, port=5000, host="0.0.0.0", debug=True)
python html 烧瓶 websocket 树莓-pi3

评论

0赞 Community 11/16/2023
请修剪您的代码,以便更轻松地找到您的问题。请遵循这些准则,以创建最小的可重现示例
0赞 toyota Supra 11/16/2023
这是链接。deviceplus.com/raspberry-pi/......

答: 暂无答案