Upload files to "/"
This commit is contained in:
369
Dimy.py
Normal file
369
Dimy.py
Normal file
@@ -0,0 +1,369 @@
|
||||
# Refactored and Documented Dimy.py - Client-side DIMY protocol implementation
|
||||
|
||||
import sys
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
import random
|
||||
import hashlib
|
||||
import argparse
|
||||
import requests
|
||||
import pickle
|
||||
import uuid
|
||||
from collections import defaultdict, deque
|
||||
from secretsharing import PlaintextToHexSecretSharer
|
||||
from Crypto.PublicKey import ECC
|
||||
from Crypto.Hash import SHA256
|
||||
from pybloom_live import BloomFilter
|
||||
from hmac import compare_digest
|
||||
|
||||
|
||||
# Constants and configuration parameters
|
||||
BROADCAST_IP = '255.255.255.255'
|
||||
BROADCAST_PORT = 5005
|
||||
BLOOM_CAPACITY = 1000
|
||||
BLOOM_ERROR_RATE = 0.001
|
||||
PACKET_DROP_PROB = 0.1 # Simulated packet drop rate
|
||||
|
||||
def load_config_from_env():
|
||||
"""
|
||||
Load configuration values from environment variables.
|
||||
"""
|
||||
try:
|
||||
t = int(os.getenv("DIMY_T", 60))
|
||||
k = int(os.getenv("DIMY_K", 3))
|
||||
n = int(os.getenv("DIMY_N", 5))
|
||||
ip = os.getenv("DIMY_SERVER_IP", "127.0.0.1")
|
||||
port = int(os.getenv("DIMY_SERVER_PORT", 5000))
|
||||
return t, k, n, ip, port
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to load configuration from environment: {e}")
|
||||
|
||||
class DimyClient:
|
||||
"""
|
||||
A client implementing the DIMY privacy-preserving Bluetooth encounter tracing protocol.
|
||||
Records ephemeral identifiers (EphIDs), reconstructs encounters from shares,
|
||||
generates/rotates Bloom filters, and uploads encoded exposure data to a central server.
|
||||
"""
|
||||
|
||||
def __init__(self, t, k, n, server_ip, server_port):
|
||||
"""
|
||||
Initialize client parameters, keys, Bloom filters, and network socket.
|
||||
Arguments:
|
||||
t - EphID rotation interval (in seconds)
|
||||
k - Minimum shares required for EphID reconstruction
|
||||
n - Total shares generated per EphID
|
||||
server_ip - IP of the central server
|
||||
server_port - Port to send QBF on central server
|
||||
"""
|
||||
|
||||
# Safety check on protocol parameters
|
||||
if not (k >= 3 and n >= 5 and k < n):
|
||||
raise ValueError("Invalid values: Ensure that k >= 3, n >= 5, and k < n.")
|
||||
|
||||
self.client_id = str(uuid.uuid4()) # Unique ID for server reporting
|
||||
self.t = t
|
||||
self.k = k
|
||||
self.n = n
|
||||
self.server_ip = server_ip
|
||||
self.server_port = int(server_port)
|
||||
|
||||
# EphID and cryptographic state
|
||||
self.ephemeral_id = None
|
||||
self.ephemeral_id_hash = None
|
||||
self.shares = []
|
||||
self.received_shares = defaultdict(list)
|
||||
|
||||
# ECDH keypair for encounter ID generation
|
||||
self.private_key = ECC.generate(curve='P-256')
|
||||
self.public_key = self.private_key.public_key()
|
||||
|
||||
# Bloom filter states
|
||||
self.dbf_window = deque() # Sliding window of daily Bloom filters
|
||||
self.dbf_interval = 5 # Seconds between DBF rotations
|
||||
self.max_dbfs = 6 # Number of DBFs to merge into a QBF
|
||||
self.lock = threading.Lock() # Thread synchronization
|
||||
self.uploaded_cbf = False # Once CBF is uploaded, DBF rotation ends
|
||||
self.encounter_ids_seen = set() # Tracks all encounters for QBF/CBF
|
||||
self.qbf_debug_encids = set() # Debugging data sent with QBF
|
||||
self.cbf_debug_ids = set() # Stores CBF EncIDs for late upload
|
||||
|
||||
self._setup_broadcast_socket()
|
||||
self._start_new_dbf()
|
||||
self._start_dbf_rotation()
|
||||
self._start_qbf_generation()
|
||||
|
||||
|
||||
|
||||
def _setup_broadcast_socket(self):
|
||||
"""
|
||||
Set up UDP socket for Bluetooth-like broadcasting (uses IP broadcasting).
|
||||
"""
|
||||
try:
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.sock.bind(('', BROADCAST_PORT))
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to set up UDP socket: {e}")
|
||||
|
||||
def _start_new_dbf(self):
|
||||
"""
|
||||
Rotates in a new, empty daily Bloom filter (DBF) into the sliding window.
|
||||
"""
|
||||
with self.lock:
|
||||
if len(self.dbf_window) >= self.max_dbfs:
|
||||
self.dbf_window.popleft()
|
||||
self.dbf_window.append((time.time(), BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE)))
|
||||
print(f"🟩 [DBF ROTATION] New DBF created. Total: {len(self.dbf_window)}")
|
||||
|
||||
def _start_dbf_rotation(self):
|
||||
"""
|
||||
Periodically initiate new DBFs at a fixed interval (DBF rotation thread).
|
||||
"""
|
||||
def rotate():
|
||||
try:
|
||||
while not self.uploaded_cbf:
|
||||
time.sleep(self.dbf_interval)
|
||||
self._start_new_dbf()
|
||||
except Exception as e:
|
||||
print(f"[THREAD ERROR] DBF rotation thread failed: {e}")
|
||||
threading.Thread(target=rotate, daemon=True).start()
|
||||
|
||||
def _start_qbf_generation(self):
|
||||
"""
|
||||
Periodically generates and uploads a query Bloom filter (QBF)
|
||||
that merges all DBFs in the sliding window.
|
||||
"""
|
||||
def generate_qbf():
|
||||
time.sleep(5) # Wait before first QBF
|
||||
try:
|
||||
while not self.uploaded_cbf:
|
||||
time.sleep(self.dbf_interval * self.max_dbfs)
|
||||
with self.lock:
|
||||
qbf = BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE)
|
||||
self.qbf_debug_encids = set(self.encounter_ids_seen)
|
||||
for _, dbf in self.dbf_window:
|
||||
qbf.union(dbf)
|
||||
print(f"🟪 [QBF] QBF created with {len(self.qbf_debug_encids)} EncIDs")
|
||||
self.send_qbf_to_server(qbf)
|
||||
except Exception as e:
|
||||
print(f"[THREAD ERROR] QBF generation failed: {e}")
|
||||
threading.Thread(target=generate_qbf, daemon=True).start()
|
||||
|
||||
def send_qbf_to_server(self, qbf):
|
||||
"""
|
||||
Uploads the QBF containing encounter history to the analysis server.
|
||||
"""
|
||||
try:
|
||||
payload = self._build_payload_for_server(qbf, "qbf")
|
||||
payload["encids"] = list(self.qbf_debug_encids)
|
||||
with socket.create_connection((self.server_ip, self.server_port), timeout=10) as s:
|
||||
s.sendall(pickle.dumps(payload))
|
||||
s.shutdown(socket.SHUT_WR)
|
||||
print("[DEBUG] Waiting for server response...")
|
||||
response = s.recv(1024).decode()
|
||||
print(f"[SERVER RESPONSE] Risk analysis result: {response}")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] QBF send failed: {e}")
|
||||
|
||||
def upload_cbf_to_server(self):
|
||||
"""
|
||||
Constructs and uploads the final consolidated CBF for contact upload/reporting phase.
|
||||
"""
|
||||
with self.lock:
|
||||
cbf = BloomFilter(capacity=BLOOM_CAPACITY, error_rate=BLOOM_ERROR_RATE)
|
||||
self.cbf_debug_ids = set(self.qbf_debug_encids)
|
||||
for encid in self.cbf_debug_ids:
|
||||
cbf.add(encid.encode())
|
||||
cbf.count = len(self.cbf_debug_ids)
|
||||
payload = self._build_payload_for_server(cbf, "cbf")
|
||||
|
||||
try:
|
||||
response = requests.post(f"http://{self.server_ip}:8000/upload_cbf", json=payload)
|
||||
if response.status_code == 200:
|
||||
print("🟠[UPLOAD] CBF successfully uploaded to server.")
|
||||
self.uploaded_cbf = True
|
||||
else:
|
||||
print(f"[UPLOAD ERROR] Server error {response.status_code}")
|
||||
except Exception as e:
|
||||
print(f"[UPLOAD ERROR] {e}")
|
||||
|
||||
def _build_payload_for_server(self, filter_obj, filter_type):
|
||||
"""
|
||||
Serializes a Bloom filter as a JSON payload for upload.
|
||||
"""
|
||||
return {
|
||||
filter_type: pickle.dumps(filter_obj).hex(),
|
||||
"client_id": self.client_id
|
||||
}
|
||||
|
||||
def insert_into_daily_bloom_filter(self, encounter_id):
|
||||
"""
|
||||
Record a new encounter ID into the most recent DBF and track for QBF/CBF use.
|
||||
"""
|
||||
with self.lock:
|
||||
self.qbf_debug_encids.add(encounter_id)
|
||||
|
||||
if encounter_id in self.encounter_ids_seen:
|
||||
print(f"🟠 [DBF] Duplicate EncID: {encounter_id[:8]} skipped.")
|
||||
return
|
||||
|
||||
self.encounter_ids_seen.add(encounter_id)
|
||||
if self.dbf_window:
|
||||
_, bf = self.dbf_window[-1]
|
||||
bf.add(encounter_id.encode())
|
||||
print(f"🟦 [DBF] Inserted: {encounter_id[:8]}... (total: {bf.count})")
|
||||
|
||||
def generate_ephemeral_id(self):
|
||||
"""
|
||||
Generate a new secure 32-byte EphID in hexadecimal form.
|
||||
"""
|
||||
return os.urandom(32).hex()
|
||||
|
||||
def start_ephemeral_id_rotation(self):
|
||||
"""
|
||||
Periodically generate new EphIDs and split them into secret shares.
|
||||
"""
|
||||
def rotate():
|
||||
try:
|
||||
while True:
|
||||
self.ephemeral_id = self.generate_ephemeral_id()
|
||||
self.ephemeral_id_hash = hashlib.sha256(self.ephemeral_id.encode()).hexdigest()
|
||||
self.shares = PlaintextToHexSecretSharer.split_secret(self.ephemeral_id, self.k, self.n)
|
||||
self.shares = [f"{self.ephemeral_id_hash}:{s}" for s in self.shares]
|
||||
print(f"⬜[SHARE GEN] {len(self.shares)} shares for EphID {self.ephemeral_id[:8]}")
|
||||
time.sleep(self.t)
|
||||
except Exception as e:
|
||||
print(f"[THREAD ERROR] EphID rotation failed: {e}")
|
||||
threading.Thread(target=rotate, daemon=True).start()
|
||||
|
||||
def perform_diffie_hellman(self, peer_pubkey_str):
|
||||
"""
|
||||
Perform Elliptic Curve Diffie-Hellman to derive a shared secret (encounter ID).
|
||||
"""
|
||||
try:
|
||||
peer_key = ECC.import_key(peer_pubkey_str)
|
||||
shared_point = peer_key.pointQ * self.private_key.d
|
||||
shared_secret = int(shared_point.x).to_bytes(32, 'big')
|
||||
return SHA256.new(shared_secret).hexdigest()
|
||||
except Exception as e:
|
||||
print("[ERROR] DH Key Exchange failed:", e)
|
||||
return None
|
||||
|
||||
def broadcast_shares(self):
|
||||
"""
|
||||
Continuously broadcast current EphID shares with public key.
|
||||
"""
|
||||
def send():
|
||||
try:
|
||||
while True:
|
||||
pubkey_pem = self.public_key.export_key(format='PEM').replace('\n', '\\n')
|
||||
for share in self.shares:
|
||||
message = f"{share}|{pubkey_pem}"
|
||||
self.sock.sendto(message.encode(), (BROADCAST_IP, BROADCAST_PORT))
|
||||
print(f"📡[BROADCAST] Sent share: {share[:16]}...")
|
||||
time.sleep(3)
|
||||
except Exception as e:
|
||||
print(f"[THREAD ERROR] Broadcast thread failed: {e}")
|
||||
threading.Thread(target=send, daemon=True).start()
|
||||
|
||||
def parse_received_packet(self, msg):
|
||||
"""
|
||||
Parse and validate a received broadcast message with share and public key.
|
||||
Returns: (hash_val, share, pubkey)
|
||||
"""
|
||||
try:
|
||||
share_part, pubkey_part = msg.split("|", 1)
|
||||
hash_val, share = share_part.split(":", 1)
|
||||
pubkey = pubkey_part.replace('\\n', '\n')
|
||||
return hash_val, share, pubkey
|
||||
except Exception as e:
|
||||
raise ValueError(f"Malformed packet: {msg[:30]} - {e}")
|
||||
|
||||
def listen_for_shares(self):
|
||||
"""
|
||||
Continuously listen for EphID shares. Reconstruct full EphIDs if enough shares are received.
|
||||
"""
|
||||
def listen():
|
||||
try:
|
||||
while True:
|
||||
data, _ = self.sock.recvfrom(1024)
|
||||
msg = data.decode()
|
||||
|
||||
# Random drop to simulate network loss
|
||||
if random.random() < PACKET_DROP_PROB:
|
||||
print("❌[DROP] Random packet drop simulated")
|
||||
continue
|
||||
|
||||
try:
|
||||
hash_val, share, peer_pubkey = self.parse_received_packet(msg)
|
||||
except ValueError as e:
|
||||
print("[PARSE ERROR]", e)
|
||||
continue
|
||||
|
||||
self.received_shares[hash_val].append(share)
|
||||
print(f"📥 Received share for hash {hash_val[:8]} ({len(self.received_shares[hash_val])})")
|
||||
|
||||
if len(self.received_shares[hash_val]) >= self.k:
|
||||
try:
|
||||
ephid = PlaintextToHexSecretSharer.recover_secret(self.received_shares[hash_val][:self.k])
|
||||
recovered_hash = hashlib.sha256(ephid.encode()).hexdigest()
|
||||
|
||||
if compare_digest(recovered_hash, hash_val):
|
||||
enc_id = self.perform_diffie_hellman(peer_pubkey)
|
||||
if enc_id:
|
||||
print(f"🟨 Encounter ID generated: {enc_id[:8]}")
|
||||
self.insert_into_daily_bloom_filter(enc_id)
|
||||
else:
|
||||
print("❌[HASH MISMATCH]")
|
||||
except Exception as e:
|
||||
print("[RECONSTRUCTION ERROR]", e)
|
||||
finally:
|
||||
self.received_shares[hash_val] = []
|
||||
print(f"🔁 Shares for hash {hash_val[:8]} cleared")
|
||||
except Exception as e:
|
||||
print("[ERROR] Listener failed:", e)
|
||||
threading.Thread(target=listen, daemon=True).start()
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Start the full client operation: share rotation, broadcast, reception, and periodic upload.
|
||||
"""
|
||||
self.start_ephemeral_id_rotation()
|
||||
self.broadcast_shares()
|
||||
self.listen_for_shares()
|
||||
# Schedule contact-based filter upload after enough rotations
|
||||
threading.Timer(self.dbf_interval * self.max_dbfs, self.upload_cbf_to_server).start()
|
||||
while True:
|
||||
time.sleep(1)
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="DIMY Contact Tracing Client")
|
||||
parser.add_argument("--env", action="store_true", help="Parse config from environment variables")
|
||||
parser.add_argument("t", nargs="?", type=int, help="Ephemeral ID rotation interval in seconds")
|
||||
parser.add_argument("k", nargs="?", type=int, help="Minimum shares for EphID")
|
||||
parser.add_argument("n", nargs="?", type=int, help="Total shares per EphID")
|
||||
parser.add_argument("server_ip", nargs="?", type=str, help="Server IP for QBF/CBF")
|
||||
parser.add_argument("server_port", nargs="?", type=int, help="Server port (TCP for QBF)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
if args.env:
|
||||
t, k, n, ip, port = load_config_from_env()
|
||||
else:
|
||||
if not all([args.t, args.k, args.n, args.server_ip, args.server_port]):
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
t, k, n, ip, port = args.t, args.k, args.n, args.server_ip, args.server_port
|
||||
|
||||
client = DimyClient(t, k, n, ip, port)
|
||||
client.run()
|
||||
except Exception as e:
|
||||
print("[FATAL ERROR]", e)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
15
Dockerfile
Normal file
15
Dockerfile
Normal file
@@ -0,0 +1,15 @@
|
||||
FROM python:3.13
|
||||
|
||||
WORKDIR /usr/src/app
|
||||
|
||||
COPY requirements.txt ./
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
RUN pip install git+https://github.com/blockstack/secret-sharing
|
||||
RUN rm requirements.txt
|
||||
|
||||
COPY Dimy.py .
|
||||
|
||||
EXPOSE 5005/udp
|
||||
|
||||
CMD ["sh", "-c", "if [ \"$USE_ENV\" = \"true\" ]; then python3 Dimy.py --env; else python3 Dimy.py $T $K $N $SERVER_IP $SERVER_PORT; fi"]
|
||||
8
requirements.txt
Normal file
8
requirements.txt
Normal file
@@ -0,0 +1,8 @@
|
||||
bitarray==3.3.1
|
||||
cffi==1.17.1
|
||||
cryptography==44.0.2
|
||||
pybloom_live==4.0.0
|
||||
pycparser==2.22
|
||||
xxhash==3.5.0
|
||||
requests
|
||||
pycryptodome
|
||||
Reference in New Issue
Block a user