Skip to content

Instantly share code, notes, and snippets.

@tomato42
Created October 15, 2023 18:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomato42/46b8f6a1a05fb894d3a26b3ace6d6df1 to your computer and use it in GitHub Desktop.
Save tomato42/46b8f6a1a05fb894d3a26b3ace6d6df1 to your computer and use it in GitHub Desktop.
Example server checking password with a timing side-channel and a tlsfuzzer script to attack it
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <byteswap.h>
void doprocessing (int sock, const char *password);
int main( int argc, char *argv[] ) {
int sockfd, newsockfd, portno;
struct sockaddr_in serv_addr, cli_addr;
unsigned int clilen;
/* simple obfuscation of the password */
char ciphertext[32] = "\x4b\x71\xbc\xe8\x41\xf0\xbf\x96\xff\xf4\x09\xc2\x6c\x1b\x44\x63\xa2\xfc\xda\xb7\xd1\x23\x96\x25\x98\xca\xfb\xe9\x70\xc4\x15\x55";
char key[32] = "\x26\x10\xce\x9e\x28\x9e\xbf\x96\x3c\xd1\x93\x39\xb7\xb6\xdd\x31\xc1\xd9\xad\x49\x00\x82\x1c\xc5\xcc\x30\x49\x77\x3d\x8b\x39\x14";
char password[32];
for (int i=0; i<32; i++)
password[i] = ciphertext[i] ^ key[i];
/* First call to socket() function */
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("ERROR opening socket");
exit(1);
}
int enable = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0){
perror("ERROR can't set SO_REUSEADDR");
exit(1);
}
/* Initialize socket structure */
bzero((char *) &serv_addr, sizeof(serv_addr));
portno = 5001;
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(portno);
/* Now bind the host address using bind() call.*/
if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
perror("ERROR on binding");
exit(1);
}
/* Now start listening for the clients, here
* process will go in sleep mode and will wait
* for the incoming connection
*/
listen(sockfd, 5);
clilen = sizeof(cli_addr);
while (1) {
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if (newsockfd < 0) {
perror("ERROR on accept");
exit(1);
}
doprocessing(newsockfd, password);
} /* end of while */
}
void doprocessing (int sock, const char *password) {
int ret;
char data[33];
char *a;
const char *b;
ret = read(sock, data, 32);
if (ret < 0) {
perror("ERROR reading from socket");
exit(1);
}
data[ret] = '\x00';
/* check if all the letters match */
for (a=data, b=password; *a != '\n' && *a != '\x00' && *b != '\n' && *b != '\x00'; a++, b++) {
if (*a != *b) {
goto end;
}
}
/* check if length matches */
if (!((*a == '\n' || *a == '\x00') && (*b == '\n' || *b == '\x00'))) {
goto end;
}
/* provide the secret if the password was correct */
ret = write(sock, "secret\n", 7);
if (ret < 0) {
perror("ERROR writing to socket");
exit(1);
}
end:
/* reply with a TLS Alert message for ease of analysis with tlsfuzzer */
ret = write(sock, "\x15\x03\x03\x00\x02\x02\x28", 7);
if (ret < 0) {
perror("ERROR writing to socket");
exit(1);
}
ret = shutdown(sock, SHUT_RDWR);
if (ret < 0) {
perror("ERROR closing socket");
exit(1);
}
close(sock);
}
# Author: Hubert Kario, (c) 2023
# Released under Gnu GPL v2.0, see LICENSE file for details
"""Timing tester of a password server."""
from __future__ import print_function
import traceback
import sys
import getopt
from itertools import chain, repeat
from random import sample
import os
from tlsfuzzer.runner import Runner
from tlsfuzzer.timing_runner import TimingRunner
from tlsfuzzer.messages import Connect, ClientHelloGenerator, \
ClientKeyExchangeGenerator, ChangeCipherSpecGenerator, \
FinishedGenerator, ApplicationDataGenerator, AlertGenerator, \
TCPBufferingEnable, TCPBufferingDisable, TCPBufferingFlush, fuzz_mac, \
fuzz_padding, RawSocketWriteGenerator
from tlsfuzzer.expect import ExpectServerHello, ExpectCertificate, \
ExpectServerHelloDone, ExpectChangeCipherSpec, ExpectFinished, \
ExpectAlert, ExpectClose
from tlslite.constants import CipherSuite, AlertLevel, AlertDescription, \
ExtensionType
from tlslite.utils.dns_utils import is_valid_hostname
from tlslite.extensions import SNIExtension, SignatureAlgorithmsCertExtension,\
SignatureAlgorithmsExtension
from tlsfuzzer.utils.lists import natural_sort_keys
from tlsfuzzer.utils.ordered_dict import OrderedDict
from tlsfuzzer.helpers import SIG_ALL, RSA_PKCS1_ALL
from tlsfuzzer.utils.statics import WARM_UP
from tlsfuzzer.utils.log import Log
version = 1
def help_msg():
print("Usage: <script-name> [-h hostname] [-p port] [[probe-name] ...]")
print(" -h hostname name of the host to run the test against")
print(" localhost by default")
print(" -p port port number to use for connection, 4433 by default")
print(" probe-name if present, will run only the probes with given")
print(" names and not all of them, e.g \"sanity\"")
print(" -e probe-name exclude the probe from the list of the ones run")
print(" may be specified multiple times")
print(" -x probe-name expect the probe to fail. When such probe passes despite being marked like this")
print(" it will be reported in the test summary and the whole script will fail.")
print(" May be specified multiple times.")
print(" -X message expect the `message` substring in exception raised during")
print(" execution of preceding expected failure probe")
print(" usage: [-x probe-name] [-X exception], order is compulsory!")
print(" -n num run 'num' or all(if 0) tests instead of default(50)")
print(" (excluding \"sanity\" tests)")
print(" -a desc the expected alert description for invalid Finished")
print(" messages - 20 (bad_record_mac) by default")
print(" Note: other values are NOT RFC compliant!")
print(" -l level the expected alert level for invalid Finished")
print(" - 2 (fatal) by default")
print(" Note: other values are NOT RFC compliant!")
print(" -i interface Allows recording timing information")
print(" on specified interface. Required to enable timing tests")
print(" -o dir Specifies output directory for timing information")
print(" /tmp by default")
print(" --base str Already guessed letters")
print(" --repeat rep How many timing samples should be gathered for each test")
print(" 100 by default")
print(" --no-safe-renego Allow the server not to support safe")
print(" renegotiation extension")
print(" --no-sni do not send server name extension.")
print(" Sends extension by default if the hostname is a")
print(" valid DNS name, not an IP address")
print(" --cpu-list Set the CPU affinity for the tcpdump process")
print(" See taskset(1) man page for the syntax of this")
print(" option. Not used by default.")
print(" --byte-len Number of bytes to encode the values in, 1 by default")
print(" --baseline Number sent for the base measurement, 10 by default")
print(" --difference Number that the baseline is modified to create signal, 1 by default")
print(" --no-quickack Don't assume that QUICKACK is in use.")
print(" --no-alert Don't expect the server to send an alert before closing connection.")
print(" --help this message")
def main():
"""Check if server doesn't process password byte by byte"""
host = "localhost"
port = 5001
num_limit = None
run_exclude = set()
expected_failures = {}
last_exp_tmp = None
alert = AlertDescription.bad_record_mac
level = AlertLevel.fatal
srv_extensions = {ExtensionType.renegotiation_info: None}
no_sni = False
repetitions = 100
interface = None
timing = False
outdir = "/tmp"
cipher = CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA
affinity = None
byte_len = 0
baseline = 10
difference = 1
no_alert = False
no_quickack = False
base = ""
argv = sys.argv[1:]
opts, args = getopt.getopt(argv,
"h:p:e:x:X:n:a:l:l:o:i:",
["help",
"no-safe-renego",
"no-sni",
"repeat=",
"cpu-list=",
"baseline=",
"difference=",
"base=",
"no-quickack",
"no-alert"])
for opt, arg in opts:
if opt == '-h':
host = arg
elif opt == '-p':
port = int(arg)
elif opt == '-e':
run_exclude.add(arg)
elif opt == '-x':
expected_failures[arg] = None
last_exp_tmp = str(arg)
elif opt == '-X':
if not last_exp_tmp:
raise ValueError("-x has to be specified before -X")
expected_failures[last_exp_tmp] = str(arg)
elif opt == '-n':
num_limit = int(arg)
elif opt == '-a':
alert = int(arg)
elif opt == '-l':
level = int(arg)
elif opt == "-i":
timing = True
interface = arg
elif opt == '-o':
outdir = arg
elif opt == "--repeat":
repetitions = int(arg)
elif opt == "--no-safe-renego":
srv_extensions = None
elif opt == "--no-sni":
no_sni = True
elif opt == "--cpu-list":
affinity = arg
elif opt == "--byte-len":
byte_len = int(arg)
if byte_len < 1:
raise ValueError("--byte-len must be a positive integer")
elif opt == "--baseline":
baseline = int(arg)
if baseline < 0:
raise ValueError("--baseline can't be negative")
elif opt == "--difference":
difference = int(arg)
if difference < 0:
raise ValueError("--difference can't be negative")
elif opt == "--no-quickack":
no_quickack = True
elif opt == "--base":
base = arg
elif opt == '--help':
help_msg()
sys.exit(0)
elif opt == "--no-alert":
no_alert = True
else:
raise ValueError("Unknown option: {0}".format(opt))
if args:
run_only = set(args)
else:
run_only = None
conversations = OrderedDict()
generators = OrderedDict()
conversation = Connect(host, port)
node = conversation
node = node.add_child(RawSocketWriteGenerator(b"X"))
node = node.add_child(ExpectAlert())
node.add_child(ExpectClose())
conversations["sanity"] = conversation
conversation = Connect(host, port)
node = conversation
byte_len = len(base) + 1
node = node.add_child(RawSocketWriteGenerator(bytes(base + 'a', 'utf-8')))
gen_node = node
node = node.add_child(ExpectAlert())
node.add_child(ExpectClose())
conversations["a - canary"] = conversation
generators["a - canary"] = gen_node
for i in range(26):
lttr = chr(ord('a')+i)
conversation = Connect(host, port)
node = conversation
node = node.add_child(RawSocketWriteGenerator(bytes(base + chr(ord('a')+i), 'utf-8')))
gen_node = node
node = node.add_child(ExpectAlert())
node.add_child(ExpectClose())
conversations["{0}".format(lttr)] = conversation
generators["{0}".format(lttr)] = gen_node
# run the conversation
good = 0
bad = 0
xfail = 0
xpass = 0
failed = []
xpassed = []
if not num_limit:
num_limit = len(conversations)
# make sure that sanity test is run first and last
# to verify that server was running and kept running throughout
sanity_tests = [('sanity', conversations['sanity'])]
if run_only:
if num_limit > len(run_only):
num_limit = len(run_only)
regular_tests = [(k, v) for k, v in conversations.items() if k in run_only]
else:
regular_tests = [(k, v) for k, v in conversations.items() if
(k != 'sanity') and k not in run_exclude]
if num_limit < len(conversations):
sampled_tests = sample(regular_tests, min(num_limit, len(regular_tests)))
else:
sampled_tests = regular_tests
ordered_tests = chain(sanity_tests, sampled_tests, sanity_tests)
for c_name, c_test in ordered_tests:
print("{0} ...".format(c_name))
runner = Runner(c_test)
res = True
exception = None
try:
runner.run()
except Exception as exp:
exception = exp
print("Error while processing")
print(traceback.format_exc())
res = False
if c_name in expected_failures:
if res:
xpass += 1
xpassed.append(c_name)
print("XPASS-expected failure but test passed\n")
else:
if expected_failures[c_name] is not None and \
expected_failures[c_name] not in str(exception):
bad += 1
failed.append(c_name)
print("Expected error message: {0}\n"
.format(expected_failures[c_name]))
else:
xfail += 1
print("OK-expected failure\n")
else:
if res:
good += 1
print("OK\n")
else:
bad += 1
failed.append(c_name)
print("Test end")
print(20 * '=')
print("""Test timing of server doing password compare""")
print(20 * '=')
print("version: {0}".format(version))
print(20 * '=')
print("TOTAL: {0}".format(len(sampled_tests) + 2 * len(sanity_tests)))
print("SKIP: {0}".format(len(run_exclude.intersection(conversations.keys()))))
print("PASS: {0}".format(good))
print("XFAIL: {0}".format(xfail))
print("FAIL: {0}".format(bad))
print("XPASS: {0}".format(xpass))
print(20 * '=')
sort = sorted(xpassed, key=natural_sort_keys)
if len(sort):
print("XPASSED:\n\t{0}".format('\n\t'.join(repr(i) for i in sort)))
sort = sorted(failed, key=natural_sort_keys)
if len(sort):
print("FAILED:\n\t{0}".format('\n\t'.join(repr(i) for i in sort)))
if bad or xpass:
sys.exit(1)
elif timing:
# if regular tests passed, run timing collection and analysis
if TimingRunner.check_tcpdump():
tests = [('generic', None)]
timing_runner = TimingRunner("{0}_v{1}".format(
sys.argv[0],
version),
tests,
outdir,
host,
port,
interface,
affinity,
skip_extract=True,
no_quickack=no_quickack)
print("Pre-generating pre-master secret values...")
with open(
os.path.join(timing_runner.out_dir, 'data_values.bin'),
"wb"
) as data_file:
# create a real order of tests to run
log = Log(os.path.join(timing_runner.out_dir, "real_log.csv"))
actual_tests = []
node_dict = {}
for c_name, c_test in sampled_tests:
if run_only and c_name not in run_only or \
c_name in run_exclude:
continue
if not c_name.startswith("sanity"):
actual_tests.append(c_name)
node_dict[c_name] = generators[c_name]
log.start_log(actual_tests)
for _ in range(repetitions):
log.shuffle_new_run()
log.write()
log.read_log()
test_classes = log.get_classes()
queries = chain(repeat(0, WARM_UP), log.iterate_log())
# generate the PMS values
for executed, index in enumerate(queries):
g_name = test_classes[index]
g_node = node_dict[g_name]
res = g_node
assert len(res.data) == byte_len, \
len(res.data)
data_file.write(res.data)
# fake the set of tests to run so it's just one
data_file = open(
os.path.join(timing_runner.out_dir, 'data_values.bin'),
"rb"
)
conversation = Connect(host, port)
node = conversation
node = node.add_child(RawSocketWriteGenerator(
data_file=data_file,
data_length=byte_len
))
node = node.add_child(ExpectAlert())
node.add_child(ExpectClose())
tests[:] = [('generic', conversation)]
print("Running timing tests...")
timing_runner.generate_log(
['generic'], [],
repetitions * len(actual_tests))
ret_val = timing_runner.run()
if ret_val != 0:
print("run failed")
sys.exit(ret_val)
os.remove(os.path.join(timing_runner.out_dir, 'log.csv'))
os.rename(
os.path.join(timing_runner.out_dir, 'real_log.csv'),
os.path.join(timing_runner.out_dir, 'log.csv')
)
print("starting extraction")
if not timing_runner.extract(fin_as_resp=no_alert):
print("extract")
ret_val = 2
else:
print("analysis")
ret_val = timing_runner.analyse()
if ret_val == 0:
print("No statistically significant difference detected")
elif ret_val == 1:
print("Statistically significant difference detected")
else:
print("Statistical analysis exited with {0}".format(ret_val))
sys.exit(ret_val)
else:
print("Could not run timing tests because tcpdump is not present!")
sys.exit(1)
print(20 * '=')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment