|
import smartpy as sp |
|
|
|
# Reference: |
|
# https://github.com/hen-community/objkt-swap/blob/jagracar/v3-prototype/smart-py/objkt_swap_v3.py |
|
|
|
|
|
## |
|
## ## Meta-Programming Configuration |
|
## |
|
## The `FA2_config` class holds the meta-programming configuration. |
|
## |
|
class FA2_config: |
|
def __init__(self, |
|
debug_mode = False, |
|
single_asset = False, |
|
non_fungible = False, |
|
add_mutez_transfer = False, |
|
readable = True, |
|
force_layouts = True, |
|
support_operator = True, |
|
assume_consecutive_token_ids = True, |
|
store_total_supply = True, |
|
lazy_entry_points = False, |
|
allow_self_transfer = False, |
|
use_token_metadata_offchain_view = False |
|
): |
|
|
|
if debug_mode: |
|
self.my_map = sp.map |
|
else: |
|
self.my_map = sp.big_map |
|
# The option `debug_mode` makes the code generation use |
|
# regular maps instead of big-maps, hence it makes inspection |
|
# of the state of the contract easier. |
|
|
|
self.use_token_metadata_offchain_view = use_token_metadata_offchain_view |
|
# Include offchain view for accessing the token metadata (requires TZIP-016 contract metadata) |
|
|
|
self.single_asset = single_asset |
|
# This makes the contract save some gas and storage by |
|
# working only for the token-id `0`. |
|
|
|
self.non_fungible = non_fungible |
|
# Enforce the non-fungibility of the tokens, i.e. the fact |
|
# that total supply has to be 1. |
|
|
|
self.readable = readable |
|
# The `readable` option is a legacy setting that we keep around |
|
# only for benchmarking purposes. |
|
# |
|
# User-accounts are kept in a big-map: |
|
# `(user-address * token-id) -> ownership-info`. |
|
# |
|
# For the Babylon protocol, one had to use `readable = False` |
|
# in order to use `PACK` on the keys of the big-map. |
|
|
|
self.force_layouts = force_layouts |
|
# The specification requires all interface-fronting records |
|
# and variants to be *right-combs;* we keep |
|
# this parameter to be able to compare performance & code-size. |
|
|
|
self.support_operator = support_operator |
|
# The operator entry-points always have to be there, but there is |
|
# definitely a use-case for having them completely empty (saving |
|
# storage and gas when `support_operator` is `False). |
|
|
|
self.assume_consecutive_token_ids = assume_consecutive_token_ids |
|
# For a previous version of the TZIP specification, it was |
|
# necessary to keep track of the set of all tokens in the contract. |
|
# |
|
# The set of tokens is for now still available; this parameter |
|
# guides how to implement it: |
|
# If `true` we don't need a real set of token ids, just to know how |
|
# many there are. |
|
|
|
self.store_total_supply = store_total_supply |
|
# Whether to store the total-supply for each token (next to |
|
# the token-metadata). |
|
|
|
self.add_mutez_transfer = add_mutez_transfer |
|
# Add an entry point for the administrator to transfer tez potentially |
|
# in the contract's balance. |
|
|
|
self.lazy_entry_points = lazy_entry_points |
|
# |
|
# Those are “compilation” options of SmartPy into Michelson. |
|
# |
|
|
|
self.allow_self_transfer = allow_self_transfer |
|
# Authorize call of `transfer` entry_point from self |
|
name = "FA2" |
|
if debug_mode: |
|
name += "-debug" |
|
if single_asset: |
|
name += "-single_asset" |
|
if non_fungible: |
|
name += "-nft" |
|
if add_mutez_transfer: |
|
name += "-mutez" |
|
if not readable: |
|
name += "-no_readable" |
|
if not force_layouts: |
|
name += "-no_layout" |
|
if not support_operator: |
|
name += "-no_ops" |
|
if not assume_consecutive_token_ids: |
|
name += "-no_toknat" |
|
if not store_total_supply: |
|
name += "-no_totsup" |
|
if lazy_entry_points: |
|
name += "-lep" |
|
if allow_self_transfer: |
|
name += "-self_transfer" |
|
self.name = name |
|
|
|
## ## Auxiliary Classes and Values |
|
## |
|
## The definitions below implement SmartML-types and functions for various |
|
## important types. |
|
## |
|
token_id_type = sp.TNat |
|
|
|
class Error_message: |
|
def __init__(self, config): |
|
self.config = config |
|
self.prefix = "FA2_" |
|
def make(self, s): return (self.prefix + s) |
|
def token_undefined(self): return self.make("TOKEN_UNDEFINED") |
|
def insufficient_balance(self): return self.make("INSUFFICIENT_BALANCE") |
|
def not_operator(self): return self.make("NOT_OPERATOR") |
|
def not_owner(self): return self.make("NOT_OWNER") |
|
def operators_unsupported(self): return self.make("OPERATORS_UNSUPPORTED") |
|
def not_admin(self): return self.make("NOT_ADMIN") |
|
def not_admin_or_operator(self): return self.make("NOT_ADMIN_OR_OPERATOR") |
|
def paused(self): return self.make("PAUSED") |
|
|
|
## The current type for a batched transfer in the specification is as |
|
## follows: |
|
## |
|
## ```ocaml |
|
## type transfer = { |
|
## from_ : address; |
|
## txs: { |
|
## to_ : address; |
|
## token_id : token_id; |
|
## amount : nat; |
|
## } list |
|
## } list |
|
## ``` |
|
## |
|
## This class provides helpers to create and force the type of such elements. |
|
## It uses the `FA2_config` to decide whether to set the right-comb layouts. |
|
class Batch_transfer: |
|
def __init__(self, config): |
|
self.config = config |
|
def get_transfer_type(self): |
|
tx_type = sp.TRecord(to_ = sp.TAddress, |
|
token_id = token_id_type, |
|
amount = sp.TNat) |
|
if self.config.force_layouts: |
|
tx_type = tx_type.layout( |
|
("to_", ("token_id", "amount")) |
|
) |
|
transfer_type = sp.TRecord(from_ = sp.TAddress, |
|
txs = sp.TList(tx_type)).layout( |
|
("from_", "txs")) |
|
return transfer_type |
|
def get_type(self): |
|
return sp.TList(self.get_transfer_type()) |
|
def item(self, from_, txs): |
|
v = sp.record(from_ = from_, txs = txs) |
|
return sp.set_type_expr(v, self.get_transfer_type()) |
|
## |
|
## `Operator_param` defines type types for the `%update_operators` entry-point. |
|
class Operator_param: |
|
def __init__(self, config): |
|
self.config = config |
|
def get_type(self): |
|
t = sp.TRecord( |
|
owner = sp.TAddress, |
|
operator = sp.TAddress, |
|
token_id = token_id_type) |
|
if self.config.force_layouts: |
|
t = t.layout(("owner", ("operator", "token_id"))) |
|
return t |
|
def make(self, owner, operator, token_id): |
|
r = sp.record(owner = owner, |
|
operator = operator, |
|
token_id = token_id) |
|
return sp.set_type_expr(r, self.get_type()) |
|
|
|
## The class `Ledger_key` defines the key type for the main ledger (big-)map: |
|
## |
|
## - In *“Babylon mode”* we also have to call `sp.pack`. |
|
## - In *“single-asset mode”* we can just use the user's address. |
|
class Ledger_key: |
|
def __init__(self, config): |
|
self.config = config |
|
def make(self, user, token): |
|
user = sp.set_type_expr(user, sp.TAddress) |
|
token = sp.set_type_expr(token, token_id_type) |
|
if self.config.single_asset: |
|
result = user |
|
else: |
|
result = sp.pair(user, token) |
|
if self.config.readable: |
|
return result |
|
else: |
|
return sp.pack(result) |
|
|
|
## For now a value in the ledger is just the user's balance. Previous |
|
## versions of the specification required more information; potential |
|
## extensions may require other fields. |
|
class Ledger_value: |
|
def get_type(): |
|
return sp.TRecord(balance = sp.TNat) |
|
def make(balance): |
|
return sp.record(balance = balance) |
|
|
|
## The link between operators and the addresses they operate is kept |
|
## in a *lazy set* of `(owner × operator × token-id)` values. |
|
## |
|
## A lazy set is a big-map whose keys are the elements of the set and |
|
## values are all `Unit`. |
|
class Operator_set: |
|
def __init__(self, config): |
|
self.config = config |
|
def inner_type(self): |
|
return sp.TRecord(owner = sp.TAddress, |
|
operator = sp.TAddress, |
|
token_id = token_id_type |
|
).layout(("owner", ("operator", "token_id"))) |
|
def key_type(self): |
|
if self.config.readable: |
|
return self.inner_type() |
|
else: |
|
return sp.TBytes |
|
def make(self): |
|
return self.config.my_map(tkey = self.key_type(), tvalue = sp.TUnit) |
|
def make_key(self, owner, operator, token_id): |
|
metakey = sp.record(owner = owner, |
|
operator = operator, |
|
token_id = token_id) |
|
metakey = sp.set_type_expr(metakey, self.inner_type()) |
|
if self.config.readable: |
|
return metakey |
|
else: |
|
return sp.pack(metakey) |
|
def add(self, set, owner, operator, token_id): |
|
set[self.make_key(owner, operator, token_id)] = sp.unit |
|
def remove(self, set, owner, operator, token_id): |
|
del set[self.make_key(owner, operator, token_id)] |
|
def is_member(self, set, owner, operator, token_id): |
|
return set.contains(self.make_key(owner, operator, token_id)) |
|
|
|
class Balance_of: |
|
def request_type(): |
|
return sp.TRecord( |
|
owner = sp.TAddress, |
|
token_id = token_id_type).layout(("owner", "token_id")) |
|
def response_type(): |
|
return sp.TList( |
|
sp.TRecord( |
|
request = Balance_of.request_type(), |
|
balance = sp.TNat).layout(("request", "balance"))) |
|
def entry_point_type(): |
|
return sp.TRecord( |
|
callback = sp.TContract(Balance_of.response_type()), |
|
requests = sp.TList(Balance_of.request_type()) |
|
).layout(("requests", "callback")) |
|
|
|
class Token_meta_data: |
|
def __init__(self, config): |
|
self.config = config |
|
|
|
def get_type(self): |
|
return sp.TRecord(token_id = sp.TNat, token_info = sp.TMap(sp.TString, sp.TBytes)) |
|
|
|
def set_type_and_layout(self, expr): |
|
sp.set_type(expr, self.get_type()) |
|
|
|
## The set of all tokens is represented by a `nat` if we assume that token-ids |
|
## are consecutive, or by an actual `(set nat)` if not. |
|
## |
|
## - Knowing the set of tokens is useful for throwing accurate error messages. |
|
## - Previous versions of the specification required this set for functional |
|
## behavior (operators interface had to deal with “all tokens”). |
|
class Token_id_set: |
|
def __init__(self, config): |
|
self.config = config |
|
def empty(self): |
|
if self.config.assume_consecutive_token_ids: |
|
# The "set" is its cardinal. |
|
return sp.nat(0) |
|
else: |
|
return sp.set(t = token_id_type) |
|
def add(self, totalTokens, tokenID): |
|
if self.config.assume_consecutive_token_ids: |
|
sp.verify(totalTokens == tokenID, message = "Token-IDs should be consecutive") |
|
totalTokens.set(tokenID + 1) |
|
else: |
|
totalTokens.add(tokenID) |
|
def contains(self, totalTokens, tokenID): |
|
if self.config.assume_consecutive_token_ids: |
|
return (tokenID < totalTokens) |
|
else: |
|
return totalTokens.contains(tokenID) |
|
def cardinal(self, totalTokens): |
|
if self.config.assume_consecutive_token_ids: |
|
return totalTokens |
|
else: |
|
return sp.len(totalTokens) |
|
|
|
## |
|
## ## Implementation of the Contract |
|
## |
|
## `mutez_transfer` is an optional entry-point, hence we define it “outside” the |
|
## class: |
|
def mutez_transfer(contract, params): |
|
sp.verify(sp.sender == contract.data.administrator) |
|
sp.set_type(params.destination, sp.TAddress) |
|
sp.set_type(params.amount, sp.TMutez) |
|
sp.send(params.destination, params.amount) |
|
## |
|
## The `FA2` class builds a contract according to an `FA2_config` and an |
|
## administrator address. |
|
## It is inheriting from `FA2_core` which implements the strict |
|
## standard and a few other classes to add other common features. |
|
## |
|
## - We see the use of |
|
## [`sp.entry_point`](https://smartpy.io/docs/introduction/entry_points) |
|
## as a function instead of using annotations in order to allow |
|
## optional entry points. |
|
## - The storage field `metadata_string` is a placeholder, the build |
|
## system replaces the field annotation with a specific version-string, such |
|
## as `"version_20200602_tzip_b916f32"`: the version of FA2-smartpy and |
|
## the git commit in the TZIP [repository](https://gitlab.com/tzip/tzip) that |
|
## the contract should obey. |
|
class FA2_core(sp.Contract): |
|
def __init__(self, config, metadata, **extra_storage): |
|
self.config = config |
|
self.error_message = Error_message(self.config) |
|
self.operator_set = Operator_set(self.config) |
|
self.operator_param = Operator_param(self.config) |
|
self.token_id_set = Token_id_set(self.config) |
|
self.ledger_key = Ledger_key(self.config) |
|
self.token_meta_data = Token_meta_data(self.config) |
|
self.batch_transfer = Batch_transfer(self.config) |
|
if self.config.add_mutez_transfer: |
|
self.transfer_mutez = sp.entry_point(mutez_transfer) |
|
if config.lazy_entry_points: |
|
self.add_flag("lazy-entry-points") |
|
self.add_flag("initial-cast") |
|
self.exception_optimization_level = "default-line" |
|
self.init( |
|
ledger = self.config.my_map(tvalue = Ledger_value.get_type()), |
|
token_metadata = self.config.my_map(tkey = sp.TNat, tvalue = self.token_meta_data.get_type()), |
|
token_royalties = self.config.my_map(tkey = sp.TNat, tvalue = sp.TRecord( |
|
# Who first created/minted this token |
|
creator = sp.TAddress, |
|
# Total amount of royalties, e.g. 1000 => 100% |
|
total_amount = sp.TNat, |
|
# Beneficiaries included in splits |
|
beneficiaries = sp.TList( |
|
sp.TRecord( |
|
# Address to receive benefit |
|
address = sp.TAddress, |
|
# Amount to receive, e.g. 250 => 25% |
|
amount = sp.TNat |
|
) |
|
) |
|
)), |
|
operators = self.operator_set.make(), |
|
all_tokens = self.token_id_set.empty(), |
|
metadata = metadata, |
|
**extra_storage |
|
) |
|
|
|
if self.config.store_total_supply: |
|
self.update_initial_storage( |
|
total_supply = self.config.my_map(tkey = sp.TNat, tvalue = sp.TNat), |
|
) |
|
|
|
@sp.entry_point |
|
def transfer(self, params): |
|
sp.verify( ~self.is_paused(), message = self.error_message.paused() ) |
|
sp.set_type(params, self.batch_transfer.get_type()) |
|
sp.for transfer in params: |
|
current_from = transfer.from_ |
|
sp.for tx in transfer.txs: |
|
if self.config.single_asset: |
|
sp.verify(tx.token_id == 0, message = "single-asset: token-id <> 0") |
|
|
|
sender_verify = ((self.is_administrator(sp.sender)) | |
|
(current_from == sp.sender)) |
|
message = self.error_message.not_owner() |
|
if self.config.support_operator: |
|
message = self.error_message.not_operator() |
|
sender_verify |= (self.operator_set.is_member(self.data.operators, |
|
current_from, |
|
sp.sender, |
|
tx.token_id)) |
|
if self.config.allow_self_transfer: |
|
sender_verify |= (sp.sender == sp.self_address) |
|
sp.verify(sender_verify, message = message) |
|
sp.verify( |
|
self.data.token_metadata.contains(tx.token_id), |
|
message = self.error_message.token_undefined() |
|
) |
|
# If amount is 0 we do nothing now: |
|
sp.if (tx.amount > 0): |
|
from_user = self.ledger_key.make(current_from, tx.token_id) |
|
sp.verify( |
|
(self.data.ledger[from_user].balance >= tx.amount), |
|
message = self.error_message.insufficient_balance()) |
|
to_user = self.ledger_key.make(tx.to_, tx.token_id) |
|
self.data.ledger[from_user].balance = sp.as_nat( |
|
self.data.ledger[from_user].balance - tx.amount) |
|
sp.if self.data.ledger.contains(to_user): |
|
self.data.ledger[to_user].balance += tx.amount |
|
sp.else: |
|
self.data.ledger[to_user] = Ledger_value.make(tx.amount) |
|
sp.else: |
|
pass |
|
|
|
@sp.entry_point |
|
def balance_of(self, params): |
|
# paused may mean that balances are meaningless: |
|
sp.verify( ~self.is_paused(), message = self.error_message.paused()) |
|
sp.set_type(params, Balance_of.entry_point_type()) |
|
def f_process_request(req): |
|
user = self.ledger_key.make(req.owner, req.token_id) |
|
sp.verify(self.data.token_metadata.contains(req.token_id), message = self.error_message.token_undefined()) |
|
sp.if self.data.ledger.contains(user): |
|
balance = self.data.ledger[user].balance |
|
sp.result( |
|
sp.record( |
|
request = sp.record( |
|
owner = sp.set_type_expr(req.owner, sp.TAddress), |
|
token_id = sp.set_type_expr(req.token_id, sp.TNat)), |
|
balance = balance)) |
|
sp.else: |
|
sp.result( |
|
sp.record( |
|
request = sp.record( |
|
owner = sp.set_type_expr(req.owner, sp.TAddress), |
|
token_id = sp.set_type_expr(req.token_id, sp.TNat)), |
|
balance = 0)) |
|
res = sp.local("responses", params.requests.map(f_process_request)) |
|
destination = sp.set_type_expr(params.callback, sp.TContract(Balance_of.response_type())) |
|
sp.transfer(res.value, sp.mutez(0), destination) |
|
|
|
@sp.offchain_view(pure = True) |
|
def get_balance(self, req): |
|
"""This is the `get_balance` view defined in TZIP-12.""" |
|
sp.set_type( |
|
req, sp.TRecord( |
|
owner = sp.TAddress, |
|
token_id = sp.TNat |
|
).layout(("owner", "token_id"))) |
|
user = self.ledger_key.make(req.owner, req.token_id) |
|
sp.verify(self.data.token_metadata.contains(req.token_id), message = self.error_message.token_undefined()) |
|
sp.result(self.data.ledger[user].balance) |
|
|
|
|
|
@sp.entry_point |
|
def update_operators(self, params): |
|
sp.set_type(params, sp.TList( |
|
sp.TVariant( |
|
add_operator = self.operator_param.get_type(), |
|
remove_operator = self.operator_param.get_type() |
|
) |
|
)) |
|
if self.config.support_operator: |
|
sp.for update in params: |
|
with update.match_cases() as arg: |
|
with arg.match("add_operator") as upd: |
|
sp.verify( |
|
(upd.owner == sp.sender) | self.is_administrator(sp.sender), |
|
message = self.error_message.not_admin_or_operator() |
|
) |
|
self.operator_set.add(self.data.operators, |
|
upd.owner, |
|
upd.operator, |
|
upd.token_id) |
|
with arg.match("remove_operator") as upd: |
|
sp.verify( |
|
(upd.owner == sp.sender) | self.is_administrator(sp.sender), |
|
message = self.error_message.not_admin_or_operator() |
|
) |
|
self.operator_set.remove(self.data.operators, |
|
upd.owner, |
|
upd.operator, |
|
upd.token_id) |
|
else: |
|
sp.failwith(self.error_message.operators_unsupported()) |
|
|
|
# this is not part of the standard but can be supported through inheritance. |
|
def is_paused(self): |
|
return sp.bool(False) |
|
|
|
# this is not part of the standard but can be supported through inheritance. |
|
def is_administrator(self, sender): |
|
return sp.bool(False) |
|
|
|
class FA2_administrator(FA2_core): |
|
def is_administrator(self, sender): |
|
return sender == self.data.administrator |
|
|
|
@sp.entry_point |
|
def set_administrator(self, params): |
|
sp.verify(self.is_administrator(sp.sender), message = self.error_message.not_admin()) |
|
self.data.administrator = params |
|
|
|
class FA2_pause(FA2_core): |
|
def is_paused(self): |
|
return self.data.paused |
|
|
|
@sp.entry_point |
|
def set_pause(self, params): |
|
sp.verify(self.is_administrator(sp.sender), message = self.error_message.not_admin()) |
|
self.data.paused = params |
|
|
|
class FA2_change_metadata(FA2_core): |
|
@sp.entry_point |
|
def set_metadata(self, k, v): |
|
sp.verify(self.is_administrator(sp.sender), message = self.error_message.not_admin()) |
|
self.data.metadata[k] = v |
|
|
|
class FA2_mint(FA2_core): |
|
@sp.entry_point |
|
def mint(self, params): |
|
sp.verify(self.is_administrator(sp.sender), message = self.error_message.not_admin()) |
|
# We don't check for pauseness because we're the admin. |
|
if self.config.single_asset: |
|
sp.verify(params.token_id == 0, message = "single-asset: token-id <> 0") |
|
if self.config.non_fungible: |
|
sp.verify(params.amount == 1, message = "NFT-asset: amount <> 1") |
|
sp.verify( |
|
~ self.token_id_set.contains(self.data.all_tokens, params.token_id), |
|
message = "NFT-asset: cannot mint twice same token" |
|
) |
|
user = self.ledger_key.make(params.address, params.token_id) |
|
sp.if self.data.ledger.contains(user): |
|
self.data.ledger[user].balance += params.amount |
|
sp.else: |
|
self.data.ledger[user] = Ledger_value.make(params.amount) |
|
sp.if ~ self.token_id_set.contains(self.data.all_tokens, params.token_id): |
|
self.token_id_set.add(self.data.all_tokens, params.token_id) |
|
self.data.token_metadata[params.token_id] = sp.record( |
|
token_id = params.token_id, |
|
token_info = params.metadata |
|
) |
|
self.data.token_royalties[params.token_id] = sp.record( |
|
creator = params.creator, |
|
total_amount = params.royalty_total, |
|
beneficiaries = params.royalty_beneficiaries |
|
) |
|
if self.config.store_total_supply: |
|
self.data.total_supply[params.token_id] = params.amount + self.data.total_supply.get(params.token_id, default_value = 0) |
|
|
|
@sp.entry_point |
|
def update_royalties (self, params): |
|
sp.verify(self.is_administrator(sp.sender), message = self.error_message.not_admin()) |
|
sp.verify(self.data.token_royalties.contains(params.token_id), 'token_id does not exist') |
|
royalties = self.data.token_royalties[params.token_id] |
|
sp.verify(royalties.creator == params.creator, 'creator does not match original token creator') |
|
royalties = sp.record( |
|
creator = params.creator, |
|
total_amount = params.royalty_total, |
|
beneficiaries = params.royalty_beneficiaries |
|
) |
|
|
|
@sp.onchain_view(name="getRoyalties") |
|
def getRoyalties(self, token_id): |
|
# Define the input parameter data type |
|
sp.set_type(token_id, sp.TNat) |
|
|
|
# Check that the token id is present in the royalties big map |
|
sp.verify(self.data.token_royalties.contains(token_id), "The provided token_id doesn't exist") |
|
|
|
# Return the royalty record |
|
sp.result(self.data.token_royalties[token_id]) |
|
|
|
@sp.onchain_view(name="getCreator") |
|
def getCreator(self, token_id): |
|
# Define the input parameter data type |
|
sp.set_type(token_id, sp.TNat) |
|
|
|
# Check that the token id is present in the royalties big map |
|
sp.verify(self.data.token_royalties.contains(token_id), "The provided token_id doesn't exist") |
|
|
|
# Return the royalty record |
|
sp.result(self.data.token_royalties[token_id].creator) |
|
|
|
class FA2_token_metadata(FA2_core): |
|
def set_token_metadata_view(self): |
|
def token_metadata(self, tok): |
|
""" |
|
Return the token-metadata URI for the given token. |
|
|
|
For a reference implementation, dynamic-views seem to be the |
|
most flexible choice. |
|
""" |
|
sp.set_type(tok, sp.TNat) |
|
sp.result(self.data.token_metadata[tok]) |
|
|
|
self.token_metadata = sp.offchain_view(pure = True, doc = "Get Token Metadata")(token_metadata) |
|
|
|
def make_metadata(symbol, name, decimals): |
|
"Helper function to build metadata JSON bytes values." |
|
return (sp.map(l = { |
|
# Remember that michelson wants map already in ordered |
|
"decimals" : sp.utils.bytes_of_string("%d" % decimals), |
|
"name" : sp.utils.bytes_of_string(name), |
|
"symbol" : sp.utils.bytes_of_string(symbol) |
|
})) |
|
|
|
|
|
class FA2(FA2_change_metadata, FA2_token_metadata, FA2_mint, FA2_administrator, FA2_pause, FA2_core): |
|
|
|
@sp.offchain_view(pure = True) |
|
def count_tokens(self): |
|
"""Get how many tokens are in this FA2 contract. |
|
""" |
|
sp.result(self.token_id_set.cardinal(self.data.all_tokens)) |
|
|
|
@sp.offchain_view(pure = True) |
|
def does_token_exist(self, tok): |
|
"Ask whether a token ID is exists." |
|
sp.set_type(tok, sp.TNat) |
|
sp.result(self.data.token_metadata.contains(tok)) |
|
|
|
@sp.offchain_view(pure = True) |
|
def all_tokens(self): |
|
if self.config.assume_consecutive_token_ids: |
|
sp.result(sp.range(0, self.data.all_tokens)) |
|
else: |
|
sp.result(self.data.all_tokens.elements()) |
|
|
|
@sp.offchain_view(pure = True) |
|
def total_supply(self, tok): |
|
if self.config.store_total_supply: |
|
sp.result(self.data.total_supply[tok]) |
|
else: |
|
sp.set_type(tok, sp.TNat) |
|
sp.result("total-supply not supported") |
|
|
|
@sp.offchain_view(pure = True) |
|
def is_operator(self, query): |
|
sp.set_type(query, |
|
sp.TRecord(token_id = sp.TNat, |
|
owner = sp.TAddress, |
|
operator = sp.TAddress).layout( |
|
("owner", ("operator", "token_id")))) |
|
sp.result( |
|
self.operator_set.is_member(self.data.operators, |
|
query.owner, |
|
query.operator, |
|
query.token_id) |
|
) |
|
|
|
def __init__(self, config, metadata, admin): |
|
# Let's show off some meta-programming: |
|
if config.assume_consecutive_token_ids: |
|
self.all_tokens.doc = """ |
|
This view is specified (but optional) in the standard. |
|
|
|
This contract is built with assume_consecutive_token_ids = |
|
True, so we return a list constructed from the number of tokens. |
|
""" |
|
else: |
|
self.all_tokens.doc = """ |
|
This view is specified (but optional) in the standard. |
|
|
|
This contract is built with assume_consecutive_token_ids = |
|
False, so we convert the set of tokens from the storage to a list |
|
to fit the expected type of TZIP-16. |
|
""" |
|
list_of_views = [ |
|
self.get_balance |
|
, self.does_token_exist |
|
, self.count_tokens |
|
, self.all_tokens |
|
, self.is_operator |
|
] |
|
|
|
if config.store_total_supply: |
|
list_of_views = list_of_views + [self.total_supply] |
|
if config.use_token_metadata_offchain_view: |
|
self.set_token_metadata_view() |
|
list_of_views = list_of_views + [self.token_metadata] |
|
|
|
metadata_base = { |
|
"version": config.name # will be changed if using fatoo. |
|
, "description" : ( |
|
"This is a didactic reference implementation of FA2," |
|
+ " a.k.a. TZIP-012, using SmartPy.\n\n" |
|
+ "This particular contract uses the configuration named: " |
|
+ config.name + "." |
|
) |
|
, "interfaces": ["TZIP-012", "TZIP-016"] |
|
, "authors": [ |
|
"Seb Mondet <https://seb.mondet.org>" |
|
] |
|
, "homepage": "https://gitlab.com/smondet/fa2-smartpy" |
|
, "views": list_of_views |
|
, "source": { |
|
"tools": ["SmartPy"] |
|
, "location": "https://gitlab.com/smondet/fa2-smartpy.git" |
|
} |
|
, "permissions": { |
|
"operator": |
|
"owner-or-operator-transfer" if config.support_operator else "owner-transfer" |
|
, "receiver": "owner-no-hook" |
|
, "sender": "owner-no-hook" |
|
} |
|
, "fa2-smartpy": { |
|
"configuration" : |
|
dict([(k, getattr(config, k)) for k in dir(config) if "__" not in k and k != 'my_map']) |
|
} |
|
} |
|
self.init_metadata("metadata_base", metadata_base) |
|
FA2_core.__init__(self, config, metadata, paused = False, administrator = admin) |
|
|
|
## ## Tests |
|
## |
|
## ### Auxiliary Consumer Contract |
|
## |
|
## This contract is used by the tests to be on the receiver side of |
|
## callback-based entry-points. |
|
## It stores facts about the results in order to use `scenario.verify(...)` |
|
## (cf. |
|
## [documentation](https://smartpy.io/docs/scenarios/testing)). |
|
class View_consumer(sp.Contract): |
|
def __init__(self, contract): |
|
self.contract = contract |
|
self.init(last_sum = 0, |
|
operator_support = not contract.config.support_operator) |
|
|
|
@sp.entry_point |
|
def reinit(self): |
|
self.data.last_sum = 0 |
|
# It's also nice to make this contract have more than one entry point. |
|
|
|
@sp.entry_point |
|
def receive_balances(self, params): |
|
sp.set_type(params, Balance_of.response_type()) |
|
self.data.last_sum = 0 |
|
sp.for resp in params: |
|
self.data.last_sum += resp.balance |
|
|
|
## |
|
## ## Global Environment Parameters |
|
## |
|
## The build system communicates with the python script through |
|
## environment variables. |
|
## The function `environment_config` creates an `FA2_config` given the |
|
## presence and values of a few environment variables. |
|
def global_parameter(env_var, default): |
|
try: |
|
if os.environ[env_var] == "true" : |
|
return True |
|
if os.environ[env_var] == "false" : |
|
return False |
|
return default |
|
except: |
|
return default |
|
|
|
def environment_config(): |
|
return FA2_config( |
|
debug_mode = global_parameter("debug_mode", False), |
|
single_asset = global_parameter("single_asset", False), |
|
non_fungible = global_parameter("non_fungible", False), |
|
add_mutez_transfer = global_parameter("add_mutez_transfer", False), |
|
readable = global_parameter("readable", True), |
|
force_layouts = global_parameter("force_layouts", True), |
|
support_operator = global_parameter("support_operator", True), |
|
assume_consecutive_token_ids = |
|
global_parameter("assume_consecutive_token_ids", True), |
|
store_total_supply = global_parameter("store_total_supply", False), |
|
lazy_entry_points = global_parameter("lazy_entry_points", False), |
|
allow_self_transfer = global_parameter("allow_self_transfer", False), |
|
use_token_metadata_offchain_view = global_parameter("use_token_metadata_offchain_view", True), |
|
) |
|
|
|
class MRKT (sp.Contract): |
|
|
|
LISTING_TYPE = sp.TRecord( |
|
# address that created listing |
|
issuer=sp.TAddress, |
|
# token FA2 contract address |
|
fa2=sp.TAddress, |
|
# token ID being listed |
|
token_id=sp.TNat, |
|
# number of editions being listed at this price |
|
token_amount=sp.TNat, |
|
# the price of each edition in mutez |
|
xtz_per_token=sp.TMutez, |
|
# royalty total |
|
royalty_total=sp.TNat, |
|
# royalties |
|
royalty_beneficiaries=sp.TList( |
|
sp.TRecord( |
|
# Address to receive benefit |
|
address = sp.TAddress, |
|
# Amount to receive, e.g. 250 => 25% |
|
amount = sp.TNat |
|
).layout(("address", "amount")) |
|
) |
|
).layout( |
|
("issuer", |
|
("fa2", |
|
("token_id", |
|
("token_amount", |
|
("xtz_per_token", |
|
("royalty_total", "royalty_beneficiaries") |
|
) |
|
) |
|
) |
|
) |
|
) |
|
) |
|
|
|
LISTING_TYPE_NO_ISSUER = sp.TRecord( |
|
# token FA2 contract address |
|
fa2=sp.TAddress, |
|
# token ID being listed |
|
token_id=sp.TNat, |
|
# number of editions being listed at this price |
|
token_amount=sp.TNat, |
|
# the price of each edition in mutez |
|
xtz_per_token=sp.TMutez, |
|
# royalty total |
|
royalty_total=sp.TNat, |
|
# royalties |
|
royalty_beneficiaries=sp.TList( |
|
sp.TRecord( |
|
# Address to receive benefit |
|
address = sp.TAddress, |
|
# Amount to receive, e.g. 250 => 25% |
|
amount = sp.TNat |
|
).layout(("address", "amount")) |
|
) |
|
).layout( |
|
("fa2", |
|
("token_id", |
|
("token_amount", |
|
("xtz_per_token", |
|
("royalty_total", "royalty_beneficiaries") |
|
) |
|
) |
|
) |
|
) |
|
) |
|
|
|
def __init__(self, metadata): |
|
# storage types |
|
self.init_type(sp.TRecord( |
|
metadata=sp.TBigMap(sp.TString, sp.TBytes), |
|
listings=sp.TBigMap(sp.TNat, MRKT.LISTING_TYPE), |
|
counter=sp.TNat |
|
)) |
|
|
|
# initialize storage |
|
self.init( |
|
metadata=metadata, |
|
listings=sp.big_map(), |
|
counter=0 |
|
) |
|
|
|
def check_no_tez_transfer(self): |
|
"""Checks that no tez were transferred in the operation.""" |
|
sp.verify(sp.amount == sp.tez(0), "The operation does not need tez transfers") |
|
|
|
# @sp.onchain_view() |
|
# def getRoyalties(self, listing_id): |
|
# """Check if a given listing id is present in the listings big map.""" |
|
# # Define the input parameter data type |
|
# sp.set_type(listing_id, sp.TNat) |
|
|
|
# # Check that the listing id is present in the listings big map |
|
# sp.verify(self.data.listings.contains(listing_id), "The provided listing_id doesn't exist") |
|
|
|
# # get listing |
|
# listing = self.data.listings[listing_id] |
|
|
|
# rec = sp.view("getRoyalties", listing.fa2, listing.token_id, t = sp.TRecord( |
|
# amount = sp.TNat, |
|
# beneficiary = sp.TAddress |
|
# )).open_some("invalid getRoyalties view") |
|
|
|
# # Return the royalty information |
|
# sp.result(rec) |
|
|
|
@sp.entry_point |
|
def collect(self, listing_id): |
|
"""Collects one edition of a token that has already been listed.""" |
|
# Define the input parameter data type |
|
sp.set_type(listing_id, sp.TNat) |
|
|
|
# Check that the listing id is present in the listings big map |
|
sp.verify(self.data.listings.contains(listing_id), "The provided listing_id doesn't exist") |
|
|
|
# Check that the collector is not the creator of the listing |
|
listing = self.data.listings[listing_id] |
|
sp.verify(sp.sender != listing.issuer, "The collector cannot be the listing issuer") |
|
|
|
# Check that the provided tez amount is exactly the edition price |
|
sp.verify(sp.amount == listing.xtz_per_token, "The sent tez amount does not coincide with the edition price") |
|
|
|
# Check that there is at least one edition available to collect |
|
sp.verify(listing.token_amount > 0, "All editions have already been collected") |
|
|
|
# Handle tez tranfers if the edition price is not zero |
|
sp.if listing.xtz_per_token != sp.tez(0): |
|
# get royalty info from token contract |
|
royalties = sp.view("getRoyalties", listing.fa2, listing.token_id, t = sp.TRecord( |
|
creator = sp.TAddress, |
|
total_amount = sp.TNat, |
|
beneficiaries = sp.TList( |
|
sp.TRecord( |
|
# Address to receive benefit |
|
address = sp.TAddress, |
|
# Amount to receive, e.g. 250 => 25% |
|
amount = sp.TNat |
|
) |
|
) |
|
)).open_some("invalid getRoyalties view") |
|
|
|
# token royalties |
|
# these are fixed and defined by the token creator |
|
amount = sp.local("amount", sp.amount) |
|
sp.if royalties.total_amount > 0: |
|
sp.for benefit in royalties.beneficiaries: |
|
split = sp.local("split", sp.split_tokens(listing.xtz_per_token, benefit.amount, royalties.total_amount)) |
|
# Send the royalties to beneficiary |
|
sp.if split.value > sp.mutez(0): |
|
sp.send(benefit.address, split.value) |
|
amount.value = amount.value - split.value |
|
sp.verify(amount.value >= sp.mutez(0), "xtz amount < 0") |
|
|
|
# listing royalties |
|
# these are defined by the listing issuer |
|
remaining = sp.local("remaining", amount.value) |
|
sp.if listing.royalty_total > 0: |
|
sp.for benefit in listing.royalty_beneficiaries: |
|
split = sp.local("split", sp.split_tokens(remaining.value, benefit.amount, listing.royalty_total)) |
|
# Send the royalties to beneficiary |
|
sp.if split.value > sp.mutez(0): |
|
sp.send(benefit.address, split.value) |
|
amount.value = amount.value - split.value |
|
sp.verify(amount.value >= sp.mutez(0), "xtz amount < 0") |
|
|
|
# Send what is left to the listing issuer |
|
sp.send(listing.issuer, amount.value) |
|
|
|
# Transfer the token edition to the collector |
|
self.fa2_transfer( |
|
fa2=listing.fa2, |
|
from_=sp.self_address, |
|
to_=sp.sender, |
|
token_id=listing.token_id, |
|
token_amount=1 |
|
) |
|
|
|
# Update the number of editions available in the listings big map |
|
listing.token_amount = sp.as_nat(listing.token_amount - 1) |
|
|
|
@sp.entry_point |
|
def create_listing(self, params): |
|
"""Lists several editions of a token at a fixed price.""" |
|
|
|
# Define the input parameter data type |
|
sp.set_type(params, MRKT.LISTING_TYPE_NO_ISSUER) |
|
|
|
# Check that no tez have been transferred |
|
self.check_no_tez_transfer() |
|
|
|
# Check that at least one edition will be listed |
|
sp.verify(params.token_amount > 0, "At least one edition needs to be listed") |
|
|
|
# Transfer all the editions to the marketplace account |
|
self.fa2_transfer( |
|
fa2=params.fa2, |
|
from_=sp.sender, |
|
to_=sp.self_address, |
|
token_id=params.token_id, |
|
token_amount=params.token_amount) |
|
|
|
# Update the listings bigmap with the new listing information |
|
self.data.listings[self.data.counter] = sp.record( |
|
issuer=sp.sender, |
|
fa2=params.fa2, |
|
token_id=params.token_id, |
|
token_amount=params.token_amount, |
|
xtz_per_token=params.xtz_per_token, |
|
royalty_total=params.royalty_total, |
|
royalty_beneficiaries = params.royalty_beneficiaries |
|
) |
|
|
|
# Increase the listing counter |
|
self.data.counter += 1 |
|
|
|
@sp.entry_point |
|
def cancel_listing(self, listing_id): |
|
"""Cancels an existing listing.""" |
|
|
|
# Define the input parameter data type |
|
sp.set_type(listing_id, sp.TNat) |
|
|
|
# Check that no tez have been transferred |
|
self.check_no_tez_transfer() |
|
|
|
# Check that the listing id is present in the listings big map |
|
sp.verify(self.data.listings.contains(listing_id), "The provided listing_id doesn't exist") |
|
|
|
# Check that the listing issuer is cancelling the listing |
|
listing = self.data.listings[listing_id] |
|
sp.verify(sp.sender == listing.issuer, "Only the listing issuer can cancel the listing") |
|
|
|
# Check that there is at least one listed edition |
|
sp.verify(listing.token_amount > 0, "All editions have been collected") |
|
|
|
# Transfer the remaining token editions back to the issuer |
|
self.fa2_transfer( |
|
fa2=listing.fa2, |
|
from_=sp.self_address, |
|
to_=sp.sender, |
|
token_id=listing.token_id, |
|
token_amount=listing.token_amount) |
|
|
|
# Delete the listing |
|
del self.data.listings[listing_id] |
|
|
|
def fa2_transfer(self, fa2, from_, to_, token_id, token_amount): |
|
"""Transfers a number of editions of a FA2 token between two addresses.""" |
|
# Get a handle to the FA2 token transfer entry point |
|
c = sp.contract( |
|
t=sp.TList(sp.TRecord( |
|
from_=sp.TAddress, |
|
txs=sp.TList(sp.TRecord( |
|
to_=sp.TAddress, |
|
token_id=sp.TNat, |
|
amount=sp.TNat).layout(("to_", ("token_id", "amount")))))), |
|
address=fa2, |
|
entry_point="transfer").open_some() |
|
|
|
# Transfer the FA2 token editions to the new address |
|
sp.transfer( |
|
arg=sp.list([sp.record( |
|
from_=from_, |
|
txs=sp.list([sp.record( |
|
to_=to_, |
|
token_id=token_id, |
|
amount=token_amount)]))]), |
|
amount=sp.mutez(0), |
|
destination=c) |
|
|
|
@sp.onchain_view() |
|
def has_listing(self, listing_id): |
|
"""Check if a given listing id is present in the listings big map.""" |
|
# Define the input parameter data type |
|
sp.set_type(listing_id, sp.TNat) |
|
|
|
# Return True if the listing id is present in the listings big map |
|
sp.result(self.data.listings.contains(listing_id)) |
|
|
|
@sp.onchain_view() |
|
def get_listing(self, listing_id): |
|
"""Returns the complete information from a given listing id.""" |
|
# Define the input parameter data type |
|
sp.set_type(listing_id, sp.TNat) |
|
|
|
# Check that the listing id is present in the listings big map |
|
sp.verify(self.data.listings.contains(listing_id), "The provided listing_id doesn't exist") |
|
|
|
# Return the listing information |
|
sp.result(self.data.listings[listing_id]) |
|
|
|
@sp.onchain_view() |
|
def get_listings_counter(self): |
|
"""Returns the listings counter.""" |
|
sp.result(self.data.counter) |
|
|
|
if "templates" not in __name__: |
|
@sp.add_test(name = "SpectrumColors") |
|
def test(): |
|
artist = sp.test_account("artist") |
|
collector1 = sp.test_account("collector1") |
|
collector2 = sp.test_account("collector2") |
|
|
|
market = MRKT(sp.utils.metadata_of_url("ipfs://aaa")) |
|
scenario = sp.test_scenario() |
|
scenario.h1("MRKT") |
|
scenario += market |
|
|
|
admin = sp.test_account("Administrator") |
|
alice = sp.test_account("Alice") |
|
bob = sp.test_account("Robert") |
|
# Let's display the accounts: |
|
scenario.h2("Accounts") |
|
scenario.show([admin, alice, bob]) |
|
fa2 = FA2( |
|
config = environment_config(), |
|
metadata = sp.utils.metadata_of_url("https://example.com"), |
|
admin = admin.address) |
|
scenario += fa2 |
|
|
|
tok0_md = FA2.make_metadata( |
|
name = "The Token Zero", |
|
decimals = 2, |
|
symbol= "TK0" ) |
|
|
|
token_id = 0 |
|
scenario += fa2.mint(address = alice.address, |
|
amount = 50, |
|
metadata = tok0_md, |
|
token_id = token_id, |
|
creator = alice.address, |
|
royalty_total = 1000, |
|
royalty_beneficiaries = sp.list(l = [ |
|
# 25% to alice |
|
sp.record(address = alice.address, amount = sp.nat(250)), |
|
# 2.5% to bob |
|
sp.record(address = bob.address, amount = sp.nat(25)) |
|
])).run(sender = admin) |
|
|
|
scenario += fa2.update_operators([ |
|
sp.variant("add_operator", sp.record( |
|
owner=alice.address, |
|
operator=market.address, |
|
token_id=token_id))]).run(sender=alice) |
|
|
|
edition_count = 50 |
|
edition_price = 1 |
|
scenario += market.create_listing( |
|
fa2 = fa2.address, |
|
token_id = 0, |
|
token_amount = edition_count, |
|
xtz_per_token = sp.tez(edition_price), |
|
royalty_total = 0, |
|
royalty_beneficiaries = sp.list() |
|
).run(sender = alice.address) |
|
|
|
listing_id = sp.as_nat(market.get_listings_counter() - 1) |
|
|
|
scenario += market.collect( |
|
listing_id |
|
).run(sender = bob.address, amount = sp.tez(edition_price)) |
|
|
|
scenario.p("Counter").show(market.get_listing(listing_id)) |
|
|
|
sp.add_compilation_target("MRKT", MRKT(sp.utils.metadata_of_url("ipfs://aaa"))) |