-
-
Save xshill/a4a41b76bb980f766657a2828989c84a to your computer and use it in GitHub Desktop.
Sample script that was used to create a replay file from a pcap using PyRDP and scapy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import logging | |
import os | |
from progressbar import progressbar | |
from scapy.all import * | |
from pyrdp.enum import ParserMode, PlayerMessageType | |
from pyrdp.layer import SegmentationLayer, TPKTLayer, X224Layer, MCSLayer, TLSSecurityLayer, SlowPathLayer, FastPathLayer, VirtualChannelLayer, ClipboardLayer | |
from pyrdp.mcs import MCSRouter | |
from pyrdp.parser import ClientInfoParser, BasicFastPathParser | |
from pyrdp.pdu import FormatDataResponsePDU | |
from pyrdp.recording import FileLayer, Recorder, RecordingFastPathObserver, RecordingSlowPathObserver | |
def bytesToIP(data: bytes): | |
return ".".join(str(b) for b in data) | |
def parseExportedPdu(packet: packet.Raw): | |
source = packet.load[12 : 16] | |
source = bytesToIP(source) | |
destination = packet.load[20 : 24] | |
destination = bytesToIP(destination) | |
data = packet.load[60 :] | |
return source, destination, data | |
class CustomRecorder(Recorder): | |
currentTimeStamp: float = None | |
def getCurrentTimeStamp(self) -> float: | |
return self.currentTimeStamp | |
def setTimeStamp(self, timeStamp: float): | |
self.currentTimeStamp = timeStamp | |
class RDP(MCSRouter): | |
def __init__(self, mode: ParserMode, recorder: Recorder): | |
self.recorder = recorder | |
self.currentClipboardData = None | |
self.segmentation = SegmentationLayer() | |
self.tpkt = TPKTLayer() | |
self.x224 = X224Layer() | |
self.mcs = MCSLayer() | |
self.security = TLSSecurityLayer() | |
self.io = SlowPathLayer() | |
self.fastPath = FastPathLayer(BasicFastPathParser(mode)) | |
self.clipboardSecurity = TLSSecurityLayer() | |
self.virtualChannelLayer = VirtualChannelLayer() | |
self.clipboardLayer = ClipboardLayer() | |
self.segmentation.attachLayer(0, self.fastPath) | |
self.segmentation.attachLayer(3, self.tpkt) | |
self.security.securityHeaderExpected = True | |
self.security.createObserver(onClientInfoReceived = self.onClientInfoReceived, onLicensingDataReceived = self.onLicensingDataReceived) | |
self.io.addObserver(RecordingSlowPathObserver(recorder)) | |
self.fastPath.addObserver(RecordingFastPathObserver(recorder, PlayerMessageType.FAST_PATH_OUTPUT if mode == ParserMode.CLIENT else PlayerMessageType.FAST_PATH_INPUT)) | |
self.clipboardLayer.createObserver(onPDUReceived = self.onClipboard) | |
self.mcs.addObserver(self) | |
self.tpkt.setNext(self.x224) | |
self.x224.setNext(self.mcs) | |
self.security.setNext(self.io) | |
self.clipboardSecurity.setNext(self.virtualChannelLayer) | |
self.virtualChannelLayer.setNext(self.clipboardLayer) | |
MCSRouter.__init__(self, self.mcs) | |
def recv(self, data: bytes): | |
self.segmentation.recv(data) | |
def recvChannel(self, pdu): | |
if pdu.channelID == 1003: | |
# I/O channel | |
self.security.recv(pdu.payload) | |
elif pdu.channelID == 1004: | |
# Clipboard channel | |
self.clipboardSecurity.recv(pdu.payload) | |
def onSendDataRequest(self, pdu): | |
self.recvChannel(pdu) | |
def onSendDataIndication(self, pdu): | |
self.recvChannel(pdu) | |
def onClientInfoReceived(self, data): | |
pdu = ClientInfoParser().parse(data) | |
recorder.record(pdu, PlayerMessageType.CLIENT_INFO) | |
def onLicensingDataReceived(self, pdu): | |
self.security.securityHeaderExpected = False | |
def onClipboard(self, pdu): | |
if isinstance(pdu, FormatDataResponsePDU): | |
if pdu.requestedFormatData != self.currentClipboardData: | |
self.recorder.record(pdu, PlayerMessageType.CLIPBOARD_DATA) | |
self.currentClipboardData = pdu.requestedFormatData | |
logging.basicConfig(level = logging.CRITICAL) | |
logging.getLogger("scapy").setLevel(logging.ERROR) | |
CLIENT = "10.0.0.3" | |
SERVER = "10.0.0.2" | |
directory = os.path.dirname(os.path.realpath(__file__)) | |
packets = rdpcap(directory + "capture.pcap") | |
replayFile = open("replay.pyrdp", "wb") | |
fileLayer = FileLayer(replayFile) | |
recorder = CustomRecorder([fileLayer]) | |
client = RDP(ParserMode.CLIENT, recorder) | |
server = RDP(ParserMode.SERVER, recorder) | |
rdp = { | |
CLIENT: client, | |
SERVER: server | |
} | |
# The last packet of this pcap is incomplete | |
for packet in progressbar(packets[: -1]): | |
# The packets start with a Wireshark exported PDU structure | |
source, destination, data = parseExportedPdu(packet) | |
try: | |
recorder.setTimeStamp(packet.time) | |
rdp[destination].recv(data) | |
server.security.securityHeaderExpected = client.security.securityHeaderExpected | |
except NotImplementedError: | |
pass | |
recorder.record(None, PlayerMessageType.CONNECTION_CLOSE) |
Thanks this is really helpful just knowing there is a lot more work to do.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@robeving I assume this is you on Twitter?
The issue is with how PyRDP handles graphics vs. how RDP normally handles it. Basically there's two ways to handle output: either the server sends drawing orders that the client then renders locally or it sends bitmaps of the screen. Normal RDP usually sends GDI drawing orders over the network that you have to render locally. To avoid implementing a bunch of drawing orders, we coded PyRDP so it only accepts bitmaps. This works fine for situations when PyRDP is running but, obviously, since we have no support for drawing orders, it means we can't parse output for "normal" RDP connections.
So, basically, we need to implement support for the drawing orders API before this becomes usable with generic RDP pcaps. Judging from this spec, I wouldn't expect that to happen soon because there's a bunch of drawing order types to implement for it to work properly.
It was bit of an oversight on our part not to mention that on the blog post - guess we were too used to only handling bitmaps. I'll add this to the README in the pyrdp-replay branch and create an issue to track this.