From 72e55220b50f3e2851a6acaf9af54dd7f4f2ec64 Mon Sep 17 00:00:00 2001 From: Victor Azizi Date: Wed, 14 Feb 2024 13:13:13 +0100 Subject: [PATCH] using hdf5 with acccess layer (#712) * require minimal typing_extension version * split into mdsplusimashandle and hdf5imashandle --- pyproject.toml | 2 +- src/duqtools/__init__.py | 2 +- src/duqtools/ids/__handle.py | 303 ++++++++++++++++++++ src/duqtools/ids/__init__.py | 4 + src/duqtools/ids/_handle.py | 394 +------------------------- src/duqtools/ids/_hdf5handle.py | 106 +++++++ src/duqtools/ids/_mdsplushandle.py | 113 ++++++++ src/duqtools/ids/_tmp.py | 391 +++++++++++++++++++++++++ src/duqtools/merge.py | 6 +- src/duqtools/systems/jetto/_system.py | 1 - tests/ids/test_imas_handler.py | 15 +- 11 files changed, 944 insertions(+), 393 deletions(-) create mode 100644 src/duqtools/ids/__handle.py create mode 100644 src/duqtools/ids/_hdf5handle.py create mode 100644 src/duqtools/ids/_mdsplushandle.py create mode 100644 src/duqtools/ids/_tmp.py diff --git a/pyproject.toml b/pyproject.toml index edd593bc..1566388d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ dependencies = [ "scipy >= 1.09", "streamlit >= 1.18", "tqdm", - "typing-extensions", + "typing-extensions >= 4.7.1", "xarray", ] diff --git a/src/duqtools/__init__.py b/src/duqtools/__init__.py index 07ef696a..0c13451a 100644 --- a/src/duqtools/__init__.py +++ b/src/duqtools/__init__.py @@ -8,7 +8,7 @@ def fix_dependencies(): 'jetto_tools>=1.8.8', 'scipy>=1.09', 'jinja2>=3.0.0', - 'typing_extensions>=4.5.0', + 'typing_extensions>=4.7.1', ] import pkg_resources # noqa diff --git a/src/duqtools/ids/__handle.py b/src/duqtools/ids/__handle.py new file mode 100644 index 00000000..82c3774f --- /dev/null +++ b/src/duqtools/ids/__handle.py @@ -0,0 +1,303 @@ +from __future__ import annotations + +import logging +import os +import re +from abc import abstractmethod +from contextlib import contextmanager +from getpass import getuser +from pathlib import Path +from typing import TYPE_CHECKING, Sequence + +from imas2xarray import squash_placeholders +from pydantic import field_validator + +from ._copy import add_provenance_info +from ._mapping import IDSMapping +from ._schema import ImasBaseModel + +if TYPE_CHECKING: + import xarray as xr + from imas2xarray import Variable + +logger = logging.getLogger(__name__) + +IMAS_PATTERN = re.compile( + r'^((?P[\\\/\w]*)\/)?(?P\w+)\/(?P\d+)\/(?P\d+)$') + + +def _patch_str_repr(obj: object): + """Reset str/repr methods to default.""" + import types + + def true_repr(x): + type_ = type(x) + module = type_.__module__ + qualname = type_.__qualname__ + return f'<{module}.{qualname} object at {hex(id(x))}>' + + obj.__str__ = types.MethodType(true_repr, obj) # type: ignore + obj.__repr__ = types.MethodType(true_repr, obj) # type: ignore + + +class _ImasHandle(ImasBaseModel): + + def __str__(self): + return f'{self.user}/{self.db}/{self.shot}/{self.run}' + + @classmethod + def from_string(cls, string: str) -> _ImasHandle: + """Return location from formatted string. + + Format: + + /// + // + + Default to the current user if the user is not specified. + + For example: + + g2user/jet/91234/555 + + Parameters + ---------- + string : str + Input string containing imas db path + + Returns + ------- + ImasHandle + """ + match = IMAS_PATTERN.match(string) + + if match: + return cls(**match.groupdict()) + + raise ValueError(f'Could not match {string!r}') + + @field_validator('user') + def user_rel_path(cls, v, values): + # Override user if we have a relative location + if relative_location := values.data['relative_location']: + logger.info( + f'Updating imasdb location with relative location {relative_location}' + ) + return os.path.abspath(relative_location) + return v + + def validate(self): + """Validate the user. + + If the user is a path, then create it. + + Raises + ------ + ValueError: + If the user is invalid. + """ + if self.is_local_db: + # jintrac v220922 + self.path().parent.mkdir(parents=True, exist_ok=True) + elif self.user == getuser() or self.user == 'public': + # jintrac v210921 + pass + else: + raise ValueError(f'Invalid user: {self.user}') + + def to_string(self) -> str: + """Generate string representation of Imas location.""" + return f'{self.user}/{self.db}/{self.shot}/{self.run}' + + @property + def is_local_db(self): + """Return True if the handle points to a local imas database.""" + return self.user.startswith('/') + + @abstractmethod + def path(self) -> Path: + pass + + def get_raw_data(self, ids: str = 'core_profiles', **kwargs): + """Get data from IDS entry. + + Parameters + ---------- + ids : str, optional + Name of profiles to open. + **kwargs + These keyword parameters are passed to `ImasHandle.open()`. + + Returns + ------- + data + """ + with self.open(**kwargs) as data_entry: + data = data_entry.get(ids) + + # reset string representation because output is extremely lengthy + _patch_str_repr(data) + + return data + + def get(self, ids: str = 'core_profiles') -> IDSMapping: + """Map the data to a dict-like structure. + + Parameters + ---------- + ids : str, optional + Name of profiles to open + + Returns + ------- + IDSMapping + """ + raw_data = self.get_raw_data(ids) + return IDSMapping(raw_data) + + def get_all_variables( + self, + extra_variables: Sequence[Variable] = [], + squash: bool = True, + ids: str = 'core_profiles', + **kwargs, + ) -> xr.Dataset: + """Get all variables that duqtools knows of from selected ids from the + dataset. + + This function looks up the data location from the + `duqtools.config.var_lookup` table + + Parameters + ---------- + extra_variables : Sequence[Variable] + Extra variables to load in addition to the ones known by duqtools. + squash : bool + Squash placeholder variables + + Returns + ------- + ds : xarray + The data in `xarray` format. + **kwargs + These keyword arguments are passed to `IDSMapping.to_xarray()` + + Raises + ------ + ValueError + When variables are from multiple IDSs. + """ + from duqtools.config import var_lookup + + idsvar_lookup = var_lookup.filter_ids(ids) + variables = list( + set(list(extra_variables) + list(idsvar_lookup.keys()))) + return self.get_variables(variables, + squash, + empty_var_ok=True, + **kwargs) + + def get_variables( + self, + variables: Sequence[str | Variable], + squash: bool = True, + **kwargs, + ) -> xr.Dataset: + """Get variables from data set. + + This function looks up the data location from the + `duqtools.config.var_lookup` table, and returns + + Parameters + ---------- + variables : Sequence[Union[str, Variable]] + Variable names of the data to load. + squash : bool + Squash placeholder variables + + Returns + ------- + ds : xarray + The data in `xarray` format. + **kwargs + These keyword arguments are passed to `IDSMapping.to_xarray()` + + Raises + ------ + ValueError + When variables are from multiple IDSs. + """ + from duqtools.config import var_lookup + var_models = var_lookup.lookup(variables) + + idss = {var.ids for var in var_models} + + if len(idss) > 1: + raise ValueError( + f'All variables must belong to the same IDS, got {idss}') + + ids = var_models[0].ids + + data_map = self.get(ids) + + ds = data_map.to_xarray(variables=var_models, **kwargs) + + if squash: + ds = squash_placeholders(ds) + + return ds + + @abstractmethod + def entry(self, backend=None): + pass + + @contextmanager + def open(self, create: bool = False): + """Context manager to open database entry. + + Parameters + ---------- + create : bool, optional + Create empty database entry if it does not exist. + + Yields + ------ + entry : `imas.DBEntry` + Opened IMAS database entry + """ + entry = self.entry() + opcode, _ = entry.open() + + if opcode == 0: + logger.debug('Data entry opened: %s', self) + elif create: + cpcode, _ = entry.create() + if cpcode == 0: + logger.debug('Data entry created: %s', self) + else: + raise OSError( + f'Cannot create data entry: {self}. ' + f'Create a new db first using `imasdb {self.db}`') + else: + raise OSError(f'Data entry does not exist: {self}') + + try: + yield entry + finally: + entry.close() + + def update_from(self, mapping: IDSMapping): + """Synchronize updated data back to IMAS db entry. + + Shortcut for 'put' command. + + Parameters + ---------- + mapping : IDSMapping + Points to an IDS mapping of the data that should be written + to this handle. + """ + add_provenance_info(handle=self) + + with self.open() as db_entry: + mapping._ids.put(db_entry=db_entry) diff --git a/src/duqtools/ids/__init__.py b/src/duqtools/ids/__init__.py index f7bcd0c0..66ac7fc3 100644 --- a/src/duqtools/ids/__init__.py +++ b/src/duqtools/ids/__init__.py @@ -3,14 +3,18 @@ import logging from ._handle import ImasHandle +from ._hdf5handle import HDF5ImasHandle from ._imas import imas_mocked from ._mapping import IDSMapping +from ._mdsplushandle import MdsplusImasHandle from ._merge import merge_data logger = logging.getLogger(__name__) __all__ = [ 'ImasHandle', + 'MdsplusImasHandle', + 'HDF5ImasHandle', 'merge_data', 'imas_mocked', 'IDSMapping', diff --git a/src/duqtools/ids/_handle.py b/src/duqtools/ids/_handle.py index 58a36e80..d2572b12 100644 --- a/src/duqtools/ids/_handle.py +++ b/src/duqtools/ids/_handle.py @@ -1,391 +1,15 @@ from __future__ import annotations -import logging import os -import re -from contextlib import contextmanager -from getpass import getuser -from pathlib import Path -from typing import TYPE_CHECKING, List, Sequence -from imas2xarray import squash_placeholders -from pydantic import field_validator +from ._hdf5handle import HDF5ImasHandle +from ._mdsplushandle import MdsplusImasHandle -from ..operations import add_to_op_queue -from ._copy import add_provenance_info, copy_ids_entry -from ._imas import imas, imasdef -from ._mapping import IDSMapping -from ._schema import ImasBaseModel +backend = os.environ.get('JINTRAC_IMAS_BACKEND', 'HDF5') -if TYPE_CHECKING: - import xarray as xr - from imas2xarray import Variable - -logger = logging.getLogger(__name__) - -_FILENAME = 'ids_{shot}{run:04d}{suffix}' -_IMASDB = ('{db}', '3', '0') -GLOBAL_PATH_TEMPLATE = str(Path.home().parent.joinpath('{user}', 'public', - 'imasdb', *_IMASDB, - _FILENAME)) -LOCAL_PATH_TEMPLATE = str(Path('{user}', *_IMASDB, _FILENAME)) -PUBLIC_PATH_TEMPLATE = str(Path('shared', 'imasdb', *_IMASDB, _FILENAME)) - -SUFFIXES = ( - '.datafile', - '.characteristics', - '.tree', -) - -IMAS_PATTERN = re.compile( - r'^((?P[\\\/\w]*)\/)?(?P\w+)\/(?P\d+)\/(?P\d+)$') - - -def _patch_str_repr(obj: object): - """Reset str/repr methods to default.""" - import types - - def true_repr(x): - type_ = type(x) - module = type_.__module__ - qualname = type_.__qualname__ - return f'<{module}.{qualname} object at {hex(id(x))}>' - - obj.__str__ = types.MethodType(true_repr, obj) # type: ignore - obj.__repr__ = types.MethodType(true_repr, obj) # type: ignore - - -class ImasHandle(ImasBaseModel): - - def __str__(self): - return f'{self.user}/{self.db}/{self.shot}/{self.run}' - - @classmethod - def from_string(cls, string: str) -> ImasHandle: - """Return location from formatted string. - - Format: - - /// - // - - Default to the current user if the user is not specified. - - For example: - - g2user/jet/91234/555 - - Parameters - ---------- - string : str - Input string containing imas db path - - Returns - ------- - ImasHandle - """ - match = IMAS_PATTERN.match(string) - - if match: - return cls(**match.groupdict()) - - raise ValueError(f'Could not match {string!r}') - - @field_validator('user') - def user_rel_path(cls, v, values): - # Override user if we have a relative location - if relative_location := values.data['relative_location']: - logger.info( - f'Updating imasdb location with relative location {relative_location}' - ) - return os.path.abspath(relative_location) - return v - - def validate(self): - """Validate the user. - - If the user is a path, then create it. - - Raises - ------ - ValueError: - If the user is invalid. - """ - if self.is_local_db: - # jintrac v220922 - self.path().parent.mkdir(parents=True, exist_ok=True) - elif self.user == getuser() or self.user == 'public': - # jintrac v210921 - pass - else: - raise ValueError(f'Invalid user: {self.user}') - - def to_string(self) -> str: - """Generate string representation of Imas location.""" - return f'{self.user}/{self.db}/{self.shot}/{self.run}' - - @property - def is_local_db(self): - """Return True if the handle points to a local imas database.""" - return self.user.startswith('/') - - def path(self, suffix=SUFFIXES[0]) -> Path: - """Return location as Path.""" - imas_home = os.environ.get('IMAS_HOME') - - if self.is_local_db: - template = LOCAL_PATH_TEMPLATE - elif imas_home and self.user == 'public': - template = imas_home + '/' + PUBLIC_PATH_TEMPLATE - else: - template = GLOBAL_PATH_TEMPLATE - - return Path( - template.format(user=self.user, - db=self.db, - shot=self.shot, - run=self.run, - suffix=suffix)) - - def paths(self) -> List[Path]: - """Return location of all files as a list of Paths.""" - return [self.path(suffix) for suffix in SUFFIXES] - - def imasdb_path(self) -> Path: - """Return path to imasdb.""" - return self.path().parents[2] - - def exists(self) -> bool: - """Return true if the directory exists. - - Returns - ------- - bool - """ - path = self.path() - return all(path.with_suffix(sf).exists() for sf in SUFFIXES) - - def copy_data_to(self, destination: ImasHandle): - """Copy ids entry to given destination. - - Parameters - ---------- - destination : ImasHandle - Copy data to a new location. - """ - logger.debug('Copy %s to %s', self, destination) - - try: - copy_ids_entry(self, destination) - except Exception as err: - raise OSError(f'Failed to copy {self}') from err - - @add_to_op_queue('Removing ids', '{self}') - def delete(self): - """Remove data from entry.""" - # ERASE_PULSE operation is yet supported by IMAS as of June 2022 - path = self.path() - for suffix in SUFFIXES: - to_delete = path.with_suffix(suffix) - logger.debug('Removing %s', to_delete) - try: - to_delete.unlink() - except FileNotFoundError: - logger.warning('%s does not exist', to_delete) - - def get_raw_data(self, ids: str = 'core_profiles', **kwargs): - """Get data from IDS entry. - - Parameters - ---------- - ids : str, optional - Name of profiles to open. - **kwargs - These keyword parameters are passed to `ImasHandle.open()`. - - Returns - ------- - data - """ - with self.open(**kwargs) as data_entry: - data = data_entry.get(ids) - - # reset string representation because output is extremely lengthy - _patch_str_repr(data) - - return data - - def get(self, ids: str = 'core_profiles') -> IDSMapping: - """Map the data to a dict-like structure. - - Parameters - ---------- - ids : str, optional - Name of profiles to open - - Returns - ------- - IDSMapping - """ - raw_data = self.get_raw_data(ids) - return IDSMapping(raw_data) - - def get_all_variables( - self, - extra_variables: Sequence[Variable] = [], - squash: bool = True, - ids: str = 'core_profiles', - **kwargs, - ) -> xr.Dataset: - """Get all variables that duqtools knows of from selected ids from the - dataset. - - This function looks up the data location from the - `duqtools.config.var_lookup` table - - Parameters - ---------- - extra_variables : Sequence[Variable] - Extra variables to load in addition to the ones known by duqtools. - squash : bool - Squash placeholder variables - - Returns - ------- - ds : xarray - The data in `xarray` format. - **kwargs - These keyword arguments are passed to `IDSMapping.to_xarray()` - - Raises - ------ - ValueError - When variables are from multiple IDSs. - """ - from duqtools.config import var_lookup - - idsvar_lookup = var_lookup.filter_ids(ids) - variables = list( - set(list(extra_variables) + list(idsvar_lookup.keys()))) - return self.get_variables(variables, - squash, - empty_var_ok=True, - **kwargs) - - def get_variables( - self, - variables: Sequence[str | Variable], - squash: bool = True, - **kwargs, - ) -> xr.Dataset: - """Get variables from data set. - - This function looks up the data location from the - `duqtools.config.var_lookup` table, and returns - - Parameters - ---------- - variables : Sequence[Union[str, Variable]] - Variable names of the data to load. - squash : bool - Squash placeholder variables - - Returns - ------- - ds : xarray - The data in `xarray` format. - **kwargs - These keyword arguments are passed to `IDSMapping.to_xarray()` - - Raises - ------ - ValueError - When variables are from multiple IDSs. - """ - from duqtools.config import var_lookup - var_models = var_lookup.lookup(variables) - - idss = {var.ids for var in var_models} - - if len(idss) > 1: - raise ValueError( - f'All variables must belong to the same IDS, got {idss}') - - ids = var_models[0].ids - - data_map = self.get(ids) - - ds = data_map.to_xarray(variables=var_models, **kwargs) - - if squash: - ds = squash_placeholders(ds) - - return ds - - def entry(self, backend=imasdef.MDSPLUS_BACKEND): - """Return reference to `imas.DBEntry.` - - Parameters - ---------- - backend : optional - Which IMAS backend to use - - Returns - ------ - entry : `imas.DBEntry` - IMAS database entry - """ - return imas.DBEntry(backend, self.db, self.shot, self.run, self.user) - - @contextmanager - def open(self, backend=imasdef.MDSPLUS_BACKEND, create: bool = False): - """Context manager to open database entry. - - Parameters - ---------- - backend : optional - Which IMAS backend to use - create : bool, optional - Create empty database entry if it does not exist. - - Yields - ------ - entry : `imas.DBEntry` - Opened IMAS database entry - """ - entry = self.entry(backend=backend) - opcode, _ = entry.open() - - if opcode == 0: - logger.debug('Data entry opened: %s', self) - elif create: - cpcode, _ = entry.create() - if cpcode == 0: - logger.debug('Data entry created: %s', self) - else: - raise OSError( - f'Cannot create data entry: {self}. ' - f'Create a new db first using `imasdb {self.db}`') - else: - raise OSError(f'Data entry does not exist: {self}') - - try: - yield entry - finally: - entry.close() - - def update_from(self, mapping: IDSMapping): - """Synchronize updated data back to IMAS db entry. - - Shortcut for 'put' command. - - Parameters - ---------- - mapping : IDSMapping - Points to an IDS mapping of the data that should be written - to this handle. - """ - add_provenance_info(handle=self) - - with self.open() as db_entry: - mapping._ids.put(db_entry=db_entry) +if backend == 'MDSPLUS': + ImasHandle = MdsplusImasHandle # type: ignore +elif backend == 'HDF5': + ImasHandle = HDF5ImasHandle # type: ignore +else: + ImasHandle = HDF5ImasHandle # type: ignore diff --git a/src/duqtools/ids/_hdf5handle.py b/src/duqtools/ids/_hdf5handle.py new file mode 100644 index 00000000..0258cfef --- /dev/null +++ b/src/duqtools/ids/_hdf5handle.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import logging +import os +import shutil +from pathlib import Path +from typing import TYPE_CHECKING, List + +from ..operations import add_to_op_queue +from .__handle import _ImasHandle +from ._imas import imas, imasdef + +if TYPE_CHECKING: + + pass + +logger = logging.getLogger(__name__) + +_IMASDB = ('{db}', '3', '{shot}', '{run}') +GLOBAL_PATH_TEMPLATE = str(Path.home().parent.joinpath('{user}', 'public', + 'imasdb', *_IMASDB)) +LOCAL_PATH_TEMPLATE = str(Path('{user}', *_IMASDB)) +PUBLIC_PATH_TEMPLATE = str(Path('shared', 'imasdb', *_IMASDB)) + + +class HDF5ImasHandle(_ImasHandle): + + def path(self) -> Path: + """Return location as Path.""" + imas_home = os.environ.get('IMAS_HOME') + + if self.is_local_db: + template = LOCAL_PATH_TEMPLATE + elif imas_home and self.user == 'public': + template = imas_home + '/' + PUBLIC_PATH_TEMPLATE + else: + template = GLOBAL_PATH_TEMPLATE + + return Path( + template.format(user=self.user, + db=self.db, + shot=self.shot, + run=self.run)) + + def paths(self) -> List[Path]: + """Return location of all files as a list of Paths.""" + return [path for path in self.path().glob('*.h5')] + + def imasdb_path(self) -> Path: + """Return path to imasdb.""" + return self.path().parents[3] + + def exists(self) -> bool: + """Return true if the directory exists. + + Returns + ------- + bool + """ + return self.path().exists() + + @add_to_op_queue('Copy imas data', + 'from {self} to {destination}', + quiet=True) + def copy_data_to(self, destination: _ImasHandle): + """Copy ids entry to given destination. + + Parameters + ---------- + destination : ImasHandle + Copy data to a new location. + """ + logger.debug('Copy %s to %s', self, destination) + + destination.path().mkdir(parents=True, exist_ok=True) + + for src_file in self.paths(): + dst_file = destination.path() / src_file.name + shutil.copyfile(src_file, dst_file) + + @add_to_op_queue('Removing ids', '{self}') + def delete(self): + """Remove data from entry.""" + # ERASE_PULSE operation is yet supported by IMAS as of June 2022 + for path in self.paths(): + logger.debug('Removing %s', path) + try: + path.unlink() + except FileNotFoundError: + logger.warning('%s does not exist', path) + + def entry(self): + """Return reference to `imas.DBEntry.` + + Parameters + ---------- + backend : optional + Which IMAS backend to use + + Returns + ------ + entry : `imas.DBEntry` + IMAS database entry + """ + return imas.DBEntry(imasdef.HDF5_BACKEND, self.db, self.shot, self.run, + self.user) diff --git a/src/duqtools/ids/_mdsplushandle.py b/src/duqtools/ids/_mdsplushandle.py new file mode 100644 index 00000000..c455dd19 --- /dev/null +++ b/src/duqtools/ids/_mdsplushandle.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import TYPE_CHECKING, List + +from ..operations import add_to_op_queue +from .__handle import _ImasHandle +from ._copy import copy_ids_entry +from ._imas import imas, imasdef + +if TYPE_CHECKING: + + pass + +logger = logging.getLogger(__name__) + +_FILENAME = 'ids_{shot}{run:04d}{suffix}' +_IMASDB = ('{db}', '3', '0') +GLOBAL_PATH_TEMPLATE = str(Path.home().parent.joinpath('{user}', 'public', + 'imasdb', *_IMASDB, + _FILENAME)) +LOCAL_PATH_TEMPLATE = str(Path('{user}', *_IMASDB, _FILENAME)) +PUBLIC_PATH_TEMPLATE = str(Path('shared', 'imasdb', *_IMASDB, _FILENAME)) + +SUFFIXES = ( + '.datafile', + '.characteristics', + '.tree', +) + + +class MdsplusImasHandle(_ImasHandle): + + def path(self, suffix=SUFFIXES[0]) -> Path: + """Return location as Path.""" + imas_home = os.environ.get('IMAS_HOME') + + if self.is_local_db: + template = LOCAL_PATH_TEMPLATE + elif imas_home and self.user == 'public': + template = imas_home + '/' + PUBLIC_PATH_TEMPLATE + else: + template = GLOBAL_PATH_TEMPLATE + + return Path( + template.format(user=self.user, + db=self.db, + shot=self.shot, + run=self.run, + suffix=suffix)) + + def paths(self) -> List[Path]: + """Return location of all files as a list of Paths.""" + return [self.path(suffix) for suffix in SUFFIXES] + + def imasdb_path(self) -> Path: + """Return path to imasdb.""" + return self.path().parents[2] + + def exists(self) -> bool: + """Return true if the directory exists. + + Returns + ------- + bool + """ + path = self.path() + return all(path.with_suffix(sf).exists() for sf in SUFFIXES) + + def copy_data_to(self, destination: _ImasHandle): + """Copy ids entry to given destination. + + Parameters + ---------- + destination : ImasHandle + Copy data to a new location. + """ + logger.debug('Copy %s to %s', self, destination) + + try: + copy_ids_entry(self, destination) + except Exception as err: + raise OSError(f'Failed to copy {self}') from err + + @add_to_op_queue('Removing ids', '{self}') + def delete(self): + """Remove data from entry.""" + # ERASE_PULSE operation is yet supported by IMAS as of June 2022 + path = self.path() + for suffix in SUFFIXES: + to_delete = path.with_suffix(suffix) + logger.debug('Removing %s', to_delete) + try: + to_delete.unlink() + except FileNotFoundError: + logger.warning('%s does not exist', to_delete) + + def entry(self, backend=imasdef.MDSPLUS_BACKEND): + """Return reference to `imas.DBEntry.` + + Parameters + ---------- + backend : optional + Which IMAS backend to use + + Returns + ------ + entry : `imas.DBEntry` + IMAS database entry + """ + return imas.DBEntry(backend, self.db, self.shot, self.run, self.user) diff --git a/src/duqtools/ids/_tmp.py b/src/duqtools/ids/_tmp.py new file mode 100644 index 00000000..58a36e80 --- /dev/null +++ b/src/duqtools/ids/_tmp.py @@ -0,0 +1,391 @@ +from __future__ import annotations + +import logging +import os +import re +from contextlib import contextmanager +from getpass import getuser +from pathlib import Path +from typing import TYPE_CHECKING, List, Sequence + +from imas2xarray import squash_placeholders +from pydantic import field_validator + +from ..operations import add_to_op_queue +from ._copy import add_provenance_info, copy_ids_entry +from ._imas import imas, imasdef +from ._mapping import IDSMapping +from ._schema import ImasBaseModel + +if TYPE_CHECKING: + import xarray as xr + from imas2xarray import Variable + +logger = logging.getLogger(__name__) + +_FILENAME = 'ids_{shot}{run:04d}{suffix}' +_IMASDB = ('{db}', '3', '0') +GLOBAL_PATH_TEMPLATE = str(Path.home().parent.joinpath('{user}', 'public', + 'imasdb', *_IMASDB, + _FILENAME)) +LOCAL_PATH_TEMPLATE = str(Path('{user}', *_IMASDB, _FILENAME)) +PUBLIC_PATH_TEMPLATE = str(Path('shared', 'imasdb', *_IMASDB, _FILENAME)) + +SUFFIXES = ( + '.datafile', + '.characteristics', + '.tree', +) + +IMAS_PATTERN = re.compile( + r'^((?P[\\\/\w]*)\/)?(?P\w+)\/(?P\d+)\/(?P\d+)$') + + +def _patch_str_repr(obj: object): + """Reset str/repr methods to default.""" + import types + + def true_repr(x): + type_ = type(x) + module = type_.__module__ + qualname = type_.__qualname__ + return f'<{module}.{qualname} object at {hex(id(x))}>' + + obj.__str__ = types.MethodType(true_repr, obj) # type: ignore + obj.__repr__ = types.MethodType(true_repr, obj) # type: ignore + + +class ImasHandle(ImasBaseModel): + + def __str__(self): + return f'{self.user}/{self.db}/{self.shot}/{self.run}' + + @classmethod + def from_string(cls, string: str) -> ImasHandle: + """Return location from formatted string. + + Format: + + /// + // + + Default to the current user if the user is not specified. + + For example: + + g2user/jet/91234/555 + + Parameters + ---------- + string : str + Input string containing imas db path + + Returns + ------- + ImasHandle + """ + match = IMAS_PATTERN.match(string) + + if match: + return cls(**match.groupdict()) + + raise ValueError(f'Could not match {string!r}') + + @field_validator('user') + def user_rel_path(cls, v, values): + # Override user if we have a relative location + if relative_location := values.data['relative_location']: + logger.info( + f'Updating imasdb location with relative location {relative_location}' + ) + return os.path.abspath(relative_location) + return v + + def validate(self): + """Validate the user. + + If the user is a path, then create it. + + Raises + ------ + ValueError: + If the user is invalid. + """ + if self.is_local_db: + # jintrac v220922 + self.path().parent.mkdir(parents=True, exist_ok=True) + elif self.user == getuser() or self.user == 'public': + # jintrac v210921 + pass + else: + raise ValueError(f'Invalid user: {self.user}') + + def to_string(self) -> str: + """Generate string representation of Imas location.""" + return f'{self.user}/{self.db}/{self.shot}/{self.run}' + + @property + def is_local_db(self): + """Return True if the handle points to a local imas database.""" + return self.user.startswith('/') + + def path(self, suffix=SUFFIXES[0]) -> Path: + """Return location as Path.""" + imas_home = os.environ.get('IMAS_HOME') + + if self.is_local_db: + template = LOCAL_PATH_TEMPLATE + elif imas_home and self.user == 'public': + template = imas_home + '/' + PUBLIC_PATH_TEMPLATE + else: + template = GLOBAL_PATH_TEMPLATE + + return Path( + template.format(user=self.user, + db=self.db, + shot=self.shot, + run=self.run, + suffix=suffix)) + + def paths(self) -> List[Path]: + """Return location of all files as a list of Paths.""" + return [self.path(suffix) for suffix in SUFFIXES] + + def imasdb_path(self) -> Path: + """Return path to imasdb.""" + return self.path().parents[2] + + def exists(self) -> bool: + """Return true if the directory exists. + + Returns + ------- + bool + """ + path = self.path() + return all(path.with_suffix(sf).exists() for sf in SUFFIXES) + + def copy_data_to(self, destination: ImasHandle): + """Copy ids entry to given destination. + + Parameters + ---------- + destination : ImasHandle + Copy data to a new location. + """ + logger.debug('Copy %s to %s', self, destination) + + try: + copy_ids_entry(self, destination) + except Exception as err: + raise OSError(f'Failed to copy {self}') from err + + @add_to_op_queue('Removing ids', '{self}') + def delete(self): + """Remove data from entry.""" + # ERASE_PULSE operation is yet supported by IMAS as of June 2022 + path = self.path() + for suffix in SUFFIXES: + to_delete = path.with_suffix(suffix) + logger.debug('Removing %s', to_delete) + try: + to_delete.unlink() + except FileNotFoundError: + logger.warning('%s does not exist', to_delete) + + def get_raw_data(self, ids: str = 'core_profiles', **kwargs): + """Get data from IDS entry. + + Parameters + ---------- + ids : str, optional + Name of profiles to open. + **kwargs + These keyword parameters are passed to `ImasHandle.open()`. + + Returns + ------- + data + """ + with self.open(**kwargs) as data_entry: + data = data_entry.get(ids) + + # reset string representation because output is extremely lengthy + _patch_str_repr(data) + + return data + + def get(self, ids: str = 'core_profiles') -> IDSMapping: + """Map the data to a dict-like structure. + + Parameters + ---------- + ids : str, optional + Name of profiles to open + + Returns + ------- + IDSMapping + """ + raw_data = self.get_raw_data(ids) + return IDSMapping(raw_data) + + def get_all_variables( + self, + extra_variables: Sequence[Variable] = [], + squash: bool = True, + ids: str = 'core_profiles', + **kwargs, + ) -> xr.Dataset: + """Get all variables that duqtools knows of from selected ids from the + dataset. + + This function looks up the data location from the + `duqtools.config.var_lookup` table + + Parameters + ---------- + extra_variables : Sequence[Variable] + Extra variables to load in addition to the ones known by duqtools. + squash : bool + Squash placeholder variables + + Returns + ------- + ds : xarray + The data in `xarray` format. + **kwargs + These keyword arguments are passed to `IDSMapping.to_xarray()` + + Raises + ------ + ValueError + When variables are from multiple IDSs. + """ + from duqtools.config import var_lookup + + idsvar_lookup = var_lookup.filter_ids(ids) + variables = list( + set(list(extra_variables) + list(idsvar_lookup.keys()))) + return self.get_variables(variables, + squash, + empty_var_ok=True, + **kwargs) + + def get_variables( + self, + variables: Sequence[str | Variable], + squash: bool = True, + **kwargs, + ) -> xr.Dataset: + """Get variables from data set. + + This function looks up the data location from the + `duqtools.config.var_lookup` table, and returns + + Parameters + ---------- + variables : Sequence[Union[str, Variable]] + Variable names of the data to load. + squash : bool + Squash placeholder variables + + Returns + ------- + ds : xarray + The data in `xarray` format. + **kwargs + These keyword arguments are passed to `IDSMapping.to_xarray()` + + Raises + ------ + ValueError + When variables are from multiple IDSs. + """ + from duqtools.config import var_lookup + var_models = var_lookup.lookup(variables) + + idss = {var.ids for var in var_models} + + if len(idss) > 1: + raise ValueError( + f'All variables must belong to the same IDS, got {idss}') + + ids = var_models[0].ids + + data_map = self.get(ids) + + ds = data_map.to_xarray(variables=var_models, **kwargs) + + if squash: + ds = squash_placeholders(ds) + + return ds + + def entry(self, backend=imasdef.MDSPLUS_BACKEND): + """Return reference to `imas.DBEntry.` + + Parameters + ---------- + backend : optional + Which IMAS backend to use + + Returns + ------ + entry : `imas.DBEntry` + IMAS database entry + """ + return imas.DBEntry(backend, self.db, self.shot, self.run, self.user) + + @contextmanager + def open(self, backend=imasdef.MDSPLUS_BACKEND, create: bool = False): + """Context manager to open database entry. + + Parameters + ---------- + backend : optional + Which IMAS backend to use + create : bool, optional + Create empty database entry if it does not exist. + + Yields + ------ + entry : `imas.DBEntry` + Opened IMAS database entry + """ + entry = self.entry(backend=backend) + opcode, _ = entry.open() + + if opcode == 0: + logger.debug('Data entry opened: %s', self) + elif create: + cpcode, _ = entry.create() + if cpcode == 0: + logger.debug('Data entry created: %s', self) + else: + raise OSError( + f'Cannot create data entry: {self}. ' + f'Create a new db first using `imasdb {self.db}`') + else: + raise OSError(f'Data entry does not exist: {self}') + + try: + yield entry + finally: + entry.close() + + def update_from(self, mapping: IDSMapping): + """Synchronize updated data back to IMAS db entry. + + Shortcut for 'put' command. + + Parameters + ---------- + mapping : IDSMapping + Points to an IDS mapping of the data that should be written + to this handle. + """ + add_provenance_info(handle=self) + + with self.open() as db_entry: + mapping._ids.put(db_entry=db_entry) diff --git a/src/duqtools/merge.py b/src/duqtools/merge.py index 0b5fdef4..9f9b970d 100644 --- a/src/duqtools/merge.py +++ b/src/duqtools/merge.py @@ -84,8 +84,8 @@ def merge(*, target: str, template: str, handles: list[str], variables = _resolve_variables(var_names) _merge( - handles=handles, - template=template, - target=target, + handles=handles, # type: ignore + template=template, # type: ignore + target=target, # type: ignore variables=variables, ) diff --git a/src/duqtools/systems/jetto/_system.py b/src/duqtools/systems/jetto/_system.py index e34fd6d1..ff3af306 100644 --- a/src/duqtools/systems/jetto/_system.py +++ b/src/duqtools/systems/jetto/_system.py @@ -169,7 +169,6 @@ def submit_docker(self, job: Job): } os.environ['RUNS_HOME'] = os.getcwd() - os.environ['JINTRAC_IMAS_BACKEND'] = 'MDSPLUS' container = jetto_manager.submit_job_to_docker( jetto_config, job.path, diff --git a/tests/ids/test_imas_handler.py b/tests/ids/test_imas_handler.py index a253291a..34412060 100644 --- a/tests/ids/test_imas_handler.py +++ b/tests/ids/test_imas_handler.py @@ -4,7 +4,7 @@ import pytest -from duqtools.ids import ImasHandle +from duqtools.ids import HDF5ImasHandle, ImasHandle, MdsplusImasHandle TEST_STRINGS = ( 'gu3ido/m0o/9234/123', @@ -55,7 +55,18 @@ def test_from_string_localdb(string, expected): def test_jintrac_v220922(): """Test local db implementation.""" user = '/some/path/imasdb' - h = ImasHandle(user=user, db='moo', shot=1, run=1) + + h = MdsplusImasHandle(user=user, db='moo', shot=1, run=1) assert h.is_local_db assert str(h.path().parent) == '/some/path/imasdb/moo/3/0' + + +def test_imas_hdf5(): + """Test local db implementation.""" + user = '/some/path/imasdb' + + h = HDF5ImasHandle(user=user, db='moo', shot=1, run=1) + + assert h.is_local_db + assert str(h.path().parent) == '/some/path/imasdb/moo/3/1'