Skip to content

Instantly share code, notes, and snippets.

@mr-c
Last active October 7, 2022 08:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mr-c/3b87d8538817c023197b7ed4bb1f0210 to your computer and use it in GitHub Desktop.
Save mr-c/3b87d8538817c023197b7ed4bb1f0210 to your computer and use it in GitHub Desktop.
cwltool --print-input-schema examples for MGnify workflows & tools
#
# This file was autogenerated using schema-salad-tool --codegen=python
# The code itself is released under the Apache 2.0 license and the help text is
# subject to the license of the original schema.
import copy
import logging
import os
import pathlib
import re
import tempfile
import uuid as _uuid__ # pylint: disable=unused-import # noqa: F401
import xml.sax # nosec
from abc import ABC, abstractmethod
from io import StringIO
from typing import (
Any,
Dict,
List,
MutableMapping,
MutableSequence,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)
from urllib.parse import quote, urldefrag, urlparse, urlsplit, urlunsplit
from urllib.request import pathname2url
from rdflib import Graph
from rdflib.plugins.parsers.notation3 import BadSyntax
from ruamel.yaml.comments import CommentedMap
from schema_salad.exceptions import SchemaSaladException, ValidationException
from schema_salad.fetcher import DefaultFetcher, Fetcher, MemoryCachingFetcher
from schema_salad.sourceline import SourceLine, add_lc_filename
from schema_salad.utils import CacheType, yaml_no_ts # requires schema-salad v8.2+
_vocab: Dict[str, str] = {}
_rvocab: Dict[str, str] = {}
_logger = logging.getLogger("salad")
IdxType = MutableMapping[str, Tuple[Any, "LoadingOptions"]]
class LoadingOptions:
idx: IdxType
fileuri: Optional[str]
baseuri: str
namespaces: MutableMapping[str, str]
schemas: MutableSequence[str]
original_doc: Optional[Any]
addl_metadata: MutableMapping[str, Any]
fetcher: Fetcher
vocab: Dict[str, str]
rvocab: Dict[str, str]
cache: CacheType
def __init__(
self,
fetcher: Optional[Fetcher] = None,
namespaces: Optional[Dict[str, str]] = None,
schemas: Optional[List[str]] = None,
fileuri: Optional[str] = None,
copyfrom: Optional["LoadingOptions"] = None,
original_doc: Optional[Any] = None,
addl_metadata: Optional[Dict[str, str]] = None,
baseuri: Optional[str] = None,
idx: Optional[IdxType] = None,
) -> None:
"""Create a LoadingOptions object."""
self.original_doc = original_doc
if idx is not None:
self.idx = idx
else:
self.idx = copyfrom.idx if copyfrom is not None else {}
if fileuri is not None:
self.fileuri = fileuri
else:
self.fileuri = copyfrom.fileuri if copyfrom is not None else None
if baseuri is not None:
self.baseuri = baseuri
else:
self.baseuri = copyfrom.baseuri if copyfrom is not None else ""
if namespaces is not None:
self.namespaces = namespaces
else:
self.namespaces = copyfrom.namespaces if copyfrom is not None else {}
if schemas is not None:
self.schemas = schemas
else:
self.schemas = copyfrom.schemas if copyfrom is not None else []
if addl_metadata is not None:
self.addl_metadata = addl_metadata
else:
self.addl_metadata = copyfrom.addl_metadata if copyfrom is not None else {}
if fetcher is not None:
self.fetcher = fetcher
elif copyfrom is not None:
self.fetcher = copyfrom.fetcher
else:
import requests
from cachecontrol.caches import FileCache
from cachecontrol.wrapper import CacheControl
root = pathlib.Path(os.environ.get("HOME", tempfile.gettempdir()))
session = CacheControl(
requests.Session(),
cache=FileCache(root / ".cache" / "salad"),
)
self.fetcher: Fetcher = DefaultFetcher({}, session)
self.cache = (
self.fetcher.cache if isinstance(self.fetcher, MemoryCachingFetcher) else {}
)
self.vocab = _vocab
self.rvocab = _rvocab
if namespaces is not None:
self.vocab = self.vocab.copy()
self.rvocab = self.rvocab.copy()
for k, v in namespaces.items():
self.vocab[k] = v
self.rvocab[v] = k
@property
def graph(self) -> Graph:
"""Generate a merged rdflib.Graph from all entries in self.schemas."""
graph = Graph()
if not self.schemas:
return graph
key = str(hash(tuple(self.schemas)))
if key in self.cache:
return cast(Graph, self.cache[key])
for schema in self.schemas:
fetchurl = (
self.fetcher.urljoin(self.fileuri, schema)
if self.fileuri is not None
else pathlib.Path(schema).resolve().as_uri()
)
try:
if fetchurl not in self.cache or self.cache[fetchurl] is True:
_logger.debug("Getting external schema %s", fetchurl)
content = self.fetcher.fetch_text(fetchurl)
self.cache[fetchurl] = newGraph = Graph()
for fmt in ["xml", "turtle"]:
try:
newGraph.parse(
data=content, format=fmt, publicID=str(fetchurl)
)
break
except (xml.sax.SAXParseException, TypeError, BadSyntax):
pass
graph += self.cache[fetchurl]
except Exception as e:
_logger.warning(
"Could not load extension schema %s: %s", fetchurl, str(e)
)
self.cache[key] = graph
return graph
class Saveable(ABC):
"""Mark classes than have a save() and fromDoc() function."""
@classmethod
@abstractmethod
def fromDoc(
cls,
_doc: Any,
baseuri: str,
loadingOptions: LoadingOptions,
docRoot: Optional[str] = None,
) -> "Saveable":
"""Construct this object from the result of yaml.load()."""
@abstractmethod
def save(
self, top: bool = False, base_url: str = "", relative_uris: bool = True
) -> Dict[str, Any]:
"""Convert this object to a JSON/YAML friendly dictionary."""
def load_field(val, fieldtype, baseuri, loadingOptions):
# type: (Union[str, Dict[str, str]], _Loader, str, LoadingOptions) -> Any
if isinstance(val, MutableMapping):
if "$import" in val:
if loadingOptions.fileuri is None:
raise SchemaSaladException("Cannot load $import without fileuri")
result, metadata = _document_load_by_url(
fieldtype,
loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]),
loadingOptions,
)
return result
elif "$include" in val:
if loadingOptions.fileuri is None:
raise SchemaSaladException("Cannot load $import without fileuri")
val = loadingOptions.fetcher.fetch_text(
loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"])
)
return fieldtype.load(val, baseuri, loadingOptions)
save_type = Optional[
Union[MutableMapping[str, Any], MutableSequence[Any], int, float, bool, str]
]
def save(
val: Any,
top: bool = True,
base_url: str = "",
relative_uris: bool = True,
) -> save_type:
if isinstance(val, Saveable):
return val.save(top=top, base_url=base_url, relative_uris=relative_uris)
if isinstance(val, MutableSequence):
return [
save(v, top=False, base_url=base_url, relative_uris=relative_uris)
for v in val
]
if isinstance(val, MutableMapping):
newdict = {}
for key in val:
newdict[key] = save(
val[key], top=False, base_url=base_url, relative_uris=relative_uris
)
return newdict
if val is None or isinstance(val, (int, float, bool, str)):
return val
raise Exception("Not Saveable: %s" % type(val))
def save_with_metadata(
val: Any,
valLoadingOpts: LoadingOptions,
top: bool = True,
base_url: str = "",
relative_uris: bool = True,
) -> save_type:
"""Save and set $namespaces, $schemas, $base and any other metadata fields at the top level."""
saved_val = save(val, top, base_url, relative_uris)
newdict: MutableMapping[str, Any] = {}
if isinstance(saved_val, MutableSequence):
newdict = {"$graph": saved_val}
elif isinstance(saved_val, MutableMapping):
newdict = saved_val
if valLoadingOpts.namespaces:
newdict["$namespaces"] = valLoadingOpts.namespaces
if valLoadingOpts.schemas:
newdict["$schemas"] = valLoadingOpts.schemas
if valLoadingOpts.baseuri:
newdict["$base"] = valLoadingOpts.baseuri
for k, v in valLoadingOpts.addl_metadata.items():
if k not in newdict:
newdict[k] = v
return newdict
def expand_url(
url, # type: str
base_url, # type: str
loadingOptions, # type: LoadingOptions
scoped_id=False, # type: bool
vocab_term=False, # type: bool
scoped_ref=None, # type: Optional[int]
):
# type: (...) -> str
if url in ("@id", "@type"):
return url
if vocab_term and url in loadingOptions.vocab:
return url
if bool(loadingOptions.vocab) and ":" in url:
prefix = url.split(":")[0]
if prefix in loadingOptions.vocab:
url = loadingOptions.vocab[prefix] + url[len(prefix) + 1 :]
split = urlsplit(url)
if (
(bool(split.scheme) and split.scheme in ["http", "https", "file"])
or url.startswith("$(")
or url.startswith("${")
):
pass
elif scoped_id and not bool(split.fragment):
splitbase = urlsplit(base_url)
frg = ""
if bool(splitbase.fragment):
frg = splitbase.fragment + "/" + split.path
else:
frg = split.path
pt = splitbase.path if splitbase.path != "" else "/"
url = urlunsplit((splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg))
elif scoped_ref is not None and not bool(split.fragment):
splitbase = urlsplit(base_url)
sp = splitbase.fragment.split("/")
n = scoped_ref
while n > 0 and len(sp) > 0:
sp.pop()
n -= 1
sp.append(url)
url = urlunsplit(
(
splitbase.scheme,
splitbase.netloc,
splitbase.path,
splitbase.query,
"/".join(sp),
)
)
else:
url = loadingOptions.fetcher.urljoin(base_url, url)
if vocab_term:
split = urlsplit(url)
if bool(split.scheme):
if url in loadingOptions.rvocab:
return loadingOptions.rvocab[url]
else:
raise ValidationException(f"Term '{url}' not in vocabulary")
return url
class _Loader:
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
pass
class _AnyLoader(_Loader):
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
if doc is not None:
return doc
raise ValidationException("Expected non-null")
class _PrimitiveLoader(_Loader):
def __init__(self, tp):
# type: (Union[type, Tuple[Type[str], Type[str]]]) -> None
self.tp = tp
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
if not isinstance(doc, self.tp):
raise ValidationException(
"Expected a {} but got {}".format(
self.tp.__class__.__name__, doc.__class__.__name__
)
)
return doc
def __repr__(self): # type: () -> str
return str(self.tp)
class _ArrayLoader(_Loader):
def __init__(self, items):
# type: (_Loader) -> None
self.items = items
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
if not isinstance(doc, MutableSequence):
raise ValidationException("Expected a list, was {}".format(type(doc)))
r = [] # type: List[Any]
errors = [] # type: List[SchemaSaladException]
for i in range(0, len(doc)):
try:
lf = load_field(
doc[i], _UnionLoader((self, self.items)), baseuri, loadingOptions
)
if isinstance(lf, MutableSequence):
r.extend(lf)
else:
r.append(lf)
except ValidationException as e:
errors.append(e.with_sourceline(SourceLine(doc, i, str)))
if errors:
raise ValidationException("", None, errors)
return r
def __repr__(self): # type: () -> str
return f"array<{self.items}>"
class _EnumLoader(_Loader):
def __init__(self, symbols, name):
# type: (Sequence[str], str) -> None
self.symbols = symbols
self.name = name
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
if doc in self.symbols:
return doc
else:
raise ValidationException(f"Expected one of {self.symbols}")
def __repr__(self): # type: () -> str
return self.name
class _SecondaryDSLLoader(_Loader):
def __init__(self, inner):
# type: (_Loader) -> None
self.inner = inner
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
r: List[Dict[str, Any]] = []
if isinstance(doc, MutableSequence):
for d in doc:
if isinstance(d, str):
if d.endswith("?"):
r.append({"pattern": d[:-1], "required": False})
else:
r.append({"pattern": d})
elif isinstance(d, dict):
new_dict: Dict[str, Any] = {}
if "pattern" in d:
new_dict["pattern"] = d.pop("pattern")
else:
raise ValidationException(
"Missing pattern in secondaryFiles specification entry: {}".format(
d
)
)
new_dict["required"] = (
d.pop("required") if "required" in d else None
)
if len(d):
raise ValidationException(
"Unallowed values in secondaryFiles specification entry: {}".format(
d
)
)
r.append(new_dict)
else:
raise ValidationException(
"Expected a string or sequence of (strings or mappings)."
)
elif isinstance(doc, MutableMapping):
new_dict = {}
if "pattern" in doc:
new_dict["pattern"] = doc.pop("pattern")
else:
raise ValidationException(
"Missing pattern in secondaryFiles specification entry: {}".format(
doc
)
)
new_dict["required"] = doc.pop("required") if "required" in doc else None
if len(doc):
raise ValidationException(
"Unallowed values in secondaryFiles specification entry: {}".format(
doc
)
)
r.append(new_dict)
elif isinstance(doc, str):
if doc.endswith("?"):
r.append({"pattern": doc[:-1], "required": False})
else:
r.append({"pattern": doc})
else:
raise ValidationException("Expected str or sequence of str")
return self.inner.load(r, baseuri, loadingOptions, docRoot)
class _RecordLoader(_Loader):
def __init__(self, classtype):
# type: (Type[Saveable]) -> None
self.classtype = classtype
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
if not isinstance(doc, MutableMapping):
raise ValidationException("Expected a dict, was {}".format(type(doc)))
return self.classtype.fromDoc(doc, baseuri, loadingOptions, docRoot=docRoot)
def __repr__(self): # type: () -> str
return str(self.classtype.__name__)
class _ExpressionLoader(_Loader):
def __init__(self, items: Type[str]) -> None:
self.items = items
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
if not isinstance(doc, str):
raise ValidationException("Expected a str, was {}".format(type(doc)))
return doc
class _UnionLoader(_Loader):
def __init__(self, alternates):
# type: (Sequence[_Loader]) -> None
self.alternates = alternates
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
errors = []
for t in self.alternates:
try:
return t.load(doc, baseuri, loadingOptions, docRoot=docRoot)
except ValidationException as e:
errors.append(ValidationException(f"tried {t} but", None, [e]))
raise ValidationException("", None, errors, "-")
def __repr__(self): # type: () -> str
return " | ".join(str(a) for a in self.alternates)
class _URILoader(_Loader):
def __init__(self, inner, scoped_id, vocab_term, scoped_ref):
# type: (_Loader, bool, bool, Union[int, None]) -> None
self.inner = inner
self.scoped_id = scoped_id
self.vocab_term = vocab_term
self.scoped_ref = scoped_ref
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
if isinstance(doc, MutableSequence):
newdoc = []
for i in doc:
if isinstance(i, str):
newdoc.append(
expand_url(
i,
baseuri,
loadingOptions,
self.scoped_id,
self.vocab_term,
self.scoped_ref,
)
)
else:
newdoc.append(i)
doc = newdoc
elif isinstance(doc, str):
doc = expand_url(
doc,
baseuri,
loadingOptions,
self.scoped_id,
self.vocab_term,
self.scoped_ref,
)
return self.inner.load(doc, baseuri, loadingOptions)
class _TypeDSLLoader(_Loader):
typeDSLregex = re.compile(r"^([^[?]+)(\[\])?(\?)?$")
def __init__(self, inner, refScope):
# type: (_Loader, Union[int, None]) -> None
self.inner = inner
self.refScope = refScope
def resolve(
self,
doc, # type: str
baseuri, # type: str
loadingOptions, # type: LoadingOptions
):
# type: (...) -> Union[List[Union[Dict[str, str], str]], Dict[str, str], str]
m = self.typeDSLregex.match(doc)
if m:
group1 = m.group(1)
assert group1 is not None # nosec
first = expand_url(
group1, baseuri, loadingOptions, False, True, self.refScope
)
second = third = None
if bool(m.group(2)):
second = {"type": "array", "items": first}
# second = CommentedMap((("type", "array"),
# ("items", first)))
# second.lc.add_kv_line_col("type", lc)
# second.lc.add_kv_line_col("items", lc)
# second.lc.filename = filename
if bool(m.group(3)):
third = ["null", second or first]
# third = CommentedSeq(["null", second or first])
# third.lc.add_kv_line_col(0, lc)
# third.lc.add_kv_line_col(1, lc)
# third.lc.filename = filename
return third or second or first
return doc
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
if isinstance(doc, MutableSequence):
r = [] # type: List[Any]
for d in doc:
if isinstance(d, str):
resolved = self.resolve(d, baseuri, loadingOptions)
if isinstance(resolved, MutableSequence):
for i in resolved:
if i not in r:
r.append(i)
else:
if resolved not in r:
r.append(resolved)
else:
r.append(d)
doc = r
elif isinstance(doc, str):
doc = self.resolve(doc, baseuri, loadingOptions)
return self.inner.load(doc, baseuri, loadingOptions)
class _IdMapLoader(_Loader):
def __init__(self, inner, mapSubject, mapPredicate):
# type: (_Loader, str, Union[str, None]) -> None
self.inner = inner
self.mapSubject = mapSubject
self.mapPredicate = mapPredicate
def load(self, doc, baseuri, loadingOptions, docRoot=None):
# type: (Any, str, LoadingOptions, Optional[str]) -> Any
if isinstance(doc, MutableMapping):
r = [] # type: List[Any]
for k in sorted(doc.keys()):
val = doc[k]
if isinstance(val, CommentedMap):
v = copy.copy(val)
v.lc.data = val.lc.data
v.lc.filename = val.lc.filename
v[self.mapSubject] = k
r.append(v)
elif isinstance(val, MutableMapping):
v2 = copy.copy(val)
v2[self.mapSubject] = k
r.append(v2)
else:
if self.mapPredicate:
v3 = {self.mapPredicate: val}
v3[self.mapSubject] = k
r.append(v3)
else:
raise ValidationException("No mapPredicate")
doc = r
return self.inner.load(doc, baseuri, loadingOptions)
def _document_load(
loader: _Loader,
doc: Union[str, MutableMapping[str, Any], MutableSequence[Any]],
baseuri: str,
loadingOptions: LoadingOptions,
addl_metadata_fields: Optional[MutableSequence[str]] = None,
) -> Tuple[Any, LoadingOptions]:
if isinstance(doc, str):
return _document_load_by_url(
loader,
loadingOptions.fetcher.urljoin(baseuri, doc),
loadingOptions,
addl_metadata_fields=addl_metadata_fields,
)
if isinstance(doc, MutableMapping):
addl_metadata = {}
if addl_metadata_fields is not None:
for mf in addl_metadata_fields:
if mf in doc:
addl_metadata[mf] = doc[mf]
docuri = baseuri
if "$base" in doc:
baseuri = doc["$base"]
loadingOptions = LoadingOptions(
copyfrom=loadingOptions,
namespaces=doc.get("$namespaces", None),
schemas=doc.get("$schemas", None),
baseuri=doc.get("$base", None),
addl_metadata=addl_metadata,
)
doc = {
k: v
for k, v in doc.items()
if k not in ("$namespaces", "$schemas", "$base")
}
if "$graph" in doc:
loadingOptions.idx[baseuri] = (
loader.load(doc["$graph"], baseuri, loadingOptions),
loadingOptions,
)
else:
loadingOptions.idx[baseuri] = (
loader.load(doc, baseuri, loadingOptions, docRoot=baseuri),
loadingOptions,
)
if docuri != baseuri:
loadingOptions.idx[docuri] = loadingOptions.idx[baseuri]
return loadingOptions.idx[baseuri]
if isinstance(doc, MutableSequence):
loadingOptions.idx[baseuri] = (
loader.load(doc, baseuri, loadingOptions),
loadingOptions,
)
return loadingOptions.idx[baseuri]
raise ValidationException(
"Expected URI string, MutableMapping or MutableSequence, got %s" % type(doc)
)
def _document_load_by_url(
loader: _Loader,
url: str,
loadingOptions: LoadingOptions,
addl_metadata_fields: Optional[MutableSequence[str]] = None,
) -> Tuple[Any, LoadingOptions]:
if url in loadingOptions.idx:
return loadingOptions.idx[url]
doc_url, frg = urldefrag(url)
text = loadingOptions.fetcher.fetch_text(doc_url)
if isinstance(text, bytes):
textIO = StringIO(text.decode("utf-8"))
else:
textIO = StringIO(text)
textIO.name = str(doc_url)
yaml = yaml_no_ts()
result = yaml.load(textIO)
add_lc_filename(result, doc_url)
loadingOptions = LoadingOptions(copyfrom=loadingOptions, fileuri=doc_url)
_document_load(
loader,
result,
doc_url,
loadingOptions,
addl_metadata_fields=addl_metadata_fields,
)
return loadingOptions.idx[url]
def file_uri(path, split_frag=False): # type: (str, bool) -> str
if path.startswith("file://"):
return path
if split_frag:
pathsp = path.split("#", 2)
frag = "#" + quote(str(pathsp[1])) if len(pathsp) == 2 else ""
urlpath = pathname2url(str(pathsp[0]))
else:
urlpath = pathname2url(path)
frag = ""
if urlpath.startswith("//"):
return f"file:{urlpath}{frag}"
else:
return f"file://{urlpath}{frag}"
def prefix_url(url: str, namespaces: Dict[str, str]) -> str:
"""Expand short forms into full URLs using the given namespace dictionary."""
for k, v in namespaces.items():
if url.startswith(v):
return k + ":" + url[len(v) :]
return url
def save_relative_uri(
uri: Any,
base_url: str,
scoped_id: bool,
ref_scope: Optional[int],
relative_uris: bool,
) -> Any:
"""Convert any URI to a relative one, obeying the scoping rules."""
if isinstance(uri, MutableSequence):
return [
save_relative_uri(u, base_url, scoped_id, ref_scope, relative_uris)
for u in uri
]
elif isinstance(uri, str):
if not relative_uris or uri == base_url:
return uri
urisplit = urlsplit(uri)
basesplit = urlsplit(base_url)
if urisplit.scheme == basesplit.scheme and urisplit.netloc == basesplit.netloc:
if urisplit.path != basesplit.path:
p = os.path.relpath(urisplit.path, os.path.dirname(basesplit.path))
if urisplit.fragment:
p = p + "#" + urisplit.fragment
return p
basefrag = basesplit.fragment + "/"
if ref_scope:
sp = basefrag.split("/")
i = 0
while i < ref_scope:
sp.pop()
i += 1
basefrag = "/".join(sp)
if urisplit.fragment.startswith(basefrag):
return urisplit.fragment[len(basefrag) :]
else:
return urisplit.fragment
return uri
else:
return save(uri, top=False, base_url=base_url, relative_uris=relative_uris)
def shortname(inputid: str) -> str:
"""
Compute the shortname of a fully qualified identifier.
See https://w3id.org/cwl/v1.2/SchemaSalad.html#Short_names.
"""
parsed_id = urlparse(inputid)
if parsed_id.fragment:
return parsed_id.fragment.split("/")[-1]
return parsed_id.path.split("/")[-1]
def parser_info() -> str:
return "uk.ac.ebi.metagenomics.mgnify.pipeline-v5.workflows.raw-reads-wf--v.5-cond.inputs"
class File(Saveable):
def __init__(
self,
location: Any,
path: Any,
basename: Any,
dirname: Any,
nameroot: Any,
nameext: Any,
checksum: Any,
size: Any,
secondaryFiles: Any,
format: Any,
contents: Any,
extension_fields: Optional[Dict[str, Any]] = None,
loadingOptions: Optional[LoadingOptions] = None,
) -> None:
if extension_fields:
self.extension_fields = extension_fields
else:
self.extension_fields = CommentedMap()
if loadingOptions:
self.loadingOptions = loadingOptions
else:
self.loadingOptions = LoadingOptions()
self.class_ = "File"
self.location = location
self.path = path
self.basename = basename
self.dirname = dirname
self.nameroot = nameroot
self.nameext = nameext
self.checksum = checksum
self.size = size
self.secondaryFiles = secondaryFiles
self.format = format
self.contents = contents
def __eq__(self, other: Any) -> bool:
if isinstance(other, File):
return bool(
self.class_ == other.class_
and self.location == other.location
and self.path == other.path
and self.basename == other.basename
and self.dirname == other.dirname
and self.nameroot == other.nameroot
and self.nameext == other.nameext
and self.checksum == other.checksum
and self.size == other.size
and self.secondaryFiles == other.secondaryFiles
and self.format == other.format
and self.contents == other.contents
)
return False
def __hash__(self) -> int:
return hash(
(
self.class_,
self.location,
self.path,
self.basename,
self.dirname,
self.nameroot,
self.nameext,
self.checksum,
self.size,
self.secondaryFiles,
self.format,
self.contents,
)
)
@classmethod
def fromDoc(
cls,
doc: Any,
baseuri: str,
loadingOptions: LoadingOptions,
docRoot: Optional[str] = None,
) -> "File":
_doc = copy.copy(doc)
if hasattr(doc, "lc"):
_doc.lc.data = doc.lc.data
_doc.lc.filename = doc.lc.filename
_errors__ = []
if _doc.get("class") != "File":
raise ValidationException("Not a File")
try:
location = load_field(
_doc.get("location"),
uri_union_of_None_type_or_strtype_False_True_None,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `location` field is not valid because:",
SourceLine(_doc, "location", str),
[e],
)
)
try:
path = load_field(
_doc.get("path"),
uri_union_of_None_type_or_strtype_False_False_None,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `path` field is not valid because:",
SourceLine(_doc, "path", str),
[e],
)
)
try:
basename = load_field(
_doc.get("basename"),
uri_union_of_None_type_or_strtype_False_False_None,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `basename` field is not valid because:",
SourceLine(_doc, "basename", str),
[e],
)
)
try:
dirname = load_field(
_doc.get("dirname"),
union_of_None_type_or_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `dirname` field is not valid because:",
SourceLine(_doc, "dirname", str),
[e],
)
)
try:
nameroot = load_field(
_doc.get("nameroot"),
union_of_None_type_or_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `nameroot` field is not valid because:",
SourceLine(_doc, "nameroot", str),
[e],
)
)
try:
nameext = load_field(
_doc.get("nameext"),
union_of_None_type_or_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `nameext` field is not valid because:",
SourceLine(_doc, "nameext", str),
[e],
)
)
try:
checksum = load_field(
_doc.get("checksum"),
union_of_None_type_or_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `checksum` field is not valid because:",
SourceLine(_doc, "checksum", str),
[e],
)
)
try:
size = load_field(
_doc.get("size"),
union_of_None_type_or_inttype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `size` field is not valid because:",
SourceLine(_doc, "size", str),
[e],
)
)
try:
secondaryFiles = load_field(
_doc.get("secondaryFiles"),
secondaryfilesdsl_union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `secondaryFiles` field is not valid because:",
SourceLine(_doc, "secondaryFiles", str),
[e],
)
)
try:
format = load_field(
_doc.get("format"),
uri_union_of_None_type_or_strtype_True_False_None,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `format` field is not valid because:",
SourceLine(_doc, "format", str),
[e],
)
)
try:
contents = load_field(
_doc.get("contents"),
union_of_None_type_or_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `contents` field is not valid because:",
SourceLine(_doc, "contents", str),
[e],
)
)
extension_fields: Dict[str, Any] = {}
for k in _doc.keys():
if k not in cls.attrs:
if ":" in k:
ex = expand_url(
k, "", loadingOptions, scoped_id=False, vocab_term=False
)
extension_fields[ex] = _doc[k]
else:
_errors__.append(
ValidationException(
"invalid field `{}`, expected one of: `class`, `location`, `path`, `basename`, `dirname`, `nameroot`, `nameext`, `checksum`, `size`, `secondaryFiles`, `format`, `contents`".format(
k
),
SourceLine(_doc, k, str),
)
)
break
if _errors__:
raise ValidationException("Trying 'File'", None, _errors__)
_constructed = cls(
location=location,
path=path,
basename=basename,
dirname=dirname,
nameroot=nameroot,
nameext=nameext,
checksum=checksum,
size=size,
secondaryFiles=secondaryFiles,
format=format,
contents=contents,
extension_fields=extension_fields,
loadingOptions=loadingOptions,
)
return _constructed
def save(
self, top: bool = False, base_url: str = "", relative_uris: bool = True
) -> Dict[str, Any]:
r: Dict[str, Any] = {}
if relative_uris:
for ef in self.extension_fields:
r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef]
else:
for ef in self.extension_fields:
r[ef] = self.extension_fields[ef]
r["class"] = "File"
if self.location is not None:
u = save_relative_uri(self.location, base_url, False, None, relative_uris)
r["location"] = u
if self.path is not None:
u = save_relative_uri(self.path, base_url, False, None, relative_uris)
r["path"] = u
if self.basename is not None:
u = save_relative_uri(self.basename, base_url, False, None, relative_uris)
r["basename"] = u
if self.dirname is not None:
r["dirname"] = save(
self.dirname, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.nameroot is not None:
r["nameroot"] = save(
self.nameroot, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.nameext is not None:
r["nameext"] = save(
self.nameext, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.checksum is not None:
r["checksum"] = save(
self.checksum, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.size is not None:
r["size"] = save(
self.size, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.secondaryFiles is not None:
r["secondaryFiles"] = save(
self.secondaryFiles,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.format is not None:
u = save_relative_uri(self.format, base_url, True, None, relative_uris)
r["format"] = u
if self.contents is not None:
r["contents"] = save(
self.contents, top=False, base_url=base_url, relative_uris=relative_uris
)
# top refers to the directory level
if top:
if self.loadingOptions.namespaces:
r["$namespaces"] = self.loadingOptions.namespaces
if self.loadingOptions.schemas:
r["$schemas"] = self.loadingOptions.schemas
return r
attrs = frozenset(
[
"class",
"location",
"path",
"basename",
"dirname",
"nameroot",
"nameext",
"checksum",
"size",
"secondaryFiles",
"format",
"contents",
]
)
class Directory(Saveable):
def __init__(
self,
location: Any,
path: Any,
basename: Any,
listing: Any,
extension_fields: Optional[Dict[str, Any]] = None,
loadingOptions: Optional[LoadingOptions] = None,
) -> None:
if extension_fields:
self.extension_fields = extension_fields
else:
self.extension_fields = CommentedMap()
if loadingOptions:
self.loadingOptions = loadingOptions
else:
self.loadingOptions = LoadingOptions()
self.class_ = "Directory"
self.location = location
self.path = path
self.basename = basename
self.listing = listing
def __eq__(self, other: Any) -> bool:
if isinstance(other, Directory):
return bool(
self.class_ == other.class_
and self.location == other.location
and self.path == other.path
and self.basename == other.basename
and self.listing == other.listing
)
return False
def __hash__(self) -> int:
return hash(
(self.class_, self.location, self.path, self.basename, self.listing)
)
@classmethod
def fromDoc(
cls,
doc: Any,
baseuri: str,
loadingOptions: LoadingOptions,
docRoot: Optional[str] = None,
) -> "Directory":
_doc = copy.copy(doc)
if hasattr(doc, "lc"):
_doc.lc.data = doc.lc.data
_doc.lc.filename = doc.lc.filename
_errors__ = []
if _doc.get("class") != "Directory":
raise ValidationException("Not a Directory")
try:
location = load_field(
_doc.get("location"),
uri_union_of_None_type_or_strtype_False_True_None,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `location` field is not valid because:",
SourceLine(_doc, "location", str),
[e],
)
)
try:
path = load_field(
_doc.get("path"),
uri_union_of_None_type_or_strtype_False_False_None,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `path` field is not valid because:",
SourceLine(_doc, "path", str),
[e],
)
)
try:
basename = load_field(
_doc.get("basename"),
uri_union_of_None_type_or_strtype_False_False_None,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `basename` field is not valid because:",
SourceLine(_doc, "basename", str),
[e],
)
)
try:
listing = load_field(
_doc.get("listing"),
union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `listing` field is not valid because:",
SourceLine(_doc, "listing", str),
[e],
)
)
extension_fields: Dict[str, Any] = {}
for k in _doc.keys():
if k not in cls.attrs:
if ":" in k:
ex = expand_url(
k, "", loadingOptions, scoped_id=False, vocab_term=False
)
extension_fields[ex] = _doc[k]
else:
_errors__.append(
ValidationException(
"invalid field `{}`, expected one of: `class`, `location`, `path`, `basename`, `listing`".format(
k
),
SourceLine(_doc, k, str),
)
)
break
if _errors__:
raise ValidationException("Trying 'Directory'", None, _errors__)
_constructed = cls(
location=location,
path=path,
basename=basename,
listing=listing,
extension_fields=extension_fields,
loadingOptions=loadingOptions,
)
return _constructed
def save(
self, top: bool = False, base_url: str = "", relative_uris: bool = True
) -> Dict[str, Any]:
r: Dict[str, Any] = {}
if relative_uris:
for ef in self.extension_fields:
r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef]
else:
for ef in self.extension_fields:
r[ef] = self.extension_fields[ef]
r["class"] = "Directory"
if self.location is not None:
u = save_relative_uri(self.location, base_url, False, None, relative_uris)
r["location"] = u
if self.path is not None:
u = save_relative_uri(self.path, base_url, False, None, relative_uris)
r["path"] = u
if self.basename is not None:
u = save_relative_uri(self.basename, base_url, False, None, relative_uris)
r["basename"] = u
if self.listing is not None:
r["listing"] = save(
self.listing, top=False, base_url=base_url, relative_uris=relative_uris
)
# top refers to the directory level
if top:
if self.loadingOptions.namespaces:
r["$namespaces"] = self.loadingOptions.namespaces
if self.loadingOptions.schemas:
r["$schemas"] = self.loadingOptions.schemas
return r
attrs = frozenset(["class", "location", "path", "basename", "listing"])
class inputs_record_schema(Saveable):
def __init__(
self,
_5_8s_pattern: Any,
_5s_pattern: Any,
CGC_config: Any,
CGC_postfixes: Any,
EggNOG_data_dir: Any,
EggNOG_db: Any,
EggNOG_diamond_db: Any,
HMM_gathering_bit_score: Any,
HMM_name_database: Any,
HMM_omit_alignment: Any,
InterProScan_applications: Any,
InterProScan_databases: Any,
InterProScan_outputFormat: Any,
cgc_chunk_size: Any,
forward_reads: Any,
func_ann_names_hmmer: Any,
func_ann_names_ips: Any,
go_config: Any,
hmmsearch_header: Any,
ips_header: Any,
ko_file: Any,
lsu_db: Any,
lsu_label: Any,
lsu_otus: Any,
lsu_tax: Any,
other_ncRNA_models: Any,
protein_chunk_size_IPS: Any,
protein_chunk_size_hmm: Any,
qc_min_length: Any,
reverse_reads: Any,
rfam_model_clans: Any,
rfam_models: Any,
single_reads: Any,
ssu_db: Any,
ssu_label: Any,
ssu_otus: Any,
ssu_tax: Any,
extension_fields: Optional[Dict[str, Any]] = None,
loadingOptions: Optional[LoadingOptions] = None,
) -> None:
if extension_fields:
self.extension_fields = extension_fields
else:
self.extension_fields = CommentedMap()
if loadingOptions:
self.loadingOptions = loadingOptions
else:
self.loadingOptions = LoadingOptions()
self._5_8s_pattern = _5_8s_pattern
self._5s_pattern = _5s_pattern
self.CGC_config = CGC_config
self.CGC_postfixes = CGC_postfixes
self.EggNOG_data_dir = EggNOG_data_dir
self.EggNOG_db = EggNOG_db
self.EggNOG_diamond_db = EggNOG_diamond_db
self.HMM_gathering_bit_score = HMM_gathering_bit_score
self.HMM_name_database = HMM_name_database
self.HMM_omit_alignment = HMM_omit_alignment
self.InterProScan_applications = InterProScan_applications
self.InterProScan_databases = InterProScan_databases
self.InterProScan_outputFormat = InterProScan_outputFormat
self.cgc_chunk_size = cgc_chunk_size
self.forward_reads = forward_reads
self.func_ann_names_hmmer = func_ann_names_hmmer
self.func_ann_names_ips = func_ann_names_ips
self.go_config = go_config
self.hmmsearch_header = hmmsearch_header
self.ips_header = ips_header
self.ko_file = ko_file
self.lsu_db = lsu_db
self.lsu_label = lsu_label
self.lsu_otus = lsu_otus
self.lsu_tax = lsu_tax
self.other_ncRNA_models = other_ncRNA_models
self.protein_chunk_size_IPS = protein_chunk_size_IPS
self.protein_chunk_size_hmm = protein_chunk_size_hmm
self.qc_min_length = qc_min_length
self.reverse_reads = reverse_reads
self.rfam_model_clans = rfam_model_clans
self.rfam_models = rfam_models
self.single_reads = single_reads
self.ssu_db = ssu_db
self.ssu_label = ssu_label
self.ssu_otus = ssu_otus
self.ssu_tax = ssu_tax
def __eq__(self, other: Any) -> bool:
if isinstance(other, inputs_record_schema):
return bool(
self._5_8s_pattern == other._5_8s_pattern
and self._5s_pattern == other._5s_pattern
and self.CGC_config == other.CGC_config
and self.CGC_postfixes == other.CGC_postfixes
and self.EggNOG_data_dir == other.EggNOG_data_dir
and self.EggNOG_db == other.EggNOG_db
and self.EggNOG_diamond_db == other.EggNOG_diamond_db
and self.HMM_gathering_bit_score == other.HMM_gathering_bit_score
and self.HMM_name_database == other.HMM_name_database
and self.HMM_omit_alignment == other.HMM_omit_alignment
and self.InterProScan_applications == other.InterProScan_applications
and self.InterProScan_databases == other.InterProScan_databases
and self.InterProScan_outputFormat == other.InterProScan_outputFormat
and self.cgc_chunk_size == other.cgc_chunk_size
and self.forward_reads == other.forward_reads
and self.func_ann_names_hmmer == other.func_ann_names_hmmer
and self.func_ann_names_ips == other.func_ann_names_ips
and self.go_config == other.go_config
and self.hmmsearch_header == other.hmmsearch_header
and self.ips_header == other.ips_header
and self.ko_file == other.ko_file
and self.lsu_db == other.lsu_db
and self.lsu_label == other.lsu_label
and self.lsu_otus == other.lsu_otus
and self.lsu_tax == other.lsu_tax
and self.other_ncRNA_models == other.other_ncRNA_models
and self.protein_chunk_size_IPS == other.protein_chunk_size_IPS
and self.protein_chunk_size_hmm == other.protein_chunk_size_hmm
and self.qc_min_length == other.qc_min_length
and self.reverse_reads == other.reverse_reads
and self.rfam_model_clans == other.rfam_model_clans
and self.rfam_models == other.rfam_models
and self.single_reads == other.single_reads
and self.ssu_db == other.ssu_db
and self.ssu_label == other.ssu_label
and self.ssu_otus == other.ssu_otus
and self.ssu_tax == other.ssu_tax
)
return False
def __hash__(self) -> int:
return hash(
(
self._5_8s_pattern,
self._5s_pattern,
self.CGC_config,
self.CGC_postfixes,
self.EggNOG_data_dir,
self.EggNOG_db,
self.EggNOG_diamond_db,
self.HMM_gathering_bit_score,
self.HMM_name_database,
self.HMM_omit_alignment,
self.InterProScan_applications,
self.InterProScan_databases,
self.InterProScan_outputFormat,
self.cgc_chunk_size,
self.forward_reads,
self.func_ann_names_hmmer,
self.func_ann_names_ips,
self.go_config,
self.hmmsearch_header,
self.ips_header,
self.ko_file,
self.lsu_db,
self.lsu_label,
self.lsu_otus,
self.lsu_tax,
self.other_ncRNA_models,
self.protein_chunk_size_IPS,
self.protein_chunk_size_hmm,
self.qc_min_length,
self.reverse_reads,
self.rfam_model_clans,
self.rfam_models,
self.single_reads,
self.ssu_db,
self.ssu_label,
self.ssu_otus,
self.ssu_tax,
)
)
@classmethod
def fromDoc(
cls,
doc: Any,
baseuri: str,
loadingOptions: LoadingOptions,
docRoot: Optional[str] = None,
) -> "inputs_record_schema":
_doc = copy.copy(doc)
if hasattr(doc, "lc"):
_doc.lc.data = doc.lc.data
_doc.lc.filename = doc.lc.filename
_errors__ = []
try:
_5_8s_pattern = load_field(
_doc.get("5.8s_pattern"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `5.8s_pattern` field is not valid because:",
SourceLine(_doc, "5.8s_pattern", str),
[e],
)
)
try:
_5s_pattern = load_field(
_doc.get("5s_pattern"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `5s_pattern` field is not valid because:",
SourceLine(_doc, "5s_pattern", str),
[e],
)
)
try:
CGC_config = load_field(
_doc.get("CGC_config"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `CGC_config` field is not valid because:",
SourceLine(_doc, "CGC_config", str),
[e],
)
)
try:
CGC_postfixes = load_field(
_doc.get("CGC_postfixes"),
array_of_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `CGC_postfixes` field is not valid because:",
SourceLine(_doc, "CGC_postfixes", str),
[e],
)
)
try:
EggNOG_data_dir = load_field(
_doc.get("EggNOG_data_dir"),
union_of_None_type_or_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `EggNOG_data_dir` field is not valid because:",
SourceLine(_doc, "EggNOG_data_dir", str),
[e],
)
)
try:
EggNOG_db = load_field(
_doc.get("EggNOG_db"),
union_of_None_type_or_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `EggNOG_db` field is not valid because:",
SourceLine(_doc, "EggNOG_db", str),
[e],
)
)
try:
EggNOG_diamond_db = load_field(
_doc.get("EggNOG_diamond_db"),
union_of_None_type_or_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `EggNOG_diamond_db` field is not valid because:",
SourceLine(_doc, "EggNOG_diamond_db", str),
[e],
)
)
try:
HMM_gathering_bit_score = load_field(
_doc.get("HMM_gathering_bit_score"),
booltype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `HMM_gathering_bit_score` field is not valid because:",
SourceLine(_doc, "HMM_gathering_bit_score", str),
[e],
)
)
try:
HMM_name_database = load_field(
_doc.get("HMM_name_database"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `HMM_name_database` field is not valid because:",
SourceLine(_doc, "HMM_name_database", str),
[e],
)
)
try:
HMM_omit_alignment = load_field(
_doc.get("HMM_omit_alignment"),
booltype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `HMM_omit_alignment` field is not valid because:",
SourceLine(_doc, "HMM_omit_alignment", str),
[e],
)
)
try:
InterProScan_applications = load_field(
_doc.get("InterProScan_applications"),
array_of_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `InterProScan_applications` field is not valid because:",
SourceLine(_doc, "InterProScan_applications", str),
[e],
)
)
try:
InterProScan_databases = load_field(
_doc.get("InterProScan_databases"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `InterProScan_databases` field is not valid because:",
SourceLine(_doc, "InterProScan_databases", str),
[e],
)
)
try:
InterProScan_outputFormat = load_field(
_doc.get("InterProScan_outputFormat"),
array_of_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `InterProScan_outputFormat` field is not valid because:",
SourceLine(_doc, "InterProScan_outputFormat", str),
[e],
)
)
try:
cgc_chunk_size = load_field(
_doc.get("cgc_chunk_size"),
inttype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `cgc_chunk_size` field is not valid because:",
SourceLine(_doc, "cgc_chunk_size", str),
[e],
)
)
try:
forward_reads = load_field(
_doc.get("forward_reads"),
union_of_None_type_or_FileLoader,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `forward_reads` field is not valid because:",
SourceLine(_doc, "forward_reads", str),
[e],
)
)
try:
func_ann_names_hmmer = load_field(
_doc.get("func_ann_names_hmmer"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `func_ann_names_hmmer` field is not valid because:",
SourceLine(_doc, "func_ann_names_hmmer", str),
[e],
)
)
try:
func_ann_names_ips = load_field(
_doc.get("func_ann_names_ips"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `func_ann_names_ips` field is not valid because:",
SourceLine(_doc, "func_ann_names_ips", str),
[e],
)
)
try:
go_config = load_field(
_doc.get("go_config"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `go_config` field is not valid because:",
SourceLine(_doc, "go_config", str),
[e],
)
)
try:
hmmsearch_header = load_field(
_doc.get("hmmsearch_header"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `hmmsearch_header` field is not valid because:",
SourceLine(_doc, "hmmsearch_header", str),
[e],
)
)
try:
ips_header = load_field(
_doc.get("ips_header"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `ips_header` field is not valid because:",
SourceLine(_doc, "ips_header", str),
[e],
)
)
try:
ko_file = load_field(
_doc.get("ko_file"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `ko_file` field is not valid because:",
SourceLine(_doc, "ko_file", str),
[e],
)
)
try:
lsu_db = load_field(
_doc.get("lsu_db"),
FileLoader,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `lsu_db` field is not valid because:",
SourceLine(_doc, "lsu_db", str),
[e],
)
)
try:
lsu_label = load_field(
_doc.get("lsu_label"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `lsu_label` field is not valid because:",
SourceLine(_doc, "lsu_label", str),
[e],
)
)
try:
lsu_otus = load_field(
_doc.get("lsu_otus"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `lsu_otus` field is not valid because:",
SourceLine(_doc, "lsu_otus", str),
[e],
)
)
try:
lsu_tax = load_field(
_doc.get("lsu_tax"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `lsu_tax` field is not valid because:",
SourceLine(_doc, "lsu_tax", str),
[e],
)
)
try:
other_ncRNA_models = load_field(
_doc.get("other_ncRNA_models"),
array_of_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `other_ncRNA_models` field is not valid because:",
SourceLine(_doc, "other_ncRNA_models", str),
[e],
)
)
try:
protein_chunk_size_IPS = load_field(
_doc.get("protein_chunk_size_IPS"),
inttype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `protein_chunk_size_IPS` field is not valid because:",
SourceLine(_doc, "protein_chunk_size_IPS", str),
[e],
)
)
try:
protein_chunk_size_hmm = load_field(
_doc.get("protein_chunk_size_hmm"),
inttype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `protein_chunk_size_hmm` field is not valid because:",
SourceLine(_doc, "protein_chunk_size_hmm", str),
[e],
)
)
try:
qc_min_length = load_field(
_doc.get("qc_min_length"),
inttype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `qc_min_length` field is not valid because:",
SourceLine(_doc, "qc_min_length", str),
[e],
)
)
try:
reverse_reads = load_field(
_doc.get("reverse_reads"),
union_of_None_type_or_FileLoader,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `reverse_reads` field is not valid because:",
SourceLine(_doc, "reverse_reads", str),
[e],
)
)
try:
rfam_model_clans = load_field(
_doc.get("rfam_model_clans"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `rfam_model_clans` field is not valid because:",
SourceLine(_doc, "rfam_model_clans", str),
[e],
)
)
try:
rfam_models = load_field(
_doc.get("rfam_models"),
array_of_strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `rfam_models` field is not valid because:",
SourceLine(_doc, "rfam_models", str),
[e],
)
)
try:
single_reads = load_field(
_doc.get("single_reads"),
union_of_None_type_or_FileLoader,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `single_reads` field is not valid because:",
SourceLine(_doc, "single_reads", str),
[e],
)
)
try:
ssu_db = load_field(
_doc.get("ssu_db"),
FileLoader,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `ssu_db` field is not valid because:",
SourceLine(_doc, "ssu_db", str),
[e],
)
)
try:
ssu_label = load_field(
_doc.get("ssu_label"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `ssu_label` field is not valid because:",
SourceLine(_doc, "ssu_label", str),
[e],
)
)
try:
ssu_otus = load_field(
_doc.get("ssu_otus"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `ssu_otus` field is not valid because:",
SourceLine(_doc, "ssu_otus", str),
[e],
)
)
try:
ssu_tax = load_field(
_doc.get("ssu_tax"),
strtype,
baseuri,
loadingOptions,
)
except ValidationException as e:
_errors__.append(
ValidationException(
"the `ssu_tax` field is not valid because:",
SourceLine(_doc, "ssu_tax", str),
[e],
)
)
extension_fields: Dict[str, Any] = {}
for k in _doc.keys():
if k not in cls.attrs:
if ":" in k:
ex = expand_url(
k, "", loadingOptions, scoped_id=False, vocab_term=False
)
extension_fields[ex] = _doc[k]
else:
_errors__.append(
ValidationException(
"invalid field `{}`, expected one of: `5.8s_pattern`, `5s_pattern`, `CGC_config`, `CGC_postfixes`, `EggNOG_data_dir`, `EggNOG_db`, `EggNOG_diamond_db`, `HMM_gathering_bit_score`, `HMM_name_database`, `HMM_omit_alignment`, `InterProScan_applications`, `InterProScan_databases`, `InterProScan_outputFormat`, `cgc_chunk_size`, `forward_reads`, `func_ann_names_hmmer`, `func_ann_names_ips`, `go_config`, `hmmsearch_header`, `ips_header`, `ko_file`, `lsu_db`, `lsu_label`, `lsu_otus`, `lsu_tax`, `other_ncRNA_models`, `protein_chunk_size_IPS`, `protein_chunk_size_hmm`, `qc_min_length`, `reverse_reads`, `rfam_model_clans`, `rfam_models`, `single_reads`, `ssu_db`, `ssu_label`, `ssu_otus`, `ssu_tax`".format(
k
),
SourceLine(_doc, k, str),
)
)
break
if _errors__:
raise ValidationException("Trying 'inputs_record_schema'", None, _errors__)
_constructed = cls(
_5_8s_pattern=_5_8s_pattern,
_5s_pattern=_5s_pattern,
CGC_config=CGC_config,
CGC_postfixes=CGC_postfixes,
EggNOG_data_dir=EggNOG_data_dir,
EggNOG_db=EggNOG_db,
EggNOG_diamond_db=EggNOG_diamond_db,
HMM_gathering_bit_score=HMM_gathering_bit_score,
HMM_name_database=HMM_name_database,
HMM_omit_alignment=HMM_omit_alignment,
InterProScan_applications=InterProScan_applications,
InterProScan_databases=InterProScan_databases,
InterProScan_outputFormat=InterProScan_outputFormat,
cgc_chunk_size=cgc_chunk_size,
forward_reads=forward_reads,
func_ann_names_hmmer=func_ann_names_hmmer,
func_ann_names_ips=func_ann_names_ips,
go_config=go_config,
hmmsearch_header=hmmsearch_header,
ips_header=ips_header,
ko_file=ko_file,
lsu_db=lsu_db,
lsu_label=lsu_label,
lsu_otus=lsu_otus,
lsu_tax=lsu_tax,
other_ncRNA_models=other_ncRNA_models,
protein_chunk_size_IPS=protein_chunk_size_IPS,
protein_chunk_size_hmm=protein_chunk_size_hmm,
qc_min_length=qc_min_length,
reverse_reads=reverse_reads,
rfam_model_clans=rfam_model_clans,
rfam_models=rfam_models,
single_reads=single_reads,
ssu_db=ssu_db,
ssu_label=ssu_label,
ssu_otus=ssu_otus,
ssu_tax=ssu_tax,
extension_fields=extension_fields,
loadingOptions=loadingOptions,
)
return _constructed
def save(
self, top: bool = False, base_url: str = "", relative_uris: bool = True
) -> Dict[str, Any]:
r: Dict[str, Any] = {}
if relative_uris:
for ef in self.extension_fields:
r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef]
else:
for ef in self.extension_fields:
r[ef] = self.extension_fields[ef]
if self._5_8s_pattern is not None:
r["5.8s_pattern"] = save(
self._5_8s_pattern,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self._5s_pattern is not None:
r["5s_pattern"] = save(
self._5s_pattern,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.CGC_config is not None:
r["CGC_config"] = save(
self.CGC_config,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.CGC_postfixes is not None:
r["CGC_postfixes"] = save(
self.CGC_postfixes,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.EggNOG_data_dir is not None:
r["EggNOG_data_dir"] = save(
self.EggNOG_data_dir,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.EggNOG_db is not None:
r["EggNOG_db"] = save(
self.EggNOG_db,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.EggNOG_diamond_db is not None:
r["EggNOG_diamond_db"] = save(
self.EggNOG_diamond_db,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.HMM_gathering_bit_score is not None:
r["HMM_gathering_bit_score"] = save(
self.HMM_gathering_bit_score,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.HMM_name_database is not None:
r["HMM_name_database"] = save(
self.HMM_name_database,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.HMM_omit_alignment is not None:
r["HMM_omit_alignment"] = save(
self.HMM_omit_alignment,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.InterProScan_applications is not None:
r["InterProScan_applications"] = save(
self.InterProScan_applications,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.InterProScan_databases is not None:
r["InterProScan_databases"] = save(
self.InterProScan_databases,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.InterProScan_outputFormat is not None:
r["InterProScan_outputFormat"] = save(
self.InterProScan_outputFormat,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.cgc_chunk_size is not None:
r["cgc_chunk_size"] = save(
self.cgc_chunk_size,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.forward_reads is not None:
r["forward_reads"] = save(
self.forward_reads,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.func_ann_names_hmmer is not None:
r["func_ann_names_hmmer"] = save(
self.func_ann_names_hmmer,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.func_ann_names_ips is not None:
r["func_ann_names_ips"] = save(
self.func_ann_names_ips,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.go_config is not None:
r["go_config"] = save(
self.go_config,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.hmmsearch_header is not None:
r["hmmsearch_header"] = save(
self.hmmsearch_header,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.ips_header is not None:
r["ips_header"] = save(
self.ips_header,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.ko_file is not None:
r["ko_file"] = save(
self.ko_file, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.lsu_db is not None:
r["lsu_db"] = save(
self.lsu_db, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.lsu_label is not None:
r["lsu_label"] = save(
self.lsu_label,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.lsu_otus is not None:
r["lsu_otus"] = save(
self.lsu_otus, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.lsu_tax is not None:
r["lsu_tax"] = save(
self.lsu_tax, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.other_ncRNA_models is not None:
r["other_ncRNA_models"] = save(
self.other_ncRNA_models,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.protein_chunk_size_IPS is not None:
r["protein_chunk_size_IPS"] = save(
self.protein_chunk_size_IPS,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.protein_chunk_size_hmm is not None:
r["protein_chunk_size_hmm"] = save(
self.protein_chunk_size_hmm,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.qc_min_length is not None:
r["qc_min_length"] = save(
self.qc_min_length,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.reverse_reads is not None:
r["reverse_reads"] = save(
self.reverse_reads,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.rfam_model_clans is not None:
r["rfam_model_clans"] = save(
self.rfam_model_clans,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.rfam_models is not None:
r["rfam_models"] = save(
self.rfam_models,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.single_reads is not None:
r["single_reads"] = save(
self.single_reads,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.ssu_db is not None:
r["ssu_db"] = save(
self.ssu_db, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.ssu_label is not None:
r["ssu_label"] = save(
self.ssu_label,
top=False,
base_url=base_url,
relative_uris=relative_uris,
)
if self.ssu_otus is not None:
r["ssu_otus"] = save(
self.ssu_otus, top=False, base_url=base_url, relative_uris=relative_uris
)
if self.ssu_tax is not None:
r["ssu_tax"] = save(
self.ssu_tax, top=False, base_url=base_url, relative_uris=relative_uris
)
# top refers to the directory level
if top:
if self.loadingOptions.namespaces:
r["$namespaces"] = self.loadingOptions.namespaces
if self.loadingOptions.schemas:
r["$schemas"] = self.loadingOptions.schemas
return r
attrs = frozenset(
[
"5.8s_pattern",
"5s_pattern",
"CGC_config",
"CGC_postfixes",
"EggNOG_data_dir",
"EggNOG_db",
"EggNOG_diamond_db",
"HMM_gathering_bit_score",
"HMM_name_database",
"HMM_omit_alignment",
"InterProScan_applications",
"InterProScan_databases",
"InterProScan_outputFormat",
"cgc_chunk_size",
"forward_reads",
"func_ann_names_hmmer",
"func_ann_names_ips",
"go_config",
"hmmsearch_header",
"ips_header",
"ko_file",
"lsu_db",
"lsu_label",
"lsu_otus",
"lsu_tax",
"other_ncRNA_models",
"protein_chunk_size_IPS",
"protein_chunk_size_hmm",
"qc_min_length",
"reverse_reads",
"rfam_model_clans",
"rfam_models",
"single_reads",
"ssu_db",
"ssu_label",
"ssu_otus",
"ssu_tax",
]
)
_vocab = {
"Directory": "https://w3id.org/cwl/cwl#Directory",
"File": "https://w3id.org/cwl/cwl#File",
"inputs_record_schema": "https://w3id.org/cwl/cwl#inputs_record_schema",
}
_rvocab = {
"https://w3id.org/cwl/cwl#Directory": "Directory",
"https://w3id.org/cwl/cwl#File": "File",
"https://w3id.org/cwl/cwl#inputs_record_schema": "inputs_record_schema",
}
strtype = _PrimitiveLoader(str)
inttype = _PrimitiveLoader(int)
floattype = _PrimitiveLoader(float)
booltype = _PrimitiveLoader(bool)
None_type = _PrimitiveLoader(type(None))
Any_type = _AnyLoader()
FileLoader = _RecordLoader(File)
DirectoryLoader = _RecordLoader(Directory)
inputs_record_schemaLoader = _RecordLoader(inputs_record_schema)
File_classLoader = _EnumLoader(("File",), "File_class")
uri_File_classLoader_False_True_None = _URILoader(File_classLoader, False, True, None)
union_of_None_type_or_strtype = _UnionLoader(
(
None_type,
strtype,
)
)
uri_union_of_None_type_or_strtype_False_True_None = _URILoader(
union_of_None_type_or_strtype, False, True, None
)
uri_union_of_None_type_or_strtype_False_False_None = _URILoader(
union_of_None_type_or_strtype, False, False, None
)
union_of_None_type_or_inttype = _UnionLoader(
(
None_type,
inttype,
)
)
union_of_FileLoader_or_DirectoryLoader = _UnionLoader(
(
FileLoader,
DirectoryLoader,
)
)
array_of_union_of_FileLoader_or_DirectoryLoader = _ArrayLoader(
union_of_FileLoader_or_DirectoryLoader
)
union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader = _UnionLoader(
(
None_type,
array_of_union_of_FileLoader_or_DirectoryLoader,
)
)
secondaryfilesdsl_union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader = _SecondaryDSLLoader(
union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader
)
uri_union_of_None_type_or_strtype_True_False_None = _URILoader(
union_of_None_type_or_strtype, True, False, None
)
Directory_classLoader = _EnumLoader(("Directory",), "Directory_class")
uri_Directory_classLoader_False_True_None = _URILoader(
Directory_classLoader, False, True, None
)
array_of_strtype = _ArrayLoader(strtype)
union_of_None_type_or_FileLoader = _UnionLoader(
(
None_type,
FileLoader,
)
)
union_of_inputs_record_schemaLoader = _UnionLoader((inputs_record_schemaLoader,))
array_of_union_of_inputs_record_schemaLoader = _ArrayLoader(
union_of_inputs_record_schemaLoader
)
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader = (
_UnionLoader(
(
inputs_record_schemaLoader,
array_of_union_of_inputs_record_schemaLoader,
)
)
)
def load_document(
doc: Any,
baseuri: Optional[str] = None,
loadingOptions: Optional[LoadingOptions] = None,
) -> Any:
if baseuri is None:
baseuri = file_uri(os.getcwd()) + "/"
if loadingOptions is None:
loadingOptions = LoadingOptions()
result, metadata = _document_load(
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader,
doc,
baseuri,
loadingOptions,
)
return result
def load_document_with_metadata(
doc: Any,
baseuri: Optional[str] = None,
loadingOptions: Optional[LoadingOptions] = None,
addl_metadata_fields: Optional[MutableSequence[str]] = None,
) -> Any:
if baseuri is None:
baseuri = file_uri(os.getcwd()) + "/"
if loadingOptions is None:
loadingOptions = LoadingOptions(fileuri=baseuri)
return _document_load(
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader,
doc,
baseuri,
loadingOptions,
addl_metadata_fields=addl_metadata_fields,
)
def load_document_by_string(
string: Any,
uri: str,
loadingOptions: Optional[LoadingOptions] = None,
) -> Any:
yaml = yaml_no_ts()
result = yaml.load(string)
add_lc_filename(result, uri)
if loadingOptions is None:
loadingOptions = LoadingOptions(fileuri=uri)
result, metadata = _document_load(
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader,
result,
uri,
loadingOptions,
)
return result
def load_document_by_yaml(
yaml: Any,
uri: str,
loadingOptions: Optional[LoadingOptions] = None,
) -> Any:
"""
Shortcut to load via a YAML object.
yaml: must be from ruamel.yaml.main.YAML.load with preserve_quotes=True
"""
add_lc_filename(yaml, uri)
if loadingOptions is None:
loadingOptions = LoadingOptions(fileuri=uri)
result, metadata = _document_load(
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader,
yaml,
uri,
loadingOptions,
)
return result
saladVersion: v1.1
$base: https://w3id.org/cwl/cwl#
$namespaces:
cwl: https://w3id.org/cwl/cwl#
$graph:
- name: File
type: record
fields:
- name: class
type:
type: enum
name: File_class
symbols:
- cwl:File
jsonldPredicate:
_id: '@type'
_type: '@vocab'
- name: location
type: string?
jsonldPredicate:
_id: '@type'
_type: '@vocab'
- name: path
type: string?
jsonldPredicate:
_id: cwl:path
_type: '@id'
- name: basename
type: string?
jsonldPredicate:
_id: cwl:basename
_type: '@id'
- name: dirname
type: string?
- name: nameroot
type: string?
- name: nameext
type: string?
- name: checksum
type: string?
- name: size
type: long?
- name: secondaryFiles
type:
- 'null'
- type: array
items:
- File
- Directory
jsonldPredicate:
_id: cwl:secondaryFiles
secondaryFilesDSL: true
- name: format
type: string?
jsonldPredicate:
_id: cwl:format
_type: '@id'
identity: true
- name: contents
type: string?
- name: Directory
type: record
fields:
- name: class
type:
type: enum
name: Directory_class
symbols:
- cwl:Directory
jsonldPredicate:
_id: '@type'
_type: '@vocab'
- name: location
type: string?
jsonldPredicate:
_id: '@type'
_type: '@vocab'
- name: path
type: string?
jsonldPredicate:
_id: cwl:path
_type: '@id'
- name: basename
type: string?
jsonldPredicate:
_id: cwl:basename
_type: '@id'
- name: listing
type:
- 'null'
- type: array
items:
- File
- Directory
jsonldPredicate:
_id: cwl:listing
- name: inputs_record_schema
type: record
fields:
- name: 5.8s_pattern
type: string
- name: 5s_pattern
type: string
- name: CGC_config
type: string
- name: CGC_postfixes
type:
type: array
items: string
- name: EggNOG_data_dir
type:
- 'null'
- string
- name: EggNOG_db
type:
- 'null'
- string
- name: EggNOG_diamond_db
type:
- 'null'
- string
- name: HMM_gathering_bit_score
type: boolean
- name: HMM_name_database
type: string
- name: HMM_omit_alignment
type: boolean
- name: InterProScan_applications
type:
type: array
items: string
- name: InterProScan_databases
type: string
- name: InterProScan_outputFormat
type:
type: array
items: string
- name: cgc_chunk_size
type: int
- name: forward_reads
type:
- 'null'
- cwl:File
- name: func_ann_names_hmmer
type: string
- name: func_ann_names_ips
type: string
- name: go_config
type: string
- name: hmmsearch_header
type: string
- name: ips_header
type: string
- name: ko_file
type: string
- name: lsu_db
type: cwl:File
- name: lsu_label
type: string
- name: lsu_otus
type: string
- name: lsu_tax
type: string
- name: other_ncRNA_models
type:
type: array
items: string
- name: protein_chunk_size_IPS
type: int
- name: protein_chunk_size_hmm
type: int
- name: qc_min_length
type: int
- name: reverse_reads
type:
- 'null'
- cwl:File
- name: rfam_model_clans
type: string
- name: rfam_models
type:
type: array
items: string
- name: single_reads
type:
- 'null'
- cwl:File
- name: ssu_db
type: cwl:File
- name: ssu_label
type: string
- name: ssu_otus
type: string
- name: ssu_tax
type: string
documentRoot: true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment