diff --git a/pipe-cli/mount/pipefuse/audit.py b/pipe-cli/mount/pipefuse/audit.py index 6d833d4ea2..1723c868e1 100644 --- a/pipe-cli/mount/pipefuse/audit.py +++ b/pipe-cli/mount/pipefuse/audit.py @@ -14,7 +14,7 @@ from pipefuse.fsclient import FileSystemClientDecorator from pipefuse.storage import StorageLowLevelFileSystemClient -from src.common.audit import DataAccessEntry, DataAccessType +from src.common.audit import DataAccessEvent, DataAccessType class AuditFileSystemClient(FileSystemClientDecorator, StorageLowLevelFileSystemClient): @@ -26,23 +26,23 @@ def __init__(self, inner, container): self._fhs = set() def upload(self, buf, path): - self._container.put(DataAccessEntry(path, DataAccessType.WRITE)) + self._container.put(DataAccessEvent(path, DataAccessType.WRITE)) self._inner.upload(buf, path) def delete(self, path): - self._container.put(DataAccessEntry(path, DataAccessType.DELETE)) + self._container.put(DataAccessEvent(path, DataAccessType.DELETE)) self._inner.delete(path) def mv(self, old_path, path): - self._container.put_all([DataAccessEntry(old_path, DataAccessType.READ), - DataAccessEntry(old_path, DataAccessType.DELETE), - DataAccessEntry(path, DataAccessType.WRITE)]) + self._container.put_all([DataAccessEvent(old_path, DataAccessType.READ), + DataAccessEvent(old_path, DataAccessType.DELETE), + DataAccessEvent(path, DataAccessType.WRITE)]) self._inner.mv(old_path, path) def download_range(self, fh, buf, path, offset=0, length=0): if fh not in self._fhs: self._fhs.add(fh) - self._container.put(DataAccessEntry(path, DataAccessType.READ)) + self._container.put(DataAccessEvent(path, DataAccessType.READ)) self._inner.download_range(fh, buf, path, offset, length) def flush(self, fh, path): @@ -53,5 +53,5 @@ def flush(self, fh, path): self._inner.flush(fh, path) def new_mpu(self, path, file_size, download, mv): - self._container.put(DataAccessEntry(path, DataAccessType.WRITE)) + self._container.put(DataAccessEvent(path, DataAccessType.WRITE)) return self._inner.new_mpu(path, file_size, download, mv) diff --git a/pipe-cli/src/common/audit.py b/pipe-cli/src/common/audit.py index 1c0505031f..02135ca3df 100644 --- a/pipe-cli/src/common/audit.py +++ b/pipe-cli/src/common/audit.py @@ -15,12 +15,14 @@ import collections import datetime import logging -import pytz import socket -import time from abc import abstractmethod, ABCMeta +from contextlib import closing from threading import Thread +import pytz +import time + try: from queue import Queue # Python 3 except ImportError: @@ -28,12 +30,37 @@ class DataAccessType: - READ = 'R' - WRITE = 'W' - DELETE = 'D' + READ = 'READ' + WRITE = 'WRITE' + DELETE = 'DELETE' + + +class DataAccessEvent: + + def __init__(self, path, type, storage=None): + self._path = path + self._type = type + self._storage = storage + + @property + def path(self): + return self._path + + @property + def type(self): + return self._type + + @property + def storage(self): + return self._storage + + def __hash__(self): + return hash((self._path, self._type, self._storage)) -DataAccessEntry = collections.namedtuple('DataAccessEntry', 'path,type') -StorageDataAccessEntry = collections.namedtuple('StorageDataAccessEntry', 'storage,path,type') + def __eq__(self, other): + return isinstance(other, DataAccessEvent) and \ + (self._path, self._type, self._storage) \ + == (other._path, other._type, other._storage) def chunks(l, n): @@ -221,12 +248,11 @@ def consume(self, entries): self._inner.consume([self._convert(entry) for entry in entries]) def _convert(self, entry): - if self._storage: - return DataAccessEntry(self._build_path(self._storage.type, self._storage.root, entry.path), entry.type) - elif isinstance(entry, StorageDataAccessEntry): - return DataAccessEntry(self._build_path(entry.storage.type, entry.storage.root, entry.path), entry.type) - else: + storage = entry.storage or self._storage + if not storage: return entry + return DataAccessEvent(self._build_path(storage.type, storage.root, entry.path), entry.type, + storage=storage) def _build_path(self, storage_type, storage_path, item_path): return '{}://{}/{}'.format(storage_type.lower(), storage_path, item_path) @@ -244,49 +270,51 @@ def __init__(self, consumer_func, user_name, service_name): self._log_hostname = socket.gethostname() self._log_type = 'audit' self._log_severity = 'INFO' - self._type_mapping = { - DataAccessType.READ: 'READ', - DataAccessType.WRITE: 'WRITE', - DataAccessType.DELETE: 'DELETE' - } def consume(self, entries): - now = datetime.datetime.now(tz=pytz.utc) - now_str = now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - events_ids_gen = self._gen_event_ids() - try: - self._consumer_func([{ - 'eventId': next(events_ids_gen), - 'messageTimestamp': now_str, - 'hostname': self._log_hostname, - 'serviceName': self._log_service, - 'type': self._log_type, - 'user': self._log_user, - 'message': self._convert_type(entry.type) + ' ' + entry.path, - 'severity': self._log_severity - } for entry in entries]) - finally: - events_ids_gen.close() + with closing(self._ids()) as ids: + self._consumer_func(list(self._to_logs(entries, ids))) - def _gen_event_ids(self): + def _ids(self): """ Generates unique ids from epoch nanoseconds. """ - prev_event_ids = set() + prev_values = set() while True: - next_event_id = self._time_ns() + next_value = self._time_ns() while True: - if next_event_id not in prev_event_ids: + if next_value not in prev_values: break - next_event_id += 1000 - prev_event_ids.add(next_event_id) - yield next_event_id + next_value += 1000 + prev_values.add(next_value) + yield next_value def _time_ns(self): return int(time.time() * 10 ** 9) - def _convert_type(self, type): - return self._type_mapping.get(type, 'ACCESS') + def _to_logs(self, entries, ids): + now = datetime.datetime.now(tz=pytz.utc) + now_str = now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + for entry in entries: + yield { + 'eventId': next(ids), + 'messageTimestamp': now_str, + 'hostname': self._log_hostname, + 'serviceName': self._log_service, + 'type': self._log_type, + 'user': self._log_user, + 'message': entry.type + ' ' + entry.path, + 'severity': self._log_severity, + 'storageId': self._get_storage_id(entry.storage) + } + + def _get_storage_id(self, storage): + if not storage: + return None + if hasattr(storage, 'id'): + return storage.id + elif hasattr(storage, 'identifier'): + return storage.identifier def flush(self): pass diff --git a/pipe-cli/src/model/data_storage_wrapper.py b/pipe-cli/src/model/data_storage_wrapper.py index fe64f3d5e8..46fdc60035 100644 --- a/pipe-cli/src/model/data_storage_wrapper.py +++ b/pipe-cli/src/model/data_storage_wrapper.py @@ -117,11 +117,11 @@ def __get_storage_wrapper(cls, bucket, relative_path, *args, **kwargs): raise RuntimeError('There is no data storage wrapper for %s storage type.' % bucket.type) @classmethod - def get_operation_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_operation_manager(cls, source_wrapper, destination_wrapper, events, command): manager_types = source_wrapper.get_type(), destination_wrapper.get_type() if manager_types in DataStorageWrapper._transfer_manager_suppliers: supplier = DataStorageWrapper._transfer_manager_suppliers[manager_types] - return supplier(source_wrapper, destination_wrapper, audit, command) + return supplier(source_wrapper, destination_wrapper, events, command) else: raise RuntimeError('Transferring files between the following storage types %s -> %s is not supported.' % manager_types) @@ -253,7 +253,7 @@ def get_type(self): pass @abstractmethod - def get_restore_manager(self, audit): + def get_restore_manager(self, events): pass @abstractmethod @@ -261,7 +261,7 @@ def get_list_manager(self, show_versions=False): pass @abstractmethod - def get_delete_manager(self, audit, versioning): + def get_delete_manager(self, events, versioning): pass @@ -302,14 +302,14 @@ def get_file_download_uri(self, relative_path): def delete_item(self, relative_path): S3BucketOperations.delete_item(self, relative_path, session=self.session) - def get_restore_manager(self, audit): - return S3BucketOperations.get_restore_manager(self, audit) + def get_restore_manager(self, events): + return S3BucketOperations.get_restore_manager(self, events) def get_list_manager(self, show_versions=False): return S3BucketOperations.get_list_manager(self, show_versions=show_versions) - def get_delete_manager(self, audit, versioning): - return S3BucketOperations.get_delete_manager(self, audit, versioning) + def get_delete_manager(self, events, versioning): + return S3BucketOperations.get_delete_manager(self, events, versioning) class AzureBucketWrapper(CloudDataStorageWrapper): @@ -330,7 +330,7 @@ def build_wrapper(cls, root_bucket, relative_path, versioning=False, init=True): def get_type(self): return WrapperType.AZURE - def get_restore_manager(self, audit): + def get_restore_manager(self, events): raise RuntimeError('Versioning is not supported by AZURE cloud provider') def get_list_manager(self, show_versions=False): @@ -338,10 +338,10 @@ def get_list_manager(self, show_versions=False): raise RuntimeError('Versioning is not supported by AZURE cloud provider') return AzureListingManager(self._blob_service(read=True, write=False), self.bucket) - def get_delete_manager(self, audit, versioning): + def get_delete_manager(self, events, versioning): if versioning: raise RuntimeError('Versioning is not supported by AZURE cloud provider') - return AzureDeleteManager(self._blob_service(read=True, write=True), audit, self.bucket) + return AzureDeleteManager(self._blob_service(read=True, write=True), events, self.bucket) def _blob_service(self, read, write): if write or not self.service: @@ -361,14 +361,14 @@ def build_wrapper(cls, root_bucket, relative_path, init=True, *args, **kwargs): def get_type(self): return WrapperType.GS - def get_restore_manager(self, audit): - return GsRestoreManager(self._storage_client(write=True, versioning=True), audit, self) + def get_restore_manager(self, events): + return GsRestoreManager(self._storage_client(write=True, versioning=True), events, self) def get_list_manager(self, show_versions=False): return GsListingManager(self._storage_client(versioning=show_versions), self.bucket, show_versions) - def get_delete_manager(self, audit, versioning): - return GsDeleteManager(self._storage_client(write=True, versioning=versioning), audit, self.bucket) + def get_delete_manager(self, events, versioning): + return GsDeleteManager(self._storage_client(write=True, versioning=versioning), events, self.bucket) def _storage_client(self, read=True, write=False, versioning=False): return GsBucketOperations.get_client(self.bucket, read=read, write=write, versioning=versioning) diff --git a/pipe-cli/src/utilities/audit.py b/pipe-cli/src/utilities/audit.py index 387a6de9c9..09182ebdc0 100644 --- a/pipe-cli/src/utilities/audit.py +++ b/pipe-cli/src/utilities/audit.py @@ -17,11 +17,10 @@ from src.api.log import SystemLog from src.common.audit import CloudPipelineAuditConsumer, LoggingAuditConsumer, StoragePathAuditConsumer, \ ChunkingAuditConsumer, BufferingAuditConsumer, QueueAuditContainer, AuditDaemon, AuditContextManager, \ - DataAccessEntry, StorageDataAccessEntry, DataAccessType + DataAccessEvent, DataAccessType from src.config import Config -DataAccessEntry = DataAccessEntry -StorageDataAccessEntry = StorageDataAccessEntry +DataAccessEvent = DataAccessEvent DataAccessType = DataAccessType diff --git a/pipe-cli/src/utilities/datastorage_operations.py b/pipe-cli/src/utilities/datastorage_operations.py index 25de957d8c..99defb8765 100644 --- a/pipe-cli/src/utilities/datastorage_operations.py +++ b/pipe-cli/src/utilities/datastorage_operations.py @@ -123,7 +123,7 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags audit_ctx = auditing() manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper, - audit=audit_ctx.container, command=command) + events=audit_ctx.container, command=command) items = files_to_copy if file_list else source_wrapper.get_items(quiet=quiet) items = cls._filter_items(items, manager, source_wrapper, destination_wrapper, permission_to_check, include, exclude, force, quiet, skip_existing, verify_destination, @@ -234,7 +234,7 @@ def storage_remove_item(cls, path, yes, version, hard_delete, recursive, exclude click.echo('Removing {} ...'.format(path), nl=False) with auditing() as audit: - manager = source_wrapper.get_delete_manager(audit=audit, versioning=version or hard_delete) + manager = source_wrapper.get_delete_manager(events=audit, versioning=version or hard_delete) manager.delete_items(source_wrapper.path, version=version, hard_delete=hard_delete, exclude=exclude, include=include, @@ -313,7 +313,7 @@ def restore(cls, path, version, recursive, exclude, include): click.echo('Flag --recursive (-r) is required to restore folders.', err=True) sys.exit(1) with auditing() as audit: - manager = source_wrapper.get_restore_manager(audit=audit) + manager = source_wrapper.get_restore_manager(events=audit) manager.restore_version(version, exclude, include, recursive=recursive) @classmethod diff --git a/pipe-cli/src/utilities/storage/azure.py b/pipe-cli/src/utilities/storage/azure.py index 9cf86610dd..230b1a3047 100644 --- a/pipe-cli/src/utilities/storage/azure.py +++ b/pipe-cli/src/utilities/storage/azure.py @@ -23,7 +23,7 @@ from datetime import timedelta, datetime from src.model.datastorage_usage_model import StorageUsage -from src.utilities.audit import StorageDataAccessEntry, DataAccessType +from src.utilities.audit import DataAccessEvent, DataAccessType from src.utilities.encoding_utilities import to_string from src.utilities.storage.storage_usage import StorageUsageAccumulator @@ -66,9 +66,9 @@ def callback(source_key, size, quiet, lock=None): class AzureManager: - def __init__(self, blob_service, audit=None): + def __init__(self, blob_service, events=None): self.service = blob_service - self.audit = audit + self.events = events def get_max_connections(self, io_threads): return max(io_threads, 1) if io_threads is not None else 2 @@ -142,8 +142,8 @@ def get_file_tags(self, relative_path): class AzureDeleteManager(AzureManager, AbstractDeleteManager): - def __init__(self, blob_service, audit, bucket): - super(AzureDeleteManager, self).__init__(blob_service, audit) + def __init__(self, blob_service, events, bucket): + super(AzureDeleteManager, self).__init__(blob_service, events) self.bucket = bucket self.delimiter = StorageOperations.PATH_SEPARATOR self.listing_manager = AzureListingManager(self.service, self.bucket) @@ -194,7 +194,7 @@ def __delete_blob(self, blob_name, exclude, include, prefix=None): return False if PatternMatcher.match_any(file_name, exclude, default=False): return False - self.audit.put(StorageDataAccessEntry(self.bucket, blob_name, DataAccessType.DELETE)) + self.events.put(DataAccessEvent(blob_name, DataAccessType.DELETE, storage=self.bucket)) self.service.delete_blob(self.bucket.path, blob_name) return True @@ -236,8 +236,8 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path progress_callback = AzureProgressPercentage.callback(full_path, size, quiet, lock) if progress_callback: progress_callback(0, size) - self.audit.put_all([StorageDataAccessEntry(source_wrapper.bucket, full_path, DataAccessType.READ), - StorageDataAccessEntry(destination_wrapper.bucket, destination_path, DataAccessType.WRITE)]) + self.events.put_all([DataAccessEvent(full_path, DataAccessType.READ, storage=source_wrapper.bucket), + DataAccessEvent(destination_path, DataAccessType.WRITE, storage=destination_wrapper.bucket)]) self.service.copy_blob(destination_bucket, destination_path, source_blob_url, requires_sync=sync_copy) if not sync_copy: @@ -245,7 +245,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path if progress_callback: progress_callback(size, size) if clean: - self.audit.put(StorageDataAccessEntry(source_wrapper.bucket, full_path, DataAccessType.DELETE)) + self.events.put(DataAccessEvent(full_path, DataAccessType.DELETE, storage=source_wrapper.bucket)) source_service.delete_blob(source_wrapper.bucket.path, full_path) return TransferResult(source_key=full_path, destination_key=destination_path, destination_version=None, tags=StorageOperations.parse_tags(tags)) @@ -287,11 +287,11 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, self.create_local_folder(destination_key, lock) progress_callback = AzureProgressPercentage.callback(source_key, size, quiet, lock) - self.audit.put(StorageDataAccessEntry(source_wrapper.bucket, source_key, DataAccessType.READ)) + self.events.put(DataAccessEvent(source_key, DataAccessType.READ, storage=source_wrapper.bucket)) self.service.get_blob_to_path(source_wrapper.bucket.path, source_key, to_string(destination_key), progress_callback=progress_callback) if clean: - self.audit.put(StorageDataAccessEntry(source_wrapper.bucket, source_key, DataAccessType.DELETE)) + self.events.put(DataAccessEvent(source_key, DataAccessType.DELETE, storage=source_wrapper.bucket)) self.service.delete_blob(source_wrapper.bucket.path, source_key) @@ -317,7 +317,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path destination_tags = StorageOperations.generate_tags(tags, source_key) progress_callback = AzureProgressPercentage.callback(relative_path, size, quiet, lock) max_connections = self.get_max_connections(io_threads) - self.audit.put(StorageDataAccessEntry(destination_wrapper.bucket, destination_key, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) self.service.create_blob_from_path(destination_wrapper.bucket.path, destination_key, to_string(source_key), metadata=destination_tags, progress_callback=progress_callback, @@ -364,7 +364,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path destination_tags = StorageOperations.generate_tags(tags, source_key) progress_callback = AzureProgressPercentage.callback(relative_path, size, quiet, lock) max_connections = self.get_max_connections(io_threads) - self.audit.put(StorageDataAccessEntry(destination_wrapper.bucket, destination_key, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) self.service.create_blob_from_stream(destination_wrapper.bucket.path, destination_key, _SourceUrlIO(source_key), metadata=destination_tags, progress_callback=progress_callback, @@ -452,24 +452,24 @@ def _apply_host(self, request, operation_context, retry_context): class AzureBucketOperations: @classmethod - def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, events, command): blob_service = cls.get_blob_service(destination_wrapper.bucket, read=True, write=True) - return TransferBetweenAzureBucketsManager(blob_service, audit) + return TransferBetweenAzureBucketsManager(blob_service, events) @classmethod - def get_download_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_download_manager(cls, source_wrapper, destination_wrapper, events, command): blob_service = cls.get_blob_service(source_wrapper.bucket, read=True, write=command == 'mv') - return AzureDownloadManager(blob_service, audit) + return AzureDownloadManager(blob_service, events) @classmethod - def get_upload_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_upload_manager(cls, source_wrapper, destination_wrapper, events, command): blob_service = cls.get_blob_service(destination_wrapper.bucket, read=True, write=True) - return AzureUploadManager(blob_service, audit) + return AzureUploadManager(blob_service, events) @classmethod - def get_transfer_from_http_or_ftp_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_transfer_from_http_or_ftp_manager(cls, source_wrapper, destination_wrapper, events, command): blob_service = cls.get_blob_service(destination_wrapper.bucket, read=True, write=True) - return TransferFromHttpOrFtpToAzureManager(blob_service, audit) + return TransferFromHttpOrFtpToAzureManager(blob_service, events) @classmethod def get_blob_service(cls, *args, **kwargs): diff --git a/pipe-cli/src/utilities/storage/gs.py b/pipe-cli/src/utilities/storage/gs.py index a335f16dff..52d2b15813 100644 --- a/pipe-cli/src/utilities/storage/gs.py +++ b/pipe-cli/src/utilities/storage/gs.py @@ -25,7 +25,7 @@ from urllib3.connection import VerifiedHTTPSConnection from src.model.datastorage_usage_model import StorageUsage -from src.utilities.audit import StorageDataAccessEntry, DataAccessType +from src.utilities.audit import DataAccessEvent, DataAccessType from src.utilities.encoding_utilities import to_string try: @@ -382,9 +382,9 @@ def __init__(self, size, progress_callback, attempts=DEFAULT_RESUME_ATTEMPTS, *a class GsManager: - def __init__(self, client, audit=None): + def __init__(self, client, events=None): self.client = client - self.audit = audit + self.events = events def custom_blob(self, bucket, blob_name, progress_callback, size): return _CustomBlob( @@ -515,8 +515,8 @@ def get_file_tags(self, relative_path): class GsDeleteManager(GsManager, AbstractDeleteManager): - def __init__(self, client, audit, bucket): - super(GsDeleteManager, self).__init__(client, audit) + def __init__(self, client, events, bucket): + super(GsDeleteManager, self).__init__(client, events) self.bucket = bucket self.delimiter = StorageOperations.PATH_SEPARATOR @@ -598,7 +598,7 @@ def _item_blobs_for_deletion(self, bucket, item, hard_delete): def _delete_blob(self, blob, exclude, include, prefix=None): if self._is_matching_delete_filters(blob.name, exclude, include, prefix): - self.audit.put(StorageDataAccessEntry(self.bucket, blob.name, DataAccessType.DELETE)) + self.events.put(DataAccessEvent(blob.name, DataAccessType.DELETE, storage=self.bucket)) blob.delete() return True return False @@ -624,8 +624,8 @@ def _get_listing_manager(self, show_versions): class GsRestoreManager(GsManager, AbstractRestoreManager): - def __init__(self, client, audit, wrapper): - super(GsRestoreManager, self).__init__(client, audit) + def __init__(self, client, events, wrapper): + super(GsRestoreManager, self).__init__(client, events) self.wrapper = wrapper self.listing_manager = GsListingManager(self.client, self.wrapper.bucket, show_versions=True) @@ -646,7 +646,7 @@ def restore_version(self, version, exclude, include, recursive): raise RuntimeError('Version "%s" doesn\'t exist.' % version) if not item.deleted and item.version == version: raise RuntimeError('Version "%s" is already the latest version.' % version) - self.audit.put(StorageDataAccessEntry(self.wrapper.bucket, blob.name, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(blob.name, DataAccessType.WRITE, storage=self.wrapper.bucket)) restored_blob = bucket.copy_blob(blob, bucket, blob.name, source_generation=int(version)) item.name = self.wrapper.path item.version = version @@ -658,14 +658,14 @@ def restore_version(self, version, exclude, include, recursive): item = file_items[0] if not item.deleted: raise RuntimeError('Latest file version is not deleted. Please specify "--version" parameter.') - self.audit.put(StorageDataAccessEntry(self.wrapper.bucket, item.name, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(item.name, DataAccessType.WRITE, storage=self.wrapper.bucket)) restored_blob = self._restore_latest_archived_version(bucket, item) self._flush_restore_results([(item, restored_blob)], flush_size=1) else: restoring_results = [] for item in all_items: if item.deleted: - self.audit.put(StorageDataAccessEntry(self.wrapper.bucket, item.name, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(item.name, DataAccessType.WRITE, storage=self.wrapper.bucket)) restored_blob = self._restore_latest_archived_version(bucket, item) restoring_results.append((item, restored_blob)) restoring_results = self._flush_restore_results(restoring_results) @@ -725,8 +725,8 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path source_bucket = source_client.bucket(source_wrapper.bucket.path) source_blob = source_bucket.blob(full_path) destination_bucket = self.client.bucket(destination_wrapper.bucket.path) - self.audit.put_all([StorageDataAccessEntry(source_wrapper.bucket, source_blob.name, DataAccessType.READ), - StorageDataAccessEntry(destination_wrapper.bucket, destination_path, DataAccessType.WRITE)]) + self.events.put_all([DataAccessEvent(source_blob.name, DataAccessType.READ, storage=source_wrapper.bucket), + DataAccessEvent(destination_path, DataAccessType.WRITE, storage=destination_wrapper.bucket)]) destination_blob = source_bucket.copy_blob(source_blob, destination_bucket, destination_path, client=self.client) # Transfer between buckets in GCP is almost an instant operation. # Therefore, the progress bar can be updated only once. @@ -737,7 +737,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path if progress_callback is not None: progress_callback(size) if clean: - self.audit.put(StorageDataAccessEntry(source_wrapper.bucket, source_blob.name, DataAccessType.DELETE)) + self.events.put(DataAccessEvent(source_blob.name, DataAccessType.DELETE, storage=source_wrapper.bucket)) source_blob.delete() return TransferResult(source_key=full_path, destination_key=destination_path, destination_version=destination_blob.generation, @@ -747,7 +747,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path class GsDownloadManager(GsManager, AbstractTransferManager): DEFAULT_BUFFERING_SIZE = 1024 * 1024 # 1MB - def __init__(self, client, audit, buffering=DEFAULT_BUFFERING_SIZE): + def __init__(self, client, events, buffering=DEFAULT_BUFFERING_SIZE): """ Google cloud storage download manager that performs either resumable downloading or parallel downloading depending on file size. @@ -767,7 +767,7 @@ def __init__(self, client, audit, buffering=DEFAULT_BUFFERING_SIZE): :param buffering: Buffering size for file system flushing. Defaults to DEFAULT_BUFFERING_SIZE and can be overridden with CP_CLI_DOWNLOAD_BUFFERING_SIZE environment variable. """ - GsManager.__init__(self, client, audit) + GsManager.__init__(self, client, events) self._buffering = int(os.environ.get(CP_CLI_DOWNLOAD_BUFFERING_SIZE) or buffering) def get_destination_key(self, destination_wrapper, relative_path): @@ -794,7 +794,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path progress_callback = None self._replace_default_download_chunk_size(self._buffering) transfer_config = self._get_transfer_config(io_threads) - self.audit.put(StorageDataAccessEntry(source_wrapper.bucket, source_key, DataAccessType.READ)) + self.events.put(DataAccessEvent(source_key, DataAccessType.READ, storage=source_wrapper.bucket)) if size > transfer_config.multipart_threshold: bucket = self.client.bucket(source_wrapper.bucket.path) blob = self.custom_blob(bucket, source_key, None, size) @@ -813,7 +813,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path blob = self.custom_blob(bucket, source_key, progress_callback, size) self._download_to_file(blob, to_string(destination_key)) if clean: - self.audit.put(StorageDataAccessEntry(source_wrapper.bucket, source_key, DataAccessType.DELETE)) + self.events.put(DataAccessEvent(source_key, DataAccessType.DELETE, storage=source_wrapper.bucket)) blob.delete() def _get_transfer_config(self, io_threads=None): @@ -877,7 +877,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path progress_callback = None transfer_config = self._get_transfer_config(size, io_threads) destination_tags = StorageOperations.generate_tags(tags, source_key) - self.audit.put(StorageDataAccessEntry(destination_wrapper.bucket, destination_key, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) if size > transfer_config.multipart_threshold: upload_client = GsCompositeUploadClient(destination_wrapper.bucket.path, destination_key, destination_tags, @@ -959,7 +959,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path source_key = self.get_source_key(source_wrapper, path) destination_key = self.get_destination_key(destination_wrapper, relative_path) - self.audit.put(StorageDataAccessEntry(destination_wrapper.bucket, destination_key, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) if StorageOperations.show_progress(quiet, size, lock): progress_callback = ProgressPercentage(relative_path, size) else: @@ -1068,24 +1068,24 @@ def __init__(self, bucket, read, write, refresh_credentials, versioning=False): class GsBucketOperations: @classmethod - def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, events, command): client = GsBucketOperations.get_client(destination_wrapper.bucket, read=True, write=True) - return TransferBetweenGsBucketsManager(client, audit) + return TransferBetweenGsBucketsManager(client, events) @classmethod - def get_download_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_download_manager(cls, source_wrapper, destination_wrapper, events, command): client = GsBucketOperations.get_client(source_wrapper.bucket, read=True, write=command == 'mv') - return GsDownloadManager(client, audit) + return GsDownloadManager(client, events) @classmethod - def get_upload_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_upload_manager(cls, source_wrapper, destination_wrapper, events, command): client = GsBucketOperations.get_client(destination_wrapper.bucket, read=True, write=True) - return GsUploadManager(client, audit) + return GsUploadManager(client, events) @classmethod - def get_transfer_from_http_or_ftp_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_transfer_from_http_or_ftp_manager(cls, source_wrapper, destination_wrapper, events, command): client = GsBucketOperations.get_client(destination_wrapper.bucket, read=True, write=True) - return TransferFromHttpOrFtpToGsManager(client, audit) + return TransferFromHttpOrFtpToGsManager(client, events) @classmethod def get_client(cls, *args, **kwargs): diff --git a/pipe-cli/src/utilities/storage/s3.py b/pipe-cli/src/utilities/storage/s3.py index 44c043f3e1..788a34eabd 100644 --- a/pipe-cli/src/utilities/storage/s3.py +++ b/pipe-cli/src/utilities/storage/s3.py @@ -16,7 +16,7 @@ from botocore.endpoint import BotocoreHTTPSession, MAX_POOL_CONNECTIONS from src.model.datastorage_usage_model import StorageUsage -from src.utilities.audit import StorageDataAccessEntry, DataAccessType +from src.utilities.audit import DataAccessEvent, DataAccessType from src.utilities.datastorage_lifecycle_manager import DataStorageLifecycleManager from src.utilities.encoding_utilities import to_string from src.utilities.storage.s3_proxy_utils import AwsProxyConnectWithHeadersHTTPSAdapter @@ -97,9 +97,9 @@ def _copy_object_task_main(self, client, copy_source, bucket, key, extra_args, c class StorageItemManager(object): - def __init__(self, session, audit=None, bucket=None, region_name=None, cross_region=False): + def __init__(self, session, events=None, bucket=None, region_name=None, cross_region=False): self.session = session - self.audit = audit + self.events = events self.region_name = region_name _boto_config = S3BucketOperations.get_proxy_config(cross_region=cross_region) self.s3 = session.resource('s3', config=_boto_config, @@ -168,8 +168,8 @@ def get_transfer_config(self, io_threads): class DownloadManager(StorageItemManager, AbstractTransferManager): - def __init__(self, session, bucket, audit, region_name=None): - super(DownloadManager, self).__init__(session, audit=audit, bucket=bucket, region_name=region_name) + def __init__(self, session, bucket, events, region_name=None): + super(DownloadManager, self).__init__(session, events=events, bucket=bucket, region_name=region_name) def get_destination_key(self, destination_wrapper, relative_path): if destination_wrapper.path.endswith(os.path.sep): @@ -194,19 +194,19 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, progress_callback = ProgressPercentage(relative_path, size) else: progress_callback = None - self.audit.put(StorageDataAccessEntry(source_wrapper.bucket, source_key, DataAccessType.READ)) + self.events.put(DataAccessEvent(source_key, DataAccessType.READ, storage=source_wrapper.bucket)) self.bucket.download_file(source_key, to_string(destination_key), Callback=progress_callback, Config=transfer_config) if clean: - self.audit.put(StorageDataAccessEntry(source_wrapper.bucket, source_key, DataAccessType.DELETE)) + self.events.put(DataAccessEvent(source_key, DataAccessType.DELETE, storage=source_wrapper.bucket)) source_wrapper.delete_item(source_key) class UploadManager(StorageItemManager, AbstractTransferManager): - def __init__(self, session, bucket, audit, region_name=None): - super(UploadManager, self).__init__(session, audit=audit, bucket=bucket, region_name=region_name) + def __init__(self, session, bucket, events, region_name=None): + super(UploadManager, self).__init__(session, events=events, bucket=bucket, region_name=region_name) def get_destination_key(self, destination_wrapper, relative_path): return S3BucketOperations.normalize_s3_path(destination_wrapper, relative_path) @@ -236,7 +236,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path progress_callback = ProgressPercentage(relative_path, size) else: progress_callback = None - self.audit.put(StorageDataAccessEntry(destination_wrapper.bucket, destination_key, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) self.bucket.upload_file(to_string(source_key), destination_key, Callback=progress_callback, Config=transfer_config, @@ -250,8 +250,8 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path class TransferFromHttpOrFtpToS3Manager(StorageItemManager, AbstractTransferManager): - def __init__(self, session, bucket, audit, region_name=None): - super(TransferFromHttpOrFtpToS3Manager, self).__init__(session, audit=audit, bucket=bucket, + def __init__(self, session, bucket, events, region_name=None): + super(TransferFromHttpOrFtpToS3Manager, self).__init__(session, events=events, bucket=bucket, region_name=region_name) def get_destination_key(self, destination_wrapper, relative_path): @@ -281,7 +281,7 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path } TransferManager.ALLOWED_UPLOAD_ARGS.append('Tagging') file_stream = urlopen(source_key) - self.audit.put(StorageDataAccessEntry(destination_wrapper.bucket, destination_key, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) if StorageItemManager.show_progress(quiet, size, lock): self.bucket.upload_fileobj(file_stream, destination_key, Callback=ProgressPercentage(relative_path, size), ExtraArgs=extra_args) @@ -294,9 +294,9 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path class TransferBetweenBucketsManager(StorageItemManager, AbstractTransferManager): - def __init__(self, session, bucket, audit, region_name=None, cross_region=False): + def __init__(self, session, bucket, events, region_name=None, cross_region=False): self.cross_region = cross_region - super(TransferBetweenBucketsManager, self).__init__(session, audit=audit, bucket=bucket, + super(TransferBetweenBucketsManager, self).__init__(session, events=events, bucket=bucket, region_name=region_name, cross_region=cross_region) def get_destination_key(self, destination_wrapper, relative_path): @@ -323,15 +323,15 @@ def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path extra_args = { 'ACL': 'bucket-owner-full-control' } - self.audit.put_all([StorageDataAccessEntry(source_wrapper.bucket, path, DataAccessType.READ), - StorageDataAccessEntry(destination_wrapper.bucket, destination_key, DataAccessType.WRITE)]) + self.events.put_all([DataAccessEvent(path, DataAccessType.READ, storage=source_wrapper.bucket), + DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)]) if StorageItemManager.show_progress(quiet, size, lock): self.bucket.copy(copy_source, destination_key, Callback=ProgressPercentage(relative_path, size), ExtraArgs=extra_args, SourceClient=source_client) else: self.bucket.copy(copy_source, destination_key, ExtraArgs=extra_args, SourceClient=source_client) if clean: - self.audit.put(StorageDataAccessEntry(source_wrapper.bucket, path, DataAccessType.DELETE)) + self.events.put(DataAccessEvent(path, DataAccessType.DELETE, storage=source_wrapper.bucket)) source_wrapper.delete_item(path) version = self.get_uploaded_s3_file_version(destination_wrapper.bucket.path, destination_key) return TransferResult(source_key=path, destination_key=destination_key, destination_version=version, @@ -366,8 +366,8 @@ def convert_object_tags(cls, object_tags): class RestoreManager(StorageItemManager, AbstractRestoreManager): VERSION_NOT_EXISTS_ERROR = 'Version "%s" doesn\'t exist.' - def __init__(self, bucket, session, audit, region_name=None): - super(RestoreManager, self).__init__(session, audit=audit, region_name=region_name) + def __init__(self, bucket, session, events, region_name=None): + super(RestoreManager, self).__init__(session, events=events, region_name=region_name) self.bucket = bucket self.listing_manager = bucket.get_list_manager(True) @@ -387,7 +387,7 @@ def restore_file_version(self, version, bucket, client, file_items): relative_path = self.bucket.path self._validate_version(bucket, client, version, file_items) try: - self.audit.put(StorageDataAccessEntry(self.bucket.bucket, relative_path, DataAccessType.WRITE)) + self.events.put(DataAccessEvent(relative_path, DataAccessType.WRITE, storage=self.bucket.bucket)) copied_object = client.copy_object(Bucket=bucket, Key=relative_path, CopySource=dict(Bucket=bucket, Key=relative_path, VersionId=version)) client.delete_objects(Bucket=bucket, Delete=dict(Objects=[dict(Key=relative_path, VersionId=version)])) @@ -507,8 +507,8 @@ def restore_folder(self, bucket, client, exclude, include, recursive): self._restore_objects(client, bucket, restore_items) def _restore_objects(self, client, bucket, items): - self.audit.put_all([StorageDataAccessEntry(self.bucket.bucket, item['Key'], DataAccessType.WRITE) - for item in items]) + self.events.put_all([DataAccessEvent(item['Key'], DataAccessType.WRITE, storage=self.bucket.bucket) + for item in items]) client.delete_objects(Bucket=bucket, Delete=dict(Objects=items)) def _validate_version(self, bucket, client, version, file_items): @@ -524,8 +524,8 @@ def _validate_version(self, bucket, client, version, file_items): class DeleteManager(StorageItemManager, AbstractDeleteManager): - def __init__(self, bucket, session, audit, region_name=None): - super(DeleteManager, self).__init__(session, audit=audit, region_name=region_name) + def __init__(self, bucket, session, events, region_name=None): + super(DeleteManager, self).__init__(session, events=events, region_name=region_name) self.bucket = bucket def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False): @@ -540,7 +540,7 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v delete_items.append(dict(Key=prefix, VersionId=version)) else: delete_items.append(dict(Key=prefix)) - self.audit.put(StorageDataAccessEntry(self.bucket.bucket, prefix, DataAccessType.DELETE)) + self.events.put(DataAccessEvent(prefix, DataAccessType.DELETE, storage=self.bucket.bucket)) client.delete_objects(Bucket=bucket, Delete=dict(Objects=delete_items)) if self.bucket.bucket.policy.versioning_enabled: if version: @@ -587,8 +587,8 @@ def delete_items(self, relative_path, recursive=False, exclude=[], include=[], v def _delete_objects(self, client, bucket, hard_delete, items): if not self.bucket.bucket.policy.versioning_enabled or hard_delete: self._delete_all_object_tags(items) - self.audit.put_all([StorageDataAccessEntry(self.bucket.bucket, item['Key'], DataAccessType.DELETE) - for item in items]) + self.events.put_all([DataAccessEvent(item['Key'], DataAccessType.DELETE, storage=self.bucket.bucket) + for item in items]) client.delete_objects(Bucket=bucket, Delete=dict(Objects=items)) def _delete_all_object_tags(self, items, chunk_size=100): @@ -1016,14 +1016,14 @@ def get_list_manager(cls, source_wrapper, show_versions=False): region_name=source_wrapper.bucket.region) @classmethod - def get_delete_manager(cls, source_wrapper, audit, versioning=False): + def get_delete_manager(cls, source_wrapper, events, versioning=False): session = cls.assumed_session(source_wrapper.bucket.identifier, None, 'mv', versioning=versioning) - return DeleteManager(source_wrapper, session, audit, source_wrapper.bucket.region) + return DeleteManager(source_wrapper, session, events, source_wrapper.bucket.region) @classmethod - def get_restore_manager(cls, source_wrapper, audit): + def get_restore_manager(cls, source_wrapper, events): session = cls.assumed_session(source_wrapper.bucket.identifier, None, 'mv', versioning=True) - return RestoreManager(source_wrapper, session, audit, source_wrapper.bucket.region) + return RestoreManager(source_wrapper, session, events, source_wrapper.bucket.region) @classmethod def delete_item(cls, storage_wrapper, relative_path, session=None): @@ -1067,7 +1067,7 @@ def refresh(): return Session(botocore_session=s) @classmethod - def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, events, command): source_id = source_wrapper.bucket.identifier destination_id = destination_wrapper.bucket.identifier session = cls.assumed_session(source_id, destination_id, command) @@ -1075,31 +1075,31 @@ def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrappe source_wrapper.session = session destination_bucket = destination_wrapper.bucket.path cross_region = destination_wrapper.bucket.region != source_wrapper.bucket.region - return TransferBetweenBucketsManager(session, destination_bucket, audit, + return TransferBetweenBucketsManager(session, destination_bucket, events, destination_wrapper.bucket.region, cross_region) @classmethod - def get_download_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_download_manager(cls, source_wrapper, destination_wrapper, events, command): source_id = source_wrapper.bucket.identifier session = cls.assumed_session(source_id, None, command) # replace session to be able to delete source for move source_wrapper.session = session source_bucket = source_wrapper.bucket.path - return DownloadManager(session, source_bucket, audit, source_wrapper.bucket.region) + return DownloadManager(session, source_bucket, events, source_wrapper.bucket.region) @classmethod - def get_upload_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_upload_manager(cls, source_wrapper, destination_wrapper, events, command): destination_id = destination_wrapper.bucket.identifier session = cls.assumed_session(None, destination_id, command) destination_bucket = destination_wrapper.bucket.path - return UploadManager(session, destination_bucket, audit, destination_wrapper.bucket.region) + return UploadManager(session, destination_bucket, events, destination_wrapper.bucket.region) @classmethod - def get_transfer_from_http_or_ftp_manager(cls, source_wrapper, destination_wrapper, audit, command): + def get_transfer_from_http_or_ftp_manager(cls, source_wrapper, destination_wrapper, events, command): destination_id = destination_wrapper.bucket.identifier session = cls.assumed_session(None, destination_id, command) destination_bucket = destination_wrapper.bucket.path - return TransferFromHttpOrFtpToS3Manager(session, destination_bucket, audit, destination_wrapper.bucket.region) + return TransferFromHttpOrFtpToS3Manager(session, destination_bucket, events, destination_wrapper.bucket.region) @classmethod def get_full_path(cls, path, param):