This repository has been archived by the owner on May 4, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Connector.py
69 lines (56 loc) · 1.86 KB
/
Connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from datetime import datetime
import pymongo
from CaseBonita.Data.Database import MongoDB
class MongoConnector:
_connection = None
def __init__(self, db=MongoDB.RODO_DB):
self._db = db
@property
def connection(self):
"""
:rtype: pymongo.MongoClient:
"""
if self._connection is None:
self._connection = pymongo.MongoClient('localhost', 27017)
return self._connection
def insert_documents_into_collection(self, docs, collection_name):
"""
This function inserts documents list into a collection
:param list docs: documents list
:param str collection_name:
"""
collection = self.connection[self._db][collection_name]
collection.insert_many(docs)
def insert_document_into_collection(self, doc, collection_name):
"""
This function inserts a document into a collection
:param dict doc: document
:param str collection_name:
"""
self.insert_documents_into_collection([doc], collection_name)
def find_documents_in_collection(self, query, collection_name):
"""
:param dict query:
:param str collection_name:
:rtype: list: results list
"""
if query is None:
query = {}
collection = self.connection[self._db][collection_name]
result = collection.find(query)
return result
if __name__ == '__main__':
test_doc = [{
'platform': "test_platform",
'playlist_url': 'test_playlist_url.com',
'playlist': [
{
'title': "DoMe",
'artist': "Please"
}
],
'timestamp': datetime.utcnow()
}]
playlists_collection = 'playlists'
MongoConnector(MongoDB.RODO_DB).insert_documents_into_collection(test_doc, playlists_collection)
pass