import asyncio import hashlib import json import logging import os import random import time from typing import Dict, List, Tuple import cryptography.hazmat.primitives.asymmetric.ed25519 as ed25519 from cryptography.hazmat.primitives import serialization # Custom protocol for node communication class PiNetworkProtocol(asyncio.Protocol): def __init__(self, node: 'PiNode'): self.node = node self.transport = None self.buffer = bytearray() def connection_made(self, transport: asyncio.Transport) -> None: self.transport = transport self.node.log.info(f"Connected to {transport.get_extra_info('peername')}") def data_received(self, data: bytes) -> None: self.buffer.extend(data) while self.buffer: msg_len = int.from_bytes(self.buffer[:4], 'big') if len(self.buffer) < msg_len + 4: break msg = self.buffer[4:msg_len + 4] self.buffer = self.buffer[msg_len + 4:] self.handle_message(msg) def handle_message(self, msg: bytes) -> None: msg_type = msg[0] if msg_type == 0x01: # Node announcement self.handle_node_announcement(msg[1:]) elif msg_type == 0x02: # Transaction self.handle_transaction(msg[1:]) elif msg_type == 0x03: # Block self.handle_block(msg[1:]) else: self.node.log.warning(f"Unknown message type {msg_type}") def handle_node_announcement(self, msg: bytes) -> None: node_id = msg[:32] node_addr = msg[32:64] self.node.add_node(node_id, node_addr) def handle_transaction(self, msg: bytes) -> None: tx = self.node.deserialize_transaction(msg) self.node.process_transaction(tx) def handle_block(self, msg: bytes) -> None: block = self.node.deserialize_block(msg) self.node.process_block(block) def send_message(self, msg: bytes) -> None: self.transport.write(len(msg).to_bytes(4, 'big') + msg) class PiNode: def __init__(self, node_id: bytes, node_addr: bytes, private_key: ed25519.Ed25519PrivateKey): self.node_id = node_id self.node_addr = node_addr self.private_key = private_key self.public_key = private_key.public_key() self.nodes: Dict[bytes, bytes] = {} self.transactions: List[Tuple[bytes, bytes]] = [] self.blocks: List[bytes] = [] self.log = logging.getLogger(f"PiNode-{node_id.hex()}") def add_node(self, node_id: bytes, node_addr: bytes) -> None: self.nodes[node_id] = node_addr self.log.info(f"Added node {node_id.hex()} at {node_addr.hex()}") def deserialize_transaction(self, msg: bytes) -> Tuple[bytes, bytes]: tx_id = msg[:32] tx_data = msg[32:] return tx_id, tx_data def deserialize_block(self, msg: bytes) -> bytes: block_id = msg[:32] block_data = msg[32:] return block_id, block_data def process_transaction(self, tx: Tuple[bytes, bytes]) -> None: # Verify transaction signature tx_id, tx_data = tx signature = tx_data[-64:] tx_data = tx_data[:-64] if not self.verify_signature(tx_id, tx_data, signature): self.log.warning(f"Invalid transaction signature for {tx_id.hex()}") return # Process transaction self.log.info(f"Processing transaction {tx_id.hex()}") # ... def process_block(self, block: bytes) -> None: # Verify block signature block_id, block_data = block signature = block_data[-64:] block_data = block_data[:-64] if not self.verify_signature(block_id, block_data, signature): self.log.warning(f"Invalid block signature for {block_id.hex()}") return # Process block self.log.info(f"Processing block {block_id.hex()}") # ... def verify_signature(self, msg_id: bytes, msg_data: bytes, signature: bytes) -> bool: public_key = self.public_key try: public_key.verify(signature, msg_id + msg_data, padding.cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding) return True except cryptography.exceptions.InvalidSignature: return False asyncdef start(self) -> None: loop = asyncio.get_event_loop() coro = loop.create_connection(lambda: PiNetworkProtocol(self), self.node_addr, self.node_addr) await coro def generate_node_id() -> bytes: return os.urandom(32) def generate_private_key() -> ed25519.Ed25519PrivateKey: return ed25519.Ed25519PrivateKey.generate() def generate_node(node_id: bytes, node_addr: bytes) -> PiNode: private_key = generate_private_key() public_key = private_key.public_key() return PiNode(node_id, node_addr, private_key) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) node_id = generate_node_id() node_addr = socket.gethostbyname(socket.gethostname()) + ":5000" node = generate_node(node_id, node_addr.encode()) try: asyncio.run(node.start()) except KeyboardInterrupt: pass