Skip to content

Instantly share code, notes, and snippets.

@spllr
Last active April 20, 2020 07:22
Show Gist options
  • Save spllr/d04c76bbc5b748bd819d53d14c9a4091 to your computer and use it in GitHub Desktop.
Save spllr/d04c76bbc5b748bd819d53d14c9a4091 to your computer and use it in GitHub Desktop.
Demo implementation of "Privacy-Preserving Contact Tracing" introduced by Apple and Google during the COVID‑19 pandemic.
#!/usr/bin/env ruby
# Usage: privacy_preserving_contact_tracing_test.rb [options]
#
# Specific options:
# -K, --tracing-key=TRACING_KEY Tracing key, 32 bytes, HEX encoded
# -DDAILY_TRACING_KEY, Daily tracing key, 16 bytes, HEX encoded
# --daily-tracing-key When provided TRACING_KEY is ignored
# -t, --time=TIME Date and/ or time to use start
# -d, --days=NUM_DAYS Number of days to generate identifiers for
# -r, --full-day Generate all identifier for the day. Ignored if -d is provided
# -f, --format=FORMAT Select output format (txt, csv)
# -o, --output=FILE Path to write output to
##
# Demo implementation of "Privacy-Preserving Contact Tracing" introduced by
# Apple and Google during the COVID‑19 pandemic.
#
# This code is written solely for the purpose of getting a better understanding
# of what is proposed.
#
# @author Klaas Speller
# @see https://www.apple.com/newsroom/2020/04/apple-and-google-partner-on-covid-19-contact-tracing-technology/
# @see https://www.apple.com/covid19/contacttracing/
#
# LICENSE:
#
# Copyright 2020 Klaas Speller <klaas@recce.nl, @spllr>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
require 'openssl'
require 'optparse'
require 'optparse/time'
require 'csv'
##
# ContactTracing Crypto
#
# The CT Module contains all functions described in the
# *ContactTracing-CryptographySpecification.pdf*.
#
# The implementation tries to stay close to the definitiions of each function.
#
# @see https://covid19-static.cdn-apple.com/applications/covid19/current/static/contact-tracing/pdf/ContactTracing-CryptographySpecification.pdf
#
module CT
TEN_MINUTES=60*10
ONE_DAY=(60 * 60 * 24)
class << self
##
# DayNumber
#
# Provides a number for each 24-hour window. These time windows are based
# on Unix Epoch Time. DayNumber
#
# (Number of Seconds since Epoch) / 60 × 60 × 24
#
# DayNumber is encoded as a 32-bit (uint32_t) unsigned little-endian value.
#
def day_number(seconds_since_epoch=Time.now.to_i)
seconds_since_epoch / (60 * 60 * 24)
end
##
# Seconds Since Start of DayNumber
#
# where Seconds Since Start of DayNumber:
#
# Number of Seconds since Epoch % (60*60*24)
#
def second_since_start_of_day(seconds_since_epoch=Time.now.to_i)
seconds_since_epoch % (60 * 60 * 24)
end
def start_of_day(seconds_since_epoch=Time.now.to_i)
seconds_since_epoch - second_since_start_of_day(seconds_since_epoch)
end
def end_of_day(seconds_since_epoch=Time.now.to_i)
start_of_day(seconds_since_epoch) + (60 * 60 * 24)
end
##
# TimeIntervalNumber
#
# Provides a number for each 10-minute window in a 24-hour window as
# defined by DayNumber. This value will be in the [0,143] interval.
#
# TimeNumberInterval = Seconds Since Start of DayNumber / (60 × 10)
#
def time_interval_number(seconds_since_epoch=Time.now.to_i)
second_since_start_of_day(seconds_since_epoch) / (60 * 10)
end
##
# CRNG designates a cryptographic random number generator.
#
# Output ← CRNG(OutputLength)
#
def CRNG(output_length)
OpenSSL::Random.random_bytes(output_length)
end
##
# HMAC
#
# HMAC designates the HMAC function as defined by IETF RFC 2104, using the
# SHA-256 hash function:
#
# Output ← HMAC(Key, Data)
#
def HMAC(key, data)
OpenSSL::HMAC.hexdigest("SHA256", key, data)
end
##
# HKDF
#
# HKDF designates the HKDF function as defined by IETF RFC 5869, using the
# SHA-256 hash function:
#
# Output ← HKDF(Key, Salt, Info, OutputLength)
#
def HKDF(key, salt, info, output_length)
OpenSSL::KDF.hkdf(key, salt: salt || '', info: info, length: output_length, hash: "SHA256")
end
##
# Truncation
#
# Truncate defines a truncation function:
# Output ← Truncate(Data, L)
#
# The Truncate function returns the first L bytes of the data.
# The input data size being greater or equal to L is a precondition.
#
def truncate(data, length)
data[(0...length)]
end
##
# Concatenation
#
# We use the symbol || to denote concatenation.
#
def concat(lhs, rhs)
lhs.to_s + rhs.to_s
end
##
# Tracing Key
#
# The Tracing Key is generated when contact tracing is enabled on the
# device and is securely stored on the device.
# The 32-byte Tracing Key is derived as follows:
#
# tk ← CRNG(32)
#
# The Tracing Key never leaves the device.
#
def generate_tracing_key
CRNG(32)
end
##
# Daily Tracing Key
#
# A Daily Tracing Key is generated for every 24-hour window where the
# protocol is advertising. From the Tracing Key, we derive the 16-byte
# Daily Tracing Key in the following way:
#
# dtki ← HKDF(tk,NULL,(UTF8("CT-DTK")||Di),16)
#
# where Di is the DayNumber for the 24-hour window the broadcast is in.
#
def generate_daily_tracing_key(tracking_key, seconds_since_epoch=Time.now.to_i)
di = day_number(seconds_since_epoch)
HKDF(tracking_key, nil, concat("CT-DTK", [di].pack("V")), 16)
end
##
# Rolling Proximity Identifier
# The Rolling Proximity Identifiers are privacy-preserving identifiers that
# are sent in Bluetooth Advertisements.
# Each time the Bluetooth MAC address changes, we derive a new Rolling
# Proximity Identifier:
#
# RPIi,j ← Truncate(HMAC(dkti,(UTF8("CT-RPI")||TINj)),16)
#
# Where:
#
# • TINj is the TimeIntervalNumber for the time at which the BLE MAC
# address changes.
#
# The 16-byte Rolling Proximity Identifier is broadcasted over Bluetooth
# LE. The use of 16-byte Contact Tracing identifiers yields a low probability
# of collisions and limits the risk of false positive matches, while keeping
# device storage requirements low.
#
def generate_rolling_proximity_identifier(tracing_key: nil, daily_tracing_key: nil, time: Time.now.to_i)
tinj = time_interval_number(time)
dtki = daily_tracing_key || generate_daily_tracing_key(tracing_key, time)
rpii_j = truncate(HMAC(dtki, concat("CT-RPI", [tinj].pack("c"))), 16)
[rpii_j, dtki, tinj]
end
end
end
##
# Options
#
options = {
format: :csv
}
OptionParser.new do |opts|
opts.banner = "Usage: privacy_preserving_contact_tracing_test.rb [options]"
opts.separator ""
opts.separator "Specific options:"
opts.on("-KTRACING_KEY", "--tracing-key=TRACING_KEY", "Tracing key, 32 bytes, HEX encoded ") do |key|
options[:tracing_key] = [key].pack("H*")
end
opts.on("-DDAILY_TRACING_KEY", "--daily-tracing-key=DAILY_TRACING_KEY", "Daily tracing key, 16 bytes, HEX encoded", "When provided TRACING_KEY is ignored") do |key|
options[:daily_tracing_key] = [key].pack("H*")
end
opts.on("-tTIME", "--time=TIME", Time, "Date and/ or time to use start") do |time|
options[:time] = time.to_i
end
opts.on("-dNUM_DAYS", "--days=NUM_DAYS", Integer, "Number of days to generate identifiers for") do |days|
options[:days] = days.to_i
end
opts.on("-r", "--full-day", "Generate all identifier for the day. Ignored if -d is provided") do |gen_all|
options[:gen_all] = gen_all
end
FORMATS=[:txt, :csv]
opts.on("-fFORMAT", "--format=FORMAT", FORMATS, "Select output format (#{FORMATS.join(", ")})") do |type|
options[:format] = type
end
opts.on("-oFILE", "--output=FILE", String, "Path to write output to") do |pathname|
options[:pathname] = File.expand_path(pathname)
end
end.parse!
def dump_txt(options, tracing_key: nil, daily_tracing_key: nil, rolling_identifier: nil, time: 0)
output = options[:output]
output.write "-------\n"
output.write "time: #{time}\n"
output.write "day number: #{CT.day_number(time)}\n"
output.write "time interval number: #{CT.time_interval_number(time)}\n"
output.write "tracing key: #{tracing_key ? tracing_key.unpack("H*")[0] : "xxxxxxxxxxxxxxxxxxx"}\n"
output.write "daily tracing key: #{daily_tracing_key.unpack("H*")[0]}\n"
output.write "rolling proximity identifier: #{rolling_identifier.unpack("H*")[0]}\n"
output.write "\n"
end
def dump_csv(options, tracing_key: nil, daily_tracing_key: nil, rolling_identifier: nil, time: 0)
output = options[:output]
output << [
(tracing_key ? tracing_key.unpack("H*")[0] : "xxxxxxxxxxxxxxxxxxx"),
daily_tracing_key.unpack("H*")[0],
rolling_identifier.unpack("H*")[0],
time.to_s,
CT.day_number(time).to_s,
CT.time_interval_number(time).to_s
]
end
##
# Dump info to the console
#
def dump_info(options, tracing_key: nil, daily_tracing_key: nil, time: 0)
rpii_j, dtki, tinj = CT.generate_rolling_proximity_identifier(
tracing_key: tracing_key,
daily_tracing_key: daily_tracing_key,
time: time)
unless options[:output] == $stdout
$stderr.write "."
end
case options[:format]
when :txt
dump_txt(options, tracing_key: tracing_key, daily_tracing_key: dtki, rolling_identifier: rpii_j, time: time)
when :csv
dump_csv(options, tracing_key: tracing_key, daily_tracing_key: dtki, rolling_identifier: rpii_j, time: time)
end
end
##
# Generate rolling proximity identifiers (RPI)
#
start_time = options[:time] || Time.now.to_i
end_time = options[:days] ? (start_time + options[:days] * CT::ONE_DAY) : start_time
dtk = options[:daily_tracing_key]
tk = dtk ? nil : (options[:tracing_key] || CT.generate_tracing_key)
time_range = options[:gen_all] ? (CT.start_of_day(start_time)...CT.end_of_day(end_time)) : (start_time...end_time)
num_items = time_range.count <= CT::TEN_MINUTES ? 1 : time_range.step(CT::TEN_MINUTES).count
$stderr.write "generating #{num_items} rolling identifiers\n"
$stderr.write "format: #{options[:format]}\n"
$stderr.write "output: #{options[:pathname] ? options[:pathname] : 'stdout'}\n\n\n"
options[:output] = case options[:format]
when :txt then (options[:pathname] ? File.open(options[:pathname], "w+") : $stdout)
when :csv
output = options[:pathname] ? CSV.open(options[:pathname], "w+", headers: true) : CSV.new($stdout, headers: true)
output << ["Tracing Key", "Daily Tracing Key", "Rolling Proximity Identifier", "Time", "Day Number", "Time Interval Number"]
output
end
if time_range.count <= CT::TEN_MINUTES
dump_info(options, tracing_key: tk, daily_tracing_key: dtk, time: time_range.first)
else
time_range.step(CT::TEN_MINUTES) do |time|
dump_info(options, tracing_key: tk, daily_tracing_key: dtk, time: time)
end
end
$stderr.write "\n\ndone\n"
options[:output].close if options[:pathname]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment