import numpy as np import networkx as nx import matplotlib.pyplot as plt from cryptography.fernet import Fernet from datetime import datetime, timedelta import jwt import hashlib import time class SecureAPINode: def __init__(self, node_id, node_type="endpoint"): self.node_id = node_id self.node_type = node_type # endpoint, gateway, security_checkpoint self.position = np.random.randn(4) # 4D space: x,y,z + security_level self.load_capacity = np.random.uniform(0.5, 1.0) self.connections = {} self.tls_sessions = {} # Track active TLS sessions self.security_metrics = { 'trust_score': 1.0, 'latency': 0.0, 'encryption_strength': 1.0, 'threat_level': 0.0 } def register_tls_session(self, session_id, tls_context): """Register existing TLS session for reuse""" self.tls_sessions[session_id] = tls_context def get_active_sessions(self): """Return currently active TLS sessions""" return {k: v for k, v in self.tls_sessions.items() if v.is_active()} self.key = Fernet.generate_key() self.cipher_suite = Fernet(self.key) def encrypt_payload(self, data): """Encrypt data for transmission""" return self.cipher_suite.encrypt(str(data).encode()) def decrypt_payload(self, encrypted_data): """Decrypt received data""" return self.cipher_suite.decrypt(encrypted_data).decode() def update_security_metrics(self, network_stats): """Update node security based on network conditions""" self.security_metrics['trust_score'] *= 0.95 # Decay factor self.security_metrics['threat_level'] = np.random.uniform(0, 0.1) # Simulated threats # Adjust encryption strength based on threat level if self.security_metrics['threat_level'] > 0.05: self.security_metrics['encryption_strength'] = 1.0 # Update position in 4D space based on security metrics self.position[3] = self.security_metrics['trust_score'] class SecureAPILink: def __init__(self, node_a, node_b): self.nodes = (node_a, node_b) self.security_strength = 1.0 self.encrypted_channel = True self.last_health_check = datetime.now() self.health_check_interval = timedelta(seconds=30) def generate_session_token(self): """Create a secure session token for the link""" payload = { 'node_a': self.nodes[0].node_id, 'node_b': self.nodes[1].node_id, 'timestamp': datetime.now().timestamp(), 'security_strength': self.security_strength } return jwt.encode(payload, 'secret', algorithm='HS256') def health_check(self): """Verify link security and performance""" current_time = datetime.now() if current_time - self.last_health_check >= self.health_check_interval: # Perform security checks self.security_strength = min( self.nodes[0].security_metrics['trust_score'], self.nodes[1].security_metrics['trust_score'] ) self.last_health_check = current_time return self.security_strength > 0.7 return True class SecureAPINetwork: def __init__(self, num_nodes=10): self.nodes = {} self.links = [] self.security_threshold = 0.8 self.route_cache = {} # Initialize different types of nodes node_types = ['endpoint', 'gateway', 'security_checkpoint'] for i in range(num_nodes): node_type = node_types[i % len(node_types)] self.nodes[i] = SecureAPINode(i, node_type) # Create initial secure connections self._initialize_secure_mesh() def _initialize_secure_mesh(self): """Create initial secure network topology""" for node_a in self.nodes.values(): for node_b in self.nodes.values(): if node_a.node_id < node_b.node_id: if np.random.random() < 0.3: # 30% chance of connection link = SecureAPILink(node_a, node_b) self.links.append(link) node_a.connections[node_b.node_id] = link node_b.connections[node_a.node_id] = link def find_secure_route(self, source_id, target_id): """Find most secure route between nodes""" if (source_id, target_id) in self.route_cache: return self.route_cache[(source_id, target_id)] def security_weight(node_a, node_b): link = node_a.connections.get(node_b.node_id) if not link: return float('inf') return (2 - link.security_strength) * np.linalg.norm( node_a.position - node_b.position ) # Create graph for pathfinding G = nx.Graph() for node in self.nodes.values(): G.add_node(node.node_id) for link in self.links: node_a, node_b = link.nodes weight = security_weight(node_a, node_b) G.add_edge(node_a.node_id, node_b.node_id, weight=weight) try: path = nx.shortest_path(G, source_id, target_id, weight='weight') self.route_cache[(source_id, target_id)] = path return path except nx.NetworkXNoPath: return None def secure_transmit(self, source_id, target_id, data): """Securely transmit data between nodes""" route = self.find_secure_route(source_id, target_id) if not route: raise ValueError("No secure route available") encrypted_data = data route_info = [] # Encrypt data through each node in the route for i in range(len(route)): current_node = self.nodes[route[i]] route_info.append({ 'node_id': current_node.node_id, 'type': current_node.node_type, 'security_level': current_node.security_metrics['trust_score'] }) encrypted_data = current_node.encrypt_payload(encrypted_data) return { 'status': 'transmitted', 'route': route_info, 'encryption_layers': len(route), 'final_payload': encrypted_data } def update_network(self): """Update network security state""" # Update node security metrics network_stats = { 'avg_trust': np.mean([ node.security_metrics['trust_score'] for node in self.nodes.values() ]) } for node in self.nodes.values(): node.update_security_metrics(network_stats) # Check and update link health self.links = [link for link in self.links if link.health_check()] self.route_cache.clear() # Invalidate cache after updates def visualize_secure_network(network): """Visualize the secure API network""" G = nx.Graph() # Add nodes with different colors based on type colors = [] for node in network.nodes.values(): G.add_node(node.node_id, pos=(node.position[0], node.position[1])) if node.node_type == 'endpoint': colors.append('lightblue') elif node.node_type == 'gateway': colors.append('lightgreen') else: colors.append('lightcoral') # Add edges with security-based colors edge_colors = [] for link in network.links: G.add_edge(link.nodes[0].node_id, link.nodes[1].node_id) edge_colors.append(plt.cm.RdYlGn(link.security_strength)) pos = nx.get_node_attributes(G, 'pos') plt.figure(figsize=(12, 8)) nx.draw(G, pos, node_color=colors, edge_color=edge_colors, with_labels=True, node_size=500, font_size=10) plt.title("Secure API Mesh Network") plt.show() # Example usage if __name__ == "__main__": # Create and initialize network network = SecureAPINetwork(num_nodes=15) # Simulate network updates for _ in range(5): network.update_network() time.sleep(1) # Attempt secure transmission try: result = network.secure_transmit(0, 5, "Sensitive API Data") print("Transmission successful!") print(f"Route taken: {result['route']}") print(f"Encryption layers: {result['encryption_layers']}") except ValueError as e: print(f"Transmission failed: {e}") # Visualize network visualize_secure_network(network)