#!/bin/env python3 import csv import subprocess import sys from pathlib import Path from multiprocessing import Pool import time def md5sum(path): ''' Compute MD5 message digest and return hex digest ''' res = subprocess.run(['md5sum', path], capture_output=True, check=True) sums = str(res.stdout, encoding='ascii') if sums != '': return sums.split()[0] def flatten_filename(ftp_url): ''' Transform URL into a unique filename: ftp://domain.ext/some/path/read_file.gz -> some__path__read_file.gz ''' _, ext = ftp_url.split('//') _, *path = ext.split('/') return '__'.join(path) def wget_and_verify( ftp_url, md5, output_dir ): basename = flatten_filename(ftp_url) filename = output_dir / basename q = Path(filename) if q.exists(): sums = md5sum(filename) if sums == md5: print(f'[{filename}] skipping already exists') return print(f'[{filename}] : starting download') retry = 0 while retry < 3: out = subprocess.run(['curl', ftp_url, '--output', filename], capture_output=True) if out.returncode != 0: print(f'[{filename}] ERR: curl failed') sums = md5sum(filename) if sums != md5: retry += 1 print(f'RETRY {retry}: ERR: checksum mismatch {sums} != {md5}', file=sys.stderr) else: print(f'[{filename}] : completed download') return print(f'ERR: MAXIMUM RETRIES EXCEEDED', file=sys.stderr) def download_all_from_index( index, output_dir, threads ): reader = csv.DictReader( index, delimiter='\t' ) with Pool(threads) as pool: res = [] for files in reader: r = pool.apply_async(wget_and_verify, (files['FASTQ'], files['FASTQ_MD5'], output_dir)) res.append(r) r = pool.apply_async(wget_and_verify, (files['PAIRED_FASTQ'], files['PAIRED_FASTQ_MD5'], output_dir)) res.append(r) for r in res: r.get()