Skip to content

Commit

Permalink
Issue #1628: pipe shall allow to export the node utilization report
Browse files Browse the repository at this point in the history
  • Loading branch information
ekazachkova committed Dec 4, 2020
1 parent 15d6041 commit c448a8a
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 6 deletions.
31 changes: 31 additions & 0 deletions pipe-cli/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from src.config import Config, ConfigNotFoundError, silent_print_config_info, is_frozen
from src.model.pipeline_run_filter_model import DEFAULT_PAGE_SIZE, DEFAULT_PAGE_INDEX
from src.model.pipeline_run_model import PriceType
from src.utilities.cluster_monitoring_manager import ClusterMonitoringManager
from src.utilities.du_format_type import DuFormatType
from src.utilities.pipeline_run_share_manager import PipelineRunShareManager
from src.utilities.tool_operations import ToolOperations
Expand Down Expand Up @@ -1533,6 +1534,36 @@ def remove_share_run(run_id, shared_user, shared_group, share_ssh):
PipelineRunShareManager().remove(run_id, shared_user, shared_group, share_ssh)


@cli.group()
def cluster():
""" Cluster commands
"""
pass


@cluster.command(name='monitor')
@click.option('-i', '--instance-id', required=False, help='The cloud instance ID. This option cannot be used '
'in conjunction with the --run_id option')
@click.option('-r', '--run-id', required=False, help='The pipeline run ID. This option cannot be used '
'in conjunction with the --instance-id option')
@click.option('-o', '--output', help='The output file for monitoring report. If not specified the report file will '
'be generated in the current folder.')
@click.option('-df', '--date-from', required=False, help='The start date for monitoring data collection. '
'If not specified the current date and time will be used.')
@click.option('-dt', '--date-to', required=False, help='The end date for monitoring data collection. If not '
'specified a --date-from option value minus 1 day will be used.')
@click.option('-p', '--interval', required=False, help='The time interval. This option shall have the following format:'
' <N>m for minutes or <N>h for hours, where <N> is the required '
'number of minutes/hours. Default: 1m.')
@click.option('-u', '--user', required=False, callback=set_user_token, expose_value=False, help=USER_OPTION_DESCRIPTION)
@Config.validate_access_token
def monitor(instance_id, run_id, output, date_from, date_to, interval):
"""
Downloads node utilization report
"""
ClusterMonitoringManager().generate_report(instance_id, run_id, output, date_from, date_to, interval)


# Used to run a PyInstaller "freezed" version
if getattr(sys, 'frozen', False):
cli(sys.argv[1:])
27 changes: 27 additions & 0 deletions pipe-cli/src/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# limitations under the License.

import json
import sys

import click
import requests
import urllib3

Expand Down Expand Up @@ -69,6 +72,30 @@ def call(self, method, data, http_method=None, error_message=None):
else:
return response_data

def download(self, url_path, file_path):
url = '{}/{}'.format(self.__config__.api.strip('/'), url_path)
headers = {
'Accept': 'application/octet-stream',
'Authorization': 'Bearer {}'.format(self.__config__.get_token())
}
response = requests.get(url, headers=headers, verify=False, proxies=self.__proxies__)
self._validate_octet_stream_response(response)
self._write_response_to_file(file_path, response)

@classmethod
def _write_response_to_file(cls, file_path, response):
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=16 * 1024):
if chunk:
f.write(chunk)

@classmethod
def _validate_octet_stream_response(cls, response):
if response.status_code == 200:
return
click.echo('Server responded with status: {}. {}'.format(str(response.status_code), response.text), err=True)
sys.exit(1)

@classmethod
def instance(cls):
return cls()
9 changes: 8 additions & 1 deletion pipe-cli/src/api/cluster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017-2019 EPAM Systems, Inc. (https://www.epam.com/)
# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,3 +56,10 @@ def list_instance_types(cls):
for instance_type_json in response_data['payload']:
result.append(ClusterInstanceTypeModel.load(instance_type_json))
return result

@classmethod
def download_usage_report(cls, instance_id, date_from, date_to, interval, file_path):
api = cls.instance()
url_path = 'cluster/node/%s/usage/report?interval=%s&from=%s&to=%s' \
% (instance_id, interval, date_from, date_to)
api.download(url_path, file_path)
89 changes: 89 additions & 0 deletions pipe-cli/src/utilities/cluster_monitoring_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys

import click
import os
from src.api.cluster import Cluster
from src.utilities import date_utilities

from src.api.pipeline_run import PipelineRun


OUTPUT_FILE_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'


class ClusterMonitoringManager:

def __init__(self):
pass

@classmethod
def generate_report(cls, instance_id, run_id, output, raw_from_date, raw_to_date, interval):
if not instance_id and not run_id:
click.echo("One of '--instance-id' or '--run-id' options shall be specified", err=True)
sys.exit(1)
if instance_id and run_id:
click.echo("Both options '--instance-id' and '--run-id' cannot be set together", err=True)
sys.exit(1)
if not instance_id:
run = PipelineRun.get(run_id)
if not run:
click.echo("Pipeline run '%d' cannot be found" % run_id, err=True)
sys.exit(1)
instance_id = dict(run.instance)['nodeName']
if not instance_id:
click.echo("Instance ID cannot be found for run '%d'" % run_id, err=True)
sys.exit(1)
if not interval:
interval = '1m'
date_to = date_utilities.now() if not raw_to_date else date_utilities.format_date(raw_to_date)
date_from = date_utilities.minus_day(date_to) if not raw_from_date \
else date_utilities.format_date(raw_from_date)
output_path = cls._build_output_path(output, run_id, instance_id, date_from, date_to, interval)
Cluster.download_usage_report(instance_id, date_from, date_to,
cls._convert_to_duration_format(interval), output_path)
click.echo("Usage report downloaded to '%s'" % output_path)

@staticmethod
def _build_output_path(output, run_id, instance_id, date_from, date_to, interval):
if not output:
output_name = ClusterMonitoringManager._build_output_file_name(run_id, instance_id, date_from, date_to,
interval)
return os.path.abspath(output_name)
output_path = os.path.abspath(output)
if output.endswith(os.path.sep) and not os.path.exists(output_path):
os.makedirs(output_path)
if output.endswith(os.path.sep) or os.path.isdir(output_path):
output_name = ClusterMonitoringManager._build_output_file_name(run_id, instance_id, date_from, date_to,
interval)
return os.path.join(output_path, output_name)
output_name = os.path.basename(output_path)
folder_path = output_path.rstrip(output_name)
if not os.path.exists(folder_path):
os.makedirs(folder_path)
return output_path

@staticmethod
def _build_output_file_name(run_id, instance_id, date_from, date_to, interval):
instance_indicator = str(run_id or instance_id)
return "cluster_monitor_%s_%s_%s_%s.csv" % (
instance_indicator,
date_utilities.format_date(date_from, OUTPUT_FILE_DATE_FORMAT),
date_utilities.format_date(date_to, OUTPUT_FILE_DATE_FORMAT),
interval)

@staticmethod
def _convert_to_duration_format(raw_interval):
return "PT{}".format(raw_interval).upper()
37 changes: 32 additions & 5 deletions pipe-cli/src/utilities/date_utilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017-2019 EPAM Systems, Inc. (https://www.epam.com/)
# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,16 @@
from ..config import Config


DATE_TIME_PARAMETER_FORMAT = '%Y-%m-%d %H:%M:%S'
DATE_TIME_SERVER_FORMAT = '%Y-%m-%d %H:%M:%S.%f'


def server_date_representation(date_string):
if not date_string:
return None
date = None
try:
date = datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S.%f')
date = datetime.datetime.strptime(date_string, DATE_TIME_SERVER_FORMAT)
except ValueError:
pass
if date is None:
Expand All @@ -32,18 +36,28 @@ def server_date_representation(date_string):
pass
if date is not None:
date_with_time_zone = pytz.utc.localize(date, is_dst=None)
return date_with_time_zone.astimezone(Config.instance().timezone()).strftime('%Y-%m-%d %H:%M:%S')
return date_with_time_zone.astimezone(Config.instance().timezone()).strftime(DATE_TIME_PARAMETER_FORMAT)
return None


def parse_date_parameter(date_string):
# accepts string with date in local timezone and converts it to another string of UTC date
date = _parse_date(date_string)
return _to_string_with_timezone(date)


def format_date(date_string, date_format=DATE_TIME_PARAMETER_FORMAT):
date = _parse_date(date_string)
return _to_string_with_timezone(date, date_format)


def _parse_date(date_string):
if not date_string:
return None
date = None
error = '"{}" does not match format "yyyy-MM-dd HH:mm:ss" or "yyyy-MM-dd"'.format(date_string)
try:
date = datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S')
date = datetime.datetime.strptime(date_string, DATE_TIME_PARAMETER_FORMAT)
error = None
except ValueError:
try:
Expand All @@ -53,5 +67,18 @@ def parse_date_parameter(date_string):
pass
if error:
raise RuntimeError(error)
return date


def _to_string_with_timezone(date, date_format=DATE_TIME_SERVER_FORMAT):
date_with_time_zone = Config.instance().timezone().localize(date, is_dst=None)
return date_with_time_zone.astimezone(pytz.utc).strftime('%Y-%m-%d %H:%M:%S.%f')
return date_with_time_zone.astimezone(pytz.utc).strftime(date_format)


def now():
return _to_string_with_timezone(datetime.datetime.now(), DATE_TIME_PARAMETER_FORMAT)


def minus_day(date_string):
date = datetime.datetime.strptime(date_string, DATE_TIME_PARAMETER_FORMAT)
return _to_string_with_timezone(date - datetime.timedelta(days=1), DATE_TIME_PARAMETER_FORMAT)

0 comments on commit c448a8a

Please sign in to comment.