Skip to content

Commit

Permalink
Merge pull request #14 from ConnectAI-E/feature-feishuwiki
Browse files Browse the repository at this point in the history
Feature feishuwiki
  • Loading branch information
lloydzhou authored Nov 13, 2023
2 parents e7aeb59 + e8225b4 commit 8fdf747
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN sed -i "s@http://deb.debian.org@http://mirrors.aliyun.com@g" /etc/apt/source

RUN apt-get update && apt-get install -y libcurl4-openssl-dev libffi-dev libxml2-dev g++\
&& pip3 install torch==2.0.1+cpu torchvision==0.15.2+cpu -f https://download.pytorch.org/whl/torch_stable.html -i https://pypi.tuna.tsinghua.edu.cn/simple \
&& pip3 install requests Flask gunicorn gevent bson Flask-Session Flask-SQLAlchemy ujson pycurl bcrypt langchain sentence_transformers pdf2image pytesseract elasticsearch_dsl redis unstructured PyMuPDF bs4 openai flask[async] Cython flask-cors python-docx python-pptx markdown celery -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn --no-cache-dir
&& pip3 install requests Flask gunicorn gevent bson Flask-Session Flask-SQLAlchemy ujson pycurl bcrypt langchain sentence_transformers pdf2image pytesseract elasticsearch_dsl redis unstructured PyMuPDF bs4 openai==0.28.1 flask[async] Cython flask-cors python-docx python-pptx markdown pandas openpyxl celery -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn --no-cache-dir

ADD ./docker/entrypoint.sh /entrypoint.sh
ADD ./docker/wait-for-it.sh /wait-for-it.sh
Expand Down
57 changes: 56 additions & 1 deletion server/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from datetime import datetime
from hashlib import md5
from langchain.schema import Document
from models import get_user, get_collection_by_id, Search, purge_document_by_id
from models import get_user, get_collection_by_id, Search, purge_document_by_id, get_document_id_by_uniqid
from tasks import (
celery,
SitemapLoader, LOADER_MAPPING,
NamedTemporaryFile,
embedding_single_document, get_status_by_id, embed_query,
LarkWikiLoader,
LarkDocLoader,
YuqueDocLoader,
)
Expand Down Expand Up @@ -80,6 +81,60 @@ def embed_documents(fileUrl, fileType, fileName, collection_id, openai=False, un
return document_ids


@celery.task()
def embed_feishuwiki(collection_id, openai=False):
# TODO 同步飞书知识库
# 1. 获取documents列表
# 2. 获取wiki中nodes列表
# 3. 对比,然后区分移除或者新增文件,其中新增的添加到知识库,移除的就移除知识库
collection = get_collection_by_id(None, collection_id)
user = get_user(collection.user_id)
extra = user.extra.to_dict()
client = extra.get('client', {})
loader = LarkWikiLoader(collection.space_id, **client)
nodes = loader.get_nodes()
current_document_ids = set([node['obj_token'] for node in loader.get_nodes()])
logging.info("debug current_document_ids %r", current_document_ids)
response = Search(index="document").filter(
"term", type="feishudoc"
).filter(
"term", status=0,
).filter(
"term", collection_id=collection_id,
).extra(
from_=0, size=10000
).sort({"modified": {"order": "desc"}}).execute()
exists_document_ids = set([doc.uniqid for doc in response])
logging.info("debug exists_document_ids %r", exists_document_ids)
new_document_ids = current_document_ids - exists_document_ids
deleted_document_ids = exists_document_ids - current_document_ids
for uniqid in exists_document_ids:
try:
# 由于是定时任务,可能导致重复插入,检查出现重复的,移除掉
document_ids = [doc.meta.id for doc in response if doc.uniqid == uniqid]
if len(document_ids) > 1:
for document_id in document_ids[:-1]:
purge_document_by_id(document_id)
except Exception as e:
logging.error(e)
for document_id in deleted_document_ids:
try:
document_ids = get_document_id_by_uniqid(collection_id, document_id)
for document_id in document_ids:
purge_document_by_id(document_id)
except Exception as e:
logging.error(e)

for document_id in new_document_ids:
task = embed_documents.delay(
'https://feishu.cn/docx/{document_id}',
'feishudoc',
fileName, collection_id, False, uniqid=document_id
)
logging.info("debug add new document %r %r", document_id, task)
return new_document_ids, exists_document_ids


@celery.task()
def sync_feishudoc(openai=False):
document_ids = []
Expand Down
17 changes: 13 additions & 4 deletions server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class Collection(ESDocument):
name = Text(analyzer='ik_max_word')
description = Text(analyzer='ik_max_word') #知识库描述
summary = Text(analyzer='ik_max_word') #知识库总结
# 飞书导入
type = Keyword() # 知识库类型用keyword保证不分词
space_id = Keyword() # 导入的知识库的space_id

class Index:
name = 'collection'
Expand Down Expand Up @@ -178,12 +181,14 @@ def save_user(openid='', name='', **kwargs):
return user


def get_collections(user_id, page, size):
def get_collections(user_id, keyword, page, size):
s = Search(index="collection").filter(
"term", user_id=user_id,
).filter(
"term", status=0,
).extra(from_=page*size-size, size=size)
if keyword:
s = s.query("match", name=keyword)
# 执行查询
response = s.execute()
total = response.hits.total.value
Expand All @@ -204,14 +209,16 @@ def get_collection_by_id(user_id, collection_id):
return collection


def save_collection(user_id, name, description, collection_id=None):
def save_collection(user_id, name, description, collection_id=None, type='', space_id=''):
collection_id = collection_id or ObjID.new_id()
collection = Collection(
meta={'id': collection_id},
user_id=user_id,
name=name,
description=description,
summary='',
type=type,
space_id=space_id,
status=0,
created=datetime.now(),
modified=datetime.now(),
Expand Down Expand Up @@ -266,13 +273,15 @@ def get_document_id_by_uniqid(collection_id, uniqid):
return None # 这里不抛出异常
# raise NotFound()
# 这里的格式是需要数组
return [response[0].meta.id]
return [i.meta.id for i in response]


def get_documents_by_collection_id(user_id, collection_id, page, size):
def get_documents_by_collection_id(user_id, collection_id, keyword, page, size):
collection = get_collection_by_id(user_id, collection_id)
assert collection, '找不到对应知识库'
s = Search(index="document").filter("term", collection_id=collection_id).filter("term", status=0).extra(from_=page*size-size, size=size).sort({"created": {"order": "desc"}})
if keyword:
s = s.query("match", name=keyword)
response = s.execute()
total = response.hits.total.value
# 返回搜索结果(文档实例的列表)
Expand Down
55 changes: 50 additions & 5 deletions server/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
get_document_by_id,
get_relation_count_by_id,
)
from celery_app import embed_documents, get_status_by_id
from celery_app import embed_documents, get_status_by_id, embed_feishuwiki
from sse import ServerSentEvents
from tasks import LarkDocLoader, YuqueDocLoader

Expand Down Expand Up @@ -317,9 +317,10 @@ def api_save_collection_client(platform):
def api_collections():
page = request.args.get('page', default=1, type=int)
size = request.args.get('size', default=20, type=int)
keyword = request.args.get('keyword', default='', type=str)
size = 10000 if size > 10000 else size
user_id = session.get('user_id', '')
collections, total = get_collections(user_id, page, size)
collections, total = get_collections(user_id, keyword, page, size)

return jsonify({
'code': 0,
Expand All @@ -335,14 +336,56 @@ def api_collections():
})


@app.route('/api/collection/feishu/wiki', methods=['GET'])
def api_get_feishu_wiki():
user_id = session.get('user_id', '')
user = get_user(user_id)
extra = user.extra.to_dict()
client = extra.get('client', {})
loader = LarkWikiLoader(space_id, **client)
return jsonify({
'code': 0,
'msg': 'success',
'data': list(loader.get_spaces()),
})


@app.route('/api/collection', methods=['POST'])
def api_save_collection():
user_id = session.get('user_id', '')
type = request.json.get('type', '')
space_id = request.json.get('space_id', '')
name = request.json.get('name')
description = request.json.get('description')
app.logger.info("debug %r", [name, description])
collection_id = save_collection(user_id, name, description)
app.logger.info("debug %r", [name, description, type, url])
if type == 'feishuwiki':
try:
user = get_user(user_id)
extra = user.extra.to_dict()
client = extra.get('client', {})
loader = LarkWikiLoader(space_id, **client)
info = self.get_info()
name = info['data']['space']['name']
description = info['data']['space']['description']
collection_id = save_collection(user_id, name, description, type=type, space_id=space_id)
# 异步支持飞书导入任务
task = embed_feishuwiki.delay(collection_id, False)
return jsonify({
'code': 0,
'msg': 'success',
'data': {
'id': collection_id,
'collection_id': collection_id,
},
})
except Exception as e:
app.logger.error(e)
return jsonify({
'code': -1,
'msg': str(e)
})

collection_id = save_collection(user_id, name, description)
return jsonify({
'code': 0,
'msg': 'success',
Expand All @@ -364,6 +407,7 @@ def api_collection_by_id(collection_id):
'msg': 'success',
'data': {
'id': collection.meta.id,
'type': collection.type if hasattr(collection, 'type') else '',
'name': collection.name,
'description': collection.description,
'created': collection.created_at,
Expand Down Expand Up @@ -399,9 +443,10 @@ def api_delete_collection_by_id(collection_id):
def api_get_documents_by_collection_id(collection_id):
page = request.args.get('page', default=1, type=int)
size = request.args.get('size', default=20, type=int)
keyword = request.args.get('keyword', default='', type=str)
size = 10000 if size > 10000 else size
user_id = session.get('user_id', '')
documents, total = get_documents_by_collection_id(user_id, collection_id, page, size)
documents, total = get_documents_by_collection_id(user_id, collection_id, keyword, page, size)

return jsonify({
'code': 0,
Expand Down
34 changes: 34 additions & 0 deletions server/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,40 @@ def post(self, url, **kwargs):
return self.request('POST', url, **kwargs)


class LarkWikiLoader(object):
def __init__(self, space_id, **kwargs):
app.logger.info("debug %r", kwargs)
self.kwargs = kwargs
self.client = Lark(**kwargs)
self.space_id = space_id

def get_info(self):
url = f"{self.client.host}/open-apis/wiki/v2/spaces/{self.space_id}"
return self.client.get(url).json()

def get_spaces(self):
page_token = ''
url = f"{self.client.host}/open-apis/wiki/v2/spaces?page_size=50&page_token={page_token}"
res = self.client.get(url).json()
while True:
for item in res.get('data', {}).get('items', []):
yield item
if not res.get('data', {}).get('has_more'):
break
page_token = res['data']['page_token']

def get_nodes(self):
page_token = ''
url = f"{self.client.host}/open-apis/wiki/v2/spaces/{self.space_id}/nodes?page_size=50&page_token={page_token}"
res = self.client.get(url).json()
while True:
for item in res.get('data', {}).get('items', []):
yield item
if not res.get('data', {}).get('has_more'):
break
page_token = res['data']['page_token']


class LarkDocLoader(object):
def __init__(self, fileUrl, document_id, **kwargs):
app.logger.info("debug %r", kwargs)
Expand Down

0 comments on commit 8fdf747

Please sign in to comment.