Files
4337-client/Dimy.py

369 lines
15 KiB
Python
Raw Normal View History

2025-04-21 10:09:36 +00:00
# Refactored and Documented Dimy.py - Client-side DIMY protocol implementation
import sys
import socket
import threading
import time
import os
import random
import hashlib
import argparse
import requests
import pickle
import uuid
from collections import defaultdict, deque
from secretsharing import PlaintextToHexSecretSharer
from Crypto.PublicKey import ECC
from Crypto.Hash import SHA256
from pybloom_live import BloomFilter
from hmac import compare_digest
# Constants and configuration parameters
BROADCAST_IP = '255.255.255.255'
BROADCAST_PORT = 5005
BLOOM_CAPACITY = 1000
BLOOM_ERROR_RATE = 0.001
PACKET_DROP_PROB = 0.1 # Simulated packet drop rate
def load_config_from_env():
"""
Load configuration values from environment variables.
"""
try:
t = int(os.getenv("DIMY_T", 60))
k = int(os.getenv("DIMY_K", 3))
n = int(os.getenv("DIMY_N", 5))
ip = os.getenv("DIMY_SERVER_IP", "127.0.0.1")
port = int(os.getenv("DIMY_SERVER_PORT", 5000))
return t, k, n, ip, port
except Exception as e:
raise RuntimeError(f"Failed to load configuration from environment: {e}")
class DimyClient:
"""
A client implementing the DIMY privacy-preserving Bluetooth encounter tracing protocol.
Records ephemeral identifiers (EphIDs), reconstructs encounters from shares,
generates/rotates Bloom filters, and uploads encoded exposure data to a central server.
"""
def __init__(self, t, k, n, server_ip, server_port):
"""
Initialize client parameters, keys, Bloom filters, and network socket.
Arguments:
t - EphID rotation interval (in seconds)
k - Minimum shares required for EphID reconstruction
n - Total shares generated per EphID
server_ip - IP of the central server
server_port - Port to send QBF on central server
"""
# Safety check on protocol parameters
if not (k >= 3 and n >= 5 and k < n):
raise ValueError("Invalid values: Ensure that k >= 3, n >= 5, and k < n.")
self.client_id = str(uuid.uuid4()) # Unique ID for server reporting
self.t = t
self.k = k
self.n = n
self.server_ip = server_ip
self.server_port = int(server_port)
# EphID and cryptographic state
self.ephemeral_id = None
self.ephemeral_id_hash = None
self.shares = []
self.received_shares = defaultdict(list)
# ECDH keypair for encounter ID generation
self.private_key = ECC.generate(curve='P-256')
self.public_key = self.private_key.public_key()
# Bloom filter states
self.dbf_window = deque() # Sliding window of daily Bloom filters
self.dbf_interval = 5 # Seconds between DBF rotations
self.max_dbfs = 6 # Number of DBFs to merge into a QBF
self.lock = threading.Lock() # Thread synchronization
self.uploaded_cbf = False # Once CBF is uploaded, DBF rotation ends
self.encounter_ids_seen = set() # Tracks all encounters for QBF/CBF
self.qbf_debug_encids = set() # Debugging data sent with QBF
self.cbf_debug_ids = set() # Stores CBF EncIDs for late upload
self._setup_broadcast_socket()
self._start_new_dbf()
self._start_dbf_rotation()
self._start_qbf_generation()
def _setup_broadcast_socket(self):
"""
Set up UDP socket for Bluetooth-like broadcasting (uses IP broadcasting).
"""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(('', BROADCAST_PORT))
except Exception as e:
raise RuntimeError(f"Failed to set up UDP socket: {e}")
def _start_new_dbf(self):
"""
Rotates in a new, empty daily Bloom filter (DBF) into the sliding window.
"""
with self.lock:
if len(self.dbf_window) >= self.max_dbfs:
self.dbf_window.popleft()
self.dbf_window.append((time.time(), BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE)))
print(f"🟩 [DBF ROTATION] New DBF created. Total: {len(self.dbf_window)}")
def _start_dbf_rotation(self):
"""
Periodically initiate new DBFs at a fixed interval (DBF rotation thread).
"""
def rotate():
try:
while not self.uploaded_cbf:
time.sleep(self.dbf_interval)
self._start_new_dbf()
except Exception as e:
print(f"[THREAD ERROR] DBF rotation thread failed: {e}")
threading.Thread(target=rotate, daemon=True).start()
def _start_qbf_generation(self):
"""
Periodically generates and uploads a query Bloom filter (QBF)
that merges all DBFs in the sliding window.
"""
def generate_qbf():
time.sleep(5) # Wait before first QBF
try:
while not self.uploaded_cbf:
time.sleep(self.dbf_interval * self.max_dbfs)
with self.lock:
qbf = BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE)
self.qbf_debug_encids = set(self.encounter_ids_seen)
for _, dbf in self.dbf_window:
qbf.union(dbf)
print(f"🟪 [QBF] QBF created with {len(self.qbf_debug_encids)} EncIDs")
self.send_qbf_to_server(qbf)
except Exception as e:
print(f"[THREAD ERROR] QBF generation failed: {e}")
threading.Thread(target=generate_qbf, daemon=True).start()
def send_qbf_to_server(self, qbf):
"""
Uploads the QBF containing encounter history to the analysis server.
"""
try:
payload = self._build_payload_for_server(qbf, "qbf")
payload["encids"] = list(self.qbf_debug_encids)
with socket.create_connection((self.server_ip, self.server_port), timeout=10) as s:
s.sendall(pickle.dumps(payload))
s.shutdown(socket.SHUT_WR)
print("[DEBUG] Waiting for server response...")
response = s.recv(1024).decode()
print(f"[SERVER RESPONSE] Risk analysis result: {response}")
except Exception as e:
print(f"[ERROR] QBF send failed: {e}")
def upload_cbf_to_server(self):
"""
Constructs and uploads the final consolidated CBF for contact upload/reporting phase.
"""
with self.lock:
cbf = BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE)
self.cbf_debug_ids = set(self.qbf_debug_encids)
for encid in self.cbf_debug_ids:
cbf.add(encid.encode())
cbf.count = len(self.cbf_debug_ids)
payload = self._build_payload_for_server(cbf, "cbf")
try:
response = requests.post(f"http://{self.server_ip}:8000/upload_cbf", json=payload)
if response.status_code == 200:
print("🟠[UPLOAD] CBF successfully uploaded to server.")
self.uploaded_cbf = True
else:
print(f"[UPLOAD ERROR] Server error {response.status_code}")
except Exception as e:
print(f"[UPLOAD ERROR] {e}")
def _build_payload_for_server(self, filter_obj, filter_type):
"""
Serializes a Bloom filter as a JSON payload for upload.
"""
return {
filter_type: pickle.dumps(filter_obj).hex(),
"client_id": self.client_id
}
def insert_into_daily_bloom_filter(self, encounter_id):
"""
Record a new encounter ID into the most recent DBF and track for QBF/CBF use.
"""
with self.lock:
self.qbf_debug_encids.add(encounter_id)
if encounter_id in self.encounter_ids_seen:
print(f"🟠 [DBF] Duplicate EncID: {encounter_id[:8]} skipped.")
return
self.encounter_ids_seen.add(encounter_id)
if self.dbf_window:
_, bf = self.dbf_window[-1]
bf.add(encounter_id.encode())
print(f"🟦 [DBF] Inserted: {encounter_id[:8]}... (total: {bf.count})")
def generate_ephemeral_id(self):
"""
Generate a new secure 32-byte EphID in hexadecimal form.
"""
return os.urandom(32).hex()
def start_ephemeral_id_rotation(self):
"""
Periodically generate new EphIDs and split them into secret shares.
"""
def rotate():
try:
while True:
self.ephemeral_id = self.generate_ephemeral_id()
self.ephemeral_id_hash = hashlib.sha256(self.ephemeral_id.encode()).hexdigest()
self.shares = PlaintextToHexSecretSharer.split_secret(self.ephemeral_id, self.k, self.n)
self.shares = [f"{self.ephemeral_id_hash}:{s}" for s in self.shares]
print(f"⬜[SHARE GEN] {len(self.shares)} shares for EphID {self.ephemeral_id[:8]}")
time.sleep(self.t)
except Exception as e:
print(f"[THREAD ERROR] EphID rotation failed: {e}")
threading.Thread(target=rotate, daemon=True).start()
def perform_diffie_hellman(self, peer_pubkey_str):
"""
Perform Elliptic Curve Diffie-Hellman to derive a shared secret (encounter ID).
"""
try:
peer_key = ECC.import_key(peer_pubkey_str)
shared_point = peer_key.pointQ * self.private_key.d
shared_secret = int(shared_point.x).to_bytes(32, 'big')
return SHA256.new(shared_secret).hexdigest()
except Exception as e:
print("[ERROR] DH Key Exchange failed:", e)
return None
def broadcast_shares(self):
"""
Continuously broadcast current EphID shares with public key.
"""
def send():
try:
while True:
pubkey_pem = self.public_key.export_key(format='PEM').replace('\n', '\\n')
for share in self.shares:
message = f"{share}|{pubkey_pem}"
self.sock.sendto(message.encode(), (BROADCAST_IP, BROADCAST_PORT))
print(f"📡[BROADCAST] Sent share: {share[:16]}...")
time.sleep(3)
except Exception as e:
print(f"[THREAD ERROR] Broadcast thread failed: {e}")
threading.Thread(target=send, daemon=True).start()
def parse_received_packet(self, msg):
"""
Parse and validate a received broadcast message with share and public key.
Returns: (hash_val, share, pubkey)
"""
try:
share_part, pubkey_part = msg.split("|", 1)
hash_val, share = share_part.split(":", 1)
pubkey = pubkey_part.replace('\\n', '\n')
return hash_val, share, pubkey
except Exception as e:
raise ValueError(f"Malformed packet: {msg[:30]} - {e}")
def listen_for_shares(self):
"""
Continuously listen for EphID shares. Reconstruct full EphIDs if enough shares are received.
"""
def listen():
try:
while True:
data, _ = self.sock.recvfrom(1024)
msg = data.decode()
# Random drop to simulate network loss
if random.random() < PACKET_DROP_PROB:
print("❌[DROP] Random packet drop simulated")
continue
try:
hash_val, share, peer_pubkey = self.parse_received_packet(msg)
except ValueError as e:
print("[PARSE ERROR]", e)
continue
self.received_shares[hash_val].append(share)
print(f"📥 Received share for hash {hash_val[:8]} ({len(self.received_shares[hash_val])})")
if len(self.received_shares[hash_val]) >= self.k:
try:
ephid = PlaintextToHexSecretSharer.recover_secret(self.received_shares[hash_val][:self.k])
recovered_hash = hashlib.sha256(ephid.encode()).hexdigest()
if compare_digest(recovered_hash, hash_val):
enc_id = self.perform_diffie_hellman(peer_pubkey)
if enc_id:
print(f"🟨 Encounter ID generated: {enc_id[:8]}")
self.insert_into_daily_bloom_filter(enc_id)
else:
print("❌[HASH MISMATCH]")
except Exception as e:
print("[RECONSTRUCTION ERROR]", e)
finally:
self.received_shares[hash_val] = []
print(f"🔁 Shares for hash {hash_val[:8]} cleared")
except Exception as e:
print("[ERROR] Listener failed:", e)
threading.Thread(target=listen, daemon=True).start()
def run(self):
"""
Start the full client operation: share rotation, broadcast, reception, and periodic upload.
"""
self.start_ephemeral_id_rotation()
self.broadcast_shares()
self.listen_for_shares()
# Schedule contact-based filter upload after enough rotations
threading.Timer(self.dbf_interval * self.max_dbfs, self.upload_cbf_to_server).start()
while True:
time.sleep(1)
def main():
parser = argparse.ArgumentParser(description="DIMY Contact Tracing Client")
parser.add_argument("--env", action="store_true", help="Parse config from environment variables")
parser.add_argument("t", nargs="?", type=int, help="Ephemeral ID rotation interval in seconds")
parser.add_argument("k", nargs="?", type=int, help="Minimum shares for EphID")
parser.add_argument("n", nargs="?", type=int, help="Total shares per EphID")
parser.add_argument("server_ip", nargs="?", type=str, help="Server IP for QBF/CBF")
parser.add_argument("server_port", nargs="?", type=int, help="Server port (TCP for QBF)")
args = parser.parse_args()
try:
if args.env:
t, k, n, ip, port = load_config_from_env()
else:
if not all([args.t, args.k, args.n, args.server_ip, args.server_port]):
parser.print_help()
sys.exit(1)
t, k, n, ip, port = args.t, args.k, args.n, args.server_ip, args.server_port
client = DimyClient(t, k, n, ip, port)
client.run()
except Exception as e:
print("[FATAL ERROR]", e)
sys.exit(1)
if __name__ == "__main__":
main()