Skip to content

Instantly share code, notes, and snippets.

@kfsone
Created May 12, 2024 22:50
Show Gist options
  • Save kfsone/68ae786cd3fe1e4fca36bfc222934900 to your computer and use it in GitHub Desktop.
Save kfsone/68ae786cd3fe1e4fca36bfc222934900 to your computer and use it in GitHub Desktop.
Read a listings.csv along with Stations and Items.csv and emit a binary file
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from struct import pack, unpack
import os
import time
MAGIC = "SILS" # Station Item Listing
FORMAT = 1
EPOCH = 1500000000
ITEM_FMT = "".join(
"!" # Use network endianess
"I" # item_id: u32
"I" # fdev_id: u32
"I" # avg_price: u32
"H" # category_id: u16
"H" # ui_order: u16
"40s" # name
)
ITEM_LEN = 4 + 4 + 4 + 2 + 2 + 40 # 56
print("-- Item Struct Len:", ITEM_LEN)
# At time of writing, there are 400 items in the game,
# so assuming a maximum of 512 for the time being will
# allow us to use 8-bytes to represent all of them in
# an availability-bit-mask.
ITEM_LIMIT = 512
ITEM_BYTES = ITEM_LIMIT / 8
ITEM_U64S = ITEM_BYTES / 8 # 8 = 8 bytes per u64
@dataclass(slots=True, frozen=True)
class Item:
item_id: int # u32
name: bytes # [40; u8]
category_id: int # u16
ui_order: int # u16
avg_price: int # u32
fdev_id: int # u32
def pack(self):
return pack(ITEM_FMT, self.item_id, self.fdev_id, self.avg_price, self.category_id, self.ui_order, self.name)
@staticmethod
def unpack(source: bytes):
item_id, fdev_id, avg_price, category_id, ui_order, name = unpack(
ITEM_FMT, source)
return Item(item_id, name, category_id, ui_order, avg_price, fdev_id)
class StationFlags(Enum):
MARKET = 0
BLACK_MARKET = 1
SHIPYARD = 2
OUTFITTING = 3
REARM = 4
REFUEL = 5
REPAIR = 6
PLANET = 7
STATION_FMT = "".join(
"!" # Use network endianess
"Q" # station_id: u64
"I" # system_id: u32
"I" # modified: u32 (minus epoch)
"f" # jump_ls: float
"h" # type_id: i16
"B" # pads: u8
"B" # known_flags: u8
"B" # flags: u8
"40s" # name
"x" # pad byte
)
STATION_LEN = 8 + 4 + 4 + 4 + 2 + 1 + 1 + 1 + 40 + 1
print("-- Station Struct Len:", STATION_LEN)
@dataclass(slots=True, frozen=True)
class Station:
station_id: int # u32
name: bytes # [40; u8]
system_id: int # u64
jump_ls: float # f32
known_flags: int # u8
flags: int # u8
pads: int # u8
type_id: int # u16
modified: int # u32 (minus epoch)
supply: bytes # [ITEM_BYTES; u8]
demand: bytes # [ITEM_BYTES; u8]
def pack(self):
return pack(STATION_FMT, self.system_id, self.station_id, self.modified - EPOCH, self.jump_ls, self.type_id, self.pads, self.known_flags, self.flags, self.name)
def populate_availability(self, item_lids, supply, demand):
self.supply = bytearray(ITEM_BYTES) # one byte per 8 max items
for item_lid, listing in supply:
# the entries aren't sequential, calculate the byte and bit
# this will live in - I actually think it should probably be
# u64s rather than bytes but bytes are easier in python
byte_no, bit_no = item_lid >> 3, item_lid & 0x07
supply_mask[byte_no] |= (1 << bit_no)
@staticmethod
def unpack(source: bytes):
station_id, system_id, modified, jump_ls, type_id, pads, known_flags, flags, name = unpack(
STATION_FMT, source)
modified += EPOCH if modified else 0
return Station(station_id, name, system_id, jump_ls, known_flags, flags, pads, type_id, modified, bytearray(ITEM_BYTES), bytearray(ITEM_BYTES))
LISTING_FMT = "".join(
"!"
"I" # u32: price
"I" # u32: units
"I" # u32: modified (minus epoch)
"I" # u32: (actually a u8 with padding)
)
LISTING_LEN = 4 * 4
@dataclass(slots=True, frozen=True)
class Listing:
price: int # u32
units: int # u32
level: int # u8 padd to u32 for convenience
modified: int # u32 (minus epoch)
def pack(self):
modified = self.modified - EPOCH if self.modified else 0
return pack(LISTING_FMT, self.price, self.units, self.level, modified)
@staticmethod
def unpack(source: bytes):
price, units, level, modified = unpack(ITEM_FMT, source)
modified = modified + EPOCH if modified else 0
return Listing(price, units, level, modified)
item_csv = "Item.csv"
if not os.path.exists(item_csv):
item_csv = "data/" + item_csv
listings_csv = "listings.csv"
if not os.path.exists(listings_csv):
listings_csv = "eddb/" + listings_csv
if not os.path.exists(listings_csv):
listings_csv = "data/" + listings_csv
station_csv = "Station.csv"
if not os.path.exists(station_csv):
station_csv = "data/" + station_csv
def read_item_table():
print("++ Reading Item.csv")
items = {} # real id -> (name, lid)
item_ids = {}
item_lids = [] # local id -> real id
with open(item_csv, "rb") as item_fh:
item_lines = iter(item_fh)
next(item_lines) # skip header
item_count_off = item_fh.tell()
for line in item_lines:
(item_id_text, name, category, ui_order,
avg_price, fdev_id) = line.split(b',')
item_id = int(item_id_text)
item_lid = len(item_lids)
item = Item(item_id, name[1:-1], int(category),
int(ui_order), int(avg_price), int(fdev_id))
items[item_id] = item
item_ids[item_id] = item_lid
item_lids += [item_id]
print(f"-- {len(item_lids):,} items, {min(items):,}-{max(items):,}")
return items, item_ids, item_lids
def read_station_table():
print("++ Reading Station.csv")
stations = {} # real id -> name
station_ids = {}
station_lids = [] # local id -> real id
max_pad_sizes = b"SML"
with open(station_csv, "rb") as station_fh:
station_lines = iter(station_fh)
next(station_lines)
for line in station_lines:
(station_id_text, name, system, jump_ls, blackmarket, max_pad, market, shipyard,
modified, outfitting, rearm, refuel, repair, planetary, type_id) = line.split(b',')
station_id = int(station_id_text)
known_flags, flags = 0, 0
for bit, value in enumerate((market, blackmarket, shipyard, outfitting, rearm, refuel, repair, planetary)):
if value == b'Y':
known_flags |= (1 << bit)
flags |= (1 << bit)
elif value == b'N':
known_flags |= (1 << bit)
max_pad_size = max_pad_sizes.find(max_pad) + 1 # 0-based
if modified.startswith(b"'"): # string format, need to parse it.
modified = datetime.fromisoformat(
modified[1:-1].decode('ascii')).timestamp()
modified = int(modified)
station = Station(station_id, name[1:-1], int(system), float(
jump_ls), known_flags, flags, max_pad_size, int(type_id), modified, b"", b"")
station_lid = len(station_lids)
stations[station_id] = station
station_ids[station_id] = station_lid
station_lids += [station_id]
print(f"-- {len(station_lids):,} stations, {min(stations):,}-{max(stations):,}")
return stations, station_ids, station_lids
def read_station_listings(station_ids, item_ids):
print("++ Reading listings.csv")
supplies = {}
demands = {}
mismatch = 0
start = time.time()
with open(listings_csv, "rb") as list_fh:
listings = iter(list_fh)
next(listings)
cur_station = None
station_id = None
station_lid = None
cur_sup = None
cur_dem = None
cur_mod = None
stn_count = 0
for rec_no, line in enumerate(listings, 1):
if rec_no % 50000 == 1:
print(f"~~ {rec_no:,}\r", end='')
# id, station, item, s_units, s_level, s_price, d_price, d_units, d_level, modified
(_, station_id_text, item_id_text, s_units, s_level, s_price,
d_price, d_units, d_level, modified) = line.split(b',')
if station_id_text != cur_station:
if cur_sup:
supplies[station_id] = cur_sup
cur_sup = {}
if cur_dem:
demands[station_id] = cur_dem
cur_dem = {}
cur_station = station_id_text
station_id = int(cur_station)
station_lid = station_ids[station_id]
cur_mod = modified
stn_count += 1
elif cur_mod != modified:
mismatch += 1
item_id = int(item_id_text)
item_lid = item_ids[item_id]
if s_price != b'0':
cur_sup[item_lid] = Listing(int(s_price), int(
s_units), int(s_level) + 1, int(modified))
if d_price != b'0':
cur_dem[item_lid] = Listing(int(d_price), int(
d_units), int(d_level) + 1, int(modified))
if cur_station:
if cur_sup:
supplies[station_id] = cur_sup
if cur_dem:
demands[station_id] = cur_dem
print(f"-- {len(supplies):,} supply entries, {len(demands):,} demand entries; {rec_no:,} listings over {stn_count:,} stations in {time.time()-start:.2f}s")
if mismatch:
print(f".. {mismatch} mismatched dates")
return supplies, demands
items, item_ids, item_lids = read_item_table()
stations, station_ids, station_lids = read_station_table()
supply, demand = read_station_listings(station_ids, item_ids)
def write_listings(ident, listings, station_lids, items, out_fh):
start = time.time()
out_fh.write(bytes(ident))
get = listings.get
item_no = 0
start_pos = out_fh.tell()
station_bytes = len(items) * LISTING_LEN
total_bytes = len(station_ids) * station_bytes
empty_station = bytearray(station_bytes)
empty_stations = 0
pop_stations = 0
for stn_no, stn in enumerate(station_lids, 1):
if stn_no % 5000 == 1:
print(f"~~ stations: {stn_no:,}, listings:{item_no:,}\r", end='')
stn_listings = get(stn) or {}
if not stn_listings:
out_fh.write(empty_station)
empty_stations += 1
continue
data = bytearray(len(items) * LISTING_LEN)
# TODO: fill with zeros then populate the appropriate block
for item_lid, listing in stn_listings.items():
offset = item_lid * LISTING_LEN
data[offset:offset + LISTING_LEN] = listing.pack()
item_no += 1
pop_stations += 1
out_fh.write(data)
print(f"-- Saved {item_no:,} items across {stn_no:,} stations ({pop_stations:,} populated, {empty_stations:,} empty) in {time.time()-start:.2f}s")
with open("tradedangerous.data", "wb") as out_fh:
# File type and version ident
out_fh.write(f"{MAGIC}{FORMAT:04X}\n".encode())
# Remember where we're going to store the item and station counts and sizes
print("<< Headers")
out_fh.write(b"HDRS")
out_fh.write(pack("!QI", len(items), ITEM_LEN))
out_fh.write(pack("!QI", len(stations), STATION_LEN))
print("<< Item List")
out_fh.write(b"ITLS")
for item in items.values():
out_fh.write(item.pack())
print("<< Station List")
out_fh.write(b"STLS")
for stn in stations.values():
out_fh.write(stn.pack())
print("<< Supply")
write_listings(b"STSU", supply, station_lids, items, out_fh)
print("<< Demand")
write_listings(b"STDE", demand, station_lids, items, out_fh)
size = out_fh.tell()
sized = f"{size / 1024 / 1024 / 1024:,.2f}GB" if size > 750 * \
1024 * 1024 else f"{size / 1024 / 1024:,.2f}MB"
print(f"-- Data file is {size:,} bytes ({sized})")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment