Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use storage id in data access audit in pipe #3195

Merged
merged 2 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add storage id to audit logs in pipe
  • Loading branch information
tcibinan committed Apr 6, 2023
commit fd3776accf09b0ddf2bb4cb77e47c767141533e2
16 changes: 8 additions & 8 deletions pipe-cli/mount/pipefuse/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from pipefuse.fsclient import FileSystemClientDecorator
from pipefuse.storage import StorageLowLevelFileSystemClient
from src.common.audit import DataAccessEntry, DataAccessType
from src.common.audit import DataAccessEvent, DataAccessType


class AuditFileSystemClient(FileSystemClientDecorator, StorageLowLevelFileSystemClient):
Expand All @@ -26,23 +26,23 @@ def __init__(self, inner, container):
self._fhs = set()

def upload(self, buf, path):
self._container.put(DataAccessEntry(path, DataAccessType.WRITE))
self._container.put(DataAccessEvent(path, DataAccessType.WRITE))
self._inner.upload(buf, path)

def delete(self, path):
self._container.put(DataAccessEntry(path, DataAccessType.DELETE))
self._container.put(DataAccessEvent(path, DataAccessType.DELETE))
self._inner.delete(path)

def mv(self, old_path, path):
self._container.put_all([DataAccessEntry(old_path, DataAccessType.READ),
DataAccessEntry(old_path, DataAccessType.DELETE),
DataAccessEntry(path, DataAccessType.WRITE)])
self._container.put_all([DataAccessEvent(old_path, DataAccessType.READ),
DataAccessEvent(old_path, DataAccessType.DELETE),
DataAccessEvent(path, DataAccessType.WRITE)])
self._inner.mv(old_path, path)

def download_range(self, fh, buf, path, offset=0, length=0):
if fh not in self._fhs:
self._fhs.add(fh)
self._container.put(DataAccessEntry(path, DataAccessType.READ))
self._container.put(DataAccessEvent(path, DataAccessType.READ))
self._inner.download_range(fh, buf, path, offset, length)

def flush(self, fh, path):
Expand All @@ -53,5 +53,5 @@ def flush(self, fh, path):
self._inner.flush(fh, path)

def new_mpu(self, path, file_size, download, mv):
self._container.put(DataAccessEntry(path, DataAccessType.WRITE))
self._container.put(DataAccessEvent(path, DataAccessType.WRITE))
return self._inner.new_mpu(path, file_size, download, mv)
106 changes: 66 additions & 40 deletions pipe-cli/src/common/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,37 @@


class DataAccessType:
READ = 'R'
WRITE = 'W'
DELETE = 'D'
READ = 'READ'
WRITE = 'WRITE'
DELETE = 'DELETE'

DataAccessEntry = collections.namedtuple('DataAccessEntry', 'path,type')
StorageDataAccessEntry = collections.namedtuple('StorageDataAccessEntry', 'storage,path,type')

class DataAccessEvent:

def __init__(self, path, type, storage=None):
self._path = path
self._type = type
self._storage = storage

@property
def path(self):
return self._path

@property
def type(self):
return self._type

@property
def storage(self):
return self._storage

def __hash__(self):
return hash((self._path, self._type, self._storage))

def __eq__(self, other):
return isinstance(other, DataAccessEvent) and \
(self._path, self._type, self._storage) \
== (other._path, other._type, other._storage)


def chunks(l, n):
Expand Down Expand Up @@ -221,12 +246,11 @@ def consume(self, entries):
self._inner.consume([self._convert(entry) for entry in entries])

def _convert(self, entry):
if self._storage:
return DataAccessEntry(self._build_path(self._storage.type, self._storage.root, entry.path), entry.type)
elif isinstance(entry, StorageDataAccessEntry):
return DataAccessEntry(self._build_path(entry.storage.type, entry.storage.root, entry.path), entry.type)
else:
storage = entry.storage or self._storage
if not storage:
return entry
return DataAccessEvent(self._build_path(storage.type, storage.root, entry.path), entry.type,
storage=storage)

