在 angular 中实现 SSE

Implementing SSE in angular

提问人:mksk23 提问时间:6/22/2023 最后编辑:Marc Glissemksk23 更新时间:6/22/2023 访问量:919

问:

我正在尝试在我的测试应用程序中实现 SSE。服务器端已经设置好了,我只是使用端点(api/v1/sse/document)。这种情况是,当我执行扫描时,扫描应出现在我的测试客户端以及我的主应用程序中。问题是它仅在我刷新页面时出现在我的测试客户端上。我一直在编写一些代码,但我仍然能够自动更新测试客户端中的事件。

这是我写的代码

sse.service.ts(服务.ts)

[文档列表.组件.ts]

public ngOnInit(): void {
        this.getDocuments();
        this.registerServerSentEvent();
    }

    public ngOnDestroy(): void {
        this.closeServerSentEvent();
    }

    /**
     * getDocuments: call the api service to get the list of documents.
     */
    public getDocuments(): void {
        this.aService.getDocuments().subscribe((documents: Documents[]) => {
            this.documents = documents;
        });
    }

    /**
     * markDocumentAsProcessed: call the api service to mark the document as processed.
     */
    public markDocumentAsProcessed(document: Documents): void {
        this.aService.markDocumentProcessed(document.document.id).subscribe({
            next: () => {
                // Remove the processed document from the list
                this.documents = this.documents.filter((doc) => doc.document.id !== document.document.id);
                this.closeDialog();
            },
            error: (error) => {
                console.log("markDocumentProcessed Error:", error);
                // Handle the error here
            },
        });
    }

    /**
     * showDetails: call the api service to get the document image and display it in a dialog.
     */
    public showDetails(document: Documents): void {
        this.aService.getDocumentImage(document.document.id).subscribe((image: Blob) => {
            const url = window.URL.createObjectURL(image);
            const safeUrl: SafeUrl = this.sanitizer.bypassSecurityTrustUrl(url);
            this.selectedDocument = {...document, imageDataURL: safeUrl};
            this.displayDialog = true;
        });
    }

    /**
     * closeDialog: close the dialog after pressing the process button.
     */
    private closeDialog(): void {
        this.displayDialog = false;
        this.selectedDocument = null;
    }

    private registerServerSentEvent(): void {
        const sseUrl = `${this.aService.config.url}api/v1/sse/document`;
        this.sseSubscription = this.sseService.getServerSentEvent(sseUrl).subscribe((event: MessageEvent) => {
            const documentEvent = JSON.parse(event.data);
            const eventType = documentEvent.type;
            const eventData = documentEvent.data;

            switch (eventType) {
                case "NewDocument": {
                    // Process new document event
                    break;
                }

                case "ViewDocument": {
                    // Process view document event
                    break;
                }

                case "WatchlistStatusUpdate": {
                    // Process watchlist status update event
                    break;
                }

                case "DocumentProcessed": {
                    // Process document processed event
                    const processedDocumentId = eventData.documentId;
                    this.updateProcessedDocument(processedDocumentId);
                    break;
                }
            }
        });
    }

    private updateProcessedDocument(processedDocumentId: string): void {
        // Find the processed document in the documents list
        const processedDocumentIndex = this.documents.findIndex((doc) => doc.document.id === processedDocumentId);
        if (processedDocumentIndex !== -1) {
            // Remove the processed document from the list
            this.documents.splice(processedDocumentIndex, 1);
            // Update any other UI-related logic or perform additional actions as needed
        }
    }

    private closeServerSentEvent(): void {
        if (this.sseSubscription) {
            this.sseSubscription.unsubscribe();
            this.sseService.closeEventSource();
        }
    }
}

这是我获取文档等的服务

[a.服务.ts]

public getDocuments(): Observable<Documents[]> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        return this.http
            .get<Documents[]>(`${url}api/v1/document/`, {
                headers: {
                    authorization: this.config.authorization,
                },
            })
            .pipe(
                map((data) => {
                    return data;
                }),
                catchError((error) => {
                    throw error;
                })
            );
    }

    /**
     * markDocumentProcessed: calls the api service to mark the document as processed.
     *
     */
    public markDocumentProcessed(documentId: string): Observable<Documents[]> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        const requestBody = {
            DocumentId: documentId,
        };

        return this.http
            .post<Documents[]>(`${url}api/v1/document/processed`, requestBody, {
                headers: {
                    authorization: this.config.authorization,
                },
            })
            .pipe(
                map((data) => {
                    return data;
                }),
                catchError((error) => {
                    throw error;
                })
            );
    }

    /**
     * getDocumentImage: calls the api service to get the document image.
     *
     */
    public getDocumentImage(documentId: string): Observable<Blob> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        return this.http.get(`${url}api/v1/document/${documentId}/image/Photo`, {
            responseType: "blob",
            headers: {
                authorization: this.config.authorization,
            },
        });
    }

我在控制台中没有收到任何错误,我的端点得到 200,但我的测试客户端没有自动接收事件,我必须刷新才能看到更改。

Angular TypeScript 事件处理 server-sent-events

评论


答:

0赞 mat.hudak 6/22/2023 #1

问题在于您正在将 SSE 端点用作常规 GET 端点。当然,只有当您请求更新时,您才会获得更新,例如通过重新加载页面。

这是您正在做的事情(取自您的 SSE 服务屏幕截图):

public getServerSentEvent(url: string): Observable<MessageEvent> {
  const token = this.aService.config.authorization;
  // WARNING - SSE doesn't have an API for Headers, this won't work if used as proper SSE
  const headers = new HttpHeaders({
    Authorization: token,
  });

  return new Observable<MessageEvent>((observer) => {
    // HERE - you are not creating the SSE Event Source, just getting from it
    this.http.get(url, {headers, responseType: "text"}).subscribe({
      //...
    })
  })
}

因此,您在 SSE 端点上调用 HTTP GET,然后将其伪装成 SSE 事件。正如你所看到的,这是行不通的。

您应该打开一个新的 EventSource,这实际上会像 SSE 一样工作:

// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
  // Close event source if current instance of SSE service has some
  if (this.eventSource) {
    this.closeSseConnection();
    this.eventSource = null;
  }
  // Open new channel, create new EventSource - provide your own URL
  this.eventSource = new EventSource(this.yourSSEurl);

  // Process default event
  this.eventSource.onmessage = (event: MessageEvent) => {
    this.zone.run(() => this.processSseEvent(event));
  };

  // Add your own EVENTS handling...
  /*
    enum SSE_EVENTS [
      NewDocument: 'NewDocument',
      //...
    ]
  */
  Object.keys(SSE_EVENTS).forEach(key => {
    this.eventSource.addEventListener(SSE_EVENTS[key], event => {
      this.zone.run(() => this.processSseEvent(event));
    });
  });

  // Process connection opened
  this.eventSource.onopen = () => {
    this.reconnectFrequencySec = 1;
  };

  // Process error
  this.eventSource.onerror = (error: any) => {
    this.reconnectOnError();
  };
}

请参阅我的旧答案:在出错时处理 SSE 重新连接。在那里,您可以找到几乎有效的SSE服务,其中包含您需要的一切。

评论

0赞 mksk23 6/23/2023
有没有其他方法可以传递令牌?我需要将令牌作为标头传递,否则我会在控制台中收到 401 未经授权的错误。
0赞 mat.hudak 6/23/2023
不使用 EventSource,我想只有解决方法