提问人:m0nhawk 提问时间:11/11/2023 最后编辑:m0nhawk 更新时间:11/16/2023 访问量:108
了解 Web.Scotty 流
Understanding Web.Scotty stream
问:
自 2013 年以来接触 Haskell,我正在编写一个小型 Web.Scotty 服务来管理 S3 存储桶(使用 Amazonka-2.0)。
Web.Scotty 部分和 Amazonka 非常清楚,但我不确定如何让它协同工作:
main :: IO ()
main = do
env <- Amazonka.newEnv Amazonka.discover
scotty 3000 (app env)
app :: Amazonka.Env -> ScottyM ()
app env = do
get "/stream-file" $ do
runResourceT $ do
resp <- runResourceT $ Amazonka.send env (newGetObject "bucket" "file")
(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))
lift $ stream $ \send flush -> do
(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)
我尝试在此处删除,没有任何更改:runResourceT
resp <- Amazonka.send env (newGetObject "bucket" "file")
这将起作用并成功打印到控制台:
(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))
这不起作用(如果打印部分被注释掉),并出现错误:
lift $ stream $ \send flush -> do
(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)
错误:
HttpExceptionRequest Request {
host = "bucket.s3.us-east-1.amazonaws.com"
port = 443
secure = True
requestHeaders = [("X-Amz-Content-SHA256",""),("X-Amz-Date",""),("Host","bucket.s3.us-east-1.amazonaws.com"),("Authorization","<REDACTED>")]
path = "/file"
queryString = ""
method = "GET"
proxy = Nothing
rawBody = False
redirectCount = 0
responseTimeout = ResponseTimeoutMicro 70000000
requestVersion = HTTP/1.1
proxySecureMode = ProxySecureWithConnect
}
ConnectionClosed
我错过了什么?
答:
如果您尝试:
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Data.Binary.Builder (fromByteString)
import Web.Scotty
import Web.Scotty
import Data.Conduit ((.|), ConduitT, yield, runConduit)
import qualified Data.Conduit.Combinators as CC
import Control.Monad.IO.Class
import Control.Lens
import Control.Monad.Trans.Class (lift)
import Control.Concurrent (threadDelay)
import Data.ByteString (ByteString)
import Data.IORef
slowSource :: MonadIO m => IORef Bool -> ConduitT a ByteString m ()
slowSource state = do
x <- liftIO $ readIORef state
yield ("state: " <> (if x then "T" else "F") <> "\n")
liftIO $ threadDelay 1000000
slowSource state
main :: IO ()
main = do
state <- newIORef False
scotty 3000 (app state)
app :: IORef Bool -> ScottyM ()
app state = do
get "/stream-file" $ do
liftIO $ writeIORef state True
stream $ \send flush -> do
runConduit $ slowSource state .| CC.map fromByteString .| CC.mapM_ (\chunk -> liftIO (send chunk >> flush))
liftIO $ writeIORef state False
你会知道的:
curl http://localhost:3000/stream-file
state: F
state: F
state: F
state: F
state: F
^C
这表明实际上只是“设置”了管道,但它实际上是在处理程序完成后执行的,即在您的资源被释放之后(在您的例子中是与 AWS 的连接)。stream
看起来 Amazonka 要求执行操作的通道保持打开状态,直到实际流式传输正文管道。这在 Amazonka.Response
模块中是有一半的记录。ResourceT
Amazonka.send
在您的代码中,该调用设置了流式处理操作,但实际上并未执行,因此外部会结束并允许在 Scotty 调用流式处理操作(包括执行 .stream
sinkBody
ResourceT
sinkBody
在 Scotty 服务器中运行一个在服务器启动时打开,仅在服务器终止时关闭的单个服务器似乎最安全、最简单。(我担心这可能会泄露连接,但 Amazonka 似乎参与了足够的连接管理,这不是问题。ResourceT
要做到这一点,而不给 Scotty 包做大的脑部手术,你可以定义以下函数,允许你“解除”变压器 -- 基本上,用一个“逃生舱口”对单个共享来做所有事情:ResourceT
IO
ResourceT
runWithResourceT :: ((forall m a. (MonadIO m) => ResourceT IO a -> m a) -> IO b) -> IO b
runWithResourceT act = runResourceT $ withRunInIO $ \runInIO -> act (liftIO . runInIO)
有了这个函数,你就可以在单个活动上下文中运行你的应用程序,如下所示:ResourceT
main :: IO ()
main = do
...
runWithResourceT $ \withResourceT -> scotty 3000 (app env withResourceT)
其中在基于 IO 的普通 monad 中运行,在需要时使用。我在这里避免了,因为它调用了自己的 fresh via .取而代之的是,我使用以下命令手动运行身体导管:app
ScottyM
withResourceT
sinkBody
runResourceT
runConduitRes
withResourceT
app :: Amazonka.Env -> (forall m a. (MonadIO m) => ResourceT IO a -> m a) -> ScottyM ()
app env withResourceT = get "/stream-file" $ do
resp <- withResourceT $ Amazonka.send env (newGetObject "bucket" "file")
stream $ \send flush -> do
withResourceT $ runConduit $
(resp ^. getObjectResponse_body._ResponseBody)
.| mapC fromByteString
.| mapM_C (liftIO . send)
flush
这是我的完整程序。我测试了它,它似乎有效。连接有时会保持打开一段时间(例如,30 秒左右),但它们最终会关闭,因此它似乎没有泄漏任何东西。
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Amazonka
import Amazonka.S3
import Amazonka.S3.Lens
import Conduit
import Control.Lens
import Data.Binary.Builder
import System.IO
import Web.Scotty
runWithResourceT :: ((forall m a. (MonadIO m) => ResourceT IO a -> m a) -> IO b) -> IO b
runWithResourceT act = runResourceT $ withRunInIO $ \runInIO -> act (liftIO . runInIO)
main :: IO ()
main = do
logger <- newLogger Debug stdout
discover <- newEnv Amazonka.discover
let env = discover
{ Amazonka.logger = logger
, Amazonka.region = Amazonka.Ohio
}
runWithResourceT $ \withResourceT -> scotty 3000 (app env withResourceT)
app :: Amazonka.Env -> (forall m a. (MonadIO m) => ResourceT IO a -> m a) -> ScottyM ()
app env withResourceT = get "/stream-file" $ do
resp <- withResourceT $ Amazonka.send env (newGetObject "bucket" "file")
stream $ \send flush -> do
withResourceT $ runConduit $
(resp ^. getObjectResponse_body._ResponseBody)
.| mapC fromByteString
.| mapM_C (liftIO . send)
flush
评论
Amazonka.send
runResourceT
runResourceT
do
runResourceT
stream
stream
runResourceT
print