def _build_path(self, storage_type, storage_path, item_path):
return '{}://{}/{}'.format(storage_type.lower(), storage_path, item_path)
Expand All @@ -244,49 +268,51 @@ def __init__(self, consumer_func, user_name, service_name):
self._log_hostname = socket.gethostname()
self._log_type = 'audit'
self._log_severity = 'INFO'
self._type_mapping = {
DataAccessType.READ: 'READ',
DataAccessType.WRITE: 'WRITE',
DataAccessType.DELETE: 'DELETE'
}

def consume(self, entries):
now = datetime.datetime.now(tz=pytz.utc)
now_str = now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
events_ids_gen = self._gen_event_ids()
try:
self._consumer_func([{
'eventId': next(events_ids_gen),
'messageTimestamp': now_str,
'hostname': self._log_hostname,
'serviceName': self._log_service,
'type': self._log_type,
'user': self._log_user,
'message': self._convert_type(entry.type) + ' ' + entry.path,
'severity': self._log_severity
} for entry in entries])
finally:
events_ids_gen.close()
with self._ids() as event_ids:
self._consumer_func(list(self._to_logs(entries, event_ids)))

def _gen_event_ids(self):
def _ids(self):
"""
Generates unique ids from epoch nanoseconds.
"""
prev_event_ids = set()
prev_values = set()
while True:
next_event_id = self._time_ns()
next_value = self._time_ns()
while True:
if next_event_id not in prev_event_ids:
if next_value not in prev_values:
break
next_event_id += 1000
prev_event_ids.add(next_event_id)
yield next_event_id
next_value += 1000
prev_values.add(next_value)
yield next_value

def _time_ns(self):
return int(time.time() * 10 ** 9)

def _convert_type(self, type):
return self._type_mapping.get(type, 'ACCESS')
def _to_logs(self, entries, event_ids):
now = datetime.datetime.now(tz=pytz.utc)
now_str = now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
for entry in entries:
yield {
'eventId': next(event_ids),
'messageTimestamp': now_str,
'hostname': self._log_hostname,
'serviceName': self._log_service,
'type': self._log_type,
'user': self._log_user,
'message': entry.type + ' ' + entry.path,
'severity': self._log_severity,
'storageId': self._get_storage_id(entry.storage)
}

def _get_storage_id(self, storage):
if not storage:
return None
if hasattr(storage, 'id'):
return storage.id
elif hasattr(storage, 'identifier'):
return storage.identifier

def flush(self):
pass
Expand Down
30 changes: 15 additions & 15 deletions pipe-cli/src/model/data_storage_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ def __get_storage_wrapper(cls, bucket, relative_path, *args, **kwargs):
raise RuntimeError('There is no data storage wrapper for %s storage type.' % bucket.type)

@classmethod
def get_operation_manager(cls, source_wrapper, destination_wrapper, audit, command):
def get_operation_manager(cls, source_wrapper, destination_wrapper, events, command):
manager_types = source_wrapper.get_type(), destination_wrapper.get_type()
if manager_types in DataStorageWrapper._transfer_manager_suppliers:
supplier = DataStorageWrapper._transfer_manager_suppliers[manager_types]
return supplier(source_wrapper, destination_wrapper, audit, command)
return supplier(source_wrapper, destination_wrapper, events, command)
else:
raise RuntimeError('Transferring files between the following storage types %s -> %s is not supported.'
% manager_types)
Expand Down Expand Up @@ -253,15 +253,15 @@ def get_type(self):
pass

@abstractmethod
def get_restore_manager(self, audit):
def get_restore_manager(self, events):
pass

@abstractmethod
def get_list_manager(self, show_versions=False):
pass

@abstractmethod
def get_delete_manager(self, audit, versioning):
def get_delete_manager(self, events, versioning):
pass


Expand Down Expand Up @@ -302,14 +302,14 @@ def get_file_download_uri(self, relative_path):
def delete_item(self, relative_path):
S3BucketOperations.delete_item(self, relative_path, session=self.session)

def get_restore_manager(self, audit):
return S3BucketOperations.get_restore_manager(self, audit)
def get_restore_manager(self, events):
return S3BucketOperations.get_restore_manager(self, events)

def get_list_manager(self, show_versions=False):
return S3BucketOperations.get_list_manager(self, show_versions=show_versions)

def get_delete_manager(self, audit, versioning):
return S3BucketOperations.get_delete_manager(self, audit, versioning)
def get_delete_manager(self, events, versioning):
return S3BucketOperations.get_delete_manager(self, events, versioning)


class AzureBucketWrapper(CloudDataStorageWrapper):
Expand All @@ -330,18 +330,18 @@ def build_wrapper(cls, root_bucket, relative_path, versioning=False, init=True):
def get_type(self):
return WrapperType.AZURE

def get_restore_manager(self, audit):
def get_restore_manager(self, events):
raise RuntimeError('Versioning is not supported by AZURE cloud provider')

def get_list_manager(self, show_versions=False):
if show_versions:
raise RuntimeError('Versioning is not supported by AZURE cloud provider')
return AzureListingManager(self._blob_service(read=True, write=False), self.bucket)

def get_delete_manager(self, audit, versioning):
def get_delete_manager(self, events, versioning):
if versioning:
raise RuntimeError('Versioning is not supported by AZURE cloud provider')
return AzureDeleteManager(self._blob_service(read=True, write=True), audit, self.bucket)
return AzureDeleteManager(self._blob_service(read=True, write=True), events, self.bucket)

def _blob_service(self, read, write):
if write or not self.service:
Expand All @@ -361,14 +361,14 @@ def build_wrapper(cls, root_bucket, relative_path, init=True, *args, **kwargs):
def get_type(self):
return WrapperType.GS

def get_restore_manager(self, audit):
return GsRestoreManager(self._storage_client(write=True, versioning=True), audit, self)
def get_restore_manager(self, events):
return GsRestoreManager(self._storage_client(write=True, versioning=True), events, self)

def get_list_manager(self, show_versions=False):
return GsListingManager(self._storage_client(versioning=show_versions), self.bucket, show_versions)

def get_delete_manager(self, audit, versioning):
return GsDeleteManager(self._storage_client(write=True, versioning=versioning), audit, self.bucket)
def get_delete_manager(self, events, versioning):
return GsDeleteManager(self._storage_client(write=True, versioning=versioning), events, self.bucket)

def _storage_client(self, read=True, write=False, versioning=False):
return GsBucketOperations.get_client(self.bucket, read=read, write=write, versioning=versioning)
Expand Down
5 changes: 2 additions & 3 deletions pipe-cli/src/utilities/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
from src.api.log import SystemLog
from src.common.audit import CloudPipelineAuditConsumer, LoggingAuditConsumer, StoragePathAuditConsumer, \
ChunkingAuditConsumer, BufferingAuditConsumer, QueueAuditContainer, AuditDaemon, AuditContextManager, \
DataAccessEntry, StorageDataAccessEntry, DataAccessType
DataAccessEvent, DataAccessType
from src.config import Config

DataAccessEntry = DataAccessEntry
StorageDataAccessEntry = StorageDataAccessEntry
DataAccessEvent = DataAccessEvent
DataAccessType = DataAccessType


Expand Down
6 changes: 3 additions & 3 deletions pipe-cli/src/utilities/datastorage_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags

audit_ctx = auditing()
manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper,
audit=audit_ctx.container, command=command)
events=audit_ctx.container, command=command)
items = files_to_copy if file_list else source_wrapper.get_items(quiet=quiet)
items = cls._filter_items(items, manager, source_wrapper, destination_wrapper, permission_to_check,
include, exclude, force, quiet, skip_existing, verify_destination,
Expand Down Expand Up @@ -234,7 +234,7 @@ def storage_remove_item(cls, path, yes, version, hard_delete, recursive, exclude
click.echo('Removing {} ...'.format(path), nl=False)

with auditing() as audit:
manager = source_wrapper.get_delete_manager(audit=audit, versioning=version or hard_delete)
manager = source_wrapper.get_delete_manager(events=audit, versioning=version or hard_delete)
manager.delete_items(source_wrapper.path,
version=version, hard_delete=hard_delete,
exclude=exclude, include=include,
Expand Down Expand Up @@ -313,7 +313,7 @@ def restore(cls, path, version, recursive, exclude, include):
click.echo('Flag --recursive (-r) is required to restore folders.', err=True)
sys.exit(1)
with auditing() as audit:
manager = source_wrapper.get_restore_manager(audit=audit)
manager = source_wrapper.get_restore_manager(events=audit)
manager.restore_version(version, exclude, include, recursive=recursive)

@classmethod
Expand Down
Loading