Skip to content

Instantly share code, notes, and snippets.

@xen
Created July 18, 2018 10:07
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save xen/70564a2915e5415523fb3f86c4487b13 to your computer and use it in GitHub Desktop.
Save xen/70564a2915e5415523fb3f86c4487b13 to your computer and use it in GitHub Desktop.
from web3 import Web3, HTTPProvider, IPCProvider
web3 = Web3(IPCProvider('/Users/xen/Library/Ethereum/rinkeby/geth.ipc'))
# web3 = Web3(IPCProvider('/Users/xen/Library/Ethereum/geth.ipc'))
print(web3.eth.blockNumber)
print(web3.eth.accounts)
import sys
import time
import pprint
from web3.providers.eth_tester import EthereumTesterProvider
from web3 import Web3, IPCProvider
from solc import compile_source
def compile_source_file(file_path):
with open(file_path, 'r') as f:
source = f.read()
return compile_source(source)
def deploy_contract(w3, contract_interface):
contract = w3.eth.contract(
abi=contract_interface['abi'],
bytecode=contract_interface['bin'])
tx = contract.deploy(args=[], transaction={
'from': w3.eth.accounts[0], 'gas': 410000})
address = w3.eth.getTransactionReceipt(tx)['contractAddress']
return address
def wait_for_receipt(w3, tx_hash, poll_interval):
while True:
tx_receipt = w3.eth.getTransactionReceipt(tx_hash)
if tx_receipt:
return tx_receipt
time.sleep(poll_interval)
# w3 = Web3(IPCProvider('/Users/xen/Library/Ethereum/rinkeby/geth.ipc'))
w3 = Web3(EthereumTesterProvider())
contract_source_path = 'contracts/store.sol'
compiled_sol = compile_source_file('contracts/store.sol')
contract_id, contract_interface = compiled_sol.popitem()
address = deploy_contract(w3, contract_interface)
print("Deployed {0} to: {1}\n".format(contract_id, address))
store_var_contract = w3.eth.contract(
address=address,
abi=contract_interface['abi'])
gas_estimate = store_var_contract.functions.setVar(255).estimateGas()
print("Gas estimate to transact with setVar: {0}\n".format(gas_estimate))
if gas_estimate < 100000:
print("Sending transaction to setVar(255)\n")
tx_hash = store_var_contract.functions.setVar(255).transact()
receipt = wait_for_receipt(w3, tx_hash, 1)
print("Transaction receipt mined: \n")
pprint.pprint(dict(receipt))
else:
print("Gas cost exceeds 100000")
from web3 import Web3, TestRPCProvider, IPCProvider
import asyncio
w3 = Web3(IPCProvider('/Users/xen/Library/Ethereum/rinkeby/geth.ipc'))
print(w3.eth.blockNumber)
def handle_event(event):
print(event)
# and whatever
async def log_loop(event_filter, poll_interval):
while True:
for event in event_filter.get_new_entries():
handle_event(event)
await asyncio.sleep(poll_interval)
def main():
block_filter = w3.eth.filter('latest')
tx_filter = w3.eth.filter('pending')
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
asyncio.gather(
log_loop(block_filter, 2),
log_loop(tx_filter, 2)))
finally:
loop.close()
if __name__ == '__main__':
main()
from web3 import Web3, HTTPProvider, IPCProvider
w3 = Web3(IPCProvider('/Users/xen/Library/Ethereum/rinkeby/geth.ipc'))
contract_address = '0x8b26d0070634719b52569fB4733b88A61F460eD0'
event_filter = w3.eth.filter({'fromBlock': 0, 'toBlock': 'latest', 'address': contract_address, 'topics': [
'0x0bd8ab3a75b603beb8c382868ae3ec451c35bb41444f6b0c2175e0505424e95c']})
event_logs = w3.eth.getFilterLogs(event_filter.filter_id)
pragma solidity ^0.4.17;
contract Inbox {
string public message;
function Inbox(string initialMessage) public {
message = initialMessage;
}
function setMessage(string newMessage) public {
message = newMessage;
}
}
contract StoreVar {
uint8 public _myVar;
event MyEvent(uint indexed _var);
function setVar(uint8 _var) public {
_myVar = _var;
MyEvent(_var);
}
function getVar() public view returns (uint8) {
return _myVar;
}
}
# Store contract
MyEvent: event({_var: indexed(uint256)})
_myVar: public(uint256)
@public
def setVar(_var: uint256):
self._myVar = _var
log.MyEvent(_var)
@public
def getVar() -> uint256:
return self._myVar
# Solidity-Compatible EIP20/ERC20 Token
# Implements https://github.com/ethereum/EIPs/blob/master/EIPS/eip-20-token-standard.md
# Author: Phil Daian
# The use of the uint256 datatype as in this token is not
# recommended, as it can pose security risks.
# This token is intended as a proof of concept towards
# language interoperability and not for production use.
# Events issued by the contract
Transfer: event(
{_from: indexed(address), _to: indexed(address), _value: uint256})
Approval: event({_owner: indexed(address),
_spender: indexed(address), _value: uint256})
balances: uint256[address]
allowances: (uint256[address])[address]
num_issued: uint256
@public
@payable
def deposit():
_value: uint256 = convert(msg.value, 'uint256')
_sender: address = msg.sender
self.balances[_sender] = uint256_add(self.balances[_sender], _value)
self.num_issued = uint256_add(self.num_issued, _value)
# Fire deposit event as transfer from 0x0
log.Transfer(0x0000000000000000000000000000000000000000, _sender, _value)
@public
def withdraw(_value: uint256) -> bool:
_sender: address = msg.sender
# Make sure sufficient funds are present, op will not underflow supply
# implicitly through overflow protection
self.balances[_sender] = uint256_sub(self.balances[_sender], _value)
self.num_issued = uint256_sub(self.num_issued, _value)
send(_sender, as_wei_value(convert(_value, 'int128'), 'wei'))
# Fire withdraw event as transfer to 0x0
log.Transfer(_sender, 0x0000000000000000000000000000000000000000, _value)
return true
@public
@constant
def totalSupply() -> uint256:
return self.num_issued
@public
@constant
def balanceOf(_owner: address) -> uint256:
return self.balances[_owner]
@public
def transfer(_to: address, _value: uint256) -> bool:
_sender: address = msg.sender
# Make sure sufficient funds are present implicitly through overflow protection
self.balances[_sender] = uint256_sub(self.balances[_sender], _value)
self.balances[_to] = uint256_add(self.balances[_to], _value)
# Fire transfer event
log.Transfer(_sender, _to, _value)
return true
@public
def transferFrom(_from: address, _to: address, _value: uint256) -> bool:
_sender: address = msg.sender
allowance: uint256 = self.allowances[_from][_sender]
# Make sure sufficient funds/allowance are present implicitly through overflow protection
self.balances[_from] = uint256_sub(self.balances[_from], _value)
self.balances[_to] = uint256_add(self.balances[_to], _value)
self.allowances[_from][_sender] = uint256_sub(allowance, _value)
# Fire transfer event
log.Transfer(_from, _to, _value)
return true
@public
def approve(_spender: address, _value: uint256) -> bool:
_sender: address = msg.sender
self.allowances[_sender][_spender] = _value
# Fire approval event
log.Approval(_sender, _spender, _value)
return true
@public
@constant
def allowance(_owner: address, _spender: address) -> uint256:
return self.allowances[_owner][_spender]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment