1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
#!/bin/env python3
import csv
import subprocess
import sys
from pathlib import Path
from multiprocessing import Pool
import time
def md5sum(path):
'''
Compute MD5 message digest and return hex digest
'''
res = subprocess.run(['md5sum', path], capture_output=True, check=True)
sums = str(res.stdout, encoding='ascii')
if sums != '':
return sums.split()[0]
def flatten_filename(ftp_url):
'''
Transform URL into a unique filename:
ftp://domain.ext/some/path/read_file.gz -> some__path__read_file.gz
'''
_, ext = ftp_url.split('//')
_, *path = ext.split('/')
return '__'.join(path)
def wget_and_verify( ftp_url, md5, output_dir ):
basename = flatten_filename(ftp_url)
filename = output_dir / basename
q = Path(filename)
if q.exists():
sums = md5sum(filename)
if sums == md5:
print(f'[{filename}] skipping already exists')
return
print(f'[{filename}] : starting download')
retry = 0
while retry < 3:
out = subprocess.run(['curl', ftp_url, '--output', filename], capture_output=True)
if out.returncode != 0:
print(f'[{filename}] ERR: curl failed')
sums = md5sum(filename)
if sums != md5:
retry += 1
print(f'RETRY {retry}: ERR: checksum mismatch {sums} != {md5}', file=sys.stderr)
else:
print(f'[{filename}] : completed download')
return
print(f'ERR: MAXIMUM RETRIES EXCEEDED', file=sys.stderr)
def download_all_from_index( index, output_dir, threads ):
reader = csv.DictReader( index, delimiter='\t' )
with Pool(threads) as pool:
res = []
for files in reader:
r = pool.apply_async(wget_and_verify, (files['FASTQ'], files['FASTQ_MD5'], output_dir))
res.append(r)
r = pool.apply_async(wget_and_verify, (files['PAIRED_FASTQ'], files['PAIRED_FASTQ_MD5'], output_dir))
res.append(r)
for r in res:
r.get()
|