# Refactored and Documented Dimy.py - Client-side DIMY protocol implementation import sys import socket import threading import time import os import random import hashlib import argparse import requests import pickle import uuid from collections import defaultdict, deque from secretsharing import PlaintextToHexSecretSharer from Crypto.PublicKey import ECC from Crypto.Hash import SHA256 from pybloom_live import BloomFilter from hmac import compare_digest # Constants and configuration parameters BROADCAST_IP = '255.255.255.255' BROADCAST_PORT = 5005 BLOOM_CAPACITY = 1000 BLOOM_ERROR_RATE = 0.001 PACKET_DROP_PROB = 0.1 # Simulated packet drop rate def load_config_from_env(): """ Load configuration values from environment variables. """ try: t = int(os.getenv("DIMY_T", 60)) k = int(os.getenv("DIMY_K", 3)) n = int(os.getenv("DIMY_N", 5)) ip = os.getenv("DIMY_SERVER_IP", "127.0.0.1") port = int(os.getenv("DIMY_SERVER_PORT", 5000)) return t, k, n, ip, port except Exception as e: raise RuntimeError(f"Failed to load configuration from environment: {e}") class DimyClient: """ A client implementing the DIMY privacy-preserving Bluetooth encounter tracing protocol. Records ephemeral identifiers (EphIDs), reconstructs encounters from shares, generates/rotates Bloom filters, and uploads encoded exposure data to a central server. """ def __init__(self, t, k, n, server_ip, server_port): """ Initialize client parameters, keys, Bloom filters, and network socket. Arguments: t - EphID rotation interval (in seconds) k - Minimum shares required for EphID reconstruction n - Total shares generated per EphID server_ip - IP of the central server server_port - Port to send QBF on central server """ # Safety check on protocol parameters if not (k >= 3 and n >= 5 and k < n): raise ValueError("Invalid values: Ensure that k >= 3, n >= 5, and k < n.") self.client_id = str(uuid.uuid4()) # Unique ID for server reporting self.t = t self.k = k self.n = n self.server_ip = server_ip self.server_port = int(server_port) # EphID and cryptographic state self.ephemeral_id = None self.ephemeral_id_hash = None self.shares = [] self.received_shares = defaultdict(list) # ECDH keypair for encounter ID generation self.private_key = ECC.generate(curve='P-256') self.public_key = self.private_key.public_key() # Bloom filter states self.dbf_window = deque() # Sliding window of daily Bloom filters self.dbf_interval = 5 # Seconds between DBF rotations self.max_dbfs = 6 # Number of DBFs to merge into a QBF self.lock = threading.Lock() # Thread synchronization self.uploaded_cbf = False # Once CBF is uploaded, DBF rotation ends self.encounter_ids_seen = set() # Tracks all encounters for QBF/CBF self.qbf_debug_encids = set() # Debugging data sent with QBF self.cbf_debug_ids = set() # Stores CBF EncIDs for late upload self._setup_broadcast_socket() self._start_new_dbf() self._start_dbf_rotation() self._start_qbf_generation() def _setup_broadcast_socket(self): """ Set up UDP socket for Bluetooth-like broadcasting (uses IP broadcasting). """ try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind(('', BROADCAST_PORT)) except Exception as e: raise RuntimeError(f"Failed to set up UDP socket: {e}") def _start_new_dbf(self): """ Rotates in a new, empty daily Bloom filter (DBF) into the sliding window. """ with self.lock: if len(self.dbf_window) >= self.max_dbfs: self.dbf_window.popleft() self.dbf_window.append((time.time(), BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE))) print(f"🟩 [DBF ROTATION] New DBF created. Total: {len(self.dbf_window)}") def _start_dbf_rotation(self): """ Periodically initiate new DBFs at a fixed interval (DBF rotation thread). """ def rotate(): try: while not self.uploaded_cbf: time.sleep(self.dbf_interval) self._start_new_dbf() except Exception as e: print(f"[THREAD ERROR] DBF rotation thread failed: {e}") threading.Thread(target=rotate, daemon=True).start() def _start_qbf_generation(self): """ Periodically generates and uploads a query Bloom filter (QBF) that merges all DBFs in the sliding window. """ def generate_qbf(): time.sleep(5) # Wait before first QBF try: while not self.uploaded_cbf: time.sleep(self.dbf_interval * self.max_dbfs) with self.lock: qbf = BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE) self.qbf_debug_encids = set(self.encounter_ids_seen) for _, dbf in self.dbf_window: qbf.union(dbf) print(f"🟪 [QBF] QBF created with {len(self.qbf_debug_encids)} EncIDs") self.send_qbf_to_server(qbf) except Exception as e: print(f"[THREAD ERROR] QBF generation failed: {e}") threading.Thread(target=generate_qbf, daemon=True).start() def send_qbf_to_server(self, qbf): """ Uploads the QBF containing encounter history to the analysis server. """ try: payload = self._build_payload_for_server(qbf, "qbf") payload["encids"] = list(self.qbf_debug_encids) with socket.create_connection((self.server_ip, self.server_port), timeout=10) as s: s.sendall(pickle.dumps(payload)) s.shutdown(socket.SHUT_WR) print("[DEBUG] Waiting for server response...") response = s.recv(1024).decode() print(f"[SERVER RESPONSE] Risk analysis result: {response}") except Exception as e: print(f"[ERROR] QBF send failed: {e}") def upload_cbf_to_server(self): """ Constructs and uploads the final consolidated CBF for contact upload/reporting phase. """ with self.lock: cbf = BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE) self.cbf_debug_ids = set(self.qbf_debug_encids) for encid in self.cbf_debug_ids: cbf.add(encid.encode()) cbf.count = len(self.cbf_debug_ids) payload = self._build_payload_for_server(cbf, "cbf") try: response = requests.post(f"http://{self.server_ip}:8000/upload_cbf", json=payload) if response.status_code == 200: print("🟠[UPLOAD] CBF successfully uploaded to server.") self.uploaded_cbf = True else: print(f"[UPLOAD ERROR] Server error {response.status_code}") except Exception as e: print(f"[UPLOAD ERROR] {e}") def _build_payload_for_server(self, filter_obj, filter_type): """ Serializes a Bloom filter as a JSON payload for upload. """ return { filter_type: pickle.dumps(filter_obj).hex(), "client_id": self.client_id } def insert_into_daily_bloom_filter(self, encounter_id): """ Record a new encounter ID into the most recent DBF and track for QBF/CBF use. """ with self.lock: self.qbf_debug_encids.add(encounter_id) if encounter_id in self.encounter_ids_seen: print(f"🟠 [DBF] Duplicate EncID: {encounter_id[:8]} skipped.") return self.encounter_ids_seen.add(encounter_id) if self.dbf_window: _, bf = self.dbf_window[-1] bf.add(encounter_id.encode()) print(f"🟦 [DBF] Inserted: {encounter_id[:8]}... (total: {bf.count})") def generate_ephemeral_id(self): """ Generate a new secure 32-byte EphID in hexadecimal form. """ return os.urandom(32).hex() def start_ephemeral_id_rotation(self): """ Periodically generate new EphIDs and split them into secret shares. """ def rotate(): try: while True: self.ephemeral_id = self.generate_ephemeral_id() self.ephemeral_id_hash = hashlib.sha256(self.ephemeral_id.encode()).hexdigest() self.shares = PlaintextToHexSecretSharer.split_secret(self.ephemeral_id, self.k, self.n) self.shares = [f"{self.ephemeral_id_hash}:{s}" for s in self.shares] print(f"⬜[SHARE GEN] {len(self.shares)} shares for EphID {self.ephemeral_id[:8]}") time.sleep(self.t) except Exception as e: print(f"[THREAD ERROR] EphID rotation failed: {e}") threading.Thread(target=rotate, daemon=True).start() def perform_diffie_hellman(self, peer_pubkey_str): """ Perform Elliptic Curve Diffie-Hellman to derive a shared secret (encounter ID). """ try: peer_key = ECC.import_key(peer_pubkey_str) shared_point = peer_key.pointQ * self.private_key.d shared_secret = int(shared_point.x).to_bytes(32, 'big') return SHA256.new(shared_secret).hexdigest() except Exception as e: print("[ERROR] DH Key Exchange failed:", e) return None def broadcast_shares(self): """ Continuously broadcast current EphID shares with public key. """ def send(): try: while True: pubkey_pem = self.public_key.export_key(format='PEM').replace('\n', '\\n') for share in self.shares: message = f"{share}|{pubkey_pem}" self.sock.sendto(message.encode(), (BROADCAST_IP, BROADCAST_PORT)) print(f"📡[BROADCAST] Sent share: {share[:16]}...") time.sleep(3) except Exception as e: print(f"[THREAD ERROR] Broadcast thread failed: {e}") threading.Thread(target=send, daemon=True).start() def parse_received_packet(self, msg): """ Parse and validate a received broadcast message with share and public key. Returns: (hash_val, share, pubkey) """ try: share_part, pubkey_part = msg.split("|", 1) hash_val, share = share_part.split(":", 1) pubkey = pubkey_part.replace('\\n', '\n') return hash_val, share, pubkey except Exception as e: raise ValueError(f"Malformed packet: {msg[:30]} - {e}") def listen_for_shares(self): """ Continuously listen for EphID shares. Reconstruct full EphIDs if enough shares are received. """ def listen(): try: while True: data, _ = self.sock.recvfrom(1024) msg = data.decode() # Random drop to simulate network loss if random.random() < PACKET_DROP_PROB: print("❌[DROP] Random packet drop simulated") continue try: hash_val, share, peer_pubkey = self.parse_received_packet(msg) except ValueError as e: print("[PARSE ERROR]", e) continue self.received_shares[hash_val].append(share) print(f"📥 Received share for hash {hash_val[:8]} ({len(self.received_shares[hash_val])})") if len(self.received_shares[hash_val]) >= self.k: try: ephid = PlaintextToHexSecretSharer.recover_secret(self.received_shares[hash_val][:self.k]) recovered_hash = hashlib.sha256(ephid.encode()).hexdigest() if compare_digest(recovered_hash, hash_val): enc_id = self.perform_diffie_hellman(peer_pubkey) if enc_id: print(f"🟨 Encounter ID generated: {enc_id[:8]}") self.insert_into_daily_bloom_filter(enc_id) else: print("❌[HASH MISMATCH]") except Exception as e: print("[RECONSTRUCTION ERROR]", e) finally: self.received_shares[hash_val] = [] print(f"🔁 Shares for hash {hash_val[:8]} cleared") except Exception as e: print("[ERROR] Listener failed:", e) threading.Thread(target=listen, daemon=True).start() def run(self): """ Start the full client operation: share rotation, broadcast, reception, and periodic upload. """ self.start_ephemeral_id_rotation() self.broadcast_shares() self.listen_for_shares() # Schedule contact-based filter upload after enough rotations threading.Timer(self.dbf_interval * self.max_dbfs, self.upload_cbf_to_server).start() while True: time.sleep(1) def main(): parser = argparse.ArgumentParser(description="DIMY Contact Tracing Client") parser.add_argument("--env", action="store_true", help="Parse config from environment variables") parser.add_argument("t", nargs="?", type=int, help="Ephemeral ID rotation interval in seconds") parser.add_argument("k", nargs="?", type=int, help="Minimum shares for EphID") parser.add_argument("n", nargs="?", type=int, help="Total shares per EphID") parser.add_argument("server_ip", nargs="?", type=str, help="Server IP for QBF/CBF") parser.add_argument("server_port", nargs="?", type=int, help="Server port (TCP for QBF)") args = parser.parse_args() try: if args.env: t, k, n, ip, port = load_config_from_env() else: if not all([args.t, args.k, args.n, args.server_ip, args.server_port]): parser.print_help() sys.exit(1) t, k, n, ip, port = args.t, args.k, args.n, args.server_ip, args.server_port client = DimyClient(t, k, n, ip, port) client.run() except Exception as e: print("[FATAL ERROR]", e) sys.exit(1) if __name__ == "__main__": main()