Skip to content

Instantly share code, notes, and snippets.

@opie4624
Created January 26, 2010 22:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save opie4624/287331 to your computer and use it in GitHub Desktop.
Save opie4624/287331 to your computer and use it in GitHub Desktop.
Working through the Twisted book
#!/usr/bin/env python
# encoding: utf-8
"""
calllater.py
Created by Andrew Kraut on 2010-01-25.
"""
import sys
import os
import time
from twisted.internet import reactor
def printTime():
print "Current time is", time.strftime("%H:%M:%S")
def stopReactor():
print "Stopping reactor"
printTime()
reactor.stop()
def main():
reactor.callLater(1, printTime)
reactor.callLater(2, printTime)
reactor.callLater(3, printTime)
reactor.callLater(4, printTime)
reactor.callLater(5, stopReactor)
print "Running the reactor..."
reactor.run()
print "Reactor stopped."
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
connectiontest.py
Created by Andrew Kraut on 2010-01-26.
"""
import sys
import os
from twisted.internet import reactor, defer, protocol
class CallbackAndDisconnectProtocol(protocol.Protocol):
def connectionMade(self):
self.factory.deferred.callback("Connected!")
self.transport.loseConnection()
class ConnectionTestFactory(protocol.ClientFactory):
protocol = CallbackAndDisconnectProtocol
def __init__(self):
self.deferred = defer.Deferred()
def clientConnectionFailed(self, connector, reason):
self.deferred.errback(reason)
def testConnect(host, port):
testFactory = ConnectionTestFactory()
reactor.connectTCP(host, port, testFactory)
return testFactory.deferred
def handleSuccess(result, port):
print "Connected to port %i" % port
reactor.stop()
def handleFailure(failure, port):
print "Error connecting to port %i: %s" % (
port, failure.getErrorMessage()
)
reactor.stop()
def main():
if not len(sys.argv) == 3:
print "Usage: connectiontest.py host port"
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2])
connecting = testConnect(host, port)
connecting.addCallback(handleSuccess, port)
connecting.addErrback(handleFailure, port)
reactor.run()
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
dataforward.py
Created by Andrew Kraut on 2010-01-27.
"""
import sys
import os
import re
from twisted.internet import stdio, reactor, protocol
from twisted.protocols import basic
class DataForwardingProtocol(protocol.Protocol):
def __init__(self):
self.output = None
self.normalizeNewLines = False
def dataReceived(self, data):
if self.normalizeNewLines:
data = re.sub(r"(\r\n|\n)", "\r\n", data)
if self.output:
self.output.write(data)
class StdioProxyProtocol(DataForwardingProtocol):
def connectionMade(self):
inputForwarder = DataForwardingProtocol()
inputForwarder.output = self.transport
inputForwarder.normalizeNewLines = True
stdioWrapper = stdio.StandardIO(inputForwarder)
self.output = stdioWrapper
print "Connected to server. Press ctrl-C to close connection."
class StdioProxyFactory(protocol.ClientFactory):
protocol = StdioProxyProtocol
def clientConnectionLost(self, transport, reason):
reactor.stop()
def clientConnectionFailed(self, transport, reason):
print reason.getErrorMessage()
reactor.stop()
def main():
if not len(sys.argv) == 3:
print "Usage: %s host port" % __file__
sys.exit(1)
reactor.connectTCP(sys.argv[1], int(sys.argv[2]), StdioProxyFactory())
reactor.run()
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
echoserver.py
Created by Andrew Kraut on 2010-01-27.
"""
import sys
import os
from twisted.internet import reactor, protocol
from twisted.protocols import basic
class EchoProtocol(basic.LineReceiver):
def lineReceived(self, line):
if line == 'quit':
self.sendLine("Goodbye.")
self.transport.loseConnection()
else:
self.sendLine("You said: " + line)
class EchoServerFactory(protocol.ServerFactory):
protocol = EchoProtocol
def main():
port = 5001
reactor.listenTCP(port, EchoServerFactory())
reactor.run()
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
portscan.py
Created by Andrew Kraut on 2010-01-27.
"""
import sys
import os
from twisted.internet import reactor, defer
from connectiontest import testConnect
def handleAllResults(results, ports):
for port, resultInfo in zip(ports, results):
success, result = resultInfo
if success:
print "Connected to port %i" % port
reactor.stop()
def main():
if len(sys.argv) > 1:
host = sys.argv[1]
else:
host = 'localhost'
ports = range(1, 201)
testers = [testConnect(host, port) for port in ports]
defer.DeferredList(testers, consumeErrors=True).addCallback(
handleAllResults, ports
)
reactor.run()
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
runreactor.py
Created by Andrew Kraut on 2010-01-25.
"""
import sys
import os
from twisted.internet import reactor
def main():
print "Running the reactor..."
reactor.run()
print "Reactor stopped."
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
tcpconnection.py
Created by Andrew Kraut on 2010-01-26.
"""
import sys
import os
from twisted.internet import reactor, protocol
class QuickDisconnectProtocol(protocol.Protocol):
def connectionMade(self):
print "Connected to %s." % self.transport.getPeer().host
self.transport.loseConnection()
class BasicClientFactory(protocol.ClientFactory):
protocol = QuickDisconnectProtocol
def clientConnectionLost(self, connector, reason):
print "Lost connection: %s" % reason.getErrorMessage()
reactor.stop()
def clientConnectionFailed(self, connector, reason):
print "Connection failed: %s" % reason.getErrorMessage()
reactor.stop()
def main():
reactor.connectTCP('www.google.com', 80, BasicClientFactory())
reactor.run()
if __name__ == '__main__':
main()
<HTML>
<HEAD>
<TITLE>Test!</TITLE>
</HEAD>
<BODY>
<H1>Test!</H1>
<P>This is a test.</P>
</BODY>
</HTML>
#!/usr/bin/env python
# encoding: utf-8
"""
updatecheck.py
Created by Andrew Kraut on 2010-02-17.
"""
import sys
from twisted.internet import reactor
from twisted.web import client
class HTTPStatusChecker(client.HTTPClientFactory):
def __init__(self, url, headers=None):
client.HTTPClientFactory.__init__(self, url, headers=headers)
self.status = None
self.deferred.addCallback(
lambda data: (data, self.status, self.response_headers)
)
def noPage(self, reason):
if self.status == '304':
client.HTTPClientFactory.page(self, '')
else:
client.HTTPClientFactory.noPage(self, reason)
def checkStatus(url, contextFactory=None, *args, **kwargs):
scheme, host, port, path = client._parse(url)
factory = HTTPStatusChecker(url, *args, **kwargs)
if scheme == 'https':
from twisted.internet import ssl
if contextFactory is None:
contextFactory = ssl.ClientContextFactory()
reactor.connectSSL(host, port, factory, contextFactory)
else:
reactor.connectTCP(host, port, factory)
return factory.deferred
def handleFirstResult(result, url):
data, status, headers = result
nextRequestHeaders = {}
eTag = headers.get('etag')
if eTag:
nextRequestHeaders['If-None-Match'] = eTag[0]
modified = headers.get('last-modified')
if modified:
nextRequestHeaders['If-Modified-Since'] = modified[0]
return checkStatus(
url, headers=nextRequestHeaders
).addCallback(
handleSecondResult
)
def handleSecondResult(result):
data, status, headers = result
print 'Second request returned status %s:' % status,
if status == '200':
print 'Page changed (or server does not support conditional requests).'
elif status == '304':
print 'Page is unchanged.'
else:
print 'Unexpected Response.'
reactor.stop()
def handleError(failure):
print "Error", failure.getErrorMessage()
reactor.stop()
def main():
url = sys.argv[1]
checkStatus(url).addCallback(
handleFirstResult, url
).addErrback(
handleError
)
reactor.run()
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
validate.py
Created by Andrew Kraut on 2010-02-04.
"""
from twisted.web import client
from twisted.internet import reactor
import sys, os, tempfile, webbrowser, random
def encodeForm(inputs):
"""
Takes a dict of inputs and returns a multipart/form-data string
containing the utf-8 encoded data. Keys must be strings, values
can be either strings or file-like objects.
"""
getRandomChar = lambda: chr(random.choice(range(97, 123)))
randomChars = [getRandomChar() for x in range(20)]
boundary = "---%s---" % ''.join(randomChars)
lines = [boundary]
for key, val in inputs.items():
header = 'Content-Disposition: form-data; name="%s"' % key
if hasattr(val, 'name'):
header += '; filename="%s"' % os.path.split(val.name)[1]
lines.append(header)
if hasattr(val, 'read'):
lines.append(val.read())
else:
lines.append(val.encode('utf-8'))
lines.append('')
lines.append(boundary)
return "\r\n".join(lines)
def showPage(pageData):
# write data to temp .html file, show file in browser
tmpfd, tmp = tempfile.mkstemp('.html')
os.close(tmpfd)
file(tmp, 'w+b').write(pageData)
webbrowser.open('file://' + tmp)
reactor.stop()
def handleError(failure):
print "Error:", failure.getErrorMessage()
reactor.stop()
def main():
filename = sys.argv[1]
fileToCheck = file(filename)
form = encodeForm({'uploaded_file': fileToCheck})
postRequest = client.getPage(
'http://validator.w3.org/check',
method='POST',
headers={'Content-Type': 'multipart/form-data; charset=utf-8',
'Content-Length': str(len(form))},
postdata=form
)
postRequest.addCallback(showPage).addErrback(handleError)
reactor.run()
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
webcat.py
Created by Andrew Kraut on 2010-01-31.
"""
import sys
import os
from twisted.web import client
from twisted.internet import reactor
def printPage(data):
print data
reactor.stop()
def printError(failure):
print >> sys.stderr, "Error:", failure.getErrorMessage()
reactor.stop()
def main():
if len(sys.argv) == 2:
url = sys.argv[1]
client.getPage(url).addCallback(
printPage).addErrback(
printError)
reactor.run()
else:
print "Usage: %s <URL>" % __file__
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
webcat.py
Created by Andrew Kraut on 2010-01-31.
"""
import sys
import os
import tempfile
from twisted.web import client
from twisted.internet import reactor
def downloadToTempFile(url):
"""
Given a URL, returns a Deferred that will be called back with the name
of a temporary file containing the downloaded data.
"""
tmpfd, tempfilename = tempfile.mkstemp()
os.close(tmpfd)
return client.downloadPage(url, tempfilename).addCallback(
returnFilename, tempfilename
)
def returnFilename(result, filename):
return filename
def printFile(filename):
for line in file(filename, 'r+b'):
sys.stdout.write(line)
os.unlink(filename) # delete file once we're done with it
reactor.stop()
def printError(failure):
print >> sys.stderr, "Error:", failure.getErrorMessage()
reactor.stop()
def main():
if len(sys.argv) == 2:
url = sys.argv[1]
downloadToTempFile(url).addCallback(printFile).addErrback(printError)
reactor.run()
else:
print "Usage: %s <URL>" % __file__
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
webcat3.py
Created by Andrew Kraut on 2010-02-04.
"""
import sys, os, getpass, base64
from twisted.web import client, error as weberror
from twisted.internet import reactor
def printPage(data):
print data
reactor.stop()
def checkHTTPError(failure, url):
failure.trap(weberror.Error)
if failure.value.status == '401':
print >> sys.stderr, failure.getErrorMessage()
# prompt for user name and password
username = raw_input("User name: ")
password = getpass.getpass("Password: ")
basicAuth = base64.encodestring("%s:%s" % (username, password))
authHeader = "Basic " + basicAuth.strip()
# try to fetch the page again with authentication
return client.getPage(url, headers={"Authorization": authHeader})
else:
return failure
def printError(failure):
print >> sys.stderr, "Error:", failure.getErrorMessage()
reactor.stop()
def main():
if len(sys.argv) == 2:
url = sys.argv[1]
client.getPage(url).addErrback(checkHTTPError, url).addCallback(printPage).addErrback(printError)
reactor.run()
else:
print "Usage: %s <URL>" % __file__
if __name__ == '__main__':
main()
#!/usr/bin/env python
# encoding: utf-8
"""
webdownload.py
Created by Andrew Kraut on 2010-02-19.
"""
import sys
import os
from twisted.web import client
class HTTPProgressDownloader(client.HTTPDownloader):
def gotHeaders(self, headers):
if self.status == '200': # page data is on the way
if headers.has_key('content-length'):
self.totalLength = int(headers['content-length'][0])
else:
self.totalLength = 0
self.currentLength = 0.0
print ''
return client.HTTPDownloader.gotHeaders(self, headers)
def pagePart(self, data):
if self.status == '200':
self.currentLength += len(data)
if self.totalLength:
percent = "%i%%" % (
(self.currentLength/self.totalLength)*100
)
else:
percent = '%dK' % (self.currentLength/1000)
print "\033[1FProgress: " + percent
return client.HTTPDownloader.pagePart(self, data)
def downloadWithProgress(url, file, contextFactory=None, *args, **kwargs):
scheme, host, port, path = client._parse(url)
factory = HTTPProgressDownloader(url, file, *args, **kwargs)
if scheme == 'https':
from twisted.internet import ssl
if contextFactory is None:
contextFactory = ssl.ClientContextFactory()
reactor.connectSSL(host, port, factory, contextFactory)
else:
reactor.connectTCP(host, port, factory)
return factory.deferred
def main():
def downloadComplete(result):
print "Download Complete."
reactor.stop()
def downloadError(failure):
print "Error:", failure.getErrorMessage()
reactor.stop()
url, outputFile = sys.argv[1:]
downloadWithProgress(url, outputFile).addCallback(
downloadComplete).addErrback(
downloadError)
reactor.run()
if __name__ == '__main__':
import sys
from twisted.internet import reactor
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment