Plotly Dash Graph 需要刷新页面才能更新

Plotly Dash Graph Requires Page Refresh to Update

提问人:PWR460 提问时间:11/11/2023 更新时间:11/11/2023 访问量:22

问:

我正在尝试更新一个 go。通过使用新的(分散的)迹线覆盖数据来计算。这个数字显示在我的达世币仪表板上,并按时间间隔自动重新加载。但是,当我更新图时,它会停止更新绘图(更新的点或之后新添加的点不显示)并在仪表板中变得非交互(将鼠标悬停在点上不会显示悬停文本,无法拖动和缩放等)。go.后端中的图按预期工作,因为我可以在重新加载页面时与更新的绘图进行良好的交互。go.Figure 是从后端调用的,下面的代码中省略了对此的函数调用。

(据我所知)我的代码没有抛出或记录异常。

我发现了一个类似的帖子,描述了与我遇到的相同问题。这篇文章中标记的解决方案建议将 dcc.Graph 对象的 animate 属性设置为 False,但是,这并不能解决我的问题。

这是我的代码:

挡泥板:

from embedding_projector.projector.embedding_projector import EmbeddingProjector
from embedding_projector.projector.projector_plot_manager import ProjectorPlotManager
from embedding_projector.projector.plots_enum import PlotsEnum
from dashboard.dahsboard_settings import DashboardSettings
from dashboard.dashboard_layout import DashboardLayout


_registered_projectors : dict[str, EmbeddingProjector] = {}
_active_projector : EmbeddingProjector = None
_active_plot_manager : ProjectorPlotManager = None

_clicked_plot : PlotsEnum = None

class Dashboard():
    _settings : DashboardSettings = DashboardSettings()
    
    def __init__(self, settings : DashboardSettings) -> None:
        global _settings
        _settings = settings

        global _layout
        self.app.layout = DashboardLayout(settings).get_layout()


    # NOTE Multiple callbacks for single output: https://community.plotly.com/t/multiple-callbacks-for-an-output/51247
    app = DashProxy(__name__, prevent_initial_callbacks=True, transforms=[MultiplexerTransform()])
    app.logger.setLevel(logging.WARNING)
    app.layout = dbc.Container()

    @app.callback(
        [Output('latest-model-plot', 'figure'),
        Output('previous-model-plot', 'figure')],
        Input('refresh-graph-interval', 'n_intervals')
    )
    def refresh_graph_interval(n_intervals):
        if _active_plot_manager is not None:
            latest_model_plot, previous_model_plot = _active_plot_manager.get_plots()
            return latest_model_plot, previous_model_plot
        return no_update, no_update

    def register_projector(self, projector : EmbeddingProjector):
            _registered_projectors[projector.id] = projector
    
    def set_active_projector(self, projector_id):
        global _active_projector
        global _active_plot_manager
        _active_projector = _registered_projectors[projector_id]
        _active_plot_manager = _active_projector.get_plot_manager()

Dashbaord 布局:

from dash import html, dcc
import dash_bootstrap_components as dbc 
from dashboard.dahsboard_settings import DashboardSettings


class DashboardLayout():
    _settings : DashboardSettings
    
    def __init__(self, settings : DashboardSettings):
        self._settings = settings

    def get_layout(self):
        return dbc.Container([
            html.Link(
                rel='stylesheet',
                href='./assets/styling.css'
            ),

            html.Div(className="graph-column",
                children=[
                    dcc.Interval(id="refresh-graph-interval", disabled=False, interval=self._settings.graph_refresh_rate_ms),
                    dcc.Graph(id='latest-model-plot', figure={}, animate=False),
                    dcc.Graph(id='previous-model-plot', figure={}, animate=False),
                ]
            ),

        ...
    ])

标图:

import time
import random
import numpy as np
import plotly.graph_objects as go
import pandas as pd
import copy
import matplotlib.colors as mcolors

from pyparsing import Iterable
from projector.plot_settings import PlotSettings
from projector.plots_enum import PlotsEnum
from embedding_projector.utils.misc import *

