ColossalAI/examples/images/diffusion/ldm/data/cifar10.py

200 lines
6.5 KiB
Python

import json
from pathlib import Path
from typing import Dict
import torch
from datasets import load_dataset
from einops import rearrange
from ldm.util import instantiate_from_config
from omegaconf import DictConfig, ListConfig
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms
def make_multi_folder_data(paths, caption_files=None, **kwargs):
"""Make a concat dataset from multiple folders
Don't suport captions yet
If paths is a list, that's ok, if it's a Dict interpret it as:
k=folder v=n_times to repeat that
"""
list_of_paths = []
if isinstance(paths, (Dict, DictConfig)):
assert caption_files is None, "Caption files not yet supported for repeats"
for folder_path, repeats in paths.items():
list_of_paths.extend([folder_path] * repeats)
paths = list_of_paths
if caption_files is not None:
datasets = [FolderData(p, caption_file=c, **kwargs) for (p, c) in zip(paths, caption_files)]
else:
datasets = [FolderData(p, **kwargs) for p in paths]
return torch.utils.data.ConcatDataset(datasets)
class FolderData(Dataset):
def __init__(
self,
root_dir,
caption_file=None,
image_transforms=[],
ext="jpg",
default_caption="",
postprocess=None,
return_paths=False,
) -> None:
"""Create a dataset from a folder of images.
If you pass in a root directory it will be searched for images
ending in ext (ext can be a list)
"""
self.root_dir = Path(root_dir)
self.default_caption = default_caption
self.return_paths = return_paths
if isinstance(postprocess, DictConfig):
postprocess = instantiate_from_config(postprocess)
self.postprocess = postprocess
if caption_file is not None:
with open(caption_file, "rt") as f:
ext = Path(caption_file).suffix.lower()
if ext == ".json":
captions = json.load(f)
elif ext == ".jsonl":
lines = f.readlines()
lines = [json.loads(x) for x in lines]
captions = {x["file_name"]: x["text"].strip("\n") for x in lines}
else:
raise ValueError(f"Unrecognised format: {ext}")
self.captions = captions
else:
self.captions = None
if not isinstance(ext, (tuple, list, ListConfig)):
ext = [ext]
# Only used if there is no caption file
self.paths = []
for e in ext:
self.paths.extend(list(self.root_dir.rglob(f"*.{e}")))
if isinstance(image_transforms, ListConfig):
image_transforms = [instantiate_from_config(tt) for tt in image_transforms]
image_transforms.extend(
[transforms.ToTensor(), transforms.Lambda(lambda x: rearrange(x * 2.0 - 1.0, "c h w -> h w c"))]
)
image_transforms = transforms.Compose(image_transforms)
self.tform = image_transforms
def __len__(self):
if self.captions is not None:
return len(self.captions.keys())
else:
return len(self.paths)
def __getitem__(self, index):
data = {}
if self.captions is not None:
chosen = list(self.captions.keys())[index]
caption = self.captions.get(chosen, None)
if caption is None:
caption = self.default_caption
filename = self.root_dir / chosen
else:
filename = self.paths[index]
if self.return_paths:
data["path"] = str(filename)
im = Image.open(filename)
im = self.process_im(im)
data["image"] = im
if self.captions is not None:
data["txt"] = caption
else:
data["txt"] = self.default_caption
if self.postprocess is not None:
data = self.postprocess(data)
return data
def process_im(self, im):
im = im.convert("RGB")
return self.tform(im)
def hf_dataset(
name,
image_transforms=[],
image_column="img",
label_column="label",
text_column="txt",
split="train",
image_key="image",
caption_key="txt",
):
"""Make huggingface dataset with appropriate list of transforms applied"""
ds = load_dataset(name, split=split)
image_transforms = [instantiate_from_config(tt) for tt in image_transforms]
image_transforms.extend(
[transforms.ToTensor(), transforms.Lambda(lambda x: rearrange(x * 2.0 - 1.0, "c h w -> h w c"))]
)
tform = transforms.Compose(image_transforms)
assert image_column in ds.column_names, f"Didn't find column {image_column} in {ds.column_names}"
assert label_column in ds.column_names, f"Didn't find column {label_column} in {ds.column_names}"
def pre_process(examples):
processed = {}
processed[image_key] = [tform(im) for im in examples[image_column]]
label_to_text_dict = {
0: "airplane",
1: "automobile",
2: "bird",
3: "cat",
4: "deer",
5: "dog",
6: "frog",
7: "horse",
8: "ship",
9: "truck",
}
processed[caption_key] = [label_to_text_dict[label] for label in examples[label_column]]
return processed
ds.set_transform(pre_process)
return ds
class TextOnly(Dataset):
def __init__(self, captions, output_size, image_key="image", caption_key="txt", n_gpus=1):
"""Returns only captions with dummy images"""
self.output_size = output_size
self.image_key = image_key
self.caption_key = caption_key
if isinstance(captions, Path):
self.captions = self._load_caption_file(captions)
else:
self.captions = captions
if n_gpus > 1:
# hack to make sure that all the captions appear on each gpu
repeated = [n_gpus * [x] for x in self.captions]
self.captions = []
[self.captions.extend(x) for x in repeated]
def __len__(self):
return len(self.captions)
def __getitem__(self, index):
dummy_im = torch.zeros(3, self.output_size, self.output_size)
dummy_im = rearrange(dummy_im * 2.0 - 1.0, "c h w -> h w c")
return {self.image_key: dummy_im, self.caption_key: self.captions[index]}
def _load_caption_file(self, filename):
with open(filename, "rt") as f:
captions = f.readlines()
return [x.strip("\n") for x in captions]