{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.H2.Sync (
LoopCheck (..),
newLoopCheck,
syncWithSender,
syncWithSender',
makeOutput,
makeOutputIO,
enqueueOutputSIO,
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Network.Control
import Network.HTTP.Semantics.IO
import Network.HTTP2.H2.Context
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Types
syncWithSender
:: Context
-> Stream
-> OutputType
-> LoopCheck
-> IO ()
syncWithSender :: Context -> Stream -> OutputType -> LoopCheck -> IO ()
syncWithSender ctx :: Context
ctx@Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
senderDone :: Context -> TVar Bool
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar WindowSize
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef WindowSize
peerStreamId :: Context -> IORef WindowSize
myStreamId :: Context -> TVar WindowSize
continued :: Context -> IORef (Maybe WindowSize)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
..} Stream
strm OutputType
otyp LoopCheck
lc = do
(pop, out) <- Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
otyp
enqueueOutput outputQ out
syncWithSender' ctx pop lc
makeOutput :: Stream -> OutputType -> IO (IO Sync, Output)
makeOutput :: Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
otyp = do
var <- IO (MVar Sync)
forall a. IO (MVar a)
newEmptyMVar
let push Maybe Output
mout = case Maybe Output
mout of
Maybe Output
Nothing -> MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var Sync
Done
Just Output
ot -> MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var (Sync -> IO ()) -> Sync -> IO ()
forall a b. (a -> b) -> a -> b
$ Output -> Sync
Cont Output
ot
pop = MVar Sync -> IO Sync
forall a. MVar a -> IO a
takeMVar MVar Sync
var
out =
Output
{ outputStream :: Stream
outputStream = Stream
strm
, outputType :: OutputType
outputType = OutputType
otyp
, outputSync :: Maybe Output -> IO ()
outputSync = Maybe Output -> IO ()
push
}
return (pop, out)
makeOutputIO :: Context -> Stream -> OutputType -> Output
makeOutputIO :: Context -> Stream -> OutputType -> Output
makeOutputIO Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
senderDone :: Context -> TVar Bool
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar WindowSize
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef WindowSize
peerStreamId :: Context -> IORef WindowSize
myStreamId :: Context -> TVar WindowSize
continued :: Context -> IORef (Maybe WindowSize)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutputType
otyp = Output
out
where
push :: Maybe Output -> IO ()
push Maybe Output
mout = case Maybe Output
mout of
Maybe Output
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Output
ot -> TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
ot
out :: Output
out =
Output
{ outputStream :: Stream
outputStream = Stream
strm
, outputType :: OutputType
outputType = OutputType
otyp
, outputSync :: Maybe Output -> IO ()
outputSync = Maybe Output -> IO ()
push
}
enqueueOutputSIO :: Context -> Stream -> OutputType -> IO ()
enqueueOutputSIO :: Context -> Stream -> OutputType -> IO ()
enqueueOutputSIO ctx :: Context
ctx@Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
senderDone :: Context -> TVar Bool
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar WindowSize
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef WindowSize
peerStreamId :: Context -> IORef WindowSize
myStreamId :: Context -> TVar WindowSize
continued :: Context -> IORef (Maybe WindowSize)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutputType
otyp = do
let out :: Output
out = Context -> Stream -> OutputType -> Output
makeOutputIO Context
ctx Stream
strm OutputType
otyp
TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
out
syncWithSender' :: Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' :: Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
senderDone :: Context -> TVar Bool
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar WindowSize
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef WindowSize
peerStreamId :: Context -> IORef WindowSize
myStreamId :: Context -> TVar WindowSize
continued :: Context -> IORef (Maybe WindowSize)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} IO Sync
pop LoopCheck
lc = IO ()
loop
where
loop :: IO ()
loop = do
s <- IO Sync
pop
case s of
Sync
Done -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Cont Output
newout -> do
cont <- LoopCheck -> IO Bool
checkLoop LoopCheck
lc
when cont $ do
enqueueOutput outputQ newout
loop
newLoopCheck :: Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck :: Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck Stream
strm Maybe (TBQueue StreamingChunk)
mtbq = do
tovar <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
return $
LoopCheck
{ lcTBQ = mtbq
, lcTimeout = tovar
, lcWindow = streamTxFlow strm
}
data LoopCheck = LoopCheck
{ LoopCheck -> Maybe (TBQueue StreamingChunk)
lcTBQ :: Maybe (TBQueue StreamingChunk)
, LoopCheck -> TVar Bool
lcTimeout :: TVar Bool
, LoopCheck -> TVar TxFlow
lcWindow :: TVar TxFlow
}
checkLoop :: LoopCheck -> IO Bool
checkLoop :: LoopCheck -> IO Bool
checkLoop LoopCheck{Maybe (TBQueue StreamingChunk)
TVar Bool
TVar TxFlow
lcTBQ :: LoopCheck -> Maybe (TBQueue StreamingChunk)
lcTimeout :: LoopCheck -> TVar Bool
lcWindow :: LoopCheck -> TVar TxFlow
lcTBQ :: Maybe (TBQueue StreamingChunk)
lcTimeout :: TVar Bool
lcWindow :: TVar TxFlow
..} = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
tout <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
lcTimeout
if tout
then return False
else do
waitStreaming' lcTBQ
waitStreamWindowSizeSTM lcWindow
return True
waitStreaming' :: Maybe (TBQueue a) -> STM ()
waitStreaming' :: forall a. Maybe (TBQueue a) -> STM ()
waitStreaming' Maybe (TBQueue a)
Nothing = () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
waitStreaming' (Just TBQueue a
tbq) = do
isEmpty <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
check (not isEmpty)
waitStreamWindowSizeSTM :: TVar TxFlow -> STM ()
waitStreamWindowSizeSTM :: TVar TxFlow -> STM ()
waitStreamWindowSizeSTM TVar TxFlow
txf = do
w <- TxFlow -> WindowSize
txWindowSize (TxFlow -> WindowSize) -> STM TxFlow -> STM WindowSize
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar TxFlow -> STM TxFlow
forall a. TVar a -> STM a
readTVar TVar TxFlow
txf
check (w > 0)