Skip to content

Commit

Permalink
Merge branch 'noc-2551' into 'master'
Browse files Browse the repository at this point in the history
#2551 Add User Subscription to Notification Group.

See merge request noc/noc!8474
  • Loading branch information
Dmitry Volodin committed Oct 9, 2024
2 parents e92cd27 + 6e1d753 commit e52fc49
Show file tree
Hide file tree
Showing 32 changed files with 1,523 additions and 420 deletions.
2 changes: 1 addition & 1 deletion aaa/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
("sa.UserAccess", "user"),
("kb.KBUserBookmark", "user"),
("main.Checkpoint", "user"),
("main.NotificationGroupUser", "user"),
("main.NotificationGroupUserSubscription", "user"),
("main.ReportSubscription", "run_as"),
("ip.PrefixBookmark", "user"),
("kb.KBEntryHistory", "user"),
Expand Down
4 changes: 2 additions & 2 deletions commands/wipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def wipe_user(self, o):
:type o: User
:return: None
"""
from noc.main.models.notificationgroup import NotificationGroupUser
from noc.main.models.notificationgroup import NotificationGroupUserSubscription
from noc.main.models.audittrail import AuditTrail
from noc.aaa.models.usercontact import UserContact
from noc.aaa.models.permission import Permission
Expand All @@ -233,7 +233,7 @@ def wipe_user(self, o):
AuditTrail.objects.filter(user=o.username).delete()
# Clean NotificationGroupUser
with self.log("Cleaning notification groups"):
NotificationGroupUser.objects.filter(user=o).delete()
NotificationGroupUserSubscription.objects.filter(user=o).delete()
# Clean User contact
with self.log("Cleaning user contact"):
UserContact.objects.filter(user=o).delete()
Expand Down
4 changes: 2 additions & 2 deletions core/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_matcher(op: str, field: str, value: Any) -> Callable:
case "$regex":
value = re.compile(value)
case "$in" | "$all" | "$any":
value = frozenset(str(v) for v in value)
value = frozenset(v for v in value)
case "$eq":
value = value
case _:
Expand Down Expand Up @@ -84,7 +84,7 @@ def match_regex(rx: re.Pattern, field: str, ctx: Dict[str, Any]) -> bool:


def match_in(c_iter: FrozenSet, field: str, ctx: Dict[str, Any]) -> bool:
return str(ctx[field]) in c_iter
return ctx[field] in c_iter


def match_all(c_iter: FrozenSet, field: str, ctx: Dict[str, Any]) -> bool:
Expand Down
65 changes: 51 additions & 14 deletions core/mx.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from noc.core.service.loader import get_service
from noc.core.comp import DEFAULT_ENCODING
from noc.core.ioloop.util import run_sync
from noc.models import get_model_id


@dataclass
Expand Down Expand Up @@ -43,6 +44,7 @@ class Message(object):
MX_PROFILE_ID = "Profile-Id"
MX_LABELS = "Labels"
MX_RESOURCE_GROUPS = "Resource-Group-Ids"
MX_WATCH_FOR_ID = "Watch-For-Id"
# Notification headers
MX_TO = "To"
MX_NOTIFICATION = b"notification"
Expand Down Expand Up @@ -76,6 +78,40 @@ class MessageType(enum.Enum):
OTHER = "other"


@dataclass(frozen=True)
class MetaConfig(object):
header: str
is_list: bool = False


CONFIGS = {
"watch_for": MetaConfig(MX_WATCH_FOR_ID),
"profile": MetaConfig(MX_PROFILE_ID),
"groups": MetaConfig(MX_RESOURCE_GROUPS, is_list=True),
"administrative_domain": MetaConfig(MX_ADMINISTRATIVE_DOMAIN_ID),
"from": MetaConfig(MX_DATA_ID),
"labels": MetaConfig(MX_LABELS, is_list=True),
}


class MessageMeta(enum.Enum):
@property
def config(self) -> MetaConfig:
return CONFIGS[self.value]

WATCH_FOR = "watch_for"
PROFILE = "profile"
GROUPS = "groups"
ADM_DOMAIN = "administrative_domain"
FROM = "from"
LABELS = "labels"

def clean_header_value(self, value: Any) -> bytes:
if self.config.is_list:
return MX_H_VALUE_SPLITTER.join(value).encode(DEFAULT_ENCODING)
return str(value).encode(DEFAULT_ENCODING)


MESSAGE_HEADERS = {
MX_SHARDING_KEY,
MX_CHANGE_ID,
Expand All @@ -102,11 +138,11 @@ def send_message(
"""
Build message and schedule to send to mx service
:param data: Data for transmit
:param message_type: Message type
:param headers: additional message headers
:param sharding_key: Key for sharding over MX services
:return:
Attrs:
data: Data for transmit
message_type: Message type
headers: additional message headers
sharding_key: Key for sharding over MX services
"""
msg_headers = {
MX_MESSAGE_TYPE: message_type.value,
Expand Down Expand Up @@ -134,11 +170,11 @@ def send_notification(
):
"""
Send notification to notification_group ot address
:param subject: Notification Title
:param body: Notification body
:param to: notification address
:param notification_method: Notification method (for to param)
:return:
Attrs:
subject: Notification Title
body: Notification body
to: notification address
notification_method: Notification method (for to param)
"""
if notification_method not in NOTIFICATION_METHODS:
raise ValueError("Unknown notification method: %s" % notification_method)
Expand All @@ -155,11 +191,12 @@ def send_notification(


def get_mx_partitions() -> int:
"""
Get number of MX stream partitions
:return:
"""
"""Get number of MX stream partitions"""
from noc.core.msgstream.config import get_stream

cfg = get_stream(MX_STREAM)
return cfg.get_partitions()


def get_subscription_id(o) -> str:
return f"m:{get_model_id(o)}:{o.id}"
7 changes: 6 additions & 1 deletion core/router/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
MX_NOTIFICATION_METHOD,
MX_NOTIFICATION_DELAY,
MX_NOTIFICATION_GROUP_ID,
MX_WATCH_FOR_ID,
MessageMeta,
)
from noc.config import config

Expand Down Expand Up @@ -173,7 +175,10 @@ def iter_action(
ng = self.get_notification_group(msg.headers.get(MX_NOTIFICATION_GROUP_ID))
if not ng:
return
for method, headers, render_template in ng.iter_actions():
for method, headers, render_template in ng.iter_actions(
message_type.decode(),
{MessageMeta.WATCH_FOR: msg.headers[MX_WATCH_FOR_ID].decode()},
):
yield NOTIFICATION_METHODS[method].decode(), headers, body


Expand Down
78 changes: 40 additions & 38 deletions core/router/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,23 @@ def __init__(self):
# self.out_queue: Optional[QBuffer] = None

def load(self):
"""
Load up all the rules and populate the chains
:return:
"""
"""Load up all the rules and populate the chains"""
from noc.main.models.messageroute import MessageRoute

num = 0
for num, route in enumerate(
MessageRoute.objects.filter(is_active=True).order_by("order"), start=1
):
self.chains[route.type] += [Route.from_data(route.get_route_config())]
cfg = route.get_route_config()
cfg["type"] = cfg["type"].value
self.chains[route.type] += [Route.from_data(cfg)]
logger.info("Loading %s route", num)
self.rebuild_chains()

def has_route(self, route_id: str) -> bool:
"""
Check Route already exists in chains
:param route_id:
:return:
route_id: Router identifier
"""
return route_id in self.routes

Expand All @@ -73,28 +71,28 @@ def change_route(self, data):
* change type = delete + insert
* change order = reorder
* change data = update
:param data:
:return:
Attrs:
data:
"""
r = Route.from_data(data)
route_id = data["id"]
to_rebuild = set()
if not self.has_route(route_id):
self.routes[data["id"]] = r
to_rebuild.add(r.type)
to_rebuild |= r.m_types
logger.info("[%s|%s] Insert route", route_id, data["name"])
self.rebuild_chains(to_rebuild)
return
if self.routes[route_id].type != r.type:
if self.routes[route_id].m_types != r.m_types:
# rebuild
logger.info(
"[%s|%s] Change route chain: %s -> %s",
route_id,
data["name"],
self.routes[route_id].type,
r.type,
b";".join(sorted(self.routes[route_id].m_types)),
b";".join(sorted(r.m_types)),
)
to_rebuild.add([r.type, self.routes[route_id].type])
to_rebuild |= r.m_types.symmetric_difference(self.routes[route_id].m_types)
self.routes[route_id].set_type(r.type)
if self.routes[route_id].order != r.order:
logger.info(
Expand All @@ -105,7 +103,7 @@ def change_route(self, data):
r.order,
)
self.routes[route_id].set_order(r.order)
to_rebuild.add(r.type)
to_rebuild |= r.m_types
if self.routes[route_id].is_differ(data):
logger.info("[%s|%s] Update route", route_id, data["name"])
self.routes[route_id].update(data)
Expand All @@ -115,37 +113,43 @@ def change_route(self, data):
def delete_route(self, route_id: str):
"""
Delete Route from Chains
:param route_id:
:return:
Attrs:
route_id: Router Identifiers
"""
r_type = None
if route_id in self.routes:
logger.info("[%s|%s] Delete route", route_id, self.routes[route_id].name)
r_type = self.routes[route_id].type
r_type = self.routes[route_id].m_types
del self.routes[route_id]
if r_type:
self.rebuild_chains([r_type], deleted=True)
self.rebuild_chains(r_type, deleted=True)

def rebuild_chains(self, r_types: Optional[Iterable[str]] = None, deleted: bool = False):
def rebuild_chains(self, r_types: Optional[Iterable[bytes]] = None, deleted: bool = False):
"""
Rebuild Router Chains
Need lock ?
:param r_types: List types for rebuild chains
:param deleted: Route was deleted
:return:
Attrs:
r_types: List types for rebuild chains
deleted: Route was deleted flag
"""
chains = defaultdict(list)
r_types = frozenset(r_types) if r_types else None
for rid, r in self.routes.items():
if r_types and r.type not in r_types and rid != self.DEFAULT_CHAIN:
if r_types and not r.m_types.intersection(r_types) and rid != self.DEFAULT_CHAIN:
continue
chains[r.type].append(r)
elif r_types:
updated_types = r.m_types.intersection(r_types)
else:
updated_types = r.m_types
for tt in updated_types:
chains[tt].append(r)
if deleted:
# Remove last route
for rt in set(r_types) - set(chains):
chains[rt] = []
for chain in chains:
logger.info("[%s] Rebuild chain", chain)
self.chains[chain.encode(encoding=DEFAULT_ENCODING)] = list(
self.chains[chain] = list(
sorted(
[r for r in chains[chain]],
key=operator.attrgetter("order"),
Expand Down Expand Up @@ -174,8 +178,7 @@ async def publish(
def route_sync(self, msg: Message):
"""
Synchronize method
:param msg:
:return:
msg: Route Message
"""
run_sync(partial(self.route_message, msg))

Expand All @@ -189,13 +192,12 @@ def get_message(
) -> Message:
"""
Build message
:param data: Data for transmit
:param message_type: Message type
:param headers: additional message headers
:param sharding_key: Key for sharding
:param raw_value:
:return:
Attrs:
data: Data for transmit
message_type: Message type
headers: additional message headers
sharding_key: Key for sharding
raw_value:
"""
msg_headers = {
MX_MESSAGE_TYPE: message_type.encode(DEFAULT_ENCODING),
Expand All @@ -215,9 +217,9 @@ def get_message(
async def route_message(self, msg: Message, msg_id: Optional[str] = None):
"""
Route message by rule
:param msg:
:param msg_id:
:return:
Attrs:
msg: Received Message
msg_id: Message sequence number
"""
mt = msg.headers.get(MX_MESSAGE_TYPE)
if not mt:
Expand Down
Loading

0 comments on commit e52fc49

Please sign in to comment.