使用 express 和 axios 对 Transfer-Encoding 分块 http 请求进行错误处理

Error handling with Transfer-Encoding chunked http requests with express and axios

提问人:Roe 提问时间:9/30/2023 更新时间:10/1/2023 访问量:94

问:

我试图使用流从JS中的数据库中获取大量数据,以防止一次将所有数据加载到内存中。我使用 express 作为我的服务器和一个使用 Axios 获取数据的 nodeJS 客户端。我已经成功地通过流式传输获取了数据,但不知道如何处理流式传输时发生的错误。

Express 服务器:

app.get('/stream', async (req, res) => {
    try {
      const cursor = //fetch data with a limit & skip (MongoDB)
      while(cursor.hasNext()) {
        const data = await cursor.next()

        const writeToStream = new Promise((resolve) => {
          res.write(data, 'utf8', () => {
            console.log("batch sent");
            resolve()
          })
  
        })
        await writeToStream
      }
      res.end()
    } catch(error) {
      console.log(`error: ${error.message}`)
      
      //How do i send the status & error message to the requestor? 
      //return res.status(400).end(error.message) // <-- wanted behavior
  })

客户:

    try {
      const res = await axios({
        url: 'http://localhost:3000/test',
        responseType: 'stream'
      })

      const { data } = res
      data.pipe(someStream)

      data.on('end', () => { 
        //stream finished
      })

      data.on('error', (error) => { // First Option
        //error without a status or message
        //res.status(error.status).send(error.message) // <-- wanted behavior  
      })
    } catch(error) {  // Second Option
      //error without a status or message
      //return res.status(error.status).send(error.message) // <-- wanted behavior
    }
      

客户端上的错误处理有效(代码运行),但我无法弄清楚如何从服务器向客户端发送状态和消息,指示错误并指定它。

版本:“axios”: “^1.5.1”, “express”: “^4.18.2”

希望得到一些帮助。 提前致谢!

JavaScript Express 错误处理 Axios 分块编码

评论


答:

0赞 Amir Hossein Baghernezad 10/1/2023 #1

问题是,在将标头发送到客户端后,您无法设置标头。因此,当您使用标头启动流时,该流已作为 .res.write200

你能做的就是使用一个技巧。您可以为 和 设置固定前缀。通过这种方式,您可以区分什么是真实数据,什么是真实错误。我知道这不是最有效的方法,但有效并且似乎是合理的,因为流堆栈本身根本不提供错误管理机制DATAERROR

server.ts

import express from 'express'

const app = express()
const port = 3000

const DATA_PREFIX = "DATA:"
const ERROR_PREFIX = "ERROR:"

app.get('/stream', async (req, res) => {
  try {
    // I have replaced the query to MongoDB with some sample data.
    const sample_datas = ["data1", "data2", "data3", "data4", "data5"]

    for (let dt of sample_datas) {
      const writeToStream = new Promise((resolve) => {

        // if dt == "data4", then simulate an error
        if (dt == "data4") {
          throw new Error("data4 has problems.")
        }

        res.write(DATA_PREFIX + dt, 'utf8', () => {
          console.log("batch sent");
          resolve(true)
        })
      })
      await writeToStream
    }

    res.end()
  } catch (error) {
    console.log(`error: ${error.message}`)

    //How do i send the status & error message to the requestor?
    res.write(ERROR_PREFIX + error.message)
    res.end()
  }
})

app.listen(port, () => {
  console.log(`Example app listening on port ${port}`)
})

client.ts

import axios from 'axios'
import fs from 'fs'
import stream from 'stream'

const DATA_PREFIX = "DATA:"
const ERROR_PREFIX = "ERROR:"

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

const main = async () => {

  await sleep(2000)

  try {
    const res = await axios({
      url: 'http://localhost:3000/stream',
      responseType: 'stream'
    })

    const { data } = res

    const writableStream = fs.createWriteStream('__output__.txt');

    const appendText = new stream.Transform({
      transform(chunk: Buffer, encoding, callback) {

        const chunk_str = chunk.toString('utf8')

        if (chunk_str.startsWith(DATA_PREFIX)) {
          this.push(Buffer.from(chunk_str.replace(DATA_PREFIX, ""), 'utf8'))
        }
        else if (chunk_str.startsWith(ERROR_PREFIX)) {
          const error = chunk_str.replace(ERROR_PREFIX, "")
          error_function(new Error(error))
        }

        callback();
      }
    });

    data.pipe(appendText).pipe(writableStream);

    data.on('end', () => {
      console.log("stream finished")
    })

    const error_function = (error: Error) => {
      console.log("data.on error:", error)
    }

    data.on('error', error_function)

  } catch (error) {
    console.log("catch error:", error)
  }
}

main()

评论

0赞 Roe 10/7/2023
这也可以在浏览器端 js 上完成吗?
0赞 Amir Hossein Baghernezad 10/7/2023
@Roe 是的,除了您可以删除并用自己的任务方法替换它的部分之外,它还是一个受良好支持的 Web API。fsstream.Transform
0赞 Mikhail Ozolins 10/1/2023 #2

Express 服务器:在服务器代码中,您希望确保在发送数据时出现问题时,请清楚地告诉客户端。方法如下:

  1. 我们使用一个名为 cursor.forEach() 的方法在从数据库获取数据时将数据发送到客户端。
  2. 如果在发送数据时发生不好的事情,我们会让客户知道,“哎呀,出了点问题!”,我们也会告诉他们这是我们的错(HTTP 状态代码 500)。
  3. 如果在我们开始发送数据之前就出了问题,我们仍然会告诉客户同样的事情。

客户端:在客户端方面,您在错误部分做得很好,但让我们分解一下:

  1. 当服务器发送数据时,我们会捕获它并将其放在某个地方,例如将其保存到文件中。
  2. 我们正在检查所有数据的发送时间(例如完成下载)。
  3. 如果在获取数据时出现问题,我们会说,“哎呀,出事了!”并记录发生的事情。我们也可以处理这个问题,比如停止下载。

通过进行这些更改,您将能够在发送数据时处理问题,并且您将以简单明了的方式告诉客户。

评论

0赞 Roe 10/7/2023
我想专门为我的 api 指定状态和错误消息。如何在发送数据时发送状态 500?