Skip to content

Instantly share code, notes, and snippets.

@WayneCui
Last active July 3, 2018 08:14
Show Gist options
  • Save WayneCui/284bbaa539f5490c6503f53df19dcff4 to your computer and use it in GitHub Desktop.
Save WayneCui/284bbaa539f5490c6503f53df19dcff4 to your computer and use it in GitHub Desktop.
yolov3 to TF converter, not finished!!!
#! /usr/bin/env python
"""
Reads Darknet config and weights and creates Tensorflow model.
"""
import argparse
import configparser
import io
import os
from collections import defaultdict
import numpy as np
import tensorflow as tf
parser = argparse.ArgumentParser(description='Darknet To Tensorflow Converter.')
parser.add_argument('config_path', help='Path to Darknet cfg file.')
parser.add_argument('weights_path', help='Path to Darknet weights file.')
parser.add_argument('output_path', help='Path to output Keras model file.')
def unique_config_sections(config_file):
section_counters = defaultdict(int)
output_stream = io.StringIO()
with open(config_file) as fin:
for line in fin:
if line.startswith('['):
section = line.strip().strip('[]')
_section = section + '_' + str(section_counters[section])
section_counters[section] += 1
line = line.replace(section, _section)
output_stream.write(line)
output_stream.seek(0)
return output_stream
def _main(args):
config_path = os.path.expanduser(args.config_path)
weights_path = os.path.expanduser(args.weights_path)
assert config_path.endswith('.cfg'), '{} is not a .cfg file'.format(config_path)
assert weights_path.endswith('.weights'), '{} is not a .weights file'.format(weights_path)
output_path = os.path.expanduser(args.output_path)
# assert
output_root = os.path.splitext(output_path)[0]
# Load weights and config.
print('Loading weights.')
weights_file = open(weights_path, 'rb')
major, minor, revision = np.ndarray(
shape=(3, ), dtype='int32', buffer=weights_file.read(12))
if(major*10+minor)>=2 and major<1000 and minor < 1000:
seen = np.ndarray(shape=(1, ), dtype='int64', buffer=weights_file.read(8))
else:
seen = np.ndarray(shape=(1, ), dtype='int32', buffer=weights_file.read(4))
print('Weights Header: ', major, minor, revision, seen)
print('Parsing Darknet config.')
unique_config_file = unique_config_sections(config_path)
cfg_parser = configparser.ConfigParser()
cfg_parser.read_file(unique_config_file)
print('Creating TF model.')
input_layer = tf.placeholder(tf.float32, [None, None, None, 3])
prev_layer = input_layer
all_layers = []
weight_decay = float(cfg_parser['net_0']['decay']) if 'net_0' in cfg_parser.sections() else 5e-4
count = 0
out_index = []
for section in cfg_parser.sections():
print('Parsing section {}'.format(section))
if section.startswith('convolutional'):
filters = int(cfg_parser[section]['filters'])
size = int(cfg_parser[section]['size'])
stride = int(cfg_parser[section]['stride'])
pad = int(cfg_parser[section]['pad'])
activation = cfg_parser[section]['activation']
batch_normalize = 'batch_normalize' in cfg_parser[section]
padding = 'same' if pad == 1 and stride == 1 else 'valid'
# Setting weights.
# Darknet serializes convolutional weights as:
# [bias/beta, [gamma, mean, variance], conv_weights]
prev_layer_shape = prev_layer.get_shape().as_list()
print(prev_layer_shape)
weights_shape = (size, size, prev_layer_shape[-1], filters)
darknet_w_shape = (filters, weights_shape[2], size, size)
weights_size = np.product(weights_shape)
print('conv2d', 'bn'
if batch_normalize else ' ', activation, weights_shape)
conv_bias = np.ndarray(
shape=(filters,),
dtype='float32',
buffer=weights_file.read(filters * 4)
),
count += filters
if batch_normalize:
bn_weights = np.ndarray(
shape=(3, filters),
dtype='float32',
buffer=weights_file.read(filters * 12)
)
count += 3 * filters
bn_weight_list = [
bn_weights[0], # scale gamma
conv_bias, # shift beta
bn_weights[1], # running mean
bn_weights[2] # running var
]
conv_weights = np.ndarray(
shape=darknet_w_shape,
dtype='float32',
buffer=weights_file.read(weights_size * 4)
)
count += weights_size
# DarkNet conv_weights are serialized Caffe-style:
# (out_dim, in_dim, height, width)
# We would like to set these to Tensorflow order:
# (height, width, in_dim, out_dim)
conv_weights = np.transpose(conv_weights, [2, 3, 1, 0])
conv_weights = [conv_weights] if batch_normalize else [
conv_weights, conv_bias
]
# Handle activation.
act_fn = None
if activation == 'leaky':
pass # Add advanced activation later.
elif activation != 'linear':
raise ValueError('Unknown activation function `{}` in section {}'.format(
activation, section))
# Create Conv2D layer
if stride > 1:
# Darknet uses left and top padding instead of 'same' mode
prev_layer = tf.keras.layers.ZeroPadding2D(((1, 0), (1, 0)))(prev_layer)
print('==============================')
print(prev_layer.get_shape().as_list())
print('==============================')
conv_layer = (tf.keras.layers.Conv2D(
filters, (size, size),
strides=(stride, stride),
kernel_regularizer=tf.keras.regularizers.l2(weight_decay),
use_bias=not batch_normalize,
weights=conv_weights,
activation=act_fn,
padding=padding
))(prev_layer)
if batch_normalize:
conv_layer = (
tf.keras.layers.BatchNormalization(weights=bn_weight_list
))(conv_layer)
prev_layer = conv_layer
if activation == 'linear':
all_layers.append(prev_layer)
elif activation == 'leaky':
act_layer = tf.keras.layers.LeakyRelu(alpha=0.1)(prev_layer)
prev_layer = act_layer
all_layers.append(act_layer)
elif section.startswith('route'):
ids = [int(i) for i in cfg_parser[section]['layers'].split(',')]
layers = [all_layers[i] for i in ids]
if len(layers) > 1:
print('Concatenating route layers:', layers)
concatenate_layer = tf.keras.layers.Concatenate()(layers)
all_layers.append(concatenate_layer)
prev_layer = concatenate_layer
else:
skip_layer = layers[0]
all_layers.append(skip_layer)
prev_layer = skip_layer
elif section.startswith('maxpool'):
size = int(cfg_parser[section]['size'])
stride = int(cfg_parser[section]['stride'])
all_layers.append(
tf.keras.layers.MaxPooling2D(
pool_size=(size, size),
strides=(stride, stride),
padding='same'
)(prev_layer)
)
prev_layer = all_layers[-1]
elif section.startswith('shortcut'):
index = int(cfg_parser[section]['from'])
activation = cfg_parser[section]['activation']
assert activation == 'linear', 'Only linear activation supported.'
all_layers.append(tf.keras.layers.Add()([all_layers[index], prev_layer]))
prev_layer = all_layers[-1]
elif section.startswith('upsample'):
stride = int(cfg_parser[section]['stride'])
assert stride == 2, 'Only strinde=2 supported.'
all_layers.append(tf.keras.layers.UpSampling2D(stride)(prev_layer))
prev_layer = all_layers[-1]
elif section.startswith('yolo'):
out_index.append(len(all_layers) - 1)
all_layers.append(None)
prev_layer = all_layers[-1]
elif section.startswith('net'):
pass
else:
raise ValueError('Unsupported section header type: {}'.format(section))
# Create and save model.
if len(out_index) == 0: out_index.append(len(all_layers) - 1)
sess = tf.InteractiveSession()
tf.global_variables_initializer().run()
saver = tf.train.Saver()
saver.save(sess, output_path)
if __name__ == '__main__':
_main(parser.parse_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment