Created
October 20, 2018 14:10
-
-
Save dragon788/ad30bce386e549990dbcb0a118b446bc to your computer and use it in GitHub Desktop.
Powerfile C200 Python from Google groups
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pywin.mfc import dialog, window, activex | |
import win32ui, win32uiole, win32con, win32file | |
import os, sys, win32api, glob, time, pywintypes | |
from win32com.client import gencache | |
import discinfo | |
# c200 | |
c200_typelib = '{E9A61682-5978-11D3-B313-00C04F7654DB}' | |
c200_clsid = '{61A3F5A5-594F-11D3-B313-00C04F7654DB}' | |
C200 = gencache.EnsureModule(c200_typelib, 0, 1, 0).C200 | |
class ChangerDisc: | |
def __init__(self, changer): | |
self.changer = changer | |
self.mounted = 0 | |
self.slot = 0 | |
class Changer: | |
def __init__( self, ocx=None ): | |
self.ocx = ocx | |
self.drive = -1 | |
self.driveletter = None | |
self.mounted = None | |
self.slots = None | |
def init_juke(self): | |
self.changerid = self.ocx.NumberOfChangers - 1 | |
self.ocx.CurrentChangerNumber = self.changerid | |
self.drive = self.ocx.FirstDriveElement | |
self.driveletter = self.ocx.DriveLetterOfDriveElement(self.drive) | |
self.nslots = self.ocx.NumberOfStorageElements | |
self.slot0 = self.ocx.FirstStorageElement | |
self.slots = [] | |
for i in range(self.nslots): | |
self.slots.append(None) | |
(rc,tag1,seq1,tag2,seq2,full,selt) = self.ocx.GetVolumeTag(self.drive) | |
if full: | |
disc = ChangerDisc(self) | |
disc.mounted = 1 | |
disc.slot = -1 | |
disc.tag = tag1 | |
self.mounted = disc | |
disc.type = self.mounted_disc_type() | |
else: | |
self.mounted = None | |
def read_slots(self): | |
self.ocx.ReserveChanger(1) | |
# getting the complete element map is much faster than | |
# checking the empty slots with getvolumetag | |
# type 2 is storage | |
(rc,mapstr) = self.ocx.GetElementMap(2, self.slot0, self.nslots, 0) | |
for i in range(self.nslots): | |
if mapstr[i] == "0": | |
continue | |
disc = ChangerDisc(self) | |
self.slots[i] = disc | |
(rc,tag1,seq1,tag2,seq2,full,selt) = self.ocx.GetVolumeTag(self.slot0 + i) | |
disc.mounted = 0 | |
disc.slot = i | |
disc.tag = tag1 | |
disc.volname = None | |
disc.fstype = None | |
#print "%3d %d %s" % (i, full, tag1) | |
self.ocx.ReserveChanger(0) | |
def dump_changer(self): | |
if self.mounted: | |
print "mounted: %s" % self.mounted.tag | |
for i in range(self.nslots): | |
if not self.slots[i]: | |
continue | |
print "slot %3d: %s" % (i, self.slots[i].tag) | |
def read_volume_labels(self): | |
if self.mounted: | |
print "mounted: %s" % self.mounted.tag | |
for i in range(self.nslots): | |
if not self.slots[i]: | |
continue | |
print "mounting slot %3d: %s" % (i, self.slots[i].tag) | |
disc = self.slots[i] | |
self.mount(disc) | |
# wait for the disc to spin up. should really do this with a retry | |
# instead | |
#time.sleep(30) | |
self.mounted_disc_type() | |
def ejectall(self): | |
if self.mounted: | |
self.unmount() | |
for disc in self.slots: | |
if disc == None: | |
continue | |
ioslot = self.ocx.FirstImportExportElement | |
print "export(%d, %d)" % (disc.slot + self.slot0, ioslot) | |
rc = self.ocx.ExportElement(disc.slot + self.slot0, ioslot) | |
if not rc: | |
print "OCX error ejecting disc %s:" % disc.tag | |
print self.ocx.GetLastError() | |
# return | |
def mounted_disc_type(self): | |
if not self.mounted: | |
return None | |
disc = discinfo.read_disc(self.driveletter) | |
self.mounted.disc = disc | |
def unmount(self): | |
if self.mounted == None: | |
return | |
# find an empty slot | |
for i in range(self.nslots): | |
if self.slots[i]: | |
continue | |
disc = self.mounted | |
rc = self.ocx.UnMountElement(self.drive, i + self.slot0, 1) | |
if not rc: | |
print "OCX error:" | |
print self.ocx.GetLastError() | |
return | |
self.mounted = None | |
disc.mounted = 0 | |
disc.slot = i | |
self.slots[i] = disc | |
return | |
# XXX error - no free slots found | |
def mount(self, disc): | |
if self.mounted == disc: | |
return | |
if self.mounted: | |
self.unmount() | |
rc = self.ocx.MountElement(disc.slot + self.slot0, self.drive) | |
if not rc: | |
print "OCX error:" | |
print self.ocx.GetLastError() | |
return | |
disc.mounted = 1 | |
self.slots[disc.slot] = None | |
disc.slot = -1 | |
self.mounted = disc | |
self.try_flush() | |
# the c200 driver doesn't update windows properly when the | |
# disc is changed. try to tell windows to re-read the drive... | |
def try_flush(self): | |
devpath = "\\\\.\\%s:" % self.driveletter | |
dev = win32file.CreateFile(devpath, win32file.GENERIC_READ, | |
win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, | |
None, win32file.OPEN_EXISTING, 0, None) | |
def storage_ioctl(code): | |
return 0x2d4000 | (code << 2) | |
# IOCTL_STORAGE_MEDIA_REMOVAL = storage_ioctl(0x0201) | |
# IOCTL_STORAGE_EJECT_MEDIA = storage_ioctl(0x0202) | |
IOCTL_STORAGE_LOAD_MEDIA = storage_ioctl(0x0203) | |
ret = win32file.DeviceIoControl(dev, IOCTL_STORAGE_LOAD_MEDIA, | |
"", 0) | |
win32file.CloseHandle(dev) | |
# drivestr = "%s:\\" % self.driveletter | |
# os.spawnv(os.P_WAIT, "C:\\WINNT\\explorer.exe", [drivestr]) | |
def testjuke(self): | |
print "changer id %s" % juke.ChangerID | |
juke.ReserveChanger(1) | |
print "i/o elt %s" % juke.FirstImportExportElement | |
print "drive elt %s" % juke.FirstDriveElement | |
print "storage elt %s x %d" % (juke.FirstStorageElement, juke.NumberOfStorageElements) | |
elt0 = juke.FirstStorageElement | |
nelts = juke.NumberOfStorageElements | |
#emap = juke.GetElementMap(1, elt0, nelts) | |
# getting the complete element map is much faster than | |
# checking the empty slots with getvolumetag | |
# type 2 is storage | |
(rc,mapstr) = juke.GetElementMap(2, elt0, nelts, 0) | |
for i in range(nelts): | |
if mapstr[i] == "0": | |
continue | |
(rc,tag1,seq1,tag2,seq2,full,selt) = juke.GetVolumeTag(elt0 + i) | |
print "%3d %d %s" % (i, full, tag1) | |
#for p in volinfo: | |
# print " %s" % p | |
status = 0 | |
foo = juke.DiscInElement(juke.FirstDriveElement, status) | |
print "foo = %s status = %s" %(foo, status) | |
foo = juke.UnMountElement(juke.FirstDriveElement, 1, 1) | |
# print juke.GetLastError | |
print "foo = %s" % foo | |
foo = juke.DiscInElement(juke.FirstDriveElement, status) | |
print "foo = %s status = %s" %(foo, status) | |
juke.ReserveChanger(0) | |
def check_license_file(): | |
sysdir = win32api.GetSystemDirectory() | |
licfile = os.path.join(sysdir, "c200.lic") | |
if os.path.exists(licfile): | |
return | |
licsrc = os.path.join(sys.path[0], "doc\\c200.lic") | |
if not os.path.exists(licsrc): | |
raise "can't find license file %s" % licsrc | |
win32api.CopyFile(licsrc, licfile, 1) | |
def test(): | |
print "no test here" | |
check_license_file() | |
if __name__ == "__main__": | |
test() | |
# DriveLetterOfDriveElement(self, Element) | |
# GetLastError(self, ErrorNum, Description, extendedInfo) | |
# ReserveChanger(self, Reserve) | |
# DiscInElement(self, Element, Status) | |
# GetElementMap(self, ElementType, FirstElement, NumberOfElements, VolTag, ElementMap) | |
# GetVacantStorageElements(self, Count) | |
# GetVolumeTag(self, Element, PrimaryTagName, pSequence, AlternateTagName, aSequence, Full, StorageElement) | |
# PutVolumeTag(self, Element, Sequence, TagName, AlternateFlag) | |
# ImportElement(self, ImportExportElement, StorageElement) | |
# ExportElement(self, StorageElement, ImportExportElement) | |
# MountElement(self, StorageElement, DriveElement) | |
# MountElementAvailableDrive(self, StorageElement, DriveElement) | |
# UnMountElement(self, DriveElement, StorageElement, ForceUnload) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://groups.google.com/forum/m/#!topic/heatsynclabs/8XkfpQHSG7g