Skip to content

Commit

Permalink
mgr/diskprediction: add prototype diskprediction module
Browse files Browse the repository at this point in the history
This module is written by Rick Chen <rick.chen@prophetstor.com> and
provides both a built-in local predictor and a cloud mode that queries
a cloud service (provided by ProphetStor) to predict device failures.

Signed-off-by: Rick Chen <rick.chen@prophetstor.com>
Signed-off-by: Sage Weil <sage@redhat.com>
  • Loading branch information
Rick Chen authored and liewegas committed Sep 17, 2018
1 parent f557a78 commit 4abb79f
Show file tree
Hide file tree
Showing 108 changed files with 6,378 additions and 0 deletions.
5 changes: 5 additions & 0 deletions COPYING
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,8 @@ Files: src/include/timegm.h
Copyright (C) Copyright Howard Hinnant
Copyright (C) Copyright 2010-2011 Vicente J. Botet Escriba
License: Boost Software License, Version 1.0

Files: src/pybind/mgr/diskprediction/predictor/models/*
Copyright: None
License: Public domain

345 changes: 345 additions & 0 deletions doc/mgr/diskprediction.rst

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions doc/mgr/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ sensible.
Writing plugins <plugins>
Writing orchestrator plugins <orchestrator_modules>
Dashboard plugin <dashboard>
DiskPrediction plugin <diskprediction>
Local pool plugin <localpool>
RESTful plugin <restful>
Zabbix plugin <zabbix>
Expand Down
3 changes: 3 additions & 0 deletions qa/tasks/mgr/test_module_selftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def test_prometheus(self):
def test_influx(self):
self._selftest_plugin("influx")

def test_diskprediction(self):
self._selftest_plugin("diskprediction")

def test_telegraf(self):
self._selftest_plugin("telegraf")

Expand Down
2 changes: 2 additions & 0 deletions src/pybind/mgr/diskprediction/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from __future__ import absolute_import
from .module import Module
38 changes: 38 additions & 0 deletions src/pybind/mgr/diskprediction/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import absolute_import

from ..common import timeout, TimeoutError


class BaseAgent(object):

measurement = ''

def __init__(self, mgr_module, obj_sender, timeout=30):
self.data = []
self._client = None
self._client = obj_sender
self._logger = mgr_module.log
self._module_inst = mgr_module
self._timeout = timeout

def run(self):
try:
self._collect_data()
self._run()
except TimeoutError:
self._logger.error('{} failed to execute {} task'.format(
__name__, self.measurement))

def __nonzero__(self):
if not self._module_inst and not self._client:
return False
else:
return True

@timeout()
def _run(self):
pass

@timeout()
def _collect_data(self):
pass
61 changes: 61 additions & 0 deletions src/pybind/mgr/diskprediction/agent/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import absolute_import

from .. import BaseAgent
from ...common import DP_MGR_STAT_FAILED, DP_MGR_STAT_WARNING, DP_MGR_STAT_OK

AGENT_VERSION = '1.0.0'


class MetricsField(object):
def __init__(self):
self.tags = {}
self.fields = {}
self.timestamp = None

def __str__(self):
return str({
'tags': self.tags,
'fields': self.fields,
'timestamp': self.timestamp
})


class MetricsAgent(BaseAgent):

def log_summary(self, status_info):
try:
if status_info:
measurement = status_info['measurement']
success_count = status_info['success_count']
failure_count = status_info['failure_count']
total_count = success_count + failure_count
display_string = \
'%s agent stats in total count: %s, success count: %s, failure count: %s.'
self._logger.info(
display_string % (measurement, total_count, success_count, failure_count)
)
except Exception as e:
self._logger.error(str(e))

def _run(self):
collect_data = self.data
result = {}
if collect_data:
status_info = self._client.send_info(collect_data, self.measurement)
# show summary info
self.log_summary(status_info)
# write sub_agent buffer
total_count = status_info['success_count'] + status_info['failure_count']
if total_count:
if status_info['success_count'] == 0:
self._module_inst.status = \
{'status': DP_MGR_STAT_FAILED,
'reason': 'failed to send metrics data to the server'}
elif status_info['failure_count'] == 0:
self._module_inst.status = \
{'status': DP_MGR_STAT_OK}
else:
self._module_inst.status = \
{'status': DP_MGR_STAT_WARNING,
'reason': 'failed to send partial metrics data to the server'}
return result
146 changes: 146 additions & 0 deletions src/pybind/mgr/diskprediction/agent/metrics/ceph_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from __future__ import absolute_import

import socket

from . import MetricsAgent, MetricsField
from ...common.clusterdata import ClusterAPI


class CephCluster(MetricsField):
""" Ceph cluster structure """
measurement = 'ceph_cluster'

def __init__(self):
super(CephCluster, self).__init__()
self.tags['cluster_id'] = None
self.fields['agenthost'] = None
self.tags['agenthost_domain_id'] = None
self.fields['cluster_health'] = ''
self.fields['num_mon'] = None
self.fields['num_mon_quorum'] = None
self.fields['num_osd'] = None
self.fields['num_osd_up'] = None
self.fields['num_osd_in'] = None
self.fields['osd_epoch'] = None
self.fields['osd_bytes'] = None
self.fields['osd_bytes_used'] = None
self.fields['osd_bytes_avail'] = None
self.fields['num_pool'] = None
self.fields['num_pg'] = None
self.fields['num_pg_active_clean'] = None
self.fields['num_pg_active'] = None
self.fields['num_pg_peering'] = None
self.fields['num_object'] = None
self.fields['num_object_degraded'] = None
self.fields['num_object_misplaced'] = None
self.fields['num_object_unfound'] = None
self.fields['num_bytes'] = None
self.fields['num_mds_up'] = None
self.fields['num_mds_in'] = None
self.fields['num_mds_failed'] = None
self.fields['mds_epoch'] = None


class CephClusterAgent(MetricsAgent):
measurement = 'ceph_cluster'

def _collect_data(self):
# process data and save to 'self.data'
obj_api = ClusterAPI(self._module_inst)
cluster_id = obj_api.get_cluster_id()

c_data = CephCluster()
cluster_state = obj_api.get_health_status()
c_data.tags['cluster_id'] = cluster_id
c_data.fields['cluster_health'] = str(cluster_state)
c_data.fields['agenthost'] = socket.gethostname()
c_data.tags['agenthost_domain_id'] = \
'%s_%s' % (cluster_id, c_data.fields['agenthost'])
c_data.fields['osd_epoch'] = obj_api.get_osd_epoch()
c_data.fields['num_mon'] = len(obj_api.get_mons())
c_data.fields['num_mon_quorum'] = \
len(obj_api.get_mon_status().get('quorum', []))

osds = obj_api.get_osds()
num_osd_up = 0
num_osd_in = 0
for osd_data in osds:
if osd_data.get('up'):
num_osd_up = num_osd_up + 1
if osd_data.get('in'):
num_osd_in = num_osd_in + 1
if osds:
c_data.fields['num_osd'] = len(osds)
else:
c_data.fields['num_osd'] = 0
c_data.fields['num_osd_up'] = num_osd_up
c_data.fields['num_osd_in'] = num_osd_in
c_data.fields['num_pool'] = len(obj_api.get_osd_pools())

df_stats = obj_api.get_df_stats()
total_bytes = df_stats.get('total_bytes', 0)
total_used_bytes = df_stats.get('total_used_bytes', 0)
total_avail_bytes = df_stats.get('total_avail_bytes', 0)
c_data.fields['osd_bytes'] = total_bytes
c_data.fields['osd_bytes_used'] = total_used_bytes
c_data.fields['osd_bytes_avail'] = total_avail_bytes
if total_bytes and total_avail_bytes:
c_data.fields['osd_bytes_used_percentage'] = \
round(float(total_used_bytes) / float(total_bytes) * 100, 4)
else:
c_data.fields['osd_bytes_used_percentage'] = 0.0000

pg_stats = obj_api.get_pg_stats()
num_bytes = 0
num_object = 0
num_object_degraded = 0
num_object_misplaced = 0
num_object_unfound = 0
num_pg_active = 0
num_pg_active_clean = 0
num_pg_peering = 0
for pg_data in pg_stats:
num_pg_active = num_pg_active + len(pg_data.get('acting'))
if 'active+clean' in pg_data.get('state'):
num_pg_active_clean = num_pg_active_clean + 1
if 'peering' in pg_data.get('state'):
num_pg_peering = num_pg_peering + 1

stat_sum = pg_data.get('stat_sum', {})
num_object = num_object + stat_sum.get('num_objects', 0)
num_object_degraded = \
num_object_degraded + stat_sum.get('num_objects_degraded', 0)
num_object_misplaced = \
num_object_misplaced + stat_sum.get('num_objects_misplaced', 0)
num_object_unfound = \
num_object_unfound + stat_sum.get('num_objects_unfound', 0)
num_bytes = num_bytes + stat_sum.get('num_bytes', 0)

c_data.fields['num_pg'] = len(pg_stats)
c_data.fields['num_object'] = num_object
c_data.fields['num_object_degraded'] = num_object_degraded
c_data.fields['num_object_misplaced'] = num_object_misplaced
c_data.fields['num_object_unfound'] = num_object_unfound
c_data.fields['num_bytes'] = num_bytes
c_data.fields['num_pg_active'] = num_pg_active
c_data.fields['num_pg_active_clean'] = num_pg_active_clean
c_data.fields['num_pg_peering'] = num_pg_active_clean

filesystems = obj_api.get_file_systems()
num_mds_in = 0
num_mds_up = 0
num_mds_failed = 0
mds_epoch = 0
for fs_data in filesystems:
num_mds_in = \
num_mds_in + len(fs_data.get('mdsmap', {}).get('in', []))
num_mds_up = \
num_mds_up + len(fs_data.get('mdsmap', {}).get('up', {}))
num_mds_failed = \
num_mds_failed + len(fs_data.get('mdsmap', {}).get('failed', []))
mds_epoch = mds_epoch + fs_data.get('mdsmap', {}).get('epoch', 0)
c_data.fields['num_mds_in'] = num_mds_in
c_data.fields['num_mds_up'] = num_mds_up
c_data.fields['num_mds_failed'] = num_mds_failed
c_data.fields['mds_epoch'] = mds_epoch
self.data.append(c_data)
Loading

0 comments on commit 4abb79f

Please sign in to comment.