提问人:mksk23 提问时间:6/22/2023 最后编辑:Marc Glissemksk23 更新时间:6/22/2023 访问量:919
在 angular 中实现 SSE
Implementing SSE in angular
问:
我正在尝试在我的测试应用程序中实现 SSE。服务器端已经设置好了,我只是使用端点(api/v1/sse/document)。这种情况是,当我执行扫描时,扫描应出现在我的测试客户端以及我的主应用程序中。问题是它仅在我刷新页面时出现在我的测试客户端上。我一直在编写一些代码,但我仍然能够自动更新测试客户端中的事件。
这是我写的代码
[文档列表.组件.ts]
public ngOnInit(): void {
this.getDocuments();
this.registerServerSentEvent();
}
public ngOnDestroy(): void {
this.closeServerSentEvent();
}
/**
* getDocuments: call the api service to get the list of documents.
*/
public getDocuments(): void {
this.aService.getDocuments().subscribe((documents: Documents[]) => {
this.documents = documents;
});
}
/**
* markDocumentAsProcessed: call the api service to mark the document as processed.
*/
public markDocumentAsProcessed(document: Documents): void {
this.aService.markDocumentProcessed(document.document.id).subscribe({
next: () => {
// Remove the processed document from the list
this.documents = this.documents.filter((doc) => doc.document.id !== document.document.id);
this.closeDialog();
},
error: (error) => {
console.log("markDocumentProcessed Error:", error);
// Handle the error here
},
});
}
/**
* showDetails: call the api service to get the document image and display it in a dialog.
*/
public showDetails(document: Documents): void {
this.aService.getDocumentImage(document.document.id).subscribe((image: Blob) => {
const url = window.URL.createObjectURL(image);
const safeUrl: SafeUrl = this.sanitizer.bypassSecurityTrustUrl(url);
this.selectedDocument = {...document, imageDataURL: safeUrl};
this.displayDialog = true;
});
}
/**
* closeDialog: close the dialog after pressing the process button.
*/
private closeDialog(): void {
this.displayDialog = false;
this.selectedDocument = null;
}
private registerServerSentEvent(): void {
const sseUrl = `${this.aService.config.url}api/v1/sse/document`;
this.sseSubscription = this.sseService.getServerSentEvent(sseUrl).subscribe((event: MessageEvent) => {
const documentEvent = JSON.parse(event.data);
const eventType = documentEvent.type;
const eventData = documentEvent.data;
switch (eventType) {
case "NewDocument": {
// Process new document event
break;
}
case "ViewDocument": {
// Process view document event
break;
}
case "WatchlistStatusUpdate": {
// Process watchlist status update event
break;
}
case "DocumentProcessed": {
// Process document processed event
const processedDocumentId = eventData.documentId;
this.updateProcessedDocument(processedDocumentId);
break;
}
}
});
}
private updateProcessedDocument(processedDocumentId: string): void {
// Find the processed document in the documents list
const processedDocumentIndex = this.documents.findIndex((doc) => doc.document.id === processedDocumentId);
if (processedDocumentIndex !== -1) {
// Remove the processed document from the list
this.documents.splice(processedDocumentIndex, 1);
// Update any other UI-related logic or perform additional actions as needed
}
}
private closeServerSentEvent(): void {
if (this.sseSubscription) {
this.sseSubscription.unsubscribe();
this.sseService.closeEventSource();
}
}
}
这是我获取文档等的服务
[a.服务.ts]
public getDocuments(): Observable<Documents[]> {
let url = this.config.url;
if (!url.endsWith("/")) {
url += "/";
}
return this.http
.get<Documents[]>(`${url}api/v1/document/`, {
headers: {
authorization: this.config.authorization,
},
})
.pipe(
map((data) => {
return data;
}),
catchError((error) => {
throw error;
})
);
}
/**
* markDocumentProcessed: calls the api service to mark the document as processed.
*
*/
public markDocumentProcessed(documentId: string): Observable<Documents[]> {
let url = this.config.url;
if (!url.endsWith("/")) {
url += "/";
}
const requestBody = {
DocumentId: documentId,
};
return this.http
.post<Documents[]>(`${url}api/v1/document/processed`, requestBody, {
headers: {
authorization: this.config.authorization,
},
})
.pipe(
map((data) => {
return data;
}),
catchError((error) => {
throw error;
})
);
}
/**
* getDocumentImage: calls the api service to get the document image.
*
*/
public getDocumentImage(documentId: string): Observable<Blob> {
let url = this.config.url;
if (!url.endsWith("/")) {
url += "/";
}
return this.http.get(`${url}api/v1/document/${documentId}/image/Photo`, {
responseType: "blob",
headers: {
authorization: this.config.authorization,
},
});
}
我在控制台中没有收到任何错误,我的端点得到 200,但我的测试客户端没有自动接收事件,我必须刷新才能看到更改。
答:
0赞
mat.hudak
6/22/2023
#1
问题在于您正在将 SSE 端点用作常规 GET 端点。当然,只有当您请求更新时,您才会获得更新,例如通过重新加载页面。
这是您正在做的事情(取自您的 SSE 服务屏幕截图):
public getServerSentEvent(url: string): Observable<MessageEvent> {
const token = this.aService.config.authorization;
// WARNING - SSE doesn't have an API for Headers, this won't work if used as proper SSE
const headers = new HttpHeaders({
Authorization: token,
});
return new Observable<MessageEvent>((observer) => {
// HERE - you are not creating the SSE Event Source, just getting from it
this.http.get(url, {headers, responseType: "text"}).subscribe({
//...
})
})
}
因此,您在 SSE 端点上调用 HTTP GET,然后将其伪装成 SSE 事件。正如你所看到的,这是行不通的。
您应该打开一个新的 EventSource,这实际上会像 SSE 一样工作:
// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
// Close event source if current instance of SSE service has some
if (this.eventSource) {
this.closeSseConnection();
this.eventSource = null;
}
// Open new channel, create new EventSource - provide your own URL
this.eventSource = new EventSource(this.yourSSEurl);
// Process default event
this.eventSource.onmessage = (event: MessageEvent) => {
this.zone.run(() => this.processSseEvent(event));
};
// Add your own EVENTS handling...
/*
enum SSE_EVENTS [
NewDocument: 'NewDocument',
//...
]
*/
Object.keys(SSE_EVENTS).forEach(key => {
this.eventSource.addEventListener(SSE_EVENTS[key], event => {
this.zone.run(() => this.processSseEvent(event));
});
});
// Process connection opened
this.eventSource.onopen = () => {
this.reconnectFrequencySec = 1;
};
// Process error
this.eventSource.onerror = (error: any) => {
this.reconnectOnError();
};
}
请参阅我的旧答案:在出错时处理 SSE 重新连接。在那里,您可以找到几乎有效的SSE服务,其中包含您需要的一切。
评论
0赞
mksk23
6/23/2023
有没有其他方法可以传递令牌?我需要将令牌作为标头传递,否则我会在控制台中收到 401 未经授权的错误。
0赞
mat.hudak
6/23/2023
不使用 EventSource,我想只有解决方法
评论