Skip to content

Instantly share code, notes, and snippets.

@unixpickle
Created November 18, 2019 21:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save unixpickle/e5266818ea5c0f5d64bac0489e1b4d38 to your computer and use it in GitHub Desktop.
Save unixpickle/e5266818ea5c0f5d64bac0489e1b4d38 to your computer and use it in GitHub Desktop.
3D model generator model
"""
Reading data produced by an external program.
"""
import math
import random
import numpy as np
import torch
DIST_SCALE = 100
NUM_DISTANCE_BINS = DIST_SCALE * 2 - 1
POINT_VECTOR_LEN = 6 + NUM_DISTANCE_BINS
def load_sequences(reader, device, batch_size):
"""
Load Tensors for (input, output) sequences.
"""
stream = read_point_distances(reader)
while True:
inputs = []
outputs = []
for _ in range(batch_size):
points = next(stream)
ins, outs = points.vectorize()
inputs.append(ins)
outputs.append(outs)
yield (torch.from_numpy(np.array(inputs)).to(device),
torch.from_numpy(np.array(outputs)).to(device).long())
def read_point_distances(reader):
"""
Iteratively read PointDistances from the given input
stream (e.g. os.stdin).
"""
for line in reader:
yield PointDistances.from_string(line.rstrip())
def discretize_signed_distance(d):
"""
Discretize the distance d into a bucket in the range
[0, NUM_DISTANCE_BINS).
"""
if d < 0:
return (DIST_SCALE*2 - 2) - discretize_signed_distance(-d)
x = int(round(math.sqrt(d) * (DIST_SCALE - 1)))
if x > DIST_SCALE - 1:
x = DIST_SCALE - 1
return DIST_SCALE + x - 1
def undiscretize_signed_distance(bucket):
"""
Perform a lossy inverse of discretize_signed_distance.
"""
x = (bucket - DIST_SCALE + 1) / (DIST_SCALE - 1)
return math.pow(x, 2) * (-1 if x < 0 else 1)
class PointDistance:
"""
A data point indicating how far away a 3D coordinate
is from the surface of a model. Negative distances
imply that the point is within the model.
"""
def __init__(self, cx, cy, cz, distance):
self.cx = cx
self.cy = cy
self.cz = cz
self.distance = distance
def coordinates(self):
return [self.cx, self.cy, self.cz]
def vectorize(self, previous=None):
input_arr = [0.0] * (3 + NUM_DISTANCE_BINS)
if previous is not None:
prev_one_hot = [0.0] * NUM_DISTANCE_BINS
prev_one_hot[discretize_signed_distance(previous.distance)] = 1
input_arr = previous.coordinates() + prev_one_hot
input_arr += self.coordinates()
return input_arr
@classmethod
def from_string(cls, text):
return cls(*[float(x) for x in text.split(',')])
@classmethod
def random(cls):
return cls(*[random.random() for _ in range(3)], random.random()*2 - 1)
class PointDistances:
"""
A collection of PointDistance instances for a single
3D model.
"""
def __init__(self, points):
self.points = points
def vectorize(self):
"""
Vectorize creates a sequence that can be fed to a
sequence model as a prediction task.
Each input is a vector containing info about the
previous point and the current point. Each output
is a one-hot vector of size NUM_DISTANCE_BINS.
Returns:
A tuple (inputs, outputs).
"""
inputs = []
outputs = []
previous = None
for previous, point in zip([None] + self.points, self.points):
input_arr = point.vectorize(previous=previous)
inputs.append(input_arr)
outputs.append(discretize_signed_distance(point.distance))
return np.array(inputs, dtype=np.float32), np.array(outputs, dtype=np.float32)
@classmethod
def from_string(cls, text):
return cls([PointDistance.from_string(p) for p in text.split(' ')])
"""
Full density models.
"""
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from .attention import AttentionLayer
from .data import (NUM_DISTANCE_BINS, POINT_VECTOR_LEN, PointDistance, PointDistances,
undiscretize_signed_distance)
COORD_RESOLUTION = 1000
COORD_EMBEDDING_SIZE = 64
TRANSFORMER_SIZE = 512
NUM_HEADS = 8
HIDDEN_SIZE = 2048
class DensityModel(nn.Module):
"""
A fully-fledged density model.
Takes input sequences of shape
[N x T x POINT_VECTOR_LEN] and outputs
logit sequences of shape [N x T x NUM_DISTANCE_BINS].
"""
def __init__(self):
super().__init__()
self.dim_embedding = nn.Embedding(COORD_RESOLUTION, COORD_EMBEDDING_SIZE)
self.in_layer = nn.Linear(COORD_EMBEDDING_SIZE*6 + NUM_DISTANCE_BINS, TRANSFORMER_SIZE)
self.layers = nn.Sequential(
AttentionLayer(TRANSFORMER_SIZE, num_heads=NUM_HEADS, hidden=HIDDEN_SIZE),
AttentionLayer(TRANSFORMER_SIZE, num_heads=NUM_HEADS, hidden=HIDDEN_SIZE),
AttentionLayer(TRANSFORMER_SIZE, num_heads=NUM_HEADS, hidden=HIDDEN_SIZE),
AttentionLayer(TRANSFORMER_SIZE, num_heads=NUM_HEADS, hidden=HIDDEN_SIZE),
)
self.out_layer = nn.Linear(TRANSFORMER_SIZE, NUM_DISTANCE_BINS)
def forward(self, x):
batch = x.shape[0]
timesteps = x.shape[1]
x = x.view(-1, POINT_VECTOR_LEN)
x = self._embed_inputs(x)
x = self.in_layer(x)
x = F.relu(x)
x = x.view(batch, timesteps, -1)
x = self.layers(x)
x = x.view(-1, TRANSFORMER_SIZE)
x = self.out_layer(x)
x = x.view(batch, timesteps, NUM_DISTANCE_BINS)
return x
def _embed_inputs(self, x):
dims = torch.cat([x[:, :3], x[:, -3:]], dim=-1)
# Hack to prevent an index out of bounds by
# subtracting a tiny number.
discrete_dims = (dims * COORD_RESOLUTION - 1e-4).long()
embedded_dims = self.dim_embedding(discrete_dims).view(-1, COORD_EMBEDDING_SIZE * 6)
distances = x[:, 3:-3] * math.sqrt(x.shape[-1] - 6)
x = torch.cat([distances, embedded_dims], dim=-1)
return x
def tail_runner(self, x):
"""
Given a sequence x, create a function that takes a
batch of next inputs and produces a batch of next
outputs.
This can be used to run the model for
[prefix..., x1], [prefix..., x2], etc. at once.
Args:
x: a [T x C] sequence Tensor.
Returns:
An [N x C] Tensor.
"""
layer_in = F.relu(self.in_layer(self._embed_inputs(x)))
layer_ins = []
for attention in self.layers:
layer_ins.append(layer_in)
layer_in = attention(layer_in[None])[0]
def result_func(nexts):
nexts = F.relu(self.in_layer(self._embed_inputs(nexts)))
for orig_input, layer in zip(layer_ins, self.layers):
nexts = layer.run_tail(orig_input, nexts)
return self.out_layer(nexts)
return result_func
def sample_point_distances(self, n):
"""
Sample the given number of PointDistance objects
autoregressively and return the result as a
PointDistances object.
"""
device = next(self.parameters()).device
distances = PointDistances([])
for i in range(n):
distances.points.append(PointDistance.random())
ins, _ = distances.vectorize()
outputs = self(torch.from_numpy(ins).to(device)[None])
logits = outputs[0, -1].detach().cpu().numpy()
sample = sample_softmax(logits)
distances.points[-1].distance = undiscretize_signed_distance(sample)
return distances
def point_runner(self, known):
"""
Get a function that takes a PointDistance with an
unknown distance and returns the expected
prediction of the model.
Args:
known: a PointDistances that is already known.
Returns:
A function that takes a list of PointDistance
objects and returns a list of expected signed
distances.
"""
device = next(self.parameters()).device
runner = self.tail_runner(torch.from_numpy(known.vectorize()[0]).to(device))
def evaluate_fn(points):
vecs = np.array([point.vectorize(previous=known.points[-1])
for point in points], dtype=np.float32)
batch_out = runner(torch.from_numpy(vecs).to(device))
all_logits = batch_out.detach().cpu().numpy()
results = []
for logits in all_logits:
probs = np.exp(logits)
probs /= np.sum(probs)
result = 0
for i, prob in enumerate(probs):
result += undiscretize_signed_distance(i) * prob
results.append(result)
return results
return evaluate_fn
def sample_softmax(x):
probs = np.exp(x)
probs /= np.sum(probs)
return int(np.random.choice(len(x), p=probs))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment