Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save keefertaylor/7d30214fb42775dfccabeadb963ae4bc to your computer and use it in GitHub Desktop.
Save keefertaylor/7d30214fb42775dfccabeadb963ae4bc to your computer and use it in GitHub Desktop.
# TODO(keefertaylor): Signature needs to be included in the asset.
import smartpy as sp
#####################################################################
# Global types
#####################################################################
# The type for the oracle data's map value.
# Exported as a constant as a convenience.
oracleDataType = sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
# We define a regular python class that handles fifos but don't store its data.
class FifoDataType:
def __call__(self):
return sp.record(first = 0, last = -1, sum = sp.nat(0), saved = {0: sp.nat(0)})
# This will typically be used in entry points
def pop(self, data):
sp.verify(data.first < data.last)
data.sum = sp.as_nat(data.sum - data.saved[data.first])
del data.saved[data.first]
data.first += 1
# This will typically be used in entry points
def push(self, data, element):
data.last += 1
data.sum += element
data.saved[data.last] = element
# This may be used in some entry points but also in the contract to access its state
def head(self, data):
return data.saved[data.first]
# Return the length of the queue
def len(self, data):
return data.last - data.first + 1
# We need only one instance of FifoDataType
fifoDT = FifoDataType()
#####################################################################
# Oracle Contract
#####################################################################
class OracleContract(sp.Contract):
def __init__(
self,
publicKey,
initialData
):
self.init(
publicKey = publicKey,
oracleData = initialData
)
# Example Param Format:
# { Elt "BAT-USDC"(Pair "spsig1CV6hTejhwTzVopzRe2hj7DZ5S1RLAGdNtWrbU8twGiGmEHHEZAJx2HbWm1VaWc9Y1zE6ZSxHrvXGaWu2CuvK2c2Wk57e7" (Pair 1595482380 (Pair 1595482440 (Pair 265759 (Pair 265822 (Pair 265759 (Pair 265822 650000000))))))); }
@sp.entry_point
def update(self, params):
# Iterate over input data.
keyValueList = params.items()
sp.for assetData in keyValueList:
assetName = assetData.key
signature = sp.fst(assetData.value)
newData = sp.snd(assetData.value)
# Verify Oracle is tracking this asset.
sp.verify(
self.data.oracleData.contains(assetName),
"Oracle does not track asset"
)
# Verify signature.
bytes = sp.pack(newData)
sp.verify(
sp.check_signature(self.data.publicKey.open_some(), signature, bytes),
"Bad signature"
)
# Verify start timestamp is newer than the last update.
# TODO(Luke|Keefer): Should we use start or end?
oldData = self.data.oracleData[assetName]
oldStartTime = sp.fst(oldData)
newStartTime = sp.fst(newData)
sp.verify(newStartTime > oldStartTime, "Update in past")
# Replace the data.
self.data.oracleData[assetName] = newData
# Example Param Format:
# spsig...
@sp.entry_point
def revoke(self, signature):
bytes = sp.pack(sp.none)
publicKey = self.data.publicKey.open_some()
sp.verify(sp.check_signature(publicKey, signature, bytes))
self.data.publicKey = sp.none
# Example param format
# Pair <asset> (Pair <contract> <entry>)
@sp.entry_point
def push(self, params):
# Disassemble params for component parts.
assetName = sp.fst(params)
contractAndEntryPointPair = sp.snd(params)
contractAddress = sp.fst(contractAndEntryPointPair)
entryPoint = sp.snd(contractAndEntryPointPair)
# Pull data
oracleData = self.data.oracleData[assetName]
# Create the Big map
parameter = sp.big_map(
l = {
assetName: oracleData
},
tkey = sp.TString,
tvalue = oracleDataType
)
parameterType = sp.TBigMap(sp.TString, oracleDataType)
# Open handle to new contract and send data.
contractHandle = sp.contract(
parameterType,
contractAddress,
entry_point = entryPoint
).open_some()
sp.transfer(parameter, sp.mutez(0), contractHandle)
#####################################################################
# Normalizer Contract
#####################################################################
# Normalizer Contract
class NormalizerContract(sp.Contract):
def __init__(
self,
assetCode = "XTZ-USD",
oracleContractAddress = sp.address("KT1QLPABNCD4z1cSYVv3ntYDYgtWTed7LkYr"),
numDataPoints = sp.int(3)
):
# Populate the queues with an initial zero elements
pricesQueue = fifoDT()
volumesQueue = fifoDT()
self.init(assetCode = assetCode,
computedPrice = 0,
prices = pricesQueue,
volumes = volumesQueue,
# Timestamp of last update
lastUpdateTime = sp.timestamp(0),
# Whitelisted contract which can call updates.
oracleContract = oracleContractAddress,
# Number of data points to store.
numDataPoints = numDataPoints
)
@sp.entry_point
def update(self, params):
# Verify the sender is the whitelisted oracle contract.
sp.verify(
sp.sender == self.data.oracleContract,
message = "Can only be called by the oracle contract."
)
# Retrieve the asset data from the map.
assetData = params[self.data.assetCode]
# Require updates be monotonically increasing in start times.
# TODO(Luke|Keefer): Should this use end or start time?
updateStartTime = sp.fst(assetData)
sp.verify(updateStartTime > self.data.lastUpdateTime)
# Update the last updated time.
self.data.lastUpdateTime = updateStartTime
# Extract required information
endPair = sp.snd(assetData)
openPair = sp.snd(endPair)
highPair = sp.snd(openPair)
lowPair = sp.snd(highPair)
closeAndVolumePair = sp.snd(lowPair)
# Calculate the the price for this data point.
# average price * volume
high = sp.fst(highPair)
low = sp.fst(lowPair)
close = sp.fst(closeAndVolumePair)
volume = sp.snd(closeAndVolumePair)
volumePrice = ((high + low + close) / 3) * volume
# Push the latest items to the FIFO queue
fifoDT.push(self.data.prices, volumePrice)
fifoDT.push(self.data.volumes, volume)
# Trim the queue if it exceeds the number of data points.
sp.if fifoDT.len(self.data.prices) > self.data.numDataPoints:
fifoDT.pop(self.data.prices)
fifoDT.pop(self.data.volumes)
# Calculate the volume
self.data.computedPrice = self.data.prices.sum / self.data.volumes.sum
@sp.entry_point
def get(self, params):
# TODO(keefertaylor): Implement.
pass
#####################################################################
# Normalizer Tests
#####################################################################
@sp.add_test(name = "Fails with bad contract data")
def test():
scenario = sp.test_scenario()
scenario.h1("Fails when data is pushed from bad address")
# GIVEN a Normalizer contract whitelisted to an address
contract = NormalizerContract(oracleContractAddress = defaultOracleContractAddress)
scenario += contract
# WHEN an update is pushed from the wrong address
# THEN the update fails.
badAddress = sp.address("KT1FrRkunqmB7futF3EyRwTt8f7fPEVJW39P")
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = sp.timestamp(1595104501),
end = sp.timestamp(1595104531),
open = 3059701,
high = 1,
low = 2,
close = 3,
volume = 4
)
).run(sender = badAddress, valid = False)
@sp.add_test(name = "Fails with updates from the same time")
def test():
scenario = sp.test_scenario()
scenario.h1("Fails with updates from the past")
# GIVEN a Normalizer contract with an update at a given time = 1
updateTime = sp.timestamp(1)
contract = NormalizerContract()
scenario += contract
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = updateTime,
end = sp.timestamp(1595104531),
open = 3059701,
high = 1,
low = 2,
close = 3,
volume = 4
)
).run(sender = defaultOracleContractAddress)
# WHEN an update is provided at the same time.
# THEN the update fails.
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = updateTime,
end = sp.timestamp(1595104531),
open = 3059701,
high = 1,
low = 2,
close = 3,
volume = 4
)
).run(sender = defaultOracleContractAddress, valid = False)
@sp.add_test(name = "Fails with updates from the past time")
def test():
scenario = sp.test_scenario()
scenario.h1("Fails with updates from the past")
# GIVEN a Normalizer contract with an update at a current time and a time in the past
currentTime = sp.timestamp(2)
pastTime = sp.timestamp(1)
contract = NormalizerContract()
scenario += contract
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = currentTime,
end = sp.timestamp(1595104531),
open = 3059701,
high = 1,
low = 2,
close = 3,
volume = 4
)
).run(sender = defaultOracleContractAddress)
# WHEN an update is provided from the past.
# THEN the update fails.
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = pastTime,
end = sp.timestamp(1595104531),
open = 3059701,
high = 1,
low = 2,
close = 3,
volume = 4
)
).run(sender = defaultOracleContractAddress, valid = False)
@sp.add_test(name = "Fails with updates for the wrong asset")
def test():
scenario = sp.test_scenario()
scenario.h1("Fails with updates from the wrong asset")
# GIVEN a Normalizer contract for the bitcoin price
contract = NormalizerContract(assetCode = "BTC-USD")
scenario += contract
# WHEN an update is provided for XTZ-USD
# THEN the update fails.
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = sp.timestamp(1595104530),
end = sp.timestamp(1595104531),
open = 3059701,
high = 1,
low = 2,
close = 3,
volume = 4
)
).run(sender = defaultOracleContractAddress, valid = False)
@sp.add_test(name = "Normalizes One Data Point")
def test():
scenario = sp.test_scenario()
scenario.h1("Normalizes One Data Point")
# GIVEN a Normalizer contract.
contract = NormalizerContract()
scenario += contract
high = 1
low = 2
close = 3
volume = 4
# WHEN an update is provided
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = sp.timestamp(1595104530),
end = sp.timestamp(1595104531),
open = 3059701,
high = high,
low = low,
close = close,
volume = volume
)
).run(sender = defaultOracleContractAddress)
# THEN the ComputedPrice is the VWAP.
expected = computeVWAP(
high = high,
low = low,
close = close,
volume = volume
) // volume
scenario.verify(contract.data.computedPrice == expected)
@sp.add_test(name = "Normalizes Three Data Points")
def test():
scenario = sp.test_scenario()
scenario.h1("Normalizes Three Data Points")
# GIVEN a Normalizer contract
contract = NormalizerContract()
scenario += contract
# WHEN three updates are provided
high1 = 1
low1 = 2
close1 = 3
volume1 = 4
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = sp.timestamp(1595104530),
end = sp.timestamp(1595104531),
open = 3059701,
high = high1,
low = low1,
close = close1,
volume = volume1
)
).run(sender = defaultOracleContractAddress)
high2 = 5
low2 = 6
close2 = 7
volume2 = 8
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = sp.timestamp(1595104532),
end = sp.timestamp(1595104533),
open = 3059701,
high = high2,
low = low2,
close = close2,
volume = volume2
)
).run(sender = defaultOracleContractAddress)
high3 = 9
low3 = 10
close3 = 11
volume3 = 12
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = sp.timestamp(1595104534),
end = sp.timestamp(1595104535),
open = 3059701,
high = high3,
low = low3,
close = close3,
volume = volume3
)
).run(sender = defaultOracleContractAddress)
# THEN the ComputedPrice is the VWAP of the updates
partialVWAP1 = computeVWAP(
high = high1,
low = low1,
close = close1,
volume = volume1
)
partialVWAP2 = computeVWAP(
high = high2,
low = low2,
close = close2,
volume = volume2
)
partialVWAP3 = computeVWAP(
high = high3,
low = low3,
close = close3,
volume = volume3
)
expected = (partialVWAP1 + partialVWAP2 + partialVWAP3) // (volume1 + volume2 + volume3)
scenario.verify(contract.data.computedPrice == expected)
@sp.add_test(name = "Bounds computation to the number of data points")
def test():
scenario = sp.test_scenario()
scenario.h1("Bounds computation to the number of data points")
# GIVEN a Normalizer contract set to only hold two data points
numDataPoints = 2
contract = NormalizerContract(numDataPoints = numDataPoints)
scenario += contract
# WHEN three updates are provided
high1 = 1
low1 = 2
close1 = 3
volume1 = 4
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = sp.timestamp(1595104530),
end = sp.timestamp(1595104531),
open = 3059701,
high = high1,
low = low1,
close = close1,
volume = volume1
)
).run(sender = defaultOracleContractAddress)
high2 = 5
low2 = 6
close2 = 7
volume2 = 8
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = sp.timestamp(1595104532),
end = sp.timestamp(1595104533),
open = 3059701,
high = high2,
low = low2,
close = close2,
volume = volume2
)
).run(sender = defaultOracleContractAddress)
high3 = 9
low3 = 10
close3 = 11
volume3 = 12
scenario += contract.update(
makeBigMap(
assetCode = "XTZ-USD",
start = sp.timestamp(1595104534),
end = sp.timestamp(1595104535),
open = 3059701,
high = high3,
low = low3,
close = close3,
volume = volume3
)
).run(sender = defaultOracleContractAddress)
# THEN the contract is only tracking two updates
scenario.verify(fifoDT.len(contract.data.prices) == 2)
scenario.verify(fifoDT.len(contract.data.volumes) == 2)
# AND the computed price is the VWAP of the latter two updates
partialVWAP2 = computeVWAP(
high = high2,
low = low2,
close = close2,
volume = volume2
)
partialVWAP3 = computeVWAP(
high = high3,
low = low3,
close = close3,
volume = volume3
)
expected = (partialVWAP2 + partialVWAP3) // (volume2 + volume3)
scenario.verify(contract.data.computedPrice == expected)
#####################################################################
# Oracle Tests
#####################################################################
@sp.add_test(name = "Update Once With Valid Data")
def test():
scenario = sp.test_scenario()
scenario.h1("Update Once With Valid Data")
# GIVEN an Oracle contract
initialData = (
sp.timestamp(0),
(
sp.timestamp(0),
(
0,
(
0,
(
0,
(
0,
0
)
)
)
)
)
)
contract = OracleContract(
publicKey = testAccountPublicKey,
initialData = sp.big_map(
l = {
"XTZ-USD": initialData
},
tkey = sp.TString,
tvalue = oracleDataType
)
)
scenario += contract
# AND an update
start = sp.timestamp(1)
end = sp.timestamp(2)
open = 3
high = 4
low = 5
close = 6
volume = 7
updateData = (
start,
(end,
(open,
(high,
(low,
(close, volume))))))
message = sp.pack(updateData)
signature = sp.make_signature(
testAccountSecretKey,
message,
message_format = 'Raw'
)
# WHEN the oracle is updated
update = sp.pair(signature, updateData)
parameter = sp.map(
l = {
"XTZ-USD": update
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
scenario += contract.update(parameter)
# THEN the oracle contains the data points
assetData = contract.data.oracleData["XTZ-USD"]
endPair = sp.snd(assetData)
openPair = sp.snd(endPair)
highPair = sp.snd(openPair)
lowPair = sp.snd(highPair)
closeAndVolumePair = sp.snd(lowPair)
expectedStart = sp.fst(assetData)
expectedEnd = sp.fst(endPair)
expectedOpen = sp.fst(openPair)
expectedHigh = sp.fst(highPair)
expecteLow = sp.fst(lowPair)
expectedClose = sp.fst(closeAndVolumePair)
expectedVolume = sp.snd(closeAndVolumePair)
scenario.verify(start == expectedStart)
scenario.verify(end == expectedEnd)
scenario.verify(open == expectedOpen)
scenario.verify(high == expectedHigh)
scenario.verify(low == expecteLow)
scenario.verify(close == expectedClose)
scenario.verify(volume == expectedVolume)
@sp.add_test(name = "Second Update Overwrites First Update")
def test():
scenario = sp.test_scenario()
scenario.h1("Second Update Overwrites First Update")
# GIVEN an Oracle contract
initialData = (
sp.timestamp(0),
(
sp.timestamp(0),
(
0,
(
0,
(
0,
(
0,
0
)
)
)
)
)
)
contract = OracleContract(
publicKey = testAccountPublicKey,
initialData = sp.big_map(
l = {
"XTZ-USD": initialData
},
tkey = sp.TString,
tvalue = oracleDataType
)
)
scenario += contract
# AND two updates
start1 = sp.timestamp(1)
end1 = sp.timestamp(2)
open1 = 3
high1 = 4
low1 = 5
close1 = 6
volume1 = 7
updateData1 = (
start1,
(end1,
(open1,
(high1,
(low1,
(close1, volume1))))))
message1 = sp.pack(updateData1)
signature1 = sp.make_signature(
testAccountSecretKey,
message1,
message_format = 'Raw'
)
start2 = sp.timestamp(8)
end2 = sp.timestamp(9)
open2 = 10
high2 = 11
low2 = 12
close2 = 13
volume2 = 14
updateData2 = (
start2,
(end2,
(open2,
(high2,
(low2,
(close2, volume2))))))
message2 = sp.pack(updateData2)
signature2 = sp.make_signature(
testAccountSecretKey,
message2,
message_format = 'Raw'
)
# WHEN the oracle is updated
update1 = sp.pair(signature1, updateData1)
update2 = sp.pair(signature2, updateData2)
parameter1 = sp.map(
l = {
"XTZ-USD": update1
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
parameter2 = sp.map(
l = {
"XTZ-USD": update2
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
scenario += contract.update(parameter1)
scenario += contract.update(parameter2)
# THEN the oracle contains the data points of the latter update
assetData = contract.data.oracleData["XTZ-USD"]
endPair = sp.snd(assetData)
openPair = sp.snd(endPair)
highPair = sp.snd(openPair)
lowPair = sp.snd(highPair)
closeAndVolumePair = sp.snd(lowPair)
# TODO(keefer): You've reversed the conditions here. Rename expected / actual.
expectedStart = sp.fst(assetData)
expectedEnd = sp.fst(endPair)
expectedOpen = sp.fst(openPair)
expectedHigh = sp.fst(highPair)
expecteLow = sp.fst(lowPair)
expectedClose = sp.fst(closeAndVolumePair)
expectedVolume = sp.snd(closeAndVolumePair)
scenario.verify(start2 == expectedStart)
scenario.verify(end2 == expectedEnd)
scenario.verify(open2 == expectedOpen)
scenario.verify(high2 == expectedHigh)
scenario.verify(low2 == expecteLow)
scenario.verify(close2 == expectedClose)
scenario.verify(volume2 == expectedVolume)
@sp.add_test(name = "Update Fails With Data From The Same Timestamp")
def test():
scenario = sp.test_scenario()
scenario.h1("Update Fails With Data From The Same Timestamp")
# GIVEN an Oracle contract
initialData = (
sp.timestamp(0),
(
sp.timestamp(0),
(
0,
(
0,
(
0,
(
0,
0
)
)
)
)
)
)
contract = OracleContract(
publicKey = testAccountPublicKey,
initialData = sp.big_map(
l = {
"XTZ-USD": initialData
},
tkey = sp.TString,
tvalue = oracleDataType
)
)
scenario += contract
# AND an update
start = sp.timestamp(1)
end = sp.timestamp(2)
open = 3
high = 4
low = 5
close = 6
volume = 7
updateData = (
start,
(end,
(open,
(high,
(low,
(close, volume))))))
message = sp.pack(updateData)
signature = sp.make_signature(
testAccountSecretKey,
message,
message_format = 'Raw'
)
# WHEN the oracle is updated twice
update = sp.pair(signature, updateData)
parameter = sp.map(
l = {
"XTZ-USD": update
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
scenario += contract.update(parameter)
# THEN the second update fails
scenario += contract.update(parameter).run(valid = False)
@sp.add_test(name = "Update Fails With Data From The Past")
def test():
scenario = sp.test_scenario()
scenario.h1("Update Fails With Data From The Past")
# GIVEN an Oracle contract with some initial data.
initialData = (
sp.timestamp(0),
(
sp.timestamp(0),
(
0,
(
0,
(
0,
(
0,
0
)
)
)
)
)
)
contract = OracleContract(
publicKey = testAccountPublicKey,
initialData = sp.big_map(
l = {
"XTZ-USD": initialData
},
tkey = sp.TString,
tvalue = oracleDataType
)
)
scenario += contract
start = sp.timestamp(2)
end = sp.timestamp(3)
open = 3
high = 4
low = 5
close = 6
volume = 7
updateData = (
start,
(end,
(open,
(high,
(low,
(close, volume))))))
message = sp.pack(updateData)
signature = sp.make_signature(
testAccountSecretKey,
message,
message_format = 'Raw'
)
update = sp.pair(signature, updateData)
parameter = sp.map(
l = {
"XTZ-USD": update
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
scenario += contract.update(parameter)
# WHEN the oracle is updated with a time in the past
updateData = (
sp.timestamp(1),
(sp.timestamp(2),
(open,
(high,
(low,
(close, volume))))))
message = sp.pack(updateData)
signature = sp.make_signature(
testAccountSecretKey,
message,
message_format = 'Raw'
)
update = sp.pair(signature, updateData)
parameter = sp.map(
l = {
"XTZ-USD": update
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
# THEN the update in the past fails
scenario += contract.update(parameter).run(valid = False)
@sp.add_test(name = "Update Fails With Untracked Asset")
def test():
scenario = sp.test_scenario()
scenario.h1("Update Fails With Untracked Asset")
# GIVEN an Oracle contract
initialData = (
sp.timestamp(0),
(
sp.timestamp(0),
(
0,
(
0,
(
0,
(
0,
0
)
)
)
)
)
)
contract = OracleContract(
publicKey = testAccountPublicKey,
initialData = sp.big_map(
l = {
"XTZ-USD": initialData
},
tkey = sp.TString,
tvalue = oracleDataType
)
)
scenario += contract
start = sp.timestamp(2)
end = sp.timestamp(3)
open = 3
high = 4
low = 5
close = 6
volume = 7
updateData = (
start,
(end,
(open,
(high,
(low,
(close, volume))))))
message = sp.pack(updateData)
signature = sp.make_signature(
testAccountSecretKey,
message,
message_format = 'Raw'
)
# WHEN the oracle is updated with an untracked asset
update = sp.pair(signature, updateData)
parameter = sp.map(
l = {
"BTC-USD": update # Not XTZ-USD
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
# THEN the update fails.
scenario += contract.update(parameter).run(valid = False)
@sp.add_test(name = "Update Fails With Bad Signature")
def test():
scenario = sp.test_scenario()
scenario.h1( "Update Fails With Bad Signature")
# GIVEN an Oracle contract
initialData = (
sp.timestamp(0),
(
sp.timestamp(0),
(
0,
(
0,
(
0,
(
0,
0
)
)
)
)
)
)
contract = OracleContract(
publicKey = testAccountPublicKey,
initialData = sp.big_map(
l = {
"XTZ-USD": initialData
},
tkey = sp.TString,
tvalue = oracleDataType
)
)
scenario += contract
# AND an update signed by an alternative key
alternativeAccount = sp.test_account("AlternativeAccount")
alternativeSecretKey = alternativeAccount.secret_key
start = sp.timestamp(1)
end = sp.timestamp(2)
open = 3
high = 4
low = 5
close = 6
volume = 7
updateData = (
start,
(end,
(open,
(high,
(low,
(close, volume))))))
message = sp.pack(updateData)
signature = sp.make_signature(
alternativeSecretKey,
message,
message_format = 'Raw'
)
# WHEN the oracle is updated
update = sp.pair(signature, updateData)
parameter = sp.map(
l = {
"XTZ-USD": update
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
# THEN the update fails
scenario += contract.update(parameter).run(valid = False)
@sp.add_test(name = "Revokes An Oracle")
def test():
scenario = sp.test_scenario()
scenario.h1("Revokes An Oracle")
# GIVEN an oracle contract and a correctly signed revoke message.
message = sp.pack(sp.none)
signature = sp.make_signature(
testAccountSecretKey,
message,
message_format = 'Raw'
)
# TODO(keefertaylor): Refactor `initialData` to a global constant.
initialData = (
sp.timestamp(0),
(
sp.timestamp(0),
(
0,
(
0,
(
0,
(
0,
0
)
)
)
)
)
)
contract = OracleContract(
publicKey = testAccountPublicKey,
initialData = sp.big_map(
l = {
"XTZ-USD": initialData
},
tkey = sp.TString,
tvalue = oracleDataType
)
)
scenario += contract
# WHEN revoke is called.
scenario += contract.revoke(signature)
# THEN the oracle is revoked
scenario.verify(~contract.data.publicKey.is_some())
# AND future updates fail
start = sp.timestamp(1)
end = sp.timestamp(2)
open = 3
high = 4
low = 5
close = 6
volume = 7
updateData = (
start,
(end,
(open,
(high,
(low,
(close, volume))))))
message = sp.pack(updateData)
signature = sp.make_signature(
testAccountSecretKey,
message,
message_format = 'Raw'
)
update = sp.pair(signature, updateData)
parameter = sp.map(
l = {
"XTZ-USD": update
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
scenario += contract.update(parameter).run(valid = False)
@sp.add_test(name = "Incorrect Revoke Fails to Revoke An Oracle")
def test():
scenario = sp.test_scenario()
scenario.h1("Incorrect Revoke Fails to Revoke An Oracle")
# GIVEN an oracle contract and a revoke message signed by another account.
testAccount = sp.test_account("Incorrect_Account")
message = sp.pack(sp.none)
signature = sp.make_signature(
testAccount.secret_key,
message,
message_format = 'Raw'
)
# TODO(keefertaylor): Refactor `initialData` to a global constant.
initialData = (
sp.timestamp(0),
(
sp.timestamp(0),
(
0,
(
0,
(
0,
(
0,
0
)
)
)
)
)
)
contract = OracleContract(
publicKey = testAccountPublicKey,
initialData = sp.big_map(
l = {
"XTZ-USD": initialData
},
tkey = sp.TString,
tvalue = oracleDataType
)
)
scenario += contract
# WHEN revoke is called
# THEN the call fails
scenario += contract.revoke(signature).run(valid = False)
# AND future updates succeed
start = sp.timestamp(1)
end = sp.timestamp(2)
open = 3
high = 4
low = 5
close = 6
volume = 7
updateData = (
start,
(end,
(open,
(high,
(low,
(close, volume))))))
message = sp.pack(updateData)
signature = sp.make_signature(
testAccountSecretKey,
message,
message_format = 'Raw'
)
update = sp.pair(signature, updateData)
parameter = sp.map(
l = {
"XTZ-USD": update
},
tkey = sp.TString,
tvalue = sp.TPair(
sp.TSignature,
sp.TPair(
sp.TTimestamp, # Start
sp.TPair(
sp.TTimestamp, # End
sp.TPair(
sp.TNat, # Open
sp.TPair(
sp.TNat, # High
sp.TPair(
sp.TNat, # Low
sp.TPair(
sp.TNat, # Close
sp.TNat # Volume
)
)
)
)
)
)
)
)
scenario += contract.update(parameter)
@sp.add_test(name = "E2E Push Test")
def test():
scenario = sp.test_scenario()
scenario.h1("E2E Push Test")
# GIVEN an Oracle contract
start = 1
end = 2
open = 3
high = 4
low = 5
close = 6
volume = 7
initialData = (
sp.timestamp(start),
(
sp.timestamp(end),
(
open,
(
high,
(
low,
(
close,
volume
)
)
)
)
)
)
oracle = OracleContract(
publicKey = testAccountPublicKey,
initialData = sp.big_map(
l = {
"XTZ-USD": initialData
},
tkey = sp.TString,
tvalue = oracleDataType
)
)
scenario += oracle
# And a normalizer contract.
normalizer = NormalizerContract(
oracleContractAddress = oracle.address
)
scenario += normalizer
# WHEN an update is pushed from the oracle to the normalizer
parameter = ("XTZ-USD", (oracle.address, "update"))
scenario += oracle.push(parameter)
# THEN the normalizer contains the VWAP.
expectedVWAP = computeVWAP(high, low, close, volume)
scenario.verify(normalizer.data.computedPrice == expectedVWAP)
#####################################################################
# Test Helpers
#####################################################################
# Default Oracle Contract Keys
testAccount = sp.test_account("Test1")
testAccountPublicKey = sp.some(testAccount.public_key)
testAccountSecretKey = testAccount.secret_key
# Compute a VWAP with the given inputs
def computeVWAP(high, low, close, volume):
return ((high + low + close) // 3) * volume
# Default address of the Oracle Contract.
# Exported for convenience
defaultOracleContractAddress = sp.address("KT1QLPABNCD4z1cSYVv3ntYDYgtWTed7LkYr")
# Convenience function to make the data for an Oracle.
def makeOracleDataPairs(start, end, open, high, low, close, volume):
p1 = sp.pair(close, volume)
p2 = sp.pair(low, p1)
p3 = sp.pair(high, p2)
p4 = sp.pair(open, p3)
p5 = sp.pair(end, p4)
p6 = sp.pair(start, p5)
return p6
# Convenience function to make a map for a single asset code.
def makeBigMap(assetCode, start, end, open, high, low, close, volume):
return sp.big_map(
l = {
assetCode: makeOracleDataPairs(
start,
end,
open,
high,
low,
close,
volume
)
},
tkey = sp.TString,
tvalue = oracleDataType
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment