Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:add apollo configuration to load env file #11210

Merged
merged 16 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from extensions.ext_configuration import ConfigurationCenter

from .app_config import DifyConfig

dify_config = DifyConfig()
# read the configuration center configuration
configuration = ConfigurationCenter()
10 changes: 6 additions & 4 deletions api/configs/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pydantic_settings import BaseSettings

from configs.middleware.cache.redis_config import RedisConfig
from configs.middleware.configuration import ConfigurationCenterConfig
from configs.middleware.storage.aliyun_oss_storage_config import AliyunOSSStorageConfig
from configs.middleware.storage.amazon_s3_storage_config import S3StorageConfig
from configs.middleware.storage.azure_blob_storage_config import AzureBlobStorageConfig
Expand Down Expand Up @@ -41,8 +42,8 @@
class StorageConfig(BaseSettings):
STORAGE_TYPE: str = Field(
description="Type of storage to use."
" Options: 'local', 's3', 'aliyun-oss', 'azure-blob', 'baidu-obs', 'google-storage', 'huawei-obs', "
"'oci-storage', 'tencent-cos', 'volcengine-tos', 'supabase'. Default is 'local'.",
" Options: 'local', 's3', 'aliyun-oss', 'azure-blob', 'baidu-obs', 'google-storage', 'huawei-obs', "
"'oci-storage', 'tencent-cos', 'volcengine-tos', 'supabase'. Default is 'local'.",
default="local",
)

Expand All @@ -55,7 +56,7 @@ class StorageConfig(BaseSettings):
class VectorStoreConfig(BaseSettings):
VECTOR_STORE: Optional[str] = Field(
description="Type of vector store to use for efficient similarity search."
" Set to None if not using a vector store.",
" Set to None if not using a vector store.",
default=None,
)

Expand All @@ -68,7 +69,7 @@ class VectorStoreConfig(BaseSettings):
class KeywordStoreConfig(BaseSettings):
KEYWORD_STORE: str = Field(
description="Method for keyword extraction and storage."
" Default is 'jieba', a Chinese text segmentation library.",
" Default is 'jieba', a Chinese text segmentation library.",
default="jieba",
)

Expand Down Expand Up @@ -224,6 +225,7 @@ class InternalTestConfig(BaseSettings):
class MiddlewareConfig(
# place the configs in alphabet order
CeleryConfig,
ConfigurationCenterConfig,
DatabaseConfig,
KeywordStoreConfig,
RedisConfig,
Expand Down
13 changes: 13 additions & 0 deletions api/configs/middleware/configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os

from pydantic import Field

from configs.middleware.configuration.apollo import ApolloConfig


class ConfigurationCenterConfig(ApolloConfig):
# configuration type
CONFIGURATION_TYPE: str = Field(
description="configuration type",
default=os.environ.get("CONFIGURATION_TYPE", "")
)
30 changes: 30 additions & 0 deletions api/configs/middleware/configuration/apollo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os

from pydantic import Field
from pydantic_settings import BaseSettings


class ApolloConfig(BaseSettings):
"""
Packaging build information
"""

APOLLO_APP_ID: str = Field(
description="apollo app_id",
default=os.environ.get("APOLLO_APP_ID", ""),
huanshare marked this conversation as resolved.
Show resolved Hide resolved
)

APOLLO_CLUSTER: str = Field(
description="apollo cluster",
default=os.environ.get("APOLLO_CLUSTER", ""),
huanshare marked this conversation as resolved.
Show resolved Hide resolved
)

APOLLO_CONFIG_URL: str = Field(
description="apollo config url",
default=os.environ.get("APOLLO_CONFIG_URL", ""),
huanshare marked this conversation as resolved.
Show resolved Hide resolved
)

APOLLO_NAMESPACE: str = Field(
description="apollo namespace",
default=os.environ.get("APOLLO_NAMESPACE", ""),
huanshare marked this conversation as resolved.
Show resolved Hide resolved
)
Empty file.
Empty file.
278 changes: 278 additions & 0 deletions api/extensions/configuration/apollo/apollo_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
import hashlib
import json
import logging
import os
import threading
import time
from pathlib import Path

from extensions.configuration.apollo.python_3x import http_request, makedirs_wrapper
from extensions.configuration.apollo.util import init_ip, CONFIGURATIONS, get_value_from_dict, \
no_key_cache_key, NOTIFICATION_ID, NAMESPACE_NAME, url_encode_wrapper, signature

logger = logging.getLogger(__name__)


class ApolloClient:

def __init__(self, config_url, app_id, cluster='default', secret='', start_hot_update=True,
change_listener=None, _notification_map=None):

# Core routing parameters
self.config_url = config_url
self.cluster = cluster
self.app_id = app_id

# Non-core parameters
self.ip = init_ip()
self.secret = secret

# Check the parameter variables

# Private control variables
self._cycle_time = 5
self._stopping = False
self._cache = {}
self._no_key = {}
self._hash = {}
self._pull_timeout = 75
self._cache_file_path = os.path.expanduser('~') + '/data/apollo/cache/'
self._long_poll_thread = None
self._change_listener = change_listener # "add" "delete" "update"
if _notification_map is None:
_notification_map = {'application': -1}
self._notification_map = _notification_map
self.last_release_key = None
# Private startup method
self._path_checker()
if start_hot_update:
self._start_hot_update()

# start the heartbeat thread
heartbeat = threading.Thread(target=self._heart_beat)
heartbeat.daemon = True
heartbeat.start()

def get_json_from_net(self, namespace='application'):
url = '{}/configs/{}/{}/{}?releaseKey={}&ip={}'.format(self.config_url, self.app_id, self.cluster, namespace,
"", self.ip)
try:
code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
if code == 200:
data = json.loads(body)
data = data["configurations"]
return_data = {CONFIGURATIONS: data}
return return_data
else:
return None
except Exception as e:
logger.error(str(e))
return None

def get_value(self, key, default_val=None, namespace='application'):
try:
# read memory configuration
namespace_cache = self._cache.get(namespace)
val = get_value_from_dict(namespace_cache, key)
if val is not None:
return val

no_key = no_key_cache_key(namespace, key)
if no_key in self._no_key:
return default_val

# read the network configuration
namespace_data = self.get_json_from_net(namespace)
val = get_value_from_dict(namespace_data, key)
if val is not None:
self._update_cache_and_file(namespace_data, namespace)
return val

# read the file configuration
namespace_cache = self._get_local_cache(namespace)
val = get_value_from_dict(namespace_cache, key)
if val is not None:
self._update_cache_and_file(namespace_cache, namespace)
return val

# If all of them are not obtained, the default value is returned
# and the local cache is set to None
self._set_local_cache_none(namespace, key)
return default_val
except Exception as e:
logger.error("get_value has error, [key is %s], [namespace is %s], [error is %s], ",
key, namespace, e)
return default_val

# Set the key of a namespace to none, and do not set default val
# to ensure the real-time correctness of the function call.
# If the user does not have the same default val twice
# and the default val is used here, there may be a problem.
def _set_local_cache_none(self, namespace, key):
no_key = no_key_cache_key(namespace, key)
self._no_key[no_key] = key

def _start_hot_update(self):
self._long_poll_thread = threading.Thread(target=self._listener)
# When the asynchronous thread is started, the daemon thread will automatically exit
# when the main thread is launched.
self._long_poll_thread.daemon = True
self._long_poll_thread.start()

def stop(self):
self._stopping = True
logger.info("Stopping listener...")

# Call the set callback function, and if it is abnormal, try it out
def _call_listener(self, namespace, old_kv, new_kv):
if self._change_listener is None:
return
if old_kv is None:
old_kv = {}
if new_kv is None:
new_kv = {}
try:
for key in old_kv:
new_value = new_kv.get(key)
old_value = old_kv.get(key)
if new_value is None:
# If newValue is empty, it means key, and the value is deleted.
self._change_listener("delete", namespace, key, old_value)
continue
if new_value != old_value:
self._change_listener("update", namespace, key, new_value)
continue
for key in new_kv:
new_value = new_kv.get(key)
old_value = old_kv.get(key)
if old_value is None:
self._change_listener("add", namespace, key, new_value)
except BaseException as e:
logger.warning(str(e))

def _path_checker(self):
if not os.path.isdir(self._cache_file_path):
makedirs_wrapper(self._cache_file_path)

# update the local cache and file cache
def _update_cache_and_file(self, namespace_data, namespace='application'):
# update the local cache
self._cache[namespace] = namespace_data
# update the file cache
new_string = json.dumps(namespace_data)
new_hash = hashlib.md5(new_string.encode('utf-8')).hexdigest()
if self._hash.get(namespace) == new_hash:
pass
else:
file_path = Path(self._cache_file_path) / f"{self.app_id}_configuration_{namespace}.txt"
file_path.write_text(new_string)
self._hash[namespace] = new_hash

# get the configuration from the local file
def _get_local_cache(self, namespace='application'):
cache_file_path = os.path.join(self._cache_file_path, f"{self.app_id}_configuration_{namespace}.txt")
if os.path.isfile(cache_file_path):
with open(cache_file_path) as f:
result = json.loads(f.readline())
return result
return {}

def _long_poll(self):
notifications = []
for key in self._cache:
namespace_data = self._cache[key]
notification_id = -1
if NOTIFICATION_ID in namespace_data:
notification_id = self._cache[key][NOTIFICATION_ID]
notifications.append({
NAMESPACE_NAME: key,
NOTIFICATION_ID: notification_id
})
try:
# if the length is 0 it is returned directly
if len(notifications) == 0:
return
url = '{}/notifications/v2'.format(self.config_url)
params = {
'appId': self.app_id,
'cluster': self.cluster,
'notifications': json.dumps(notifications, ensure_ascii=False)
}
param_str = url_encode_wrapper(params)
url = url + '?' + param_str
code, body = http_request(url, self._pull_timeout, headers=self._sign_headers(url))
http_code = code
if http_code == 304:
logger.debug('No change, loop...')
return
if http_code == 200:
data = json.loads(body)
for entry in data:
namespace = entry[NAMESPACE_NAME]
n_id = entry[NOTIFICATION_ID]
logger.info("%s has changes: notificationId=%d", namespace, n_id)
self._get_net_and_set_local(namespace, n_id, call_change=True)
return
else:
logger.warning('Sleep...')
except Exception as e:
logger.warning(str(e))

def _get_net_and_set_local(self, namespace, n_id, call_change=False):
namespace_data = self.get_json_from_net(namespace)
namespace_data[NOTIFICATION_ID] = n_id
old_namespace = self._cache.get(namespace)
self._update_cache_and_file(namespace_data, namespace)
if self._change_listener is not None and call_change:
old_kv = old_namespace.get(CONFIGURATIONS)
new_kv = namespace_data.get(CONFIGURATIONS)
self._call_listener(namespace, old_kv, new_kv)

def _listener(self):
logger.info('start long_poll')
while not self._stopping:
self._long_poll()
time.sleep(self._cycle_time)
logger.info("stopped, long_poll")

# add the need for endorsement to the header
def _sign_headers(self, url):
headers = {}
if self.secret == '':
return headers
uri = url[len(self.config_url):len(url)]
time_unix_now = str(int(round(time.time() * 1000)))
headers['Authorization'] = 'Apollo ' + self.app_id + ':' + signature(time_unix_now, uri, self.secret)
headers['Timestamp'] = time_unix_now
return headers

def _heart_beat(self):
while not self._stopping:
for namespace in self._notification_map:
self._do_heart_beat(namespace)
time.sleep(60 * 10) # 10分钟

def _do_heart_beat(self, namespace):
url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_url, self.app_id, self.cluster, namespace,
self.ip)
try:
code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
if code == 200:
data = json.loads(body)
if self.last_release_key == data["releaseKey"]:
return None
self.last_release_key = data["releaseKey"]
data = data["configurations"]
self._update_cache_and_file(data, namespace)
else:
return None
except Exception as e:
logger.error(str(e))
return None

def get_all_dicts(self, namespace):
namespace_data = self._cache.get(namespace)
if namespace_data is None:
namespace_data = self.get_json_from_net(namespace).get(CONFIGURATIONS)
self._update_cache_and_file(namespace_data, namespace)
return namespace_data
Loading
Loading