Skip to content

Instantly share code, notes, and snippets.

@Alyetama
Last active August 17, 2022 06:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Alyetama/5348dfdb0fe4bb33596ebdd608b3dc76 to your computer and use it in GitHub Desktop.
Save Alyetama/5348dfdb0fe4bb33596ebdd608b3dc76 to your computer and use it in GitHub Desktop.
Label Studio S3 to YOLOv5
#!/usr/bin/env python
# coding: utf-8
import argparse
import json
import os
import shutil
import signal
import sys
import textwrap
from pathlib import Path
from typing import Union
import ray
import requests
from dotenv import load_dotenv
from tqdm import tqdm
from split_dataset import autosplit
class PrepareDataset:
def __init__(self,
project_id: int,
dataset_path: str = './dataset',
weights: Union[list, tuple] = (0.8, 0.2, 0.0)):
self.project_id = project_id
self.dataset_path = Path(dataset_path)
self.weights = weights
@staticmethod
def keyboard_interrupt_handler(sig: int, _) -> None:
print(f'\nKeyboardInterrupt (id: {sig}) has been caught...')
print('Terminating the session gracefully...')
sys.exit(1)
def get_project_data(self) -> None:
@ray.remote
def download(task: dict):
img_url = task['data']['image'].replace('s3://', s3_endpoint)
fname = self.dataset_path / 'images' / Path(img_url).name
if fname.exists():
return
with open(fname, 'wb') as fp:
res = requests.get(img_url)
fp.write(res.content)
load_dotenv()
s3_endpoint = os.environ['S3_ENDPOINT'].rstrip('/') + '/'
headers = {
'Authorization': f'Token {os.environ["LABEL_STUDIO_TOKEN"]}'
}
label_studio_host = os.environ['LABEL_STUDIO_HOST'].rstrip('/') + '/'
r = requests.get(
f'{label_studio_host}/api/projects/{self.project_id}/export?exportType=YOLO', # noqa E501
headers=headers)
r.raise_for_status()
with open(f'{self.dataset_path}.zip', 'wb') as f:
f.write(r.content)
shutil.unpack_archive(f'{self.dataset_path}.zip', self.dataset_path)
Path(f'{self.dataset_path}.zip').unlink()
r = requests.get(
f'{label_studio_host}/api/projects/{self.project_id}/export?exportType=JSON', # noqa E501
headers=headers)
r.raise_for_status()
data = r.json()
with open(f'{self.dataset_path}/annotated_tasks.json', 'w') as j:
json.dump(data, j)
futures = [download.remote(task) for task in tqdm(data)]
results_nums = [ray.get(future) for future in tqdm(futures)]
def create_dataset_config(self):
with open(self.dataset_path / 'classes.txt') as f:
classes = f.read().splitlines()
num_classes = len(classes)
content = f'''\
path: {self.dataset_path.absolute()}
train: autosplit_train.txt
val: autosplit_val.txt
test:
nc: {num_classes}
names: {classes}\n'''
with open(self.dataset_path / 'dataset_config.yml', 'w') as f:
f.write(textwrap.dedent(content))
def run_pipeline(self):
signal.signal(signal.SIGINT, self.keyboard_interrupt_handler)
self.get_project_data()
splits = [
'autosplit_train.txt', 'autosplit_val.txt', 'autosplit_test.txt'
]
autosplit(self.dataset_path, self.weights)
for split in splits:
if Path(split).exists():
if (self.dataset_path / split).exists():
Path(self.dataset_path / split).unlink()
shutil.move(split, self.dataset_path)
self.create_dataset_config()
def _opts() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('-p',
'--project-id',
help='Label-studio project id',
type=int,
required=True)
parser.add_argument('-d',
'--dataset-path',
help='Path to the output dataset '
'(if existing, dataset will be updated)',
type=str,
default='./dataset')
parser.add_argument(
'-w',
'--weights',
help='Split weights: train val test (default: 0.8 0.2 0.0)',
type=float,
default=[0.8, 0.2, 0.0],
nargs=3)
return parser.parse_args()
if __name__ == '__main__':
args = _opts()
pd = PrepareDataset(project_id=args.project_id,
dataset_path=args.dataset_path,
weights=args.weights)
pd.run_pipeline()
ray>=1.13.0
requests>=2.28.1
python-dotenv>=0.20.0
tqdm>=4.64.0
#!/usr/bin/env python
# coding: utf-8
"""
Source: https://github.com/ultralytics/yolov5/blob/master/utils/dataloaders.py
"""
import os
import random
from pathlib import Path
from tqdm import tqdm
def img2label_paths(img_paths) -> list:
# Define label paths as a function of image paths
sa = os.sep + 'images' + os.sep # /images/ substrings
sb = os.sep + 'labels' + os.sep # /labels/ substrings
return [
sb.join(x.rsplit(sa, 1)).rsplit('.', 1)[0] + '.txt' for x in img_paths
]
def autosplit(path: str,
weights: tuple = (0.9, 0.1, 0.0),
annotated_only: bool = False) -> None:
"""Autosplit a dataset
Autosplit a dataset into train/val/test splits and save
path/autosplit_*.txt files
Args:
path (str): Path to images directory
weights (tuple): Train, val, test weights
annotated_only (bool): Only use images with an annotated txt file
"""
IMG_FORMATS = [
'bmp', 'jpg', 'jpeg', 'png', 'tif', 'tiff', 'dng', 'webp', 'mpo'
] # acceptable image suffixes
path = Path(path) # images dir
files = sorted(x for x in path.rglob('*.*')
if x.suffix[1:].lower() in IMG_FORMATS) # image files only
n = len(files) # number of files
random.seed(0) # for reproducibility
indices = random.choices([0, 1, 2], weights=weights,
k=n) # assign each image to a split
txt = ['autosplit_train.txt', 'autosplit_val.txt',
'autosplit_test.txt'] # 3 txt files
_ = [(path.parent / x).unlink() for x in txt
if Path(x).exists()] # remove existing
print(f'Autosplitting images from {path}' +
', using *.txt labeled images only' * annotated_only)
for i, img in tqdm(zip(indices, files), total=n):
if not annotated_only or Path(img2label_paths(
[str(img)])[0]).exists(): # check label
with open(path.parent / txt[i], 'a') as f:
f.write(f'./{Path(*img.parts[1:])}' +
'\n') # add image to txt file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment