Skip to content

Instantly share code, notes, and snippets.

@mryellow
Created June 7, 2018 23:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mryellow/2d58a618b24714daaeec4ecbebe3ac71 to your computer and use it in GitHub Desktop.
Save mryellow/2d58a618b24714daaeec4ecbebe3ac71 to your computer and use it in GitHub Desktop.
const Tx = require('ethereumjs-tx');
const asyncPriorityQueue = require('async').priorityQueue;
let web3;
let mutexLock = false;
const signTask = task => {
const tx = new Tx(task.txOpt);
tx.sign(Buffer.from(task.privateKey.replace(/^0x/, ''), 'hex'));
const valid = tx.validate();
const verify = tx.verifySignature();
const from = tx.getSenderAddress();
console.assert(valid, 'Transaction invalid');
console.assert(verify, 'Signature invalid');
console.assert(from, 'Sender invalid');
//from = '0x' + from.toString('hex');
//console.assert(from === task.account, 'Sender mismatch');
return '0x' + tx.serialize().toString('hex');
};
// `hashCallback`, `receiptCallback`
const sendSignedTask = (signedTx, throttle) => {
if (!signedTx) return Promise.reject(new Error('Missing signedTx'));
return new Promise((resolve, reject) => {
let txHash;
web3.eth
.sendSignedTransaction(signedTx)
.once('transactionHash', hash => {
// Once we have hash, release thread to send next nonce.
mutexLock = false;
// Save the hash in-case we timeout and need to poll for it.
txHash = hash;
// TODO: Separate from `callback` another `hashCallback`?
// TODO: Emit an event or callback to tell outside we've recieved hash?
// TODO: Return the promievent?
// Wait for throttle instead of waiting for receipt.
if (throttle > 0) {
setTimeout(() => {
resolve({ transactionHash: txHash });
}, throttle);
}
})
.once('receipt', receipt => {
resolve(receipt);
})
.catch(err => {
console.error(err.message);
// FIXME: How do we know this error clears the mutex and isn't after it was already cleared and then locked again?
mutexLock = false;
// Continue waiting
if (
!throttle &&
err.message
.toLowerCase()
.indexOf('transaction was not mined within') !== -1
) {
let cnt = 0;
// TODO: Back-off, or timeout?
const _checkReceipt = async hash => {
const receipt = await web3.eth.getTransactionReceipt(hash);
// Log periodically
if (cnt % 250 === 0) console.log(hash, receipt, cnt);
cnt++;
if (receipt && receipt.blockNumber > 0) {
resolve(receipt);
} else {
// Recurse
setTimeout(() => _checkReceipt(hash), 500);
}
};
_checkReceipt(txHash);
// Exit
} else {
reject(new Error(err.message));
}
});
});
};
//export default
module.exports = function(web3js, concurrency, throttle, callback) {
web3 = web3js;
const worker = (task, workerDone) => {
console.log('worker', task);
console.assert(task, 'Invalid task');
console.assert(task.account, 'Invalid account');
console.assert(task.privateKey, 'Invalid privateKey');
//console.assert(task.txOpt.nonce, 'Invalid nonce');
console.assert(task.txOpt.gasLimit, 'Invalid gasLimit');
console.assert(task.txOpt.gasPrice, 'Invalid gasPrice');
const _processTask = async () => {
mutexLock = true;
// Queue priority is separate to nonce, if left undefined we can count.
if (!task.txOpt.nonce)
task.txOpt.nonce = await web3.eth.getTransactionCount(
task.account,
'pending'
);
console.log('nonce', task.account, task.txOpt.nonce);
// EIP 155 `chainId`
const netId = await web3.eth.net.getId();
task.txOpt.chainId = web3.utils.toHex(netId);
const signedTx = signTask(task);
// TODO: This is resolve/reject with response
const receipt = await sendSignedTask(signedTx, throttle);
callback(receipt);
workerDone();
//setTimeout(workerDone, 30000);
};
// Waits until transaction is broadcast before starting another.
const _waitMutex = () => {
if (!mutexLock) return _processTask();
setTimeout(() => {
_waitMutex();
}, 500);
};
_waitMutex();
};
// Start with a priority queue
return asyncPriorityQueue(worker, concurrency);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment