Skip to content

Instantly share code, notes, and snippets.

@keuv-grvl
Last active October 26, 2021 03:01
Show Gist options
  • Save keuv-grvl/a5846eb2b6a122823185ccfb55aac12e to your computer and use it in GitHub Desktop.
Save keuv-grvl/a5846eb2b6a122823185ccfb55aac12e to your computer and use it in GitHub Desktop.
Convert UniRef XML to fasta. Mostly to rebuild previous UniRef databases
#!/usr/bin/env python3
import argparse
import gzip
import logging
import sys
from time import time
from lxml import etree
_VERSION = "20201019"
_AUTHORS = ["@keuv.grvl", "@nick-youngblut"]
def parse_args():
desc = "Convert UniRef XML to fasta. Mostly to rebuild previous UniRef databases"
epi = """DESCRIPTION: A simple script to convert UniRef XMLto fasta.
The previous UniRef releases to the current release do not include fasta files, but instead only XML
files for all UniRef sequences (+ all associated info). The script allows one to easily re-create
the fasta-formatted version of a prevoius UniRef release.
Use "-" to provide the input file STDIN. Example: gunzip -c uniref50.xml | unirefxml2fasta.py -
"""
parser = argparse.ArgumentParser(
description=desc, epilog=epi, formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"xml_file", metavar="xml_file", type=str, help='UniRef xml file. Use "-" if from STDIN'
)
parser.add_argument(
"-o",
"--output",
type=str,
default="uniref50.fasta",
help="Output file path (default: %(default)s)",
)
parser.add_argument(
"-l",
"--label",
type=str,
default="UniRef50",
help="XML label used for parsing (default: %(default)s)."
+ ' For UniRef90, use "UniRef90".',
)
parser.add_argument(
"-g",
"--gzip-output",
action="store_true",
default=False,
help="Gzip the output file (default: %(default)s).",
)
parser.add_argument("-v", "--version", action="version", version=_VERSION)
return parser.parse_args()
def each_chunk(stream, separator):
"""
Yield lines from `stream` until `separator`. Source: https://stackoverflow.com/a/47927374
"""
buffer = ""
while True: # until EOF
chunk = stream.read(65536) # read 2^16 bytes
if not chunk: # EOF?
yield buffer
break
buffer += chunk
while True: # until no separator is found
try:
part, buffer = buffer.split(separator, 1)
except ValueError:
break
else:
yield part
def main(args):
# I/O
if args.xml_file == "-":
inF = sys.stdin
else:
inF = open(args.xml_file)
if args.gzip_output:
if not args.output.endswith(".gz"):
args.output += ".gz"
outF = gzip.open(args.output, "wb")
else:
outF = open(args.output, "w")
# params
skipfirst = True
i = 0
sep = "<entry"
start = time()
# read the XML file one entry at a time
for chunk in each_chunk(sys.stdin, separator=sep):
if skipfirst:
skipfirst = False
continue
# -- get the XML entry as text
xml = sep + chunk # separator has been dropped, add it to get a valid xml
xml = xml.replace("</{}>\n".format(args.label), "")
# -- parse the XML
root = etree.fromstring(xml)
protid = root.get("id")
try:
protname = root.xpath("/entry/name")[0].text.rstring()
except AttributeError:
protname = root.xpath("/entry/name")[0].text
protseq = root.xpath("/entry/representativeMember/sequence")[0].text
seq = "\n".join([">" + protid + " " + protname, protseq]) + "\n"
if args.gzip_output:
seq = seq.encode("utf-8")
outF.write(seq)
# print status
# TODO better progress bar with tqdm
if i % 2500 == 0:
print("Entries processed: {}".format(i), end="\r")
i += 1
# finish up
outF.close()
if inF is not sys.stdin:
inF.close()
end = time()
print("{} entries converted in {:.2f} seconds".format(i, end - start))
if __name__ == "__main__":
args = parse_args()
main(args)
@nick-youngblut
Copy link

