Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(par): add mover functionality to parallel capabilities #1427

Merged
merged 15 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions autotest/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@

import flopy
import numpy as np
from common_regression import (COMPARE_PROGRAMS, adjust_htol,
get_mf6_comparison, get_mf6_files,
get_namefiles, get_rclose, get_regression_files,
setup_mf6, setup_mf6_comparison)
from common_regression import (
COMPARE_PROGRAMS,
adjust_htol,
get_mf6_comparison,
get_mf6_files,
get_namefiles,
get_rclose,
get_regression_files,
setup_mf6,
setup_mf6_comparison,
)
from flopy.mbase import BaseModel
from flopy.mf6 import MFSimulation
from flopy.utils.compare import compare_heads
Expand Down Expand Up @@ -243,7 +250,7 @@ def __init__(
self.build = build
self.check = check
self.parallel = parallel
self.ncpus = ncpus
self.ncpus = [ncpus] if isinstance(ncpus, int) else ncpus
self.api_func = api_func
self.compare = compare
self.outp = None
Expand Down Expand Up @@ -526,6 +533,7 @@ def run_sim_or_model(
workspace: Union[str, os.PathLike],
target: Union[str, os.PathLike],
xfail: bool = False,
ncpus: int = 1,
) -> Tuple[bool, List[str]]:
"""
Run a simulation or model with FloPy.
Expand All @@ -536,6 +544,8 @@ def run_sim_or_model(
The target executable to use
xfail : bool
Whether to expect failure
ncpus : int
The number of CPUs for a parallel run
"""

# make sure workspace exists
Expand Down Expand Up @@ -570,14 +580,12 @@ def run_sim_or_model(
# via MF6 executable
elif "mf6" in target.name:
# parallel test if configured
if self.parallel:
if self.parallel and ncpus > 1:
print(
f"Parallel test {self.name} on {self.ncpus} processes"
)
try:
success, buff = run_parallel(
workspace, target, self.ncpus
)
success, buff = run_parallel(workspace, target, ncpus)
except Exception:
warn(
"MODFLOW 6 parallel test",
Expand Down Expand Up @@ -687,19 +695,31 @@ def run(self):
self.sims = sims
nsims = len(sims)
self.buffs = list(repeat(None, nsims))

assert len(self.xfail) in [
1,
nsims,
], f"Invalid xfail: expected a single boolean or one for each model"
if len(self.xfail) == 1 and nsims:
self.xfail = list(repeat(self.xfail[0], nsims))

assert len(self.ncpus) in [
1,
nsims,
], f"Invalid ncpus: expected a single integer or one for each model"
if len(self.ncpus) == 1 and nsims:
self.ncpus = list(repeat(self.ncpus[0], nsims))

write_input(*sims, overwrite=self.overwrite, verbose=self.verbose)
else:
self.sims = [MFSimulation.load(sim_ws=self.workspace)]
self.buffs = [None]
assert (
len(self.xfail) == 1
), f"Invalid xfail: expected a single boolean or one for each model"
), f"Invalid xfail: expected a single boolean"
assert (
len(self.ncpus) == 1
), f"Invalid ncpus: expected a single integer"

# run models/simulations
for i, sim_or_model in enumerate(self.sims):
Expand All @@ -716,7 +736,10 @@ def run(self):
else tgts.get(exe_path.stem, tgts["mf6"])
)
xfail = self.xfail[i]
success, buff = self.run_sim_or_model(workspace, target, xfail)
ncpus = self.ncpus[i]
success, buff = self.run_sim_or_model(
workspace, target, xfail, ncpus
)
self.buffs[i] = buff # store model output for assertions later
assert success, (
f"{'Simulation' if 'mf6' in str(target) else 'Model'} "
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Based on sft01 gwf model, but split into two gwf models test gwf-gwf and
mvr. The single base model is split using the model splitter into two models.
The single model is run as the regression model
mvr. The single model is run as the regression model

The final split model look like:

Expand All @@ -18,9 +17,7 @@

from framework import TestFramework

# from flopy.mf6.utils import Mf6Splitter

cases = ["sfr01gwfgwf"]
cases = ["gwf_exgmvr"]

# properties for single model combination
lx = 14.0
Expand Down
45 changes: 45 additions & 0 deletions autotest/test_par_gwf_exgmvr01.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
This tests reuses the simulation data in test_gwf_exgmvr01.py
and runs it in parallel on two cpus with

cpu 1: 'left'
cpu 2: 'right'

so we can compare the parallel coupling of 'left' + 'right'
with a serial 'single'
"""

import pytest

from framework import TestFramework

cases = ["par_exgmvr01"]


def build_models(idx, test):
from test_gwf_exgmvr01 import build_models as build

sim, sim_ref = build(idx, test)
return sim, sim_ref


def check_output(idx, test):
from test_gwf_exgmvr01 import check_output as check

check(idx, test)


@pytest.mark.parallel
@pytest.mark.parametrize("idx, name", enumerate(cases))
def test_mf6model(idx, name, function_tmpdir, targets):
test = TestFramework(
name=name,
workspace=function_tmpdir,
targets=targets,
build=lambda t: build_models(idx, t),
check=lambda t: check_output(idx, t),
parallel=True,
ncpus=(2, 1),
compare=None,
)
test.run()
1 change: 1 addition & 0 deletions make/makefile
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ $(OBJDIR)/GridConnection.o \
$(OBJDIR)/DistributedVariable.o \
$(OBJDIR)/gwt1.o \
$(OBJDIR)/gwf3.o \
$(OBJDIR)/GwfExchangeMover.o \
$(OBJDIR)/SerialRouter.o \
$(OBJDIR)/Timer.o \
$(OBJDIR)/LinearSolverFactory.o \
Expand Down
7 changes: 4 additions & 3 deletions msvs/mf6core.vfproj
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,16 @@
<File RelativePath="..\src\Distributed\VirtualModel.f90"/>
<File RelativePath="..\src\Distributed\VirtualSolution.f90"/></Filter>
<Filter Name="Exchange">
<File RelativePath="..\src\Exchange\gwfgwfidm.f90"/>
<File RelativePath="..\src\Exchange\gwfgwtidm.f90"/>
<File RelativePath="..\src\Exchange\gwtgwtidm.f90"/>
<File RelativePath="..\src\Exchange\BaseExchange.f90"/>
<File RelativePath="..\src\Exchange\DisConnExchange.f90"/>
<File RelativePath="..\src\Exchange\GhostNode.f90"/>
<File RelativePath="..\src\Exchange\GwfExchangeMover.f90"/>
<File RelativePath="..\src\Exchange\GwfGwfExchange.f90"/>
<File RelativePath="..\src\Exchange\gwfgwfidm.f90"/>
<File RelativePath="..\src\Exchange\GwfGwtExchange.f90"/>
<File RelativePath="..\src\Exchange\gwfgwtidm.f90"/>
<File RelativePath="..\src\Exchange\GwtGwtExchange.f90"/>
<File RelativePath="..\src\Exchange\gwtgwtidm.f90"/>
<File RelativePath="..\src\Exchange\NumericalExchange.f90"/></Filter>
<Filter Name="Model">
<Filter Name="Connection">
Expand Down
28 changes: 27 additions & 1 deletion src/Distributed/Mapper.f90
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ subroutine add_exchange_vars(this)
integer(I4B) :: iconn
class(SpatialModelConnectionType), pointer :: conn
class(VirtualExchangeType), pointer :: virt_exg
character(len=LENMEMPATH) :: virt_mem_path
character(len=LENMEMPATH) :: virt_mem_path, local_mem_path

do iconn = 1, baseconnectionlist%Count()
conn => get_smc_from_list(baseconnectionlist, iconn)
Expand All @@ -60,11 +60,37 @@ subroutine add_exchange_vars(this)
virt_mem_path = virt_exg%get_vrt_mem_path('NODEM1', '')
call this%map_data_full(0, 'NODEM1', conn%prim_exchange%memoryPath, &
'NODEM1', virt_mem_path, (/STG_BFR_CON_DF/))

! these are only present when there is a mover:
if (virt_exg%has_mover()) then
local_mem_path = create_mem_path(virt_exg%name, 'MVR')
virt_mem_path = virt_exg%get_vrt_mem_path('QPACTUAL_M1', 'MVR')
call this%map_data_full(conn%owner%idsoln, 'QPACTUAL_M1', &
local_mem_path, 'QPACTUAL_M1', &
virt_mem_path, (/STG_BFR_EXG_FC/))
virt_mem_path = virt_exg%get_vrt_mem_path('ID_MAPPED_M1', 'MVR')
call this%map_data_full(conn%owner%idsoln, 'ID_MAPPED_M1', &
local_mem_path, 'ID_MAPPED_M1', &
virt_mem_path, (/STG_AFT_CON_RP/))
end if
end if
if (.not. virt_exg%v_model2%is_local) then
virt_mem_path = virt_exg%get_vrt_mem_path('NODEM2', '')
call this%map_data_full(0, 'NODEM2', conn%prim_exchange%memoryPath, &
'NODEM2', virt_mem_path, (/STG_BFR_CON_DF/))

! these are only present when there is a mover:
if (virt_exg%has_mover()) then
local_mem_path = create_mem_path(virt_exg%name, 'MVR')
virt_mem_path = virt_exg%get_vrt_mem_path('QPACTUAL_M2', 'MVR')
call this%map_data_full(conn%owner%idsoln, 'QPACTUAL_M2', &
local_mem_path, 'QPACTUAL_M2', &
virt_mem_path, (/STG_BFR_EXG_FC/))
virt_mem_path = virt_exg%get_vrt_mem_path('ID_MAPPED_M2', 'MVR')
call this%map_data_full(conn%owner%idsoln, 'ID_MAPPED_M2', &
local_mem_path, 'ID_MAPPED_M2', &
virt_mem_path, (/STG_AFT_CON_RP/))
end if
end if
end do

Expand Down
11 changes: 11 additions & 0 deletions src/Distributed/VirtualExchange.f90
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ module VirtualExchangeModule
procedure :: prepare_stage => vx_prepare_stage
procedure :: get_send_items => vx_get_send_items
procedure :: get_recv_items => vx_get_recv_items
procedure :: has_mover => vx_has_mover
procedure :: destroy => vx_destroy
! private
procedure, private :: init_virtual_data
Expand Down Expand Up @@ -241,6 +242,16 @@ subroutine vx_get_send_items(this, stage, rank, virtual_items)

end subroutine vx_get_send_items

!> @brief Checks if there is an active mover in the exchange
!<
function vx_has_mover(this) result(has_mover)
class(VirtualExchangeType) :: this
logical(LGP) :: has_mover

has_mover = .false.

end function vx_has_mover

subroutine vx_destroy(this)
class(VirtualExchangeType) :: this

Expand Down
Loading