Created
July 26, 2023 04:37
-
-
Save Miking98/b888d57ecc8e3444e0d6c96302436a7a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""From: https://github.com/ermongroup/ddim/blob/main/models/diffusion.py""" | |
import math | |
import torch | |
import torch.nn as nn | |
def get_timestep_embedding(timesteps, embedding_dim): | |
""" | |
This matches the implementation in Denoising Diffusion Probabilistic Models: | |
From Fairseq. | |
Build sinusoidal embeddings. | |
This matches the implementation in tensor2tensor, but differs slightly | |
from the description in Section 3.5 of "Attention Is All You Need". | |
""" | |
assert len(timesteps.shape) == 1 | |
half_dim = embedding_dim // 2 | |
emb = math.log(10000) / (half_dim - 1) | |
emb = torch.exp(torch.arange(half_dim, dtype=torch.float32) * -emb) | |
emb = emb.to(device=timesteps.device) | |
emb = timesteps.float()[:, None] * emb[None, :] | |
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1) | |
if embedding_dim % 2 == 1: # zero pad | |
emb = torch.nn.functional.pad(emb, (0, 1, 0, 0)) | |
return emb | |
def nonlinearity(x): | |
# swish | |
return x*torch.sigmoid(x) | |
def Normalize(in_channels): | |
return torch.nn.GroupNorm(num_groups=32, num_channels=in_channels, eps=1e-6, affine=True) | |
class Upsample(nn.Module): | |
def __init__(self, in_channels, with_conv): | |
super().__init__() | |
self.with_conv = with_conv | |
if self.with_conv: | |
self.conv = torch.nn.Conv2d(in_channels, | |
in_channels, | |
kernel_size=3, | |
stride=1, | |
padding=1) | |
def forward(self, x): | |
x = torch.nn.functional.interpolate( | |
x, scale_factor=2.0, mode="nearest") | |
if self.with_conv: | |
x = self.conv(x) | |
return x | |
class Downsample(nn.Module): | |
def __init__(self, in_channels, with_conv): | |
super().__init__() | |
self.with_conv = with_conv | |
if self.with_conv: | |
# no asymmetric padding in torch conv, must do it ourselves | |
self.conv = torch.nn.Conv2d(in_channels, | |
in_channels, | |
kernel_size=3, | |
stride=2, | |
padding=0) | |
def forward(self, x): | |
if self.with_conv: | |
pad = (0, 1, 0, 1) | |
x = torch.nn.functional.pad(x, pad, mode="constant", value=0) | |
x = self.conv(x) | |
else: | |
x = torch.nn.functional.avg_pool2d(x, kernel_size=2, stride=2) | |
return x | |
class ResnetBlock(nn.Module): | |
def __init__(self, *, in_channels, out_channels=None, conv_shortcut=False, | |
dropout, temb_channels=512): | |
super().__init__() | |
self.in_channels = in_channels | |
out_channels = in_channels if out_channels is None else out_channels | |
self.out_channels = out_channels | |
self.use_conv_shortcut = conv_shortcut | |
self.norm1 = Normalize(in_channels) | |
self.conv1 = torch.nn.Conv2d(in_channels, | |
out_channels, | |
kernel_size=3, | |
stride=1, | |
padding=1) | |
self.temb_proj = torch.nn.Linear(temb_channels, | |
out_channels) | |
self.norm2 = Normalize(out_channels) | |
self.dropout = torch.nn.Dropout(dropout) | |
self.conv2 = torch.nn.Conv2d(out_channels, | |
out_channels, | |
kernel_size=3, | |
stride=1, | |
padding=1) | |
if self.in_channels != self.out_channels: | |
if self.use_conv_shortcut: | |
self.conv_shortcut = torch.nn.Conv2d(in_channels, | |
out_channels, | |
kernel_size=3, | |
stride=1, | |
padding=1) | |
else: | |
self.nin_shortcut = torch.nn.Conv2d(in_channels, | |
out_channels, | |
kernel_size=1, | |
stride=1, | |
padding=0) | |
def forward(self, x, temb): | |
h = x | |
h = self.norm1(h) | |
h = nonlinearity(h) | |
h = self.conv1(h) | |
h = h + self.temb_proj(nonlinearity(temb))[:, :, None, None] | |
h = self.norm2(h) | |
h = nonlinearity(h) | |
h = self.dropout(h) | |
h = self.conv2(h) | |
if self.in_channels != self.out_channels: | |
if self.use_conv_shortcut: | |
x = self.conv_shortcut(x) | |
else: | |
x = self.nin_shortcut(x) | |
return x+h | |
class AttnBlock(nn.Module): | |
def __init__(self, in_channels): | |
super().__init__() | |
self.in_channels = in_channels | |
self.norm = Normalize(in_channels) | |
self.q = torch.nn.Conv2d(in_channels, | |
in_channels, | |
kernel_size=1, | |
stride=1, | |
padding=0) | |
self.k = torch.nn.Conv2d(in_channels, | |
in_channels, | |
kernel_size=1, | |
stride=1, | |
padding=0) | |
self.v = torch.nn.Conv2d(in_channels, | |
in_channels, | |
kernel_size=1, | |
stride=1, | |
padding=0) | |
self.proj_out = torch.nn.Conv2d(in_channels, | |
in_channels, | |
kernel_size=1, | |
stride=1, | |
padding=0) | |
def forward(self, x): | |
h_ = x | |
h_ = self.norm(h_) | |
q = self.q(h_) | |
k = self.k(h_) | |
v = self.v(h_) | |
# compute attention | |
b, c, h, w = q.shape | |
q = q.reshape(b, c, h*w) | |
q = q.permute(0, 2, 1) # b,hw,c | |
k = k.reshape(b, c, h*w) # b,c,hw | |
w_ = torch.bmm(q, k) # b,hw,hw w[b,i,j]=sum_c q[b,i,c]k[b,c,j] | |
w_ = w_ * (int(c)**(-0.5)) | |
w_ = torch.nn.functional.softmax(w_, dim=2) | |
# attend to values | |
v = v.reshape(b, c, h*w) | |
w_ = w_.permute(0, 2, 1) # b,hw,hw (first hw of k, second of q) | |
# b, c,hw (hw of q) h_[b,c,j] = sum_i v[b,c,i] w_[b,i,j] | |
h_ = torch.bmm(v, w_) | |
h_ = h_.reshape(b, c, h, w) | |
h_ = self.proj_out(h_) | |
return x+h_ | |
class Model(nn.Module): | |
def __init__(self, config): | |
super().__init__() | |
self.config = config | |
ch, out_ch, ch_mult = config.model.ch, config.model.out_ch, tuple(config.model.ch_mult) | |
num_res_blocks = config.model.num_res_blocks | |
attn_resolutions = config.model.attn_resolutions | |
dropout = config.model.dropout | |
in_channels = config.model.in_channels | |
resolution = config.data.image_size | |
resamp_with_conv = config.model.resamp_with_conv | |
num_timesteps = config.diffusion.num_diffusion_timesteps | |
if config.model.type == 'bayesian': | |
self.logvar = nn.Parameter(torch.zeros(num_timesteps)) | |
self.ch = ch | |
self.temb_ch = self.ch*4 | |
self.num_resolutions = len(ch_mult) | |
self.num_res_blocks = num_res_blocks | |
self.resolution = resolution | |
self.in_channels = in_channels | |
# timestep embedding | |
self.temb = nn.Module() | |
self.temb.dense = nn.ModuleList([ | |
torch.nn.Linear(self.ch, | |
self.temb_ch), | |
torch.nn.Linear(self.temb_ch, | |
self.temb_ch), | |
]) | |
# downsampling | |
self.conv_in = torch.nn.Conv2d(in_channels, | |
self.ch, | |
kernel_size=3, | |
stride=1, | |
padding=1) | |
curr_res = resolution | |
in_ch_mult = (1,)+ch_mult | |
self.down = nn.ModuleList() | |
block_in = None | |
for i_level in range(self.num_resolutions): | |
block = nn.ModuleList() | |
attn = nn.ModuleList() | |
block_in = ch*in_ch_mult[i_level] | |
block_out = ch*ch_mult[i_level] | |
for i_block in range(self.num_res_blocks): | |
block.append(ResnetBlock(in_channels=block_in, | |
out_channels=block_out, | |
temb_channels=self.temb_ch, | |
dropout=dropout)) | |
block_in = block_out | |
if curr_res in attn_resolutions: | |
attn.append(AttnBlock(block_in)) | |
down = nn.Module() | |
down.block = block | |
down.attn = attn | |
if i_level != self.num_resolutions-1: | |
down.downsample = Downsample(block_in, resamp_with_conv) | |
curr_res = curr_res // 2 | |
self.down.append(down) | |
# middle | |
self.mid = nn.Module() | |
self.mid.block_1 = ResnetBlock(in_channels=block_in, | |
out_channels=block_in, | |
temb_channels=self.temb_ch, | |
dropout=dropout) | |
self.mid.attn_1 = AttnBlock(block_in) | |
self.mid.block_2 = ResnetBlock(in_channels=block_in, | |
out_channels=block_in, | |
temb_channels=self.temb_ch, | |
dropout=dropout) | |
# upsampling | |
self.up = nn.ModuleList() | |
for i_level in reversed(range(self.num_resolutions)): | |
block = nn.ModuleList() | |
attn = nn.ModuleList() | |
block_out = ch*ch_mult[i_level] | |
skip_in = ch*ch_mult[i_level] | |
for i_block in range(self.num_res_blocks+1): | |
if i_block == self.num_res_blocks: | |
skip_in = ch*in_ch_mult[i_level] | |
block.append(ResnetBlock(in_channels=block_in+skip_in, | |
out_channels=block_out, | |
temb_channels=self.temb_ch, | |
dropout=dropout)) | |
block_in = block_out | |
if curr_res in attn_resolutions: | |
attn.append(AttnBlock(block_in)) | |
up = nn.Module() | |
up.block = block | |
up.attn = attn | |
if i_level != 0: | |
up.upsample = Upsample(block_in, resamp_with_conv) | |
curr_res = curr_res * 2 | |
self.up.insert(0, up) # prepend to get consistent order | |
# end | |
self.norm_out = Normalize(block_in) | |
self.conv_out = torch.nn.Conv2d(block_in, | |
out_ch, | |
kernel_size=3, | |
stride=1, | |
padding=1) | |
def forward(self, x, t): | |
# Asserts that the width and height of the input image are equal to a preset resolution | |
assert x.shape[2] == x.shape[3] == self.resolution | |
# timestep embedding | |
temb = get_timestep_embedding(t, self.ch) | |
temb = self.temb.dense[0](temb) | |
temb = nonlinearity(temb) | |
temb = self.temb.dense[1](temb) | |
# downsampling | |
# The input tensor x is passed through a convolutional layer (self.conv_in), and the result is stored in hs. The model then applies a sequence of residual blocks and possibly an attention mechanism at each resolution level. The output at each stage is appended to the list hs. If it's not the final resolution level, it applies a downsampling operation. | |
hs = [self.conv_in(x)] | |
for i_level in range(self.num_resolutions): | |
for i_block in range(self.num_res_blocks): | |
h = self.down[i_level].block[i_block](hs[-1], temb) | |
if len(self.down[i_level].attn) > 0: | |
h = self.down[i_level].attn[i_block](h) | |
hs.append(h) | |
if i_level != self.num_resolutions-1: | |
hs.append(self.down[i_level].downsample(hs[-1])) | |
# The output of the downsampling stage (hs[-1]) goes through two blocks and an attention mechanism. The same timestep embedding temb is used as before. | |
h = hs[-1] | |
h = self.mid.block_1(h, temb) | |
h = self.mid.attn_1(h) | |
h = self.mid.block_2(h, temb) | |
# upsampling | |
# The model uses the outputs stored in hs and the output from the "middle" part h, applying a sequence of blocks (with potential attention) at each resolution level in reverse order. At each level, the block's output is concatenated with the corresponding tensor in hs. If it's not the lowest resolution level, it applies an upsampling operation. | |
for i_level in reversed(range(self.num_resolutions)): | |
for i_block in range(self.num_res_blocks+1): | |
h = self.up[i_level].block[i_block]( | |
torch.cat([h, hs.pop()], dim=1), temb) | |
if len(self.up[i_level].attn) > 0: | |
h = self.up[i_level].attn[i_block](h) | |
if i_level != 0: | |
h = self.up[i_level].upsample(h) | |
# The output tensor from the upsampling stage is normalized, passed through a nonlinearity, and a convolutional layer.d | |
h = self.norm_out(h) | |
h = nonlinearity(h) | |
h = self.conv_out(h) | |
return h | |
@property | |
def device(self) -> str: | |
return next(self.parameters()).device |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment