Last active
July 3, 2018 08:14
-
-
Save WayneCui/284bbaa539f5490c6503f53df19dcff4 to your computer and use it in GitHub Desktop.
yolov3 to TF converter, not finished!!!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
Reads Darknet config and weights and creates Tensorflow model. | |
""" | |
import argparse | |
import configparser | |
import io | |
import os | |
from collections import defaultdict | |
import numpy as np | |
import tensorflow as tf | |
parser = argparse.ArgumentParser(description='Darknet To Tensorflow Converter.') | |
parser.add_argument('config_path', help='Path to Darknet cfg file.') | |
parser.add_argument('weights_path', help='Path to Darknet weights file.') | |
parser.add_argument('output_path', help='Path to output Keras model file.') | |
def unique_config_sections(config_file): | |
section_counters = defaultdict(int) | |
output_stream = io.StringIO() | |
with open(config_file) as fin: | |
for line in fin: | |
if line.startswith('['): | |
section = line.strip().strip('[]') | |
_section = section + '_' + str(section_counters[section]) | |
section_counters[section] += 1 | |
line = line.replace(section, _section) | |
output_stream.write(line) | |
output_stream.seek(0) | |
return output_stream | |
def _main(args): | |
config_path = os.path.expanduser(args.config_path) | |
weights_path = os.path.expanduser(args.weights_path) | |
assert config_path.endswith('.cfg'), '{} is not a .cfg file'.format(config_path) | |
assert weights_path.endswith('.weights'), '{} is not a .weights file'.format(weights_path) | |
output_path = os.path.expanduser(args.output_path) | |
# assert | |
output_root = os.path.splitext(output_path)[0] | |
# Load weights and config. | |
print('Loading weights.') | |
weights_file = open(weights_path, 'rb') | |
major, minor, revision = np.ndarray( | |
shape=(3, ), dtype='int32', buffer=weights_file.read(12)) | |
if(major*10+minor)>=2 and major<1000 and minor < 1000: | |
seen = np.ndarray(shape=(1, ), dtype='int64', buffer=weights_file.read(8)) | |
else: | |
seen = np.ndarray(shape=(1, ), dtype='int32', buffer=weights_file.read(4)) | |
print('Weights Header: ', major, minor, revision, seen) | |
print('Parsing Darknet config.') | |
unique_config_file = unique_config_sections(config_path) | |
cfg_parser = configparser.ConfigParser() | |
cfg_parser.read_file(unique_config_file) | |
print('Creating TF model.') | |
input_layer = tf.placeholder(tf.float32, [None, None, None, 3]) | |
prev_layer = input_layer | |
all_layers = [] | |
weight_decay = float(cfg_parser['net_0']['decay']) if 'net_0' in cfg_parser.sections() else 5e-4 | |
count = 0 | |
out_index = [] | |
for section in cfg_parser.sections(): | |
print('Parsing section {}'.format(section)) | |
if section.startswith('convolutional'): | |
filters = int(cfg_parser[section]['filters']) | |
size = int(cfg_parser[section]['size']) | |
stride = int(cfg_parser[section]['stride']) | |
pad = int(cfg_parser[section]['pad']) | |
activation = cfg_parser[section]['activation'] | |
batch_normalize = 'batch_normalize' in cfg_parser[section] | |
padding = 'same' if pad == 1 and stride == 1 else 'valid' | |
# Setting weights. | |
# Darknet serializes convolutional weights as: | |
# [bias/beta, [gamma, mean, variance], conv_weights] | |
prev_layer_shape = prev_layer.get_shape().as_list() | |
print(prev_layer_shape) | |
weights_shape = (size, size, prev_layer_shape[-1], filters) | |
darknet_w_shape = (filters, weights_shape[2], size, size) | |
weights_size = np.product(weights_shape) | |
print('conv2d', 'bn' | |
if batch_normalize else ' ', activation, weights_shape) | |
conv_bias = np.ndarray( | |
shape=(filters,), | |
dtype='float32', | |
buffer=weights_file.read(filters * 4) | |
), | |
count += filters | |
if batch_normalize: | |
bn_weights = np.ndarray( | |
shape=(3, filters), | |
dtype='float32', | |
buffer=weights_file.read(filters * 12) | |
) | |
count += 3 * filters | |
bn_weight_list = [ | |
bn_weights[0], # scale gamma | |
conv_bias, # shift beta | |
bn_weights[1], # running mean | |
bn_weights[2] # running var | |
] | |
conv_weights = np.ndarray( | |
shape=darknet_w_shape, | |
dtype='float32', | |
buffer=weights_file.read(weights_size * 4) | |
) | |
count += weights_size | |
# DarkNet conv_weights are serialized Caffe-style: | |
# (out_dim, in_dim, height, width) | |
# We would like to set these to Tensorflow order: | |
# (height, width, in_dim, out_dim) | |
conv_weights = np.transpose(conv_weights, [2, 3, 1, 0]) | |
conv_weights = [conv_weights] if batch_normalize else [ | |
conv_weights, conv_bias | |
] | |
# Handle activation. | |
act_fn = None | |
if activation == 'leaky': | |
pass # Add advanced activation later. | |
elif activation != 'linear': | |
raise ValueError('Unknown activation function `{}` in section {}'.format( | |
activation, section)) | |
# Create Conv2D layer | |
if stride > 1: | |
# Darknet uses left and top padding instead of 'same' mode | |
prev_layer = tf.keras.layers.ZeroPadding2D(((1, 0), (1, 0)))(prev_layer) | |
print('==============================') | |
print(prev_layer.get_shape().as_list()) | |
print('==============================') | |
conv_layer = (tf.keras.layers.Conv2D( | |
filters, (size, size), | |
strides=(stride, stride), | |
kernel_regularizer=tf.keras.regularizers.l2(weight_decay), | |
use_bias=not batch_normalize, | |
weights=conv_weights, | |
activation=act_fn, | |
padding=padding | |
))(prev_layer) | |
if batch_normalize: | |
conv_layer = ( | |
tf.keras.layers.BatchNormalization(weights=bn_weight_list | |
))(conv_layer) | |
prev_layer = conv_layer | |
if activation == 'linear': | |
all_layers.append(prev_layer) | |
elif activation == 'leaky': | |
act_layer = tf.keras.layers.LeakyRelu(alpha=0.1)(prev_layer) | |
prev_layer = act_layer | |
all_layers.append(act_layer) | |
elif section.startswith('route'): | |
ids = [int(i) for i in cfg_parser[section]['layers'].split(',')] | |
layers = [all_layers[i] for i in ids] | |
if len(layers) > 1: | |
print('Concatenating route layers:', layers) | |
concatenate_layer = tf.keras.layers.Concatenate()(layers) | |
all_layers.append(concatenate_layer) | |
prev_layer = concatenate_layer | |
else: | |
skip_layer = layers[0] | |
all_layers.append(skip_layer) | |
prev_layer = skip_layer | |
elif section.startswith('maxpool'): | |
size = int(cfg_parser[section]['size']) | |
stride = int(cfg_parser[section]['stride']) | |
all_layers.append( | |
tf.keras.layers.MaxPooling2D( | |
pool_size=(size, size), | |
strides=(stride, stride), | |
padding='same' | |
)(prev_layer) | |
) | |
prev_layer = all_layers[-1] | |
elif section.startswith('shortcut'): | |
index = int(cfg_parser[section]['from']) | |
activation = cfg_parser[section]['activation'] | |
assert activation == 'linear', 'Only linear activation supported.' | |
all_layers.append(tf.keras.layers.Add()([all_layers[index], prev_layer])) | |
prev_layer = all_layers[-1] | |
elif section.startswith('upsample'): | |
stride = int(cfg_parser[section]['stride']) | |
assert stride == 2, 'Only strinde=2 supported.' | |
all_layers.append(tf.keras.layers.UpSampling2D(stride)(prev_layer)) | |
prev_layer = all_layers[-1] | |
elif section.startswith('yolo'): | |
out_index.append(len(all_layers) - 1) | |
all_layers.append(None) | |
prev_layer = all_layers[-1] | |
elif section.startswith('net'): | |
pass | |
else: | |
raise ValueError('Unsupported section header type: {}'.format(section)) | |
# Create and save model. | |
if len(out_index) == 0: out_index.append(len(all_layers) - 1) | |
sess = tf.InteractiveSession() | |
tf.global_variables_initializer().run() | |
saver = tf.train.Saver() | |
saver.save(sess, output_path) | |
if __name__ == '__main__': | |
_main(parser.parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment