Created
November 18, 2019 21:57
-
-
Save unixpickle/e5266818ea5c0f5d64bac0489e1b4d38 to your computer and use it in GitHub Desktop.
3D model generator model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Reading data produced by an external program. | |
""" | |
import math | |
import random | |
import numpy as np | |
import torch | |
DIST_SCALE = 100 | |
NUM_DISTANCE_BINS = DIST_SCALE * 2 - 1 | |
POINT_VECTOR_LEN = 6 + NUM_DISTANCE_BINS | |
def load_sequences(reader, device, batch_size): | |
""" | |
Load Tensors for (input, output) sequences. | |
""" | |
stream = read_point_distances(reader) | |
while True: | |
inputs = [] | |
outputs = [] | |
for _ in range(batch_size): | |
points = next(stream) | |
ins, outs = points.vectorize() | |
inputs.append(ins) | |
outputs.append(outs) | |
yield (torch.from_numpy(np.array(inputs)).to(device), | |
torch.from_numpy(np.array(outputs)).to(device).long()) | |
def read_point_distances(reader): | |
""" | |
Iteratively read PointDistances from the given input | |
stream (e.g. os.stdin). | |
""" | |
for line in reader: | |
yield PointDistances.from_string(line.rstrip()) | |
def discretize_signed_distance(d): | |
""" | |
Discretize the distance d into a bucket in the range | |
[0, NUM_DISTANCE_BINS). | |
""" | |
if d < 0: | |
return (DIST_SCALE*2 - 2) - discretize_signed_distance(-d) | |
x = int(round(math.sqrt(d) * (DIST_SCALE - 1))) | |
if x > DIST_SCALE - 1: | |
x = DIST_SCALE - 1 | |
return DIST_SCALE + x - 1 | |
def undiscretize_signed_distance(bucket): | |
""" | |
Perform a lossy inverse of discretize_signed_distance. | |
""" | |
x = (bucket - DIST_SCALE + 1) / (DIST_SCALE - 1) | |
return math.pow(x, 2) * (-1 if x < 0 else 1) | |
class PointDistance: | |
""" | |
A data point indicating how far away a 3D coordinate | |
is from the surface of a model. Negative distances | |
imply that the point is within the model. | |
""" | |
def __init__(self, cx, cy, cz, distance): | |
self.cx = cx | |
self.cy = cy | |
self.cz = cz | |
self.distance = distance | |
def coordinates(self): | |
return [self.cx, self.cy, self.cz] | |
def vectorize(self, previous=None): | |
input_arr = [0.0] * (3 + NUM_DISTANCE_BINS) | |
if previous is not None: | |
prev_one_hot = [0.0] * NUM_DISTANCE_BINS | |
prev_one_hot[discretize_signed_distance(previous.distance)] = 1 | |
input_arr = previous.coordinates() + prev_one_hot | |
input_arr += self.coordinates() | |
return input_arr | |
@classmethod | |
def from_string(cls, text): | |
return cls(*[float(x) for x in text.split(',')]) | |
@classmethod | |
def random(cls): | |
return cls(*[random.random() for _ in range(3)], random.random()*2 - 1) | |
class PointDistances: | |
""" | |
A collection of PointDistance instances for a single | |
3D model. | |
""" | |
def __init__(self, points): | |
self.points = points | |
def vectorize(self): | |
""" | |
Vectorize creates a sequence that can be fed to a | |
sequence model as a prediction task. | |
Each input is a vector containing info about the | |
previous point and the current point. Each output | |
is a one-hot vector of size NUM_DISTANCE_BINS. | |
Returns: | |
A tuple (inputs, outputs). | |
""" | |
inputs = [] | |
outputs = [] | |
previous = None | |
for previous, point in zip([None] + self.points, self.points): | |
input_arr = point.vectorize(previous=previous) | |
inputs.append(input_arr) | |
outputs.append(discretize_signed_distance(point.distance)) | |
return np.array(inputs, dtype=np.float32), np.array(outputs, dtype=np.float32) | |
@classmethod | |
def from_string(cls, text): | |
return cls([PointDistance.from_string(p) for p in text.split(' ')]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Full density models. | |
""" | |
import math | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from .attention import AttentionLayer | |
from .data import (NUM_DISTANCE_BINS, POINT_VECTOR_LEN, PointDistance, PointDistances, | |
undiscretize_signed_distance) | |
COORD_RESOLUTION = 1000 | |
COORD_EMBEDDING_SIZE = 64 | |
TRANSFORMER_SIZE = 512 | |
NUM_HEADS = 8 | |
HIDDEN_SIZE = 2048 | |
class DensityModel(nn.Module): | |
""" | |
A fully-fledged density model. | |
Takes input sequences of shape | |
[N x T x POINT_VECTOR_LEN] and outputs | |
logit sequences of shape [N x T x NUM_DISTANCE_BINS]. | |
""" | |
def __init__(self): | |
super().__init__() | |
self.dim_embedding = nn.Embedding(COORD_RESOLUTION, COORD_EMBEDDING_SIZE) | |
self.in_layer = nn.Linear(COORD_EMBEDDING_SIZE*6 + NUM_DISTANCE_BINS, TRANSFORMER_SIZE) | |
self.layers = nn.Sequential( | |
AttentionLayer(TRANSFORMER_SIZE, num_heads=NUM_HEADS, hidden=HIDDEN_SIZE), | |
AttentionLayer(TRANSFORMER_SIZE, num_heads=NUM_HEADS, hidden=HIDDEN_SIZE), | |
AttentionLayer(TRANSFORMER_SIZE, num_heads=NUM_HEADS, hidden=HIDDEN_SIZE), | |
AttentionLayer(TRANSFORMER_SIZE, num_heads=NUM_HEADS, hidden=HIDDEN_SIZE), | |
) | |
self.out_layer = nn.Linear(TRANSFORMER_SIZE, NUM_DISTANCE_BINS) | |
def forward(self, x): | |
batch = x.shape[0] | |
timesteps = x.shape[1] | |
x = x.view(-1, POINT_VECTOR_LEN) | |
x = self._embed_inputs(x) | |
x = self.in_layer(x) | |
x = F.relu(x) | |
x = x.view(batch, timesteps, -1) | |
x = self.layers(x) | |
x = x.view(-1, TRANSFORMER_SIZE) | |
x = self.out_layer(x) | |
x = x.view(batch, timesteps, NUM_DISTANCE_BINS) | |
return x | |
def _embed_inputs(self, x): | |
dims = torch.cat([x[:, :3], x[:, -3:]], dim=-1) | |
# Hack to prevent an index out of bounds by | |
# subtracting a tiny number. | |
discrete_dims = (dims * COORD_RESOLUTION - 1e-4).long() | |
embedded_dims = self.dim_embedding(discrete_dims).view(-1, COORD_EMBEDDING_SIZE * 6) | |
distances = x[:, 3:-3] * math.sqrt(x.shape[-1] - 6) | |
x = torch.cat([distances, embedded_dims], dim=-1) | |
return x | |
def tail_runner(self, x): | |
""" | |
Given a sequence x, create a function that takes a | |
batch of next inputs and produces a batch of next | |
outputs. | |
This can be used to run the model for | |
[prefix..., x1], [prefix..., x2], etc. at once. | |
Args: | |
x: a [T x C] sequence Tensor. | |
Returns: | |
An [N x C] Tensor. | |
""" | |
layer_in = F.relu(self.in_layer(self._embed_inputs(x))) | |
layer_ins = [] | |
for attention in self.layers: | |
layer_ins.append(layer_in) | |
layer_in = attention(layer_in[None])[0] | |
def result_func(nexts): | |
nexts = F.relu(self.in_layer(self._embed_inputs(nexts))) | |
for orig_input, layer in zip(layer_ins, self.layers): | |
nexts = layer.run_tail(orig_input, nexts) | |
return self.out_layer(nexts) | |
return result_func | |
def sample_point_distances(self, n): | |
""" | |
Sample the given number of PointDistance objects | |
autoregressively and return the result as a | |
PointDistances object. | |
""" | |
device = next(self.parameters()).device | |
distances = PointDistances([]) | |
for i in range(n): | |
distances.points.append(PointDistance.random()) | |
ins, _ = distances.vectorize() | |
outputs = self(torch.from_numpy(ins).to(device)[None]) | |
logits = outputs[0, -1].detach().cpu().numpy() | |
sample = sample_softmax(logits) | |
distances.points[-1].distance = undiscretize_signed_distance(sample) | |
return distances | |
def point_runner(self, known): | |
""" | |
Get a function that takes a PointDistance with an | |
unknown distance and returns the expected | |
prediction of the model. | |
Args: | |
known: a PointDistances that is already known. | |
Returns: | |
A function that takes a list of PointDistance | |
objects and returns a list of expected signed | |
distances. | |
""" | |
device = next(self.parameters()).device | |
runner = self.tail_runner(torch.from_numpy(known.vectorize()[0]).to(device)) | |
def evaluate_fn(points): | |
vecs = np.array([point.vectorize(previous=known.points[-1]) | |
for point in points], dtype=np.float32) | |
batch_out = runner(torch.from_numpy(vecs).to(device)) | |
all_logits = batch_out.detach().cpu().numpy() | |
results = [] | |
for logits in all_logits: | |
probs = np.exp(logits) | |
probs /= np.sum(probs) | |
result = 0 | |
for i, prob in enumerate(probs): | |
result += undiscretize_signed_distance(i) * prob | |
results.append(result) | |
return results | |
return evaluate_fn | |
def sample_softmax(x): | |
probs = np.exp(x) | |
probs /= np.sum(probs) | |
return int(np.random.choice(len(x), p=probs)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment