Created
January 26, 2010 22:45
-
-
Save opie4624/287331 to your computer and use it in GitHub Desktop.
Working through the Twisted book
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
calllater.py | |
Created by Andrew Kraut on 2010-01-25. | |
""" | |
import sys | |
import os | |
import time | |
from twisted.internet import reactor | |
def printTime(): | |
print "Current time is", time.strftime("%H:%M:%S") | |
def stopReactor(): | |
print "Stopping reactor" | |
printTime() | |
reactor.stop() | |
def main(): | |
reactor.callLater(1, printTime) | |
reactor.callLater(2, printTime) | |
reactor.callLater(3, printTime) | |
reactor.callLater(4, printTime) | |
reactor.callLater(5, stopReactor) | |
print "Running the reactor..." | |
reactor.run() | |
print "Reactor stopped." | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
connectiontest.py | |
Created by Andrew Kraut on 2010-01-26. | |
""" | |
import sys | |
import os | |
from twisted.internet import reactor, defer, protocol | |
class CallbackAndDisconnectProtocol(protocol.Protocol): | |
def connectionMade(self): | |
self.factory.deferred.callback("Connected!") | |
self.transport.loseConnection() | |
class ConnectionTestFactory(protocol.ClientFactory): | |
protocol = CallbackAndDisconnectProtocol | |
def __init__(self): | |
self.deferred = defer.Deferred() | |
def clientConnectionFailed(self, connector, reason): | |
self.deferred.errback(reason) | |
def testConnect(host, port): | |
testFactory = ConnectionTestFactory() | |
reactor.connectTCP(host, port, testFactory) | |
return testFactory.deferred | |
def handleSuccess(result, port): | |
print "Connected to port %i" % port | |
reactor.stop() | |
def handleFailure(failure, port): | |
print "Error connecting to port %i: %s" % ( | |
port, failure.getErrorMessage() | |
) | |
reactor.stop() | |
def main(): | |
if not len(sys.argv) == 3: | |
print "Usage: connectiontest.py host port" | |
sys.exit(1) | |
host = sys.argv[1] | |
port = int(sys.argv[2]) | |
connecting = testConnect(host, port) | |
connecting.addCallback(handleSuccess, port) | |
connecting.addErrback(handleFailure, port) | |
reactor.run() | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
dataforward.py | |
Created by Andrew Kraut on 2010-01-27. | |
""" | |
import sys | |
import os | |
import re | |
from twisted.internet import stdio, reactor, protocol | |
from twisted.protocols import basic | |
class DataForwardingProtocol(protocol.Protocol): | |
def __init__(self): | |
self.output = None | |
self.normalizeNewLines = False | |
def dataReceived(self, data): | |
if self.normalizeNewLines: | |
data = re.sub(r"(\r\n|\n)", "\r\n", data) | |
if self.output: | |
self.output.write(data) | |
class StdioProxyProtocol(DataForwardingProtocol): | |
def connectionMade(self): | |
inputForwarder = DataForwardingProtocol() | |
inputForwarder.output = self.transport | |
inputForwarder.normalizeNewLines = True | |
stdioWrapper = stdio.StandardIO(inputForwarder) | |
self.output = stdioWrapper | |
print "Connected to server. Press ctrl-C to close connection." | |
class StdioProxyFactory(protocol.ClientFactory): | |
protocol = StdioProxyProtocol | |
def clientConnectionLost(self, transport, reason): | |
reactor.stop() | |
def clientConnectionFailed(self, transport, reason): | |
print reason.getErrorMessage() | |
reactor.stop() | |
def main(): | |
if not len(sys.argv) == 3: | |
print "Usage: %s host port" % __file__ | |
sys.exit(1) | |
reactor.connectTCP(sys.argv[1], int(sys.argv[2]), StdioProxyFactory()) | |
reactor.run() | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
echoserver.py | |
Created by Andrew Kraut on 2010-01-27. | |
""" | |
import sys | |
import os | |
from twisted.internet import reactor, protocol | |
from twisted.protocols import basic | |
class EchoProtocol(basic.LineReceiver): | |
def lineReceived(self, line): | |
if line == 'quit': | |
self.sendLine("Goodbye.") | |
self.transport.loseConnection() | |
else: | |
self.sendLine("You said: " + line) | |
class EchoServerFactory(protocol.ServerFactory): | |
protocol = EchoProtocol | |
def main(): | |
port = 5001 | |
reactor.listenTCP(port, EchoServerFactory()) | |
reactor.run() | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
portscan.py | |
Created by Andrew Kraut on 2010-01-27. | |
""" | |
import sys | |
import os | |
from twisted.internet import reactor, defer | |
from connectiontest import testConnect | |
def handleAllResults(results, ports): | |
for port, resultInfo in zip(ports, results): | |
success, result = resultInfo | |
if success: | |
print "Connected to port %i" % port | |
reactor.stop() | |
def main(): | |
if len(sys.argv) > 1: | |
host = sys.argv[1] | |
else: | |
host = 'localhost' | |
ports = range(1, 201) | |
testers = [testConnect(host, port) for port in ports] | |
defer.DeferredList(testers, consumeErrors=True).addCallback( | |
handleAllResults, ports | |
) | |
reactor.run() | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
runreactor.py | |
Created by Andrew Kraut on 2010-01-25. | |
""" | |
import sys | |
import os | |
from twisted.internet import reactor | |
def main(): | |
print "Running the reactor..." | |
reactor.run() | |
print "Reactor stopped." | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
tcpconnection.py | |
Created by Andrew Kraut on 2010-01-26. | |
""" | |
import sys | |
import os | |
from twisted.internet import reactor, protocol | |
class QuickDisconnectProtocol(protocol.Protocol): | |
def connectionMade(self): | |
print "Connected to %s." % self.transport.getPeer().host | |
self.transport.loseConnection() | |
class BasicClientFactory(protocol.ClientFactory): | |
protocol = QuickDisconnectProtocol | |
def clientConnectionLost(self, connector, reason): | |
print "Lost connection: %s" % reason.getErrorMessage() | |
reactor.stop() | |
def clientConnectionFailed(self, connector, reason): | |
print "Connection failed: %s" % reason.getErrorMessage() | |
reactor.stop() | |
def main(): | |
reactor.connectTCP('www.google.com', 80, BasicClientFactory()) | |
reactor.run() | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<HTML> | |
<HEAD> | |
<TITLE>Test!</TITLE> | |
</HEAD> | |
<BODY> | |
<H1>Test!</H1> | |
<P>This is a test.</P> | |
</BODY> | |
</HTML> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
updatecheck.py | |
Created by Andrew Kraut on 2010-02-17. | |
""" | |
import sys | |
from twisted.internet import reactor | |
from twisted.web import client | |
class HTTPStatusChecker(client.HTTPClientFactory): | |
def __init__(self, url, headers=None): | |
client.HTTPClientFactory.__init__(self, url, headers=headers) | |
self.status = None | |
self.deferred.addCallback( | |
lambda data: (data, self.status, self.response_headers) | |
) | |
def noPage(self, reason): | |
if self.status == '304': | |
client.HTTPClientFactory.page(self, '') | |
else: | |
client.HTTPClientFactory.noPage(self, reason) | |
def checkStatus(url, contextFactory=None, *args, **kwargs): | |
scheme, host, port, path = client._parse(url) | |
factory = HTTPStatusChecker(url, *args, **kwargs) | |
if scheme == 'https': | |
from twisted.internet import ssl | |
if contextFactory is None: | |
contextFactory = ssl.ClientContextFactory() | |
reactor.connectSSL(host, port, factory, contextFactory) | |
else: | |
reactor.connectTCP(host, port, factory) | |
return factory.deferred | |
def handleFirstResult(result, url): | |
data, status, headers = result | |
nextRequestHeaders = {} | |
eTag = headers.get('etag') | |
if eTag: | |
nextRequestHeaders['If-None-Match'] = eTag[0] | |
modified = headers.get('last-modified') | |
if modified: | |
nextRequestHeaders['If-Modified-Since'] = modified[0] | |
return checkStatus( | |
url, headers=nextRequestHeaders | |
).addCallback( | |
handleSecondResult | |
) | |
def handleSecondResult(result): | |
data, status, headers = result | |
print 'Second request returned status %s:' % status, | |
if status == '200': | |
print 'Page changed (or server does not support conditional requests).' | |
elif status == '304': | |
print 'Page is unchanged.' | |
else: | |
print 'Unexpected Response.' | |
reactor.stop() | |
def handleError(failure): | |
print "Error", failure.getErrorMessage() | |
reactor.stop() | |
def main(): | |
url = sys.argv[1] | |
checkStatus(url).addCallback( | |
handleFirstResult, url | |
).addErrback( | |
handleError | |
) | |
reactor.run() | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
validate.py | |
Created by Andrew Kraut on 2010-02-04. | |
""" | |
from twisted.web import client | |
from twisted.internet import reactor | |
import sys, os, tempfile, webbrowser, random | |
def encodeForm(inputs): | |
""" | |
Takes a dict of inputs and returns a multipart/form-data string | |
containing the utf-8 encoded data. Keys must be strings, values | |
can be either strings or file-like objects. | |
""" | |
getRandomChar = lambda: chr(random.choice(range(97, 123))) | |
randomChars = [getRandomChar() for x in range(20)] | |
boundary = "---%s---" % ''.join(randomChars) | |
lines = [boundary] | |
for key, val in inputs.items(): | |
header = 'Content-Disposition: form-data; name="%s"' % key | |
if hasattr(val, 'name'): | |
header += '; filename="%s"' % os.path.split(val.name)[1] | |
lines.append(header) | |
if hasattr(val, 'read'): | |
lines.append(val.read()) | |
else: | |
lines.append(val.encode('utf-8')) | |
lines.append('') | |
lines.append(boundary) | |
return "\r\n".join(lines) | |
def showPage(pageData): | |
# write data to temp .html file, show file in browser | |
tmpfd, tmp = tempfile.mkstemp('.html') | |
os.close(tmpfd) | |
file(tmp, 'w+b').write(pageData) | |
webbrowser.open('file://' + tmp) | |
reactor.stop() | |
def handleError(failure): | |
print "Error:", failure.getErrorMessage() | |
reactor.stop() | |
def main(): | |
filename = sys.argv[1] | |
fileToCheck = file(filename) | |
form = encodeForm({'uploaded_file': fileToCheck}) | |
postRequest = client.getPage( | |
'http://validator.w3.org/check', | |
method='POST', | |
headers={'Content-Type': 'multipart/form-data; charset=utf-8', | |
'Content-Length': str(len(form))}, | |
postdata=form | |
) | |
postRequest.addCallback(showPage).addErrback(handleError) | |
reactor.run() | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
webcat.py | |
Created by Andrew Kraut on 2010-01-31. | |
""" | |
import sys | |
import os | |
from twisted.web import client | |
from twisted.internet import reactor | |
def printPage(data): | |
print data | |
reactor.stop() | |
def printError(failure): | |
print >> sys.stderr, "Error:", failure.getErrorMessage() | |
reactor.stop() | |
def main(): | |
if len(sys.argv) == 2: | |
url = sys.argv[1] | |
client.getPage(url).addCallback( | |
printPage).addErrback( | |
printError) | |
reactor.run() | |
else: | |
print "Usage: %s <URL>" % __file__ | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
webcat.py | |
Created by Andrew Kraut on 2010-01-31. | |
""" | |
import sys | |
import os | |
import tempfile | |
from twisted.web import client | |
from twisted.internet import reactor | |
def downloadToTempFile(url): | |
""" | |
Given a URL, returns a Deferred that will be called back with the name | |
of a temporary file containing the downloaded data. | |
""" | |
tmpfd, tempfilename = tempfile.mkstemp() | |
os.close(tmpfd) | |
return client.downloadPage(url, tempfilename).addCallback( | |
returnFilename, tempfilename | |
) | |
def returnFilename(result, filename): | |
return filename | |
def printFile(filename): | |
for line in file(filename, 'r+b'): | |
sys.stdout.write(line) | |
os.unlink(filename) # delete file once we're done with it | |
reactor.stop() | |
def printError(failure): | |
print >> sys.stderr, "Error:", failure.getErrorMessage() | |
reactor.stop() | |
def main(): | |
if len(sys.argv) == 2: | |
url = sys.argv[1] | |
downloadToTempFile(url).addCallback(printFile).addErrback(printError) | |
reactor.run() | |
else: | |
print "Usage: %s <URL>" % __file__ | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
webcat3.py | |
Created by Andrew Kraut on 2010-02-04. | |
""" | |
import sys, os, getpass, base64 | |
from twisted.web import client, error as weberror | |
from twisted.internet import reactor | |
def printPage(data): | |
print data | |
reactor.stop() | |
def checkHTTPError(failure, url): | |
failure.trap(weberror.Error) | |
if failure.value.status == '401': | |
print >> sys.stderr, failure.getErrorMessage() | |
# prompt for user name and password | |
username = raw_input("User name: ") | |
password = getpass.getpass("Password: ") | |
basicAuth = base64.encodestring("%s:%s" % (username, password)) | |
authHeader = "Basic " + basicAuth.strip() | |
# try to fetch the page again with authentication | |
return client.getPage(url, headers={"Authorization": authHeader}) | |
else: | |
return failure | |
def printError(failure): | |
print >> sys.stderr, "Error:", failure.getErrorMessage() | |
reactor.stop() | |
def main(): | |
if len(sys.argv) == 2: | |
url = sys.argv[1] | |
client.getPage(url).addErrback(checkHTTPError, url).addCallback(printPage).addErrback(printError) | |
reactor.run() | |
else: | |
print "Usage: %s <URL>" % __file__ | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
webdownload.py | |
Created by Andrew Kraut on 2010-02-19. | |
""" | |
import sys | |
import os | |
from twisted.web import client | |
class HTTPProgressDownloader(client.HTTPDownloader): | |
def gotHeaders(self, headers): | |
if self.status == '200': # page data is on the way | |
if headers.has_key('content-length'): | |
self.totalLength = int(headers['content-length'][0]) | |
else: | |
self.totalLength = 0 | |
self.currentLength = 0.0 | |
print '' | |
return client.HTTPDownloader.gotHeaders(self, headers) | |
def pagePart(self, data): | |
if self.status == '200': | |
self.currentLength += len(data) | |
if self.totalLength: | |
percent = "%i%%" % ( | |
(self.currentLength/self.totalLength)*100 | |
) | |
else: | |
percent = '%dK' % (self.currentLength/1000) | |
print "\033[1FProgress: " + percent | |
return client.HTTPDownloader.pagePart(self, data) | |
def downloadWithProgress(url, file, contextFactory=None, *args, **kwargs): | |
scheme, host, port, path = client._parse(url) | |
factory = HTTPProgressDownloader(url, file, *args, **kwargs) | |
if scheme == 'https': | |
from twisted.internet import ssl | |
if contextFactory is None: | |
contextFactory = ssl.ClientContextFactory() | |
reactor.connectSSL(host, port, factory, contextFactory) | |
else: | |
reactor.connectTCP(host, port, factory) | |
return factory.deferred | |
def main(): | |
def downloadComplete(result): | |
print "Download Complete." | |
reactor.stop() | |
def downloadError(failure): | |
print "Error:", failure.getErrorMessage() | |
reactor.stop() | |
url, outputFile = sys.argv[1:] | |
downloadWithProgress(url, outputFile).addCallback( | |
downloadComplete).addErrback( | |
downloadError) | |
reactor.run() | |
if __name__ == '__main__': | |
import sys | |
from twisted.internet import reactor | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment