module BfHaskell.StreamingAPI.StreamingProcessor
(
runStreamingHandler
) where
import BfHaskell.Common.Logging
import BfHaskell.DSL.Login (LoginHandler)
import BfHaskell.StreamingAPI.CommandExecutor (runCommandExecutor)
import BfHaskell.StreamingAPI.SslClient (runSslClient)
import BfHaskell.StreamingAPI.StreamingComm (CommCentre, StreamingControlMessage (..),
addControlMessage,
ccMarketCache,
ccOrderCache,
newCommCentre,
readClientUpdate,
tryReadClientUpdate)
import BfHaskell.StreamingAPI.Types (StreamingConnectionInfo (..),
StreamingHandler (..),
scStore)
import qualified Control.Concurrent.Async as AS
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Lens (view)
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Polysemy
import Polysemy.Async
import Polysemy.Error
import Polysemy.Output
import Polysemy.Resource
runStreamingHandler :: Members '[Embed IO,
Output LogMessage,
Async,
Resource,
LoginHandler,
Error String] r
=> StreamingConnectionInfo
-> InterpreterFor StreamingHandler r
runStreamingHandler connectionInfo sem = do
comm <- liftIO newCommCentre
bracket (startThreads comm) (stopThreads comm) (const $ interpretStreaming comm)
where
startThreads comm = do
sslClientThread <- async $ runSslClient comm connectionInfo
cmdExecutorThread <- async $ runCommandExecutor sslClientThread comm
return (sslClientThread, cmdExecutorThread)
stopThreads :: Members '[Embed IO,
Async,
Resource,
LoginHandler,
Output LogMessage,
Error String] r
=> CommCentre
-> (AS.Async (Maybe ()), AS.Async (Maybe ()))
-> Sem r ()
stopThreads comm (sslClientThread, cmdExecutorThread) = do
liftIO $ addControlMessage comm SCMStopService
void $ await sslClientThread
void $ await cmdExecutorThread
interpretStreaming comm =
interpret (\case
GetNextStreamMessage -> readClientUpdate comm
TryGetNextStreamMessage -> tryReadClientUpdate comm
SubscribeToMarkets marketFilter ->
addControlMessage comm $ SCMSubscribeToMarkets marketFilter
GetMarketCache -> do
cache <- embed $ readTVarIO $ view ccMarketCache comm
return $ view scStore cache
SubscribeToOrders -> addControlMessage comm SCMSubscribeToOrders
GetOrderCache -> do
cache <- embed $ readTVarIO $ view ccOrderCache comm
return $ view scStore cache
) sem