提问人:PWR460 提问时间:11/11/2023 更新时间:11/11/2023 访问量:22
Plotly Dash Graph 需要刷新页面才能更新
Plotly Dash Graph Requires Page Refresh to Update
问:
我正在尝试更新一个 go。通过使用新的(分散的)迹线覆盖数据来计算。这个数字显示在我的达世币仪表板上,并按时间间隔自动重新加载。但是,当我更新图时,它会停止更新绘图(更新的点或之后新添加的点不显示)并在仪表板中变得非交互(将鼠标悬停在点上不会显示悬停文本,无法拖动和缩放等)。go.后端中的图按预期工作,因为我可以在重新加载页面时与更新的绘图进行良好的交互。go.Figure 是从后端调用的,下面的代码中省略了对此的函数调用。
(据我所知)我的代码没有抛出或记录异常。
我发现了一个类似的帖子,描述了与我遇到的相同问题。这篇文章中标记的解决方案建议将 dcc.Graph 对象的 animate 属性设置为 False,但是,这并不能解决我的问题。
这是我的代码:
挡泥板:
from embedding_projector.projector.embedding_projector import EmbeddingProjector
from embedding_projector.projector.projector_plot_manager import ProjectorPlotManager
from embedding_projector.projector.plots_enum import PlotsEnum
from dashboard.dahsboard_settings import DashboardSettings
from dashboard.dashboard_layout import DashboardLayout
_registered_projectors : dict[str, EmbeddingProjector] = {}
_active_projector : EmbeddingProjector = None
_active_plot_manager : ProjectorPlotManager = None
_clicked_plot : PlotsEnum = None
class Dashboard():
_settings : DashboardSettings = DashboardSettings()
def __init__(self, settings : DashboardSettings) -> None:
global _settings
_settings = settings
global _layout
self.app.layout = DashboardLayout(settings).get_layout()
# NOTE Multiple callbacks for single output: https://community.plotly.com/t/multiple-callbacks-for-an-output/51247
app = DashProxy(__name__, prevent_initial_callbacks=True, transforms=[MultiplexerTransform()])
app.logger.setLevel(logging.WARNING)
app.layout = dbc.Container()
@app.callback(
[Output('latest-model-plot', 'figure'),
Output('previous-model-plot', 'figure')],
Input('refresh-graph-interval', 'n_intervals')
)
def refresh_graph_interval(n_intervals):
if _active_plot_manager is not None:
latest_model_plot, previous_model_plot = _active_plot_manager.get_plots()
return latest_model_plot, previous_model_plot
return no_update, no_update
def register_projector(self, projector : EmbeddingProjector):
_registered_projectors[projector.id] = projector
def set_active_projector(self, projector_id):
global _active_projector
global _active_plot_manager
_active_projector = _registered_projectors[projector_id]
_active_plot_manager = _active_projector.get_plot_manager()
Dashbaord 布局:
from dash import html, dcc
import dash_bootstrap_components as dbc
from dashboard.dahsboard_settings import DashboardSettings
class DashboardLayout():
_settings : DashboardSettings
def __init__(self, settings : DashboardSettings):
self._settings = settings
def get_layout(self):
return dbc.Container([
html.Link(
rel='stylesheet',
href='./assets/styling.css'
),
html.Div(className="graph-column",
children=[
dcc.Interval(id="refresh-graph-interval", disabled=False, interval=self._settings.graph_refresh_rate_ms),
dcc.Graph(id='latest-model-plot', figure={}, animate=False),
dcc.Graph(id='previous-model-plot', figure={}, animate=False),
]
),
...
])
标图:
import time
import random
import numpy as np
import plotly.graph_objects as go
import pandas as pd
import copy
import matplotlib.colors as mcolors
from pyparsing import Iterable
from projector.plot_settings import PlotSettings
from projector.plots_enum import PlotsEnum
from embedding_projector.utils.misc import *
class ProjectorPlotManager():
name : str
_latest_model_plot : go.Figure
_labels_shown_in_legend_latest = []
_previous_model_plot : go.Figure
_labels_shown_in_legend_previous = []
_labels_dict = {}
_color_map = {}
_settings : PlotSettings
def __init__(self, name : str, settings : PlotSettings):
self.name = name
self._settings = settings
self._resolve_label_settings()
self._latest_model_plot = go.Figure()
self._previous_model_plot = go.Figure()
def _resolve_label_settings(self):
self._labels_dict[-1] = self._settings.unclassified_label
for i, label in enumerate(self._settings.labels):
self._labels_dict[i] = label
self._color_map[self._settings.unclassified_label] = self._settings.unclassified_label_color
self._color_map.update(self._settings.label_colors)
def get_plots(self):
return self._latest_model_plot, self._previous_model_plot
def get_labels(self):
labels = [self._settings.unclassified_label]
labels.extend(self._settings.labels)
return labels
def update_plots(self, data_new, time_points, labels = None, update_previous_plot : bool = False):
if update_previous_plot and len(self._latest_model_plot.data) > 0:
self._previous_model_plot = copy.deepcopy(self._latest_model_plot)
self._labels_shown_in_legend_previous = self._labels_shown_in_legend_latest.copy()
opacity_traces = self._get_opacity_traces()
# if there are fewer traces in the latest plot, extend opacity_traces such that it matches the length of new_data
if len(opacity_traces) < len(data_new):
opacity_traces += [1] * (len(data_new) - len(opacity_traces))
self._labels_shown_in_legend_latest.clear()
labels = self._process_labels(labels)
scatter_traces = self._create_scatter_traces(data_new, labels, time_points, self._labels_shown_in_legend_latest, opacity_traces)
self._latest_model_plot.update(data=scatter_traces)
def _create_scatter_traces(self, data : Iterable, labels, time_points : list[float], legend_labels : list[str], opacity_values : list[float] = None):
scatter_traces = []
for label, grouped_data in self._group_by_labels(data, labels, time_points, opacity_values):
if label not in legend_labels:
show_legend = True
legend_labels.append(label)
else: show_legend = False
if label == self._settings.unclassified_label and show_legend:
legend_rank = 1
else: legend_rank = None
label_color = self._color_map[label]
for data_point in np.array(grouped_data):
scatter_trace = self._create_scatter_trace(data_point, label, label_color, show_legend, legend_rank)
scatter_traces.append(scatter_trace)
if show_legend: show_legend = False # Only show the legend once for each label
return scatter_traces
def _create_scatter_trace(self, data : Iterable, label : str, label_color : str = None, show_legend : bool = False, legend_rank : int = None):
x=data[0]
y=data[1]
time_point = data[3] # matches the index of the "timepoints" column of the data
opacity = data[4] if data[4] is not None else 1
return go.Scatter(
x=[x],
y=[y],
mode='markers',
text=f"{label}; T{time_point}",
name=label,
legendgroup=label,
marker=dict(color=label_color),
showlegend=show_legend,
legendrank=legend_rank,
opacity=opacity,
ids = (time_point,),
uid = get_trace_uid(x,y, time_point)
)
def _group_by_labels(self, data, labels : list[str], time_points : list[float], opacity_values : list[float]):
df = pd.DataFrame(data)
df['labels'] = labels
df['time_points'] = time_points
df['opacity'] = opacity_values
label_groupings = df.groupby('labels')
return label_groupings
# label count is only needed when labels is None, this will set the labels to the unclassified label
def _process_labels(self, labels : Iterable, label_count : int = None) -> Iterable[str]:
if labels is None:
if label_count is None:
raise Exception("Error processing labels: When labels is none, label_count needs to be provided.")
return [self._settings.unclassified_label] * label_count
labels = np.nan_to_num(labels, nan=-1)
labels = list(map(lambda label: self._labels_dict[label], labels))
for label in labels:
if label not in self._color_map:
self._color_map[label] = self._get_random_plotly_color()
return labels
def _get_opacity_traces(self) -> list[float]:
opacity_traces = []
for trace in self._latest_model_plot.data:
opacity_traces.append(trace.opacity)
return opacity_traces
def _get_random_plotly_color(self):
colors_in_use = list(self._color_map.values())
named_colors_list = list(mcolors.CSS4_COLORS.keys())
if colors_in_use is not None and set(named_colors_list).issubset(set(colors_in_use)):
raise Exception('all colors available to plotly are already being used')
random_color = random.choice(named_colors_list)
while colors_in_use is not None and random_color in colors_in_use:
random_color = random.choice(named_colors_list)
return random_color
def get_trace_uid(x_point : float, y_point : float, time_point : float) -> str:
return f"X{x_point}Y{y_point}T{time_point}"
答: 暂无答案
评论