From 5601d6101e6d389044d13c966737d9080cf59ea2 Mon Sep 17 00:00:00 2001 From: Pepsi Date: Mon, 21 Apr 2025 10:09:36 +0000 Subject: [PATCH] Upload files to "/" --- Dimy.py | 369 +++++++++++++++++++++++++++++++++++++++++++++++ Dockerfile | 15 ++ requirements.txt | 8 + 3 files changed, 392 insertions(+) create mode 100644 Dimy.py create mode 100644 Dockerfile create mode 100644 requirements.txt diff --git a/Dimy.py b/Dimy.py new file mode 100644 index 0000000..a2f07aa --- /dev/null +++ b/Dimy.py @@ -0,0 +1,369 @@ +# Refactored and Documented Dimy.py - Client-side DIMY protocol implementation + +import sys +import socket +import threading +import time +import os +import random +import hashlib +import argparse +import requests +import pickle +import uuid +from collections import defaultdict, deque +from secretsharing import PlaintextToHexSecretSharer +from Crypto.PublicKey import ECC +from Crypto.Hash import SHA256 +from pybloom_live import BloomFilter +from hmac import compare_digest + + +# Constants and configuration parameters +BROADCAST_IP = '255.255.255.255' +BROADCAST_PORT = 5005 +BLOOM_CAPACITY = 1000 +BLOOM_ERROR_RATE = 0.001 +PACKET_DROP_PROB = 0.1 # Simulated packet drop rate + +def load_config_from_env(): + """ + Load configuration values from environment variables. + """ + try: + t = int(os.getenv("DIMY_T", 60)) + k = int(os.getenv("DIMY_K", 3)) + n = int(os.getenv("DIMY_N", 5)) + ip = os.getenv("DIMY_SERVER_IP", "127.0.0.1") + port = int(os.getenv("DIMY_SERVER_PORT", 5000)) + return t, k, n, ip, port + except Exception as e: + raise RuntimeError(f"Failed to load configuration from environment: {e}") + +class DimyClient: + """ + A client implementing the DIMY privacy-preserving Bluetooth encounter tracing protocol. + Records ephemeral identifiers (EphIDs), reconstructs encounters from shares, + generates/rotates Bloom filters, and uploads encoded exposure data to a central server. + """ + + def __init__(self, t, k, n, server_ip, server_port): + """ + Initialize client parameters, keys, Bloom filters, and network socket. + Arguments: + t - EphID rotation interval (in seconds) + k - Minimum shares required for EphID reconstruction + n - Total shares generated per EphID + server_ip - IP of the central server + server_port - Port to send QBF on central server + """ + + # Safety check on protocol parameters + if not (k >= 3 and n >= 5 and k < n): + raise ValueError("Invalid values: Ensure that k >= 3, n >= 5, and k < n.") + + self.client_id = str(uuid.uuid4()) # Unique ID for server reporting + self.t = t + self.k = k + self.n = n + self.server_ip = server_ip + self.server_port = int(server_port) + + # EphID and cryptographic state + self.ephemeral_id = None + self.ephemeral_id_hash = None + self.shares = [] + self.received_shares = defaultdict(list) + + # ECDH keypair for encounter ID generation + self.private_key = ECC.generate(curve='P-256') + self.public_key = self.private_key.public_key() + + # Bloom filter states + self.dbf_window = deque() # Sliding window of daily Bloom filters + self.dbf_interval = 5 # Seconds between DBF rotations + self.max_dbfs = 6 # Number of DBFs to merge into a QBF + self.lock = threading.Lock() # Thread synchronization + self.uploaded_cbf = False # Once CBF is uploaded, DBF rotation ends + self.encounter_ids_seen = set() # Tracks all encounters for QBF/CBF + self.qbf_debug_encids = set() # Debugging data sent with QBF + self.cbf_debug_ids = set() # Stores CBF EncIDs for late upload + + self._setup_broadcast_socket() + self._start_new_dbf() + self._start_dbf_rotation() + self._start_qbf_generation() + + + + def _setup_broadcast_socket(self): + """ + Set up UDP socket for Bluetooth-like broadcasting (uses IP broadcasting). + """ + try: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind(('', BROADCAST_PORT)) + except Exception as e: + raise RuntimeError(f"Failed to set up UDP socket: {e}") + + def _start_new_dbf(self): + """ + Rotates in a new, empty daily Bloom filter (DBF) into the sliding window. + """ + with self.lock: + if len(self.dbf_window) >= self.max_dbfs: + self.dbf_window.popleft() + self.dbf_window.append((time.time(), BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE))) + print(f"🟩 [DBF ROTATION] New DBF created. Total: {len(self.dbf_window)}") + + def _start_dbf_rotation(self): + """ + Periodically initiate new DBFs at a fixed interval (DBF rotation thread). + """ + def rotate(): + try: + while not self.uploaded_cbf: + time.sleep(self.dbf_interval) + self._start_new_dbf() + except Exception as e: + print(f"[THREAD ERROR] DBF rotation thread failed: {e}") + threading.Thread(target=rotate, daemon=True).start() + + def _start_qbf_generation(self): + """ + Periodically generates and uploads a query Bloom filter (QBF) + that merges all DBFs in the sliding window. + """ + def generate_qbf(): + time.sleep(5) # Wait before first QBF + try: + while not self.uploaded_cbf: + time.sleep(self.dbf_interval * self.max_dbfs) + with self.lock: + qbf = BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE) + self.qbf_debug_encids = set(self.encounter_ids_seen) + for _, dbf in self.dbf_window: + qbf.union(dbf) + print(f"🟪 [QBF] QBF created with {len(self.qbf_debug_encids)} EncIDs") + self.send_qbf_to_server(qbf) + except Exception as e: + print(f"[THREAD ERROR] QBF generation failed: {e}") + threading.Thread(target=generate_qbf, daemon=True).start() + + def send_qbf_to_server(self, qbf): + """ + Uploads the QBF containing encounter history to the analysis server. + """ + try: + payload = self._build_payload_for_server(qbf, "qbf") + payload["encids"] = list(self.qbf_debug_encids) + with socket.create_connection((self.server_ip, self.server_port), timeout=10) as s: + s.sendall(pickle.dumps(payload)) + s.shutdown(socket.SHUT_WR) + print("[DEBUG] Waiting for server response...") + response = s.recv(1024).decode() + print(f"[SERVER RESPONSE] Risk analysis result: {response}") + except Exception as e: + print(f"[ERROR] QBF send failed: {e}") + + def upload_cbf_to_server(self): + """ + Constructs and uploads the final consolidated CBF for contact upload/reporting phase. + """ + with self.lock: + cbf = BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE) + self.cbf_debug_ids = set(self.qbf_debug_encids) + for encid in self.cbf_debug_ids: + cbf.add(encid.encode()) + cbf.count = len(self.cbf_debug_ids) + payload = self._build_payload_for_server(cbf, "cbf") + + try: + response = requests.post(f"http://{self.server_ip}:8000/upload_cbf", json=payload) + if response.status_code == 200: + print("🟠[UPLOAD] CBF successfully uploaded to server.") + self.uploaded_cbf = True + else: + print(f"[UPLOAD ERROR] Server error {response.status_code}") + except Exception as e: + print(f"[UPLOAD ERROR] {e}") + + def _build_payload_for_server(self, filter_obj, filter_type): + """ + Serializes a Bloom filter as a JSON payload for upload. + """ + return { + filter_type: pickle.dumps(filter_obj).hex(), + "client_id": self.client_id + } + + def insert_into_daily_bloom_filter(self, encounter_id): + """ + Record a new encounter ID into the most recent DBF and track for QBF/CBF use. + """ + with self.lock: + self.qbf_debug_encids.add(encounter_id) + + if encounter_id in self.encounter_ids_seen: + print(f"🟠 [DBF] Duplicate EncID: {encounter_id[:8]} skipped.") + return + + self.encounter_ids_seen.add(encounter_id) + if self.dbf_window: + _, bf = self.dbf_window[-1] + bf.add(encounter_id.encode()) + print(f"🟦 [DBF] Inserted: {encounter_id[:8]}... (total: {bf.count})") + + def generate_ephemeral_id(self): + """ + Generate a new secure 32-byte EphID in hexadecimal form. + """ + return os.urandom(32).hex() + + def start_ephemeral_id_rotation(self): + """ + Periodically generate new EphIDs and split them into secret shares. + """ + def rotate(): + try: + while True: + self.ephemeral_id = self.generate_ephemeral_id() + self.ephemeral_id_hash = hashlib.sha256(self.ephemeral_id.encode()).hexdigest() + self.shares = PlaintextToHexSecretSharer.split_secret(self.ephemeral_id, self.k, self.n) + self.shares = [f"{self.ephemeral_id_hash}:{s}" for s in self.shares] + print(f"⬜[SHARE GEN] {len(self.shares)} shares for EphID {self.ephemeral_id[:8]}") + time.sleep(self.t) + except Exception as e: + print(f"[THREAD ERROR] EphID rotation failed: {e}") + threading.Thread(target=rotate, daemon=True).start() + + def perform_diffie_hellman(self, peer_pubkey_str): + """ + Perform Elliptic Curve Diffie-Hellman to derive a shared secret (encounter ID). + """ + try: + peer_key = ECC.import_key(peer_pubkey_str) + shared_point = peer_key.pointQ * self.private_key.d + shared_secret = int(shared_point.x).to_bytes(32, 'big') + return SHA256.new(shared_secret).hexdigest() + except Exception as e: + print("[ERROR] DH Key Exchange failed:", e) + return None + + def broadcast_shares(self): + """ + Continuously broadcast current EphID shares with public key. + """ + def send(): + try: + while True: + pubkey_pem = self.public_key.export_key(format='PEM').replace('\n', '\\n') + for share in self.shares: + message = f"{share}|{pubkey_pem}" + self.sock.sendto(message.encode(), (BROADCAST_IP, BROADCAST_PORT)) + print(f"📡[BROADCAST] Sent share: {share[:16]}...") + time.sleep(3) + except Exception as e: + print(f"[THREAD ERROR] Broadcast thread failed: {e}") + threading.Thread(target=send, daemon=True).start() + + def parse_received_packet(self, msg): + """ + Parse and validate a received broadcast message with share and public key. + Returns: (hash_val, share, pubkey) + """ + try: + share_part, pubkey_part = msg.split("|", 1) + hash_val, share = share_part.split(":", 1) + pubkey = pubkey_part.replace('\\n', '\n') + return hash_val, share, pubkey + except Exception as e: + raise ValueError(f"Malformed packet: {msg[:30]} - {e}") + + def listen_for_shares(self): + """ + Continuously listen for EphID shares. Reconstruct full EphIDs if enough shares are received. + """ + def listen(): + try: + while True: + data, _ = self.sock.recvfrom(1024) + msg = data.decode() + + # Random drop to simulate network loss + if random.random() < PACKET_DROP_PROB: + print("❌[DROP] Random packet drop simulated") + continue + + try: + hash_val, share, peer_pubkey = self.parse_received_packet(msg) + except ValueError as e: + print("[PARSE ERROR]", e) + continue + + self.received_shares[hash_val].append(share) + print(f"📥 Received share for hash {hash_val[:8]} ({len(self.received_shares[hash_val])})") + + if len(self.received_shares[hash_val]) >= self.k: + try: + ephid = PlaintextToHexSecretSharer.recover_secret(self.received_shares[hash_val][:self.k]) + recovered_hash = hashlib.sha256(ephid.encode()).hexdigest() + + if compare_digest(recovered_hash, hash_val): + enc_id = self.perform_diffie_hellman(peer_pubkey) + if enc_id: + print(f"🟨 Encounter ID generated: {enc_id[:8]}") + self.insert_into_daily_bloom_filter(enc_id) + else: + print("❌[HASH MISMATCH]") + except Exception as e: + print("[RECONSTRUCTION ERROR]", e) + finally: + self.received_shares[hash_val] = [] + print(f"🔁 Shares for hash {hash_val[:8]} cleared") + except Exception as e: + print("[ERROR] Listener failed:", e) + threading.Thread(target=listen, daemon=True).start() + + def run(self): + """ + Start the full client operation: share rotation, broadcast, reception, and periodic upload. + """ + self.start_ephemeral_id_rotation() + self.broadcast_shares() + self.listen_for_shares() + # Schedule contact-based filter upload after enough rotations + threading.Timer(self.dbf_interval * self.max_dbfs, self.upload_cbf_to_server).start() + while True: + time.sleep(1) +def main(): + parser = argparse.ArgumentParser(description="DIMY Contact Tracing Client") + parser.add_argument("--env", action="store_true", help="Parse config from environment variables") + parser.add_argument("t", nargs="?", type=int, help="Ephemeral ID rotation interval in seconds") + parser.add_argument("k", nargs="?", type=int, help="Minimum shares for EphID") + parser.add_argument("n", nargs="?", type=int, help="Total shares per EphID") + parser.add_argument("server_ip", nargs="?", type=str, help="Server IP for QBF/CBF") + parser.add_argument("server_port", nargs="?", type=int, help="Server port (TCP for QBF)") + + args = parser.parse_args() + + try: + if args.env: + t, k, n, ip, port = load_config_from_env() + else: + if not all([args.t, args.k, args.n, args.server_ip, args.server_port]): + parser.print_help() + sys.exit(1) + t, k, n, ip, port = args.t, args.k, args.n, args.server_ip, args.server_port + + client = DimyClient(t, k, n, ip, port) + client.run() + except Exception as e: + print("[FATAL ERROR]", e) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..60ef00f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.13 + +WORKDIR /usr/src/app + +COPY requirements.txt ./ + +RUN pip install --no-cache-dir -r requirements.txt +RUN pip install git+https://github.com/blockstack/secret-sharing +RUN rm requirements.txt + +COPY Dimy.py . + +EXPOSE 5005/udp + +CMD ["sh", "-c", "if [ \"$USE_ENV\" = \"true\" ]; then python3 Dimy.py --env; else python3 Dimy.py $T $K $N $SERVER_IP $SERVER_PORT; fi"] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7f3cc4c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +bitarray==3.3.1 +cffi==1.17.1 +cryptography==44.0.2 +pybloom_live==4.0.0 +pycparser==2.22 +xxhash==3.5.0 +requests +pycryptodome \ No newline at end of file