class ProjectorPlotManager():
    name : str
    _latest_model_plot : go.Figure
    _labels_shown_in_legend_latest = []

    _previous_model_plot : go.Figure
    _labels_shown_in_legend_previous = []

    _labels_dict = {}
    _color_map = {}
    _settings : PlotSettings


    def __init__(self, name : str, settings : PlotSettings):
        self.name = name
        self._settings = settings
        self._resolve_label_settings()

        self._latest_model_plot = go.Figure()
        self._previous_model_plot = go.Figure()

    def _resolve_label_settings(self):
        self._labels_dict[-1] = self._settings.unclassified_label
        for i, label in enumerate(self._settings.labels):
            self._labels_dict[i] = label

        self._color_map[self._settings.unclassified_label] = self._settings.unclassified_label_color
        self._color_map.update(self._settings.label_colors)


    def get_plots(self):
        return self._latest_model_plot, self._previous_model_plot


    def get_labels(self):
        labels = [self._settings.unclassified_label]
        labels.extend(self._settings.labels)
        return labels
    
   
    def update_plots(self, data_new, time_points, labels = None, update_previous_plot : bool = False):
        if update_previous_plot and len(self._latest_model_plot.data) > 0:
            self._previous_model_plot = copy.deepcopy(self._latest_model_plot)
            self._labels_shown_in_legend_previous = self._labels_shown_in_legend_latest.copy()

        opacity_traces = self._get_opacity_traces()
        # if there are fewer traces in the latest plot, extend opacity_traces such that it matches the length of new_data
        if len(opacity_traces) < len(data_new):
            opacity_traces += [1] * (len(data_new) - len(opacity_traces))

        self._labels_shown_in_legend_latest.clear()
        labels = self._process_labels(labels)
        scatter_traces = self._create_scatter_traces(data_new, labels, time_points, self._labels_shown_in_legend_latest, opacity_traces)
        self._latest_model_plot.update(data=scatter_traces)


    def _create_scatter_traces(self, data : Iterable, labels, time_points : list[float], legend_labels : list[str], opacity_values : list[float] = None):
        scatter_traces = []
        for label, grouped_data in self._group_by_labels(data, labels, time_points, opacity_values):
            if label not in legend_labels:
                show_legend = True
                legend_labels.append(label)
            else: show_legend = False

            if label == self._settings.unclassified_label and show_legend:
                legend_rank = 1
            else: legend_rank = None

            label_color = self._color_map[label]
            for data_point in np.array(grouped_data):
                scatter_trace = self._create_scatter_trace(data_point, label, label_color, show_legend, legend_rank)
                scatter_traces.append(scatter_trace)
                if show_legend: show_legend = False # Only show the legend once for each label
        return scatter_traces


    def _create_scatter_trace(self, data : Iterable, label : str, label_color : str = None, show_legend : bool = False, legend_rank : int = None):
        x=data[0]
        y=data[1]
        time_point = data[3] # matches the index of the "timepoints" column of the data
        opacity = data[4] if data[4] is not None else 1
        
        return go.Scatter(
            x=[x],
            y=[y],
            mode='markers',
            text=f"{label}; T{time_point}",
            name=label,
            legendgroup=label,
            marker=dict(color=label_color),
            showlegend=show_legend,
            legendrank=legend_rank,
            opacity=opacity,
            ids = (time_point,),
            uid = get_trace_uid(x,y, time_point)
        )

    
    def _group_by_labels(self, data, labels : list[str], time_points : list[float], opacity_values : list[float]):
        df = pd.DataFrame(data)
        df['labels'] = labels
        df['time_points'] = time_points
        df['opacity'] = opacity_values
        label_groupings = df.groupby('labels')
        return label_groupings


    # label count is only needed when labels is None, this will set the labels to the unclassified label
    def _process_labels(self, labels : Iterable, label_count : int = None) -> Iterable[str]:
        if labels is None:
            if label_count is None:
                raise Exception("Error processing labels: When labels is none, label_count needs to be provided.")
            return [self._settings.unclassified_label] * label_count
        
        labels = np.nan_to_num(labels, nan=-1)
        labels = list(map(lambda label: self._labels_dict[label], labels))

        for label in labels: 
            if label not in self._color_map:
                self._color_map[label] = self._get_random_plotly_color()

        return labels
    

    def _get_opacity_traces(self) -> list[float]:
        opacity_traces = []
        for trace in self._latest_model_plot.data:
            opacity_traces.append(trace.opacity)
        return opacity_traces


    def _get_random_plotly_color(self):
        colors_in_use = list(self._color_map.values())
        named_colors_list = list(mcolors.CSS4_COLORS.keys())
        if colors_in_use is not None and set(named_colors_list).issubset(set(colors_in_use)):
            raise Exception('all colors available to plotly are already being used')

        random_color = random.choice(named_colors_list)
        while colors_in_use is not None and random_color in colors_in_use:
            random_color = random.choice(named_colors_list)
        return random_color


def get_trace_uid(x_point : float, y_point : float, time_point : float) -> str:
    return f"X{x_point}Y{y_point}T{time_point}"
python-3.x plotly-dash

评论


答: 暂无答案