edgeserve.util
1import ftplib 2import os 3import socket 4import sys 5import time 6from urllib.error import URLError 7from urllib.parse import unquote, urlparse 8 9 10def local_to_global_path(local_file_path, local_ftp_path): 11 return 'ftp://' + socket.gethostname() + '/' + local_file_path.replace(local_ftp_path, '') 12 13 14def ftp_fetch(url, local_ftp_path='/srv/ftp/', memory=True, delete=False): 15 scheme, host, path, _, _, _ = urlparse(url) 16 if scheme != 'ftp': 17 raise OSError('ftp error: wrong URL, expect ftp://') 18 if not host: 19 raise OSError('ftp error: no host given') 20 if not path: 21 raise OSError('ftp error: no file path given') 22 23 dirs = path.split('/') 24 dirs = list(map(unquote, dirs)) 25 dirs, file = dirs[:-1], dirs[-1] 26 if dirs and not dirs[0]: 27 dirs = dirs[1:] 28 dir = '/'.join(dirs) 29 os.chdir(local_ftp_path + dir) 30 handler = CacheFTPHandler() 31 if memory: 32 return handler.ftp_open(host, dir, file, memory, delete) 33 34 handler.ftp_open(host, dir, file, memory, delete) 35 return local_ftp_path + dir + '/' + file 36 37 38def splituser(host): 39 """splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]'.""" 40 user, delim, host = host.rpartition('@') 41 return (user if delim else None), host 42 43 44def splitpasswd(user): 45 """splitpasswd('user:passwd') -> 'user', 'passwd'.""" 46 user, delim, passwd = user.partition(':') 47 return user, (passwd if delim else None) 48 49 50# Borrowed from urllib.request 51class FTPHandler: 52 def ftp_open(self, host, dir, file, memory=True, delete=False): 53 # username/password handling 54 user, host = splituser(host) 55 if user: 56 user, passwd = splitpasswd(user) 57 else: 58 passwd = None 59 host = unquote(host) 60 user = user or '' 61 passwd = passwd or '' 62 63 try: 64 host = socket.gethostbyname(host) 65 except OSError as msg: 66 raise URLError(msg) 67 68 try: 69 fw = self.connect_ftp(user, passwd, host, dir, socket._GLOBAL_DEFAULT_TIMEOUT) 70 if memory: 71 return fw.retrmemory(file, delete) 72 fw.retrfile(file, delete) 73 except ftplib.all_errors as exp: 74 exc = URLError('ftp error: %r' % exp) 75 raise exc.with_traceback(sys.exc_info()[2]) 76 77 def connect_ftp(self, user, passwd, host, dir, timeout): 78 return ftpwrapper(user, passwd, host, dir, timeout) 79 80 81class CacheFTPHandler(FTPHandler): 82 # XXX would be nice to have pluggable cache strategies 83 # XXX this stuff is definitely not thread safe 84 def __init__(self): 85 self.cache = {} 86 self.timeout = {} 87 self.soonest = 0 88 self.delay = 60 89 self.max_conns = 16 90 91 def setTimeout(self, t): 92 self.delay = t 93 94 def setMaxConns(self, m): 95 self.max_conns = m 96 97 def connect_ftp(self, user, passwd, host, dirs, timeout): 98 key = user, host, '/'.join(dirs), timeout 99 if key in self.cache: 100 self.timeout[key] = time.time() + self.delay 101 else: 102 self.cache[key] = ftpwrapper(user, passwd, host, 103 dirs, timeout) 104 self.timeout[key] = time.time() + self.delay 105 self.check_cache() 106 return self.cache[key] 107 108 def check_cache(self): 109 # first check for old ones 110 t = time.time() 111 if self.soonest <= t: 112 for k, v in list(self.timeout.items()): 113 if v < t: 114 self.cache[k].close() 115 del self.cache[k] 116 del self.timeout[k] 117 self.soonest = min(list(self.timeout.values())) 118 119 # then check the size 120 if len(self.cache) == self.max_conns: 121 for k, v in list(self.timeout.items()): 122 if v == self.soonest: 123 del self.cache[k] 124 del self.timeout[k] 125 break 126 self.soonest = min(list(self.timeout.values())) 127 128 def clear_cache(self): 129 for conn in self.cache.values(): 130 conn.close() 131 self.cache.clear() 132 self.timeout.clear() 133 134 135class ftpwrapper: 136 """Class used by open_ftp() for cache of open FTP connections.""" 137 138 def __init__(self, user, passwd, host, dir, timeout=None): 139 self.user = user 140 self.passwd = passwd 141 self.host = host 142 self.dir = dir 143 self.timeout = timeout 144 try: 145 self.ftp = ftplib.FTP(self.host, timeout=self.timeout) 146 self.ftp.set_pasv(False) 147 self.ftp.login(self.user, self.passwd) 148 self.ftp.cwd(self.dir) 149 except ftplib.error_perm as reason: 150 raise URLError('ftp error: %r' % reason).with_traceback( 151 sys.exc_info()[2]) 152 153 def retrmemory(self, file, delete=False): 154 data = [] 155 def read_block(block): 156 data.append(block) 157 try: 158 self.ftp.voidcmd('TYPE I') 159 cmd = 'RETR ' + file 160 self.ftp.retrbinary(cmd, read_block) 161 if delete: 162 self.ftp.delete(file) 163 return b''.join(data) 164 except ftplib.error_perm as reason: 165 raise URLError('ftp error: %r' % reason).with_traceback( 166 sys.exc_info()[2]) 167 168 def retrfile(self, file, delete=False): 169 try: 170 self.ftp.voidcmd('TYPE I') 171 with open(file + '.tmp', 'wb') as fp: 172 cmd = 'RETR ' + file 173 self.ftp.retrbinary(cmd, fp.write) 174 if delete: 175 self.ftp.delete(file) 176 os.rename(file + '.tmp', file) 177 except ftplib.error_perm as reason: 178 raise URLError('ftp error: %r' % reason).with_traceback( 179 sys.exc_info()[2]) 180 181 def close(self): 182 try: 183 self.ftp.close() 184 except ftplib.all_errors: 185 pass
def
local_to_global_path(local_file_path, local_ftp_path):
def
ftp_fetch(url, local_ftp_path='/srv/ftp/', memory=True, delete=False):
15def ftp_fetch(url, local_ftp_path='/srv/ftp/', memory=True, delete=False): 16 scheme, host, path, _, _, _ = urlparse(url) 17 if scheme != 'ftp': 18 raise OSError('ftp error: wrong URL, expect ftp://') 19 if not host: 20 raise OSError('ftp error: no host given') 21 if not path: 22 raise OSError('ftp error: no file path given') 23 24 dirs = path.split('/') 25 dirs = list(map(unquote, dirs)) 26 dirs, file = dirs[:-1], dirs[-1] 27 if dirs and not dirs[0]: 28 dirs = dirs[1:] 29 dir = '/'.join(dirs) 30 os.chdir(local_ftp_path + dir) 31 handler = CacheFTPHandler() 32 if memory: 33 return handler.ftp_open(host, dir, file, memory, delete) 34 35 handler.ftp_open(host, dir, file, memory, delete) 36 return local_ftp_path + dir + '/' + file
def
splituser(host):
39def splituser(host): 40 """splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]'.""" 41 user, delim, host = host.rpartition('@') 42 return (user if delim else None), host
splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]'.
def
splitpasswd(user):
45def splitpasswd(user): 46 """splitpasswd('user:passwd') -> 'user', 'passwd'.""" 47 user, delim, passwd = user.partition(':') 48 return user, (passwd if delim else None)
splitpasswd('user:passwd') -> 'user', 'passwd'.
class
FTPHandler:
52class FTPHandler: 53 def ftp_open(self, host, dir, file, memory=True, delete=False): 54 # username/password handling 55 user, host = splituser(host) 56 if user: 57 user, passwd = splitpasswd(user) 58 else: 59 passwd = None 60 host = unquote(host) 61 user = user or '' 62 passwd = passwd or '' 63 64 try: 65 host = socket.gethostbyname(host) 66 except OSError as msg: 67 raise URLError(msg) 68 69 try: 70 fw = self.connect_ftp(user, passwd, host, dir, socket._GLOBAL_DEFAULT_TIMEOUT) 71 if memory: 72 return fw.retrmemory(file, delete) 73 fw.retrfile(file, delete) 74 except ftplib.all_errors as exp: 75 exc = URLError('ftp error: %r' % exp) 76 raise exc.with_traceback(sys.exc_info()[2]) 77 78 def connect_ftp(self, user, passwd, host, dir, timeout): 79 return ftpwrapper(user, passwd, host, dir, timeout)
def
ftp_open(self, host, dir, file, memory=True, delete=False):
53 def ftp_open(self, host, dir, file, memory=True, delete=False): 54 # username/password handling 55 user, host = splituser(host) 56 if user: 57 user, passwd = splitpasswd(user) 58 else: 59 passwd = None 60 host = unquote(host) 61 user = user or '' 62 passwd = passwd or '' 63 64 try: 65 host = socket.gethostbyname(host) 66 except OSError as msg: 67 raise URLError(msg) 68 69 try: 70 fw = self.connect_ftp(user, passwd, host, dir, socket._GLOBAL_DEFAULT_TIMEOUT) 71 if memory: 72 return fw.retrmemory(file, delete) 73 fw.retrfile(file, delete) 74 except ftplib.all_errors as exp: 75 exc = URLError('ftp error: %r' % exp) 76 raise exc.with_traceback(sys.exc_info()[2])
82class CacheFTPHandler(FTPHandler): 83 # XXX would be nice to have pluggable cache strategies 84 # XXX this stuff is definitely not thread safe 85 def __init__(self): 86 self.cache = {} 87 self.timeout = {} 88 self.soonest = 0 89 self.delay = 60 90 self.max_conns = 16 91 92 def setTimeout(self, t): 93 self.delay = t 94 95 def setMaxConns(self, m): 96 self.max_conns = m 97 98 def connect_ftp(self, user, passwd, host, dirs, timeout): 99 key = user, host, '/'.join(dirs), timeout 100 if key in self.cache: 101 self.timeout[key] = time.time() + self.delay 102 else: 103 self.cache[key] = ftpwrapper(user, passwd, host, 104 dirs, timeout) 105 self.timeout[key] = time.time() + self.delay 106 self.check_cache() 107 return self.cache[key] 108 109 def check_cache(self): 110 # first check for old ones 111 t = time.time() 112 if self.soonest <= t: 113 for k, v in list(self.timeout.items()): 114 if v < t: 115 self.cache[k].close() 116 del self.cache[k] 117 del self.timeout[k] 118 self.soonest = min(list(self.timeout.values())) 119 120 # then check the size 121 if len(self.cache) == self.max_conns: 122 for k, v in list(self.timeout.items()): 123 if v == self.soonest: 124 del self.cache[k] 125 del self.timeout[k] 126 break 127 self.soonest = min(list(self.timeout.values())) 128 129 def clear_cache(self): 130 for conn in self.cache.values(): 131 conn.close() 132 self.cache.clear() 133 self.timeout.clear()
def
connect_ftp(self, user, passwd, host, dirs, timeout):
98 def connect_ftp(self, user, passwd, host, dirs, timeout): 99 key = user, host, '/'.join(dirs), timeout 100 if key in self.cache: 101 self.timeout[key] = time.time() + self.delay 102 else: 103 self.cache[key] = ftpwrapper(user, passwd, host, 104 dirs, timeout) 105 self.timeout[key] = time.time() + self.delay 106 self.check_cache() 107 return self.cache[key]
def
check_cache(self):
109 def check_cache(self): 110 # first check for old ones 111 t = time.time() 112 if self.soonest <= t: 113 for k, v in list(self.timeout.items()): 114 if v < t: 115 self.cache[k].close() 116 del self.cache[k] 117 del self.timeout[k] 118 self.soonest = min(list(self.timeout.values())) 119 120 # then check the size 121 if len(self.cache) == self.max_conns: 122 for k, v in list(self.timeout.items()): 123 if v == self.soonest: 124 del self.cache[k] 125 del self.timeout[k] 126 break 127 self.soonest = min(list(self.timeout.values()))
Inherited Members
class
ftpwrapper:
136class ftpwrapper: 137 """Class used by open_ftp() for cache of open FTP connections.""" 138 139 def __init__(self, user, passwd, host, dir, timeout=None): 140 self.user = user 141 self.passwd = passwd 142 self.host = host 143 self.dir = dir 144 self.timeout = timeout 145 try: 146 self.ftp = ftplib.FTP(self.host, timeout=self.timeout) 147 self.ftp.set_pasv(False) 148 self.ftp.login(self.user, self.passwd) 149 self.ftp.cwd(self.dir) 150 except ftplib.error_perm as reason: 151 raise URLError('ftp error: %r' % reason).with_traceback( 152 sys.exc_info()[2]) 153 154 def retrmemory(self, file, delete=False): 155 data = [] 156 def read_block(block): 157 data.append(block) 158 try: 159 self.ftp.voidcmd('TYPE I') 160 cmd = 'RETR ' + file 161 self.ftp.retrbinary(cmd, read_block) 162 if delete: 163 self.ftp.delete(file) 164 return b''.join(data) 165 except ftplib.error_perm as reason: 166 raise URLError('ftp error: %r' % reason).with_traceback( 167 sys.exc_info()[2]) 168 169 def retrfile(self, file, delete=False): 170 try: 171 self.ftp.voidcmd('TYPE I') 172 with open(file + '.tmp', 'wb') as fp: 173 cmd = 'RETR ' + file 174 self.ftp.retrbinary(cmd, fp.write) 175 if delete: 176 self.ftp.delete(file) 177 os.rename(file + '.tmp', file) 178 except ftplib.error_perm as reason: 179 raise URLError('ftp error: %r' % reason).with_traceback( 180 sys.exc_info()[2]) 181 182 def close(self): 183 try: 184 self.ftp.close() 185 except ftplib.all_errors: 186 pass
Class used by open_ftp() for cache of open FTP connections.
ftpwrapper(user, passwd, host, dir, timeout=None)
139 def __init__(self, user, passwd, host, dir, timeout=None): 140 self.user = user 141 self.passwd = passwd 142 self.host = host 143 self.dir = dir 144 self.timeout = timeout 145 try: 146 self.ftp = ftplib.FTP(self.host, timeout=self.timeout) 147 self.ftp.set_pasv(False) 148 self.ftp.login(self.user, self.passwd) 149 self.ftp.cwd(self.dir) 150 except ftplib.error_perm as reason: 151 raise URLError('ftp error: %r' % reason).with_traceback( 152 sys.exc_info()[2])
def
retrmemory(self, file, delete=False):
154 def retrmemory(self, file, delete=False): 155 data = [] 156 def read_block(block): 157 data.append(block) 158 try: 159 self.ftp.voidcmd('TYPE I') 160 cmd = 'RETR ' + file 161 self.ftp.retrbinary(cmd, read_block) 162 if delete: 163 self.ftp.delete(file) 164 return b''.join(data) 165 except ftplib.error_perm as reason: 166 raise URLError('ftp error: %r' % reason).with_traceback( 167 sys.exc_info()[2])
def
retrfile(self, file, delete=False):
169 def retrfile(self, file, delete=False): 170 try: 171 self.ftp.voidcmd('TYPE I') 172 with open(file + '.tmp', 'wb') as fp: 173 cmd = 'RETR ' + file 174 self.ftp.retrbinary(cmd, fp.write) 175 if delete: 176 self.ftp.delete(file) 177 os.rename(file + '.tmp', file) 178 except ftplib.error_perm as reason: 179 raise URLError('ftp error: %r' % reason).with_traceback( 180 sys.exc_info()[2])