提问人:Jerome 提问时间:11/16/2023 更新时间:11/16/2023 访问量:21
协调异步进程:管理来自两个函数的数据流以控制另一个依赖函数
Coordinating Asynchronous Processes: Managing Data Flow from two functions to control another dependent function
问:
只是想问一下我需要使用什么方法来完成基于 python 的项目。
描述: 因此,我正在开发一个移动机器人,我正在使用 rplidar 和 uwb 分别用于防撞和定位。
处理时间(如果单独测试):
- 对于 rplidar 数据 “get_data_from_LIDAR()” : 2-20ms
- 对于uwb数据“main_visualizaiton()”:1300-2100ms
我希望它们在系统激活时同时运行,但是它们发送/更新数据的进程完成时间不同,这对我来说是异步的,因为 rplidar 太快而 uwb 很慢。
方法/关注点: 我已经尝试将 uwb 的功能放入 rplidar,反之亦然,但它会导致 rplidar 出现错误,即“不正确的起始描述符字节和错误的主体大小”,有时它会卡住。 问题是 uwb 的处理速度太慢,这就是它影响 rpliar 功能的原因。
问题: 来自这两个函数的两个数据对于操作“controlCenter()”的函数都很重要。 要发送的数据在“#Send 要处理的数据”的注释下。我只是希望它们以某种方式单独运行,但仍然更新需要在“controlCenter”中使用的变量,但我还不知道我需要为此做什么方法。
我感谢解决此问题的任何指导或建议。 谢谢。
顺便说一句,这是 python 片段:
#Encapsulation
class robotController:
def __init__(self):
#Securing RPLidar Connection
#Securing UWB Connection
def get_data_from_LIDAR(self): #Total Process Time: 2ms- 20ms
if (self.con_scan):
try:
print(self.lidar.info) #Print LIDAR info
for scan in self.lidar.iter_scans(max_buf_meas=500):
self.scan_data = [0]*360
for (_, angle, distance) in scan:
intAngle = floor(angle)
index = min([359, intAngle])
self.scan_data[index] = distance
# Send Data to Process for ObstacleDetection/Collision Avoidance v Motor Control
angle340_359 = self.scan_data[340:360] # 340-359 [LEFT FRONT]
angle0_20 = self.scan_data[0:21] # 0-20 [RIGHT FRONT]
angle270_339 = self.scan_data[270:340] # 270-339 [LEFT SIDEMOST]
angle21_90 = self.scan_data[21:91] # 21-90 [RIGHT SIDEMOST]
self.cycle += 1
except KeyboardInterrupt:
self.con_scan = False
print("\nLIDAR Stopped!")
self.lidar.stop()
self.lidar.disconnect()
GPIO.cleanup()
sys.exit()
except Exception as e:
print(f"Error: {e}")
def uwb_data_visualization(self): #Total Process Time: 1300ms- 2100ms
while True:
curTime = int(time.time()*1000)
if (curTime - self.lastExecTime) > self.elapsedTime:
node_count = 0
list = self.read_data() # Read UWB data
for one in list:
if one["A"] == "1782":
self.clean(self.t_a1) # Clear and update anchor A1782's range
self.a1_range = self.uwb_range_offset(float(one["R"]))
self.draw_uwb_anchor(-250, 150, "A1782(0,0)", self.a1_range, self.t_a1)
node_count += 1
if one["A"] == "1783":
self.clean(self.t_a2) # Clear and update anchor A1783's range
self.a2_range = self.uwb_range_offset(float(one["R"]))
self.draw_uwb_anchor(-250 + self.meter2pixel * self.distance_a1_a2, 150, "A1783(" +
str(self.distance_a1_a2)+")", self.a2_range, self.t_a2)
node_count += 1
if node_count == 2: # Calculate the tag's position based on the two anchor ranges (CHANGING)
# Coordinates of Robot Current Position
x1, y1 = self.tag_pos(self.a2_range, self.a1_range, self.distance_a1_a2)
print("x1:", x1,"m ","y1:", y1,"m")
self.clean(self.t_a3)
self.draw_uwb_tag(x1, y1, "TAGi", self.t_a3)
# Coordinates of Target Position
x2 = 1.2
y2 = 1.4
#print("x2:", x2,"m ","y2:", y2,"m")
#Euclidean Linear Distance Computation
distance_between_centers = ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5
#print("Distance inBetween: ", distance_between_centers)
distance_threshold = 0.75 # Define a distance threshold (adjust as needed)
if distance_between_centers >= distance_threshold:
self.draw_line(x1,y1,x2,y2,color="black")
self.is_delivery_comple = False
else:
self.draw_line(x1,y1,x2,y2,color="yellow")
self.is_delivery_complete = True
'''Send Data to Process for Positioning v Motor Control
x1, y1,
x2, y2,
distance_between_centers,
is_delivery_complete
'''
# Pause briefly to allow for updates
self.lastExecTime = curTime #50ms
turtle.mainloop()
def controlCenter(self):
#Received Data from UWB and RPlidar
#<No line here for receiving of data>
#Sample motor contorl logic
if is_delivery_complete:
print("stop")
else:
print("forward")
if angle340_359 >= 650 || angle0_20 >= 650:
print("rotateCW")
else:
print("something")
#Initialization
robot = robotController()
#Activating System's Process
while True:
robot.controlCenter()
答: 暂无答案
评论