A version with a couple of new features (eg., arg parsing, STDIN input, gzip'ed output):

#!/usr/bin/env python
from __future__ import print_function
# batteries
import sys
import gzip
import argparse
import logging
from time import time
# 3rd party
from lxml import etree

# argparse
desc = 'Convert UniRef XML to fasta. Mostly to rebuild previous UniRef databases'
epi = """DESCRIPTION:
A simple script to convert UniRef XMLto fasta.

The previous UniRef releases to the current release do not include fasta files,
but instead only XML files for all UniRef sequences (+ all associated info).
The script allows one to easily re-create the fasta-formatted version of a 
prevoius UniRef release. 

Use "-" to provide the input file STDIN.
  Example: gunzip -c uniref50.xml | unirefxml2fasta.py -
"""
parser = argparse.ArgumentParser(description=desc,
                                 epilog=epi,
                                 formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('xml_file', metavar='xml_file', type=str,
                    help='UniRef xml file. Use "-" if from STDIN')
parser.add_argument('-o', '--output',  type=str, default='uniref50.fasta',
                    help='Output file path (default: %(default)s)')
parser.add_argument('-l', '--label',  type=str, default='UniRef50',
                    help='XML label used for parsing (default: %(default)s).' + \
                         ' For UniRef90, use "UniRef90".')
parser.add_argument('-g', '--gzip-output',  action='store_true', default=False,
                    help='Gzip the output file (default: %(default)s).')
parser.add_argument('--version', action='version', version='0.0.1')

def each_chunk(stream, separator):
    '''
    Yield lines from `stream` until `separator`.
    Source: https://stackoverflow.com/a/47927374
    '''
    buffer = ''
    while True:  # until EOF
        chunk = stream.read(65536)  # read 2^16 bytes
        if not chunk:  # EOF?
            yield buffer
            break
        buffer += chunk
        while True:  # until no separator is found
            try:
                part, buffer = buffer.split(separator, 1)
            except ValueError:
                break
            else:
                yield part

def main(args):
    # I/O
    if args.xml_file == '-':
        inF = sys.stdin
    else:
        inF = open(args.xml_file)
    if args.gzip_output:
        if not args.output.endswith('.gz'):
            args.output += '.gz'
        outF = gzip.open(args.output, 'wb')
    else:
        outF = open(args.output, 'w')
    
    # params
    skipfirst = True
    i = 0
    sep = '<entry'
    deb = time()
    
    # read the XML file one entry at a time
    for chunk in each_chunk(sys.stdin, separator=sep):
        if skipfirst:
            skipfirst = False
            continue
        #-- get the XML entry as text
        xml = sep + chunk  # separator has been dropped, add it to get a valid xml
        xml = xml.replace('</{}>\n'.format(args.label), '')  
        #-- parse the XML
        root = etree.fromstring(xml)
        protid = root.get('id')
        try:
            protname = root.xpath('/entry/name')[0].text.rstring()
        except AttributeError:
            protname = root.xpath('/entry/name')[0].text
        protseq = root.xpath('/entry/representativeMember/sequence')[0].text
        seq = '\n'.join(['>' + protid + ' ' + protname, protseq]) + '\n'
        if args.gzip_output:
            seq = seq.encode('utf-8')
        outF.write(seq)
        # status
        if i % 2500 == 0:
            print('Entries processed: {}'.format(i), end='\r')
        i += 1
        
    # finish up
    outF.close()    
    if inF is not sys.stdin:
        inF.close()
    fin = time()
    print('{} entrées converted in {:.2f} seconds'.format(i, fin-deb))

if __name__ == '__main__':
    args = parser.parse_args()
    main(args)

@keuv-grvl
Copy link
Author

Thanks @nick-youngblut, I integrated your changes to the gist

@yangkky
Copy link

yangkky commented May 7, 2021

Line 97 should be:

for chunk in each_chunk(inF, separator=sep):

Also, in UniRef2018_03 at least, the sequence entries seem to have a '\n' at the front, which means the output has an extra line between every header and sequence.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment