了解 Web.Scotty 流

Understanding Web.Scotty stream

提问人:m0nhawk 提问时间:11/11/2023 最后编辑:m0nhawk 更新时间:11/16/2023 访问量:108

问:

自 2013 年以来接触 Haskell,我正在编写一个小型 Web.Scotty 服务来管理 S3 存储桶(使用 Amazonka-2.0)。

Web.Scotty 部分和 Amazonka 非常清楚,但我不确定如何让它协同工作:

main :: IO ()
main = do
    env <- Amazonka.newEnv Amazonka.discover
    scotty 3000 (app env)

app :: Amazonka.Env -> ScottyM ()
app env = do
    get "/stream-file" $ do
        runResourceT $ do
            resp <- runResourceT $ Amazonka.send env (newGetObject "bucket" "file")

            (resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))

            lift $ stream $ \send flush -> do
                (resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)

我尝试在此处删除,没有任何更改:runResourceT

resp <- Amazonka.send env (newGetObject "bucket" "file")

这将起作用并成功打印到控制台:

(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))

这不起作用(如果打印部分被注释掉),并出现错误:

lift $ stream $ \send flush -> do
    (resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)

错误:

HttpExceptionRequest Request {
  host                 = "bucket.s3.us-east-1.amazonaws.com"
  port                 = 443
  secure               = True
  requestHeaders       = [("X-Amz-Content-SHA256",""),("X-Amz-Date",""),("Host","bucket.s3.us-east-1.amazonaws.com"),("Authorization","<REDACTED>")]
  path                 = "/file"
  queryString          = ""
  method               = "GET"
  proxy                = Nothing
  rawBody              = False
  redirectCount        = 0
  responseTimeout      = ResponseTimeoutMicro 70000000
  requestVersion       = HTTP/1.1
  proxySecureMode      = ProxySecureWithConnect
}
 ConnectionClosed

我错过了什么?

Haskell 管道 Scotty Amazonka

评论

0赞 fghibellini 11/11/2023
我对库不是很熟悉,但你不是通过包装来关闭连接吗?Amazonka.sendrunResourceT
0赞 fghibellini 11/11/2023
我的期望是应该包装与响应一起工作的整个 monadic 操作,即整个块 - 您的代码甚至已经如此。你能试着摆脱内在吗?runResourceTdorunResourceT
0赞 m0nhawk 11/11/2023
@fghibellini 这是一个有趣的建议,我试过了 - 仍然是同样的问题。我正在尝试使用 ghci-dap 进行调试,当它进入时发生了一些意外的事情。stream
0赞 m0nhawk 11/11/2023
@fghibellini,如果是这样的话,那么带有印刷品的管道也不起作用。
0赞 fghibellini 11/11/2023
也许只是“配置”正文的发送方式,而不是实际执行 IO。因此,IO 仍将在 . 可能会成功,因为它太快了。某些操作(如关闭连接)并不总是等待完成 - 您可以尝试在打印盒中添加睡眠来测试它。streamrunResourceTprint

答:

0赞 fghibellini 11/11/2023 #1

如果您尝试:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Data.Binary.Builder (fromByteString)
import Web.Scotty
import Web.Scotty
import Data.Conduit ((.|), ConduitT, yield, runConduit)
import qualified Data.Conduit.Combinators as CC
import Control.Monad.IO.Class
import Control.Lens
import Control.Monad.Trans.Class (lift)
import Control.Concurrent (threadDelay)
import Data.ByteString (ByteString)

import Data.IORef

slowSource :: MonadIO m => IORef Bool -> ConduitT a ByteString m ()
slowSource state = do
  x <- liftIO $ readIORef state
  yield ("state: " <> (if x then "T" else "F") <> "\n")
  liftIO $ threadDelay 1000000
  slowSource state

main :: IO ()
main = do
    state <- newIORef False
    scotty 3000 (app state)

app :: IORef Bool -> ScottyM ()
app state = do
    get "/stream-file" $ do
      liftIO $ writeIORef state True

      stream $ \send flush -> do
          runConduit $ slowSource state .| CC.map fromByteString .| CC.mapM_ (\chunk -> liftIO (send chunk >> flush))

      liftIO $ writeIORef state False

你会知道的:

curl http://localhost:3000/stream-file
state: F
state: F
state: F
state: F
state: F
^C

这表明实际上只是“设置”了管道,但它实际上是处理程序完成后执行的,即在您的资源被释放之后(在您的例子中是与 AWS 的连接)。stream

0赞 K. A. Buhr 11/16/2023 #2

看起来 Amazonka 要求执行操作的通道保持打开状态,直到实际流式传输正文管道。这在 Amazonka.Response 模块中是有一半的记录。ResourceTAmazonka.send

在您的代码中,该调用设置了流式处理操作,但实际上并未执行,因此外部会结束并允许在 Scotty 调用流式处理操作(包括执行 .streamsinkBodyResourceTsinkBody

在 Scotty 服务器中运行一个在服务器启动时打开,仅在服务器终止时关闭的单个服务器似乎最安全、最简单。(我担心这可能会泄露连接,但 Amazonka 似乎参与了足够的连接管理,这不是问题。ResourceT

要做到这一点,而不给 Scotty 包做大的脑部手术,你可以定义以下函数,允许你“解除”变压器 -- 基本上,用一个“逃生舱口”对单个共享来做所有事情:ResourceTIOResourceT

runWithResourceT :: ((forall m a. (MonadIO m) => ResourceT IO a -> m a) -> IO b) -> IO b
runWithResourceT act = runResourceT $ withRunInIO $ \runInIO -> act (liftIO . runInIO)

有了这个函数,你就可以在单个活动上下文中运行你的应用程序,如下所示:ResourceT

main :: IO ()
main = do
  ...
  runWithResourceT $ \withResourceT -> scotty 3000 (app env withResourceT)

其中在基于 IO 的普通 monad 中运行,在需要时使用。我在这里避免了,因为它调用了自己的 fresh via .取而代之的是,我使用以下命令手动运行身体导管:appScottyMwithResourceTsinkBodyrunResourceTrunConduitReswithResourceT

app :: Amazonka.Env -> (forall m a. (MonadIO m) => ResourceT IO a -> m a) -> ScottyM ()
app env withResourceT = get "/stream-file" $ do
  resp <- withResourceT $ Amazonka.send env (newGetObject "bucket" "file")
  stream $ \send flush -> do
    withResourceT $ runConduit $
      (resp ^. getObjectResponse_body._ResponseBody)
      .| mapC fromByteString
      .| mapM_C (liftIO . send)
    flush

这是我的完整程序。我测试了它,它似乎有效。连接有时会保持打开一段时间(例如,30 秒左右),但它们最终会关闭,因此它似乎没有泄漏任何东西。

{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}

module Main where

import Amazonka
import Amazonka.S3
import Amazonka.S3.Lens
import Conduit
import Control.Lens
import Data.Binary.Builder
import System.IO
import Web.Scotty

runWithResourceT :: ((forall m a. (MonadIO m) => ResourceT IO a -> m a) -> IO b) -> IO b
runWithResourceT act = runResourceT $ withRunInIO $ \runInIO -> act (liftIO . runInIO)

main :: IO ()
main = do
  logger <- newLogger Debug stdout
  discover <- newEnv Amazonka.discover
  let env = discover
        { Amazonka.logger = logger
        , Amazonka.region = Amazonka.Ohio
        }
  runWithResourceT $ \withResourceT -> scotty 3000 (app env withResourceT)

app :: Amazonka.Env -> (forall m a. (MonadIO m) => ResourceT IO a -> m a) -> ScottyM ()
app env withResourceT = get "/stream-file" $ do
  resp <- withResourceT $ Amazonka.send env (newGetObject "bucket" "file")
  stream $ \send flush -> do
    withResourceT $ runConduit $
      (resp ^. getObjectResponse_body._ResponseBody)
      .| mapC fromByteString
      .| mapM_C (liftIO . send)
    flush