-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsensors.py
125 lines (95 loc) · 3.18 KB
/
sensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from phidias.Types import *
import threading
import time
import telegram
import configparser
class TIMEOUT(Reactor): pass
class message(Reactor): pass
config = configparser.ConfigParser()
config.read('config.ini')
TELEGRAM_TOKEN = config.get('AGENT', 'TELEGRAM_TOKEN')
class HotwordDetect(Sensor):
def on_start(self):
self.running = True
print("\nStarting Hotword detection...")
# put instantiation hotword code here
def on_stop(self):
print("\nStopping Hotword detection...")
self.running = False
def sense(self):
while self.running is True:
time.sleep(1)
# --------------> put hotword detection code here <---------------
# when right hotword is detected: self.assert_belief(HOTWORD_DETECTED("ON"))
class UtteranceDetect(Sensor):
def on_start(self):
self.running = True
print("\nStarting utterance detection...")
# instantiate hotword engine here
def on_stop(self):
print("\nStopping utterance detection...")
self.running = False
def sense(self):
while self.running:
time.sleep(1)
# --------------> put utterance detection code here <---------------
# when incoming new utterance detected: self.assert_belief(SST(utterance))
class Timer(Sensor):
def on_start(self, uTimeout):
evt = threading.Event()
self.event = evt
self.timeout = uTimeout()
self.do_restart = False
def on_restart(self, uTimeout):
self.do_restart = True
self.event.set()
def on_stop(self):
self.do_restart = False
self.event.set()
def sense(self):
while True:
self.event.wait(self.timeout)
self.event.clear()
if self.do_restart:
self.do_restart = False
continue
if self.stopped:
return
else:
self.assert_belief(TIMEOUT("ON"))
return
class Chatbot(Sensor):
def on_start(self):
global BOT
BOT = telegram.Bot(TELEGRAM_TOKEN)
self.update_id = None
self.msgs = [ ]
def sense(self):
global BOT
while True:
if self.msgs == []:
for m in BOT.get_updates(offset=self.update_id, timeout=10):
if self.update_id is None:
self.update_id = m.update_id
self.update_id = self.update_id + 1
if m.message:
self.msgs.append(m)
if self.msgs == []:
continue
m = self.msgs[0]
del self.msgs[0]
print(m.message.text)
if m.message.text is None:
continue
message_data = m.message.text.lower().split()
message_data.insert(0, m.message.chat.id)
print(message_data)
self.assert_belief(message(m.message.chat.id, m.message.text))
class Reply(Action):
def execute(self, *args):
m = []
sender = args[0]()
for v in args[1:]:
m.append(v())
message = " ".join(m)
BOT.sendMessage(sender, message)