Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuwei Yan committed Jan 4, 2025
1 parent eaae06d commit c892a61
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
24 changes: 12 additions & 12 deletions pycityagent/message/messager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,30 @@
@ray.remote
class Messager:
def __init__(
self, hostname:str, port:int=1883, username=None, password=None, timeout=math.inf
self, hostname:str, port:int=1883, username=None, password=None, timeout=60
):
self.client = Client(
hostname, port=port, username=username, password=password, timeout=timeout
)
self.connected = False # 是否已连接标志
self.message_queue = asyncio.Queue() # 用于存储接收到的消息
self.subscribers = {} # 订阅者信息,topic -> Agent 映射
self.receive_messages_task = None

async def __aexit__(self, exc_type, exc_value, traceback):
await self.stop()

async def connect(self):
try:
await self.client.__aenter__()
self.connected = True
logger.info("Connected to MQTT Broker")
except Exception as e:
self.connected = False
logger.error(f"Failed to connect to MQTT Broker: {e}")
for i in range(3):
try:
await self.client.__aenter__()
self.connected = True
logger.info("Connected to MQTT Broker")
return
except Exception as e:
logger.error(f"Attempt {i+1}: Failed to connect to MQTT Broker: {e}")
await asyncio.sleep(10)
self.connected = False
logger.error("All connection attempts failed.")

async def disconnect(self):
await self.client.__aexit__(None, None, None)
Expand All @@ -52,9 +55,6 @@ async def subscribe(self, topics: Union[str, List[str]], agents: Union[Any, List
topics = [topics]
if not isinstance(agents, list):
agents = [agents]
for topic, agent in zip(topics, agents):
self.subscribers[topic] = agent
logger.info(f"Subscribed to {topic} for Agent {agent._uuid}")
await self.client.subscribe(topics, qos=1)

async def receive_messages(self):
Expand Down
2 changes: 1 addition & 1 deletion pycityagent/simulation/agentgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def init_agents(self):
agent.set_messager(self.messager)
topic = (f"exps/{self.exp_id}/agents/{agent._uuid}/#", 1)
topics.append(topic)
agents.append(agent)
agents.append(agent.uuid)
await self.messager.subscribe.remote(topics, agents)
self.message_dispatch_task = asyncio.create_task(self.message_dispatch())
if self.enable_avro:
Expand Down

0 comments on commit c892a61

Please sign in to comment.