Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve async runtime scaling #946

Merged
merged 18 commits into from
Aug 23, 2023
Prev Previous commit
Next Next commit
Stub: async scheduler
  • Loading branch information
jaspervdj committed Aug 4, 2022
commit 7f411cbb5c9e136c9e8369509b828e195c1a4938
272 changes: 133 additions & 139 deletions lib/Hakyll/Core/Runtime.hs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ emptyScheduler = Scheduler {..}


--------------------------------------------------------------------------------
schedulerOutOfDate
schedulerMarkOutOfDate
:: Map Identifier (Compiler SomeItem)
-> Set Identifier
-> Scheduler
-> (Scheduler, [String])
schedulerOutOfDate universe modified scheduler@Scheduler {..} =
schedulerMarkOutOfDate universe modified scheduler@Scheduler {..} =
( scheduler
{ schedulerQueue = schedulerQueue <> Seq.fromList (Map.keys todo)
, schedulerDone = schedulerDone <>
Expand All @@ -204,20 +204,29 @@ schedulerOutOfDate universe modified scheduler@Scheduler {..} =


--------------------------------------------------------------------------------
data Pop
= PopOk Identifier (Compiler SomeItem)
| PopError
| PopFinished
| PopStarve
data SchedulerStep
-- | The scheduler instructs to offer some work on the given item. It
-- also returns the number of threads that can be resumed after they have
-- starved.
= SchedulerWork Identifier (Compiler SomeItem) Int
-- | There's currently no work available, but there will be after other
-- threads have finished whatever they are doing.
| SchedulerStarve
-- | We've finished all work.
| SchedulerFinish
-- | An error occurred. You can retrieve the errors from 'schedulerErrors'.
| SchedulerError


--------------------------------------------------------------------------------
schedulerPop :: Scheduler -> (Scheduler, Pop)
schedulerPop :: Scheduler -> (Scheduler, SchedulerStep)
schedulerPop scheduler@Scheduler {..} = case Seq.viewl schedulerQueue of
Seq.EmptyL
| Set.null schedulerWorking -> (scheduler, PopFinished)
| Set.null schedulerWorking -> (scheduler, SchedulerFinish)
| otherwise ->
(scheduler {schedulerStarved = schedulerStarved + 1}, PopStarve)
( scheduler {schedulerStarved = schedulerStarved + 1}
, SchedulerStarve
)
x Seq.:< xs
| x `Set.member` schedulerDone ->
trace ("ignoring identifier " <> show x <> " (done)") $
Expand All @@ -233,14 +242,14 @@ schedulerPop scheduler@Scheduler {..} = case Seq.viewl schedulerQueue of
( scheduler
{ schedulerErrors = (Just x, "Compiler not found") : schedulerErrors
}
, PopError
, SchedulerError
)
Just c ->
( scheduler
{ schedulerQueue = xs
, schedulerWorking = Set.insert x schedulerWorking
}
, PopOk x c
, SchedulerWork x c 0
)


Expand All @@ -256,42 +265,33 @@ schedulerBlock
-> [(Identifier, Snapshot)]
-> Compiler SomeItem
-> Scheduler
-> (Scheduler, Block)
-> (Scheduler, SchedulerStep)
schedulerBlock identifier deps0 compiler scheduler@Scheduler {..}
| null deps1 =
trace ("done for identifier " <> show identifier <> ", " <> show deps1 <> ", " <> show deps0) $
( scheduler
-- TODO: Not needed? Should we just continue directly?
{ schedulerBlocked = Set.delete identifier schedulerBlocked
, schedulerQueue = Seq.singleton identifier <> schedulerQueue
, schedulerWorking = Set.delete identifier schedulerWorking
, schedulerTodo = Map.insert identifier compiler schedulerTodo
}
, BlockContinue
)
| otherwise =
( scheduler
{ schedulerQueue =
-- Optimization: move deps to the front and item to the back
Seq.fromList depIds <>
schedulerQueue <>
Seq.singleton identifier
, schedulerTodo =
trace ("insert for identifier " <> show identifier) $
Map.insert identifier
(Compiler $ \_ -> pure $ CompilerRequire deps0 compiler)
schedulerTodo
, schedulerWorking = Set.delete identifier schedulerWorking
, schedulerBlocked = Set.insert identifier schedulerBlocked
, schedulerTriggers = foldl'
(\acc (depId, _) ->
trace ("identifier " <> show identifier <> " blocked on " <> show depId) $
Map.insertWith Set.union depId (Set.singleton identifier) acc)
schedulerTriggers
deps1
}
, BlockBlocked
, SchedulerWork identifier compiler 0
)
| otherwise = schedulerPop $ scheduler
{ schedulerQueue =
-- Optimization: move deps to the front and item to the back
Seq.fromList depIds <>
schedulerQueue <>
Seq.singleton identifier
, schedulerTodo =
trace ("insert for identifier " <> show identifier) $
Map.insert identifier
(Compiler $ \_ -> pure $ CompilerRequire deps0 compiler)
schedulerTodo
, schedulerWorking = Set.delete identifier schedulerWorking
, schedulerBlocked = Set.insert identifier schedulerBlocked
, schedulerTriggers = foldl'
(\acc (depId, _) ->
trace ("identifier " <> show identifier <> " blocked on " <> show depId) $
Map.insertWith Set.union depId (Set.singleton identifier) acc)
schedulerTriggers
deps1
}
where
deps1 = filter (not . done) deps0
depIds = map fst deps1
Expand Down Expand Up @@ -323,35 +323,37 @@ schedulerUnblock identifier scheduler@Scheduler {..} =

--------------------------------------------------------------------------------
schedulerSnapshot
:: Identifier -> Snapshot -> Compiler SomeItem -> Scheduler -> (Scheduler, Int)
:: Identifier -> Snapshot -> Compiler SomeItem
-> Scheduler -> (Scheduler, SchedulerStep)
schedulerSnapshot identifier snapshot compiler scheduler@Scheduler {..} =
schedulerUnblock identifier $ scheduler
{ schedulerQueue = Seq.singleton identifier <> schedulerQueue
, schedulerWorking = Set.delete identifier schedulerWorking
, schedulerSnapshots = Set.insert (identifier, snapshot) schedulerSnapshots
, schedulerTodo =
trace ("snapshot for identifier " <> show identifier <> " " <> snapshot) $
Map.insert identifier compiler schedulerTodo
}
let (scheduler', resume) = schedulerUnblock identifier scheduler
{ schedulerSnapshots =
Set.insert (identifier, snapshot) schedulerSnapshots
} in
(scheduler', SchedulerWork identifier compiler resume)


--------------------------------------------------------------------------------
schedulerWrite
:: Identifier
-> [Dependency]
-> Scheduler
-> (Scheduler, Int)
schedulerWrite identifier depFacts scheduler@Scheduler {..} =
schedulerUnblock identifier $ scheduler
{ schedulerWorking = Set.delete identifier schedulerWorking
, schedulerFacts = Map.insert identifier depFacts schedulerFacts
, schedulerDone =
trace ("write for identifier " <> show identifier) $
Set.insert identifier schedulerDone
, schedulerTodo =
trace ("delete for identifier " <> show identifier) $
Map.delete identifier schedulerTodo
}
-> (Scheduler, SchedulerStep)
schedulerWrite identifier depFacts scheduler0@Scheduler {..} =
let (scheduler1, resume) = schedulerUnblock identifier scheduler0
{ schedulerWorking = Set.delete identifier schedulerWorking
, schedulerFacts = Map.insert identifier depFacts schedulerFacts
, schedulerDone =
trace ("write for identifier " <> show identifier) $
Set.insert identifier schedulerDone
, schedulerTodo =
trace ("delete for identifier " <> show identifier) $
Map.delete identifier schedulerTodo
}
(scheduler2, step) = schedulerPop scheduler1 in
case step of
SchedulerWork i c n -> (scheduler2, SchedulerWork i c (n + resume))
_ -> (scheduler2, step)


--------------------------------------------------------------------------------
Expand Down Expand Up @@ -450,7 +452,7 @@ scheduleOutOfDate2 = do
schedulerRef <- runtimeScheduler <$> ask
let modified = Set.filter (resourceModified provider) (Map.keysSet universe)
msgs <- liftIO . IORef.atomicModifyIORef' schedulerRef $
schedulerOutOfDate universe modified
schedulerMarkOutOfDate universe modified

-- Print messages
mapM_ (Logger.debug logger) msgs
Expand All @@ -474,13 +476,19 @@ pickAndChase = do
--------------------------------------------------------------------------------
pickAndChase2 :: ReaderT RuntimeRead IO ()
pickAndChase2 = do
(continue, _) <- chase2
when continue pickAndChase2
scheduler <- runtimeScheduler <$> ask
pop <- liftIO . IORef.atomicModifyIORef' scheduler $ schedulerPop
go pop
where
go SchedulerFinish = pure ()
go SchedulerStarve = pure ()
go SchedulerError = pure ()
go (SchedulerWork i c _) = work i c >>= go


--------------------------------------------------------------------------------
chase2 :: ReaderT RuntimeRead IO (Bool, Int)
chase2 = do
work :: Identifier -> Compiler SomeItem -> ReaderT RuntimeRead IO SchedulerStep
work id' compiler = do
logger <- runtimeLogger <$> ask
provider <- runtimeProvider <$> ask
universe <- runtimeUniverse <$> ask
Expand All @@ -494,76 +502,62 @@ chase2 = do
, ()
)

pop <- liftIO . IORef.atomicModifyIORef' scheduler $ schedulerPop
case pop of
PopError -> pure (False, 0)
PopFinished -> pure (False, 0)
PopStarve -> pure (False, 0)
PopOk id' compiler -> do
let cread = CompilerRead
{ compilerConfig = config
, compilerUnderlying = id'
, compilerProvider = provider
, compilerUniverse = Map.keysSet universe
, compilerRoutes = routes
, compilerStore = store
, compilerLogger = logger
}
result <- liftIO $ runCompiler compiler cread
case result of
CompilerError e -> do
let msgs = case compilerErrorMessages e of
[] -> ["Compiler failed but no info given, try running with -v?"]
es -> es
for_ msgs . addError $ Just id'
return (False, 0)

CompilerSnapshot snapshot c -> do
threads <- liftIO . IORef.atomicModifyIORef' scheduler $
schedulerSnapshot id' snapshot c
pure (True, threads)

CompilerDone (SomeItem item) cwrite -> do
-- Print some info
let facts = compilerDependencies cwrite
cacheHits
| compilerCacheHits cwrite <= 0 = "updated"
| otherwise = "cached "
Logger.message logger $ cacheHits ++ " " ++ show id'

-- Sanity check
unless (itemIdentifier item == id') $ addError (Just id') $
"The compiler yielded an Item with Identifier " ++
show (itemIdentifier item) ++ ", but we were expecting " ++
"an Item with Identifier " ++ show id' ++ " " ++
"(you probably want to call makeItem to solve this problem)"

-- Write if necessary
(mroute, _) <- liftIO $ runRoutes routes provider id'
case mroute of
Nothing -> return ()
Just route -> do
let path = destinationDirectory config </> route
liftIO $ makeDirectories path
liftIO $ write path item
Logger.debug logger $ "Routed to " ++ path

Logger.message logger $ "Saved _final for " <> show id'
liftIO $ save store item
threads <- liftIO . IORef.atomicModifyIORef' scheduler $
schedulerWrite id' facts
pure (True, threads)

CompilerRequire reqs c -> do
block <- liftIO . IORef.atomicModifyIORef' scheduler $
schedulerBlock id' reqs c
{-
schedulerBlock id' reqs $ Compiler $
\_ -> pure $ CompilerRequire reqs c
-}
case block of
BlockContinue -> pure (True, 0)
BlockBlocked -> pure (True, 0)
let cread = CompilerRead
{ compilerConfig = config
, compilerUnderlying = id'
, compilerProvider = provider
, compilerUniverse = Map.keysSet universe
, compilerRoutes = routes
, compilerStore = store
, compilerLogger = logger
}
result <- liftIO $ runCompiler compiler cread
case result of
CompilerError e -> do
let msgs = case compilerErrorMessages e of
[] -> ["Compiler failed but no info given, try running with -v?"]
es -> es
for_ msgs . addError $ Just id'
return SchedulerError

CompilerSnapshot snapshot c -> do
liftIO . IORef.atomicModifyIORef' scheduler $
schedulerSnapshot id' snapshot c
-- TODO: continue with threads

CompilerDone (SomeItem item) cwrite -> do
-- Print some info
let facts = compilerDependencies cwrite
cacheHits
| compilerCacheHits cwrite <= 0 = "updated"
| otherwise = "cached "
Logger.message logger $ cacheHits ++ " " ++ show id'

-- Sanity check
unless (itemIdentifier item == id') $ addError (Just id') $
"The compiler yielded an Item with Identifier " ++
show (itemIdentifier item) ++ ", but we were expecting " ++
"an Item with Identifier " ++ show id' ++ " " ++
"(you probably want to call makeItem to solve this problem)"

-- Write if necessary
(mroute, _) <- liftIO $ runRoutes routes provider id'
case mroute of
Nothing -> return ()
Just route -> do
let path = destinationDirectory config </> route
liftIO $ makeDirectories path
liftIO $ write path item
Logger.debug logger $ "Routed to " ++ path

Logger.message logger $ "Saved _final for " <> show id'
liftIO $ save store item
liftIO . IORef.atomicModifyIORef' scheduler $
schedulerWrite id' facts

CompilerRequire reqs c -> do
liftIO . IORef.atomicModifyIORef' scheduler $
schedulerBlock id' reqs c


--------------------------------------------------------------------------------
Expand Down