edgeserve.util

  1import ftplib
  2import os
  3import socket
  4import sys
  5import time
  6from urllib.error import URLError
  7from urllib.parse import unquote, urlparse
  8
  9
 10def local_to_global_path(local_file_path, local_ftp_path):
 11    return 'ftp://' + socket.gethostname() + '/' + local_file_path.replace(local_ftp_path, '')
 12
 13
 14def ftp_fetch(url, local_ftp_path='/srv/ftp/', memory=True, delete=False):
 15    scheme, host, path, _, _, _ = urlparse(url)
 16    if scheme != 'ftp':
 17        raise OSError('ftp error: wrong URL, expect ftp://')
 18    if not host:
 19        raise OSError('ftp error: no host given')
 20    if not path:
 21        raise OSError('ftp error: no file path given')
 22
 23    dirs = path.split('/')
 24    dirs = list(map(unquote, dirs))
 25    dirs, file = dirs[:-1], dirs[-1]
 26    if dirs and not dirs[0]:
 27        dirs = dirs[1:]
 28    dir = '/'.join(dirs)
 29    os.chdir(local_ftp_path + dir)
 30    handler = CacheFTPHandler()
 31    if memory:
 32        return handler.ftp_open(host, dir, file, memory, delete)
 33
 34    handler.ftp_open(host, dir, file, memory, delete)
 35    return local_ftp_path + dir + '/' + file
 36
 37
 38def splituser(host):
 39    """splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]'."""
 40    user, delim, host = host.rpartition('@')
 41    return (user if delim else None), host
 42
 43
 44def splitpasswd(user):
 45    """splitpasswd('user:passwd') -> 'user', 'passwd'."""
 46    user, delim, passwd = user.partition(':')
 47    return user, (passwd if delim else None)
 48
 49
 50# Borrowed from urllib.request
 51class FTPHandler:
 52    def ftp_open(self, host, dir, file, memory=True, delete=False):
 53        # username/password handling
 54        user, host = splituser(host)
 55        if user:
 56            user, passwd = splitpasswd(user)
 57        else:
 58            passwd = None
 59        host = unquote(host)
 60        user = user or ''
 61        passwd = passwd or ''
 62
 63        try:
 64            host = socket.gethostbyname(host)
 65        except OSError as msg:
 66            raise URLError(msg)
 67
 68        try:
 69            fw = self.connect_ftp(user, passwd, host, dir, socket._GLOBAL_DEFAULT_TIMEOUT)
 70            if memory:
 71                return fw.retrmemory(file, delete)
 72            fw.retrfile(file, delete)
 73        except ftplib.all_errors as exp:
 74            exc = URLError('ftp error: %r' % exp)
 75            raise exc.with_traceback(sys.exc_info()[2])
 76
 77    def connect_ftp(self, user, passwd, host, dir, timeout):
 78        return ftpwrapper(user, passwd, host, dir, timeout)
 79
 80
 81class CacheFTPHandler(FTPHandler):
 82    # XXX would be nice to have pluggable cache strategies
 83    # XXX this stuff is definitely not thread safe
 84    def __init__(self):
 85        self.cache = {}
 86        self.timeout = {}
 87        self.soonest = 0
 88        self.delay = 60
 89        self.max_conns = 16
 90
 91    def setTimeout(self, t):
 92        self.delay = t
 93
 94    def setMaxConns(self, m):
 95        self.max_conns = m
 96
 97    def connect_ftp(self, user, passwd, host, dirs, timeout):
 98        key = user, host, '/'.join(dirs), timeout
 99        if key in self.cache:
100            self.timeout[key] = time.time() + self.delay
101        else:
102            self.cache[key] = ftpwrapper(user, passwd, host,
103                                         dirs, timeout)
104            self.timeout[key] = time.time() + self.delay
105        self.check_cache()
106        return self.cache[key]
107
108    def check_cache(self):
109        # first check for old ones
110        t = time.time()
111        if self.soonest <= t:
112            for k, v in list(self.timeout.items()):
113                if v < t:
114                    self.cache[k].close()
115                    del self.cache[k]
116                    del self.timeout[k]
117        self.soonest = min(list(self.timeout.values()))
118
119        # then check the size
120        if len(self.cache) == self.max_conns:
121            for k, v in list(self.timeout.items()):
122                if v == self.soonest:
123                    del self.cache[k]
124                    del self.timeout[k]
125                    break
126            self.soonest = min(list(self.timeout.values()))
127
128    def clear_cache(self):
129        for conn in self.cache.values():
130            conn.close()
131        self.cache.clear()
132        self.timeout.clear()
133
134
135class ftpwrapper:
136    """Class used by open_ftp() for cache of open FTP connections."""
137
138    def __init__(self, user, passwd, host, dir, timeout=None):
139        self.user = user
140        self.passwd = passwd
141        self.host = host
142        self.dir = dir
143        self.timeout = timeout
144        try:
145            self.ftp = ftplib.FTP(self.host, timeout=self.timeout)
146            self.ftp.set_pasv(False)
147            self.ftp.login(self.user, self.passwd)
148            self.ftp.cwd(self.dir)
149        except ftplib.error_perm as reason:
150            raise URLError('ftp error: %r' % reason).with_traceback(
151                sys.exc_info()[2])
152
153    def retrmemory(self, file, delete=False):
154        data = []
155        def read_block(block):
156            data.append(block)
157        try:
158            self.ftp.voidcmd('TYPE I')
159            cmd = 'RETR ' + file
160            self.ftp.retrbinary(cmd, read_block)
161            if delete:
162                self.ftp.delete(file)
163            return b''.join(data)
164        except ftplib.error_perm as reason:
165            raise URLError('ftp error: %r' % reason).with_traceback(
166                sys.exc_info()[2])
167
168    def retrfile(self, file, delete=False):
169        try:
170            self.ftp.voidcmd('TYPE I')
171            with open(file + '.tmp', 'wb') as fp:
172                cmd = 'RETR ' + file
173                self.ftp.retrbinary(cmd, fp.write)
174            if delete:
175                self.ftp.delete(file)
176            os.rename(file + '.tmp', file)
177        except ftplib.error_perm as reason:
178            raise URLError('ftp error: %r' % reason).with_traceback(
179                sys.exc_info()[2])
180
181    def close(self):
182        try:
183            self.ftp.close()
184        except ftplib.all_errors:
185            pass
def local_to_global_path(local_file_path, local_ftp_path):
11def local_to_global_path(local_file_path, local_ftp_path):
12    return 'ftp://' + socket.gethostname() + '/' + local_file_path.replace(local_ftp_path, '')
def ftp_fetch(url, local_ftp_path='/srv/ftp/', memory=True, delete=False):
15def ftp_fetch(url, local_ftp_path='/srv/ftp/', memory=True, delete=False):
16    scheme, host, path, _, _, _ = urlparse(url)
17    if scheme != 'ftp':
18        raise OSError('ftp error: wrong URL, expect ftp://')
19    if not host:
20        raise OSError('ftp error: no host given')
21    if not path:
22        raise OSError('ftp error: no file path given')
23
24    dirs = path.split('/')
25    dirs = list(map(unquote, dirs))
26    dirs, file = dirs[:-1], dirs[-1]
27    if dirs and not dirs[0]:
28        dirs = dirs[1:]
29    dir = '/'.join(dirs)
30    os.chdir(local_ftp_path + dir)
31    handler = CacheFTPHandler()
32    if memory:
33        return handler.ftp_open(host, dir, file, memory, delete)
34
35    handler.ftp_open(host, dir, file, memory, delete)
36    return local_ftp_path + dir + '/' + file
def splituser(host):
39def splituser(host):
40    """splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]'."""
41    user, delim, host = host.rpartition('@')
42    return (user if delim else None), host

splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]'.

def splitpasswd(user):
45def splitpasswd(user):
46    """splitpasswd('user:passwd') -> 'user', 'passwd'."""
47    user, delim, passwd = user.partition(':')
48    return user, (passwd if delim else None)

splitpasswd('user:passwd') -> 'user', 'passwd'.

class FTPHandler:
52class FTPHandler:
53    def ftp_open(self, host, dir, file, memory=True, delete=False):
54        # username/password handling
55        user, host = splituser(host)
56        if user:
57            user, passwd = splitpasswd(user)
58        else:
59            passwd = None
60        host = unquote(host)
61        user = user or ''
62        passwd = passwd or ''
63
64        try:
65            host = socket.gethostbyname(host)
66        except OSError as msg:
67            raise URLError(msg)
68
69        try:
70            fw = self.connect_ftp(user, passwd, host, dir, socket._GLOBAL_DEFAULT_TIMEOUT)
71            if memory:
72                return fw.retrmemory(file, delete)
73            fw.retrfile(file, delete)
74        except ftplib.all_errors as exp:
75            exc = URLError('ftp error: %r' % exp)
76            raise exc.with_traceback(sys.exc_info()[2])
77
78    def connect_ftp(self, user, passwd, host, dir, timeout):
79        return ftpwrapper(user, passwd, host, dir, timeout)
def ftp_open(self, host, dir, file, memory=True, delete=False):
53    def ftp_open(self, host, dir, file, memory=True, delete=False):
54        # username/password handling
55        user, host = splituser(host)
56        if user:
57            user, passwd = splitpasswd(user)
58        else:
59            passwd = None
60        host = unquote(host)
61        user = user or ''
62        passwd = passwd or ''
63
64        try:
65            host = socket.gethostbyname(host)
66        except OSError as msg:
67            raise URLError(msg)
68
69        try:
70            fw = self.connect_ftp(user, passwd, host, dir, socket._GLOBAL_DEFAULT_TIMEOUT)
71            if memory:
72                return fw.retrmemory(file, delete)
73            fw.retrfile(file, delete)
74        except ftplib.all_errors as exp:
75            exc = URLError('ftp error: %r' % exp)
76            raise exc.with_traceback(sys.exc_info()[2])
def connect_ftp(self, user, passwd, host, dir, timeout):
78    def connect_ftp(self, user, passwd, host, dir, timeout):
79        return ftpwrapper(user, passwd, host, dir, timeout)
class CacheFTPHandler(FTPHandler):
 82class CacheFTPHandler(FTPHandler):
 83    # XXX would be nice to have pluggable cache strategies
 84    # XXX this stuff is definitely not thread safe
 85    def __init__(self):
 86        self.cache = {}
 87        self.timeout = {}
 88        self.soonest = 0
 89        self.delay = 60
 90        self.max_conns = 16
 91
 92    def setTimeout(self, t):
 93        self.delay = t
 94
 95    def setMaxConns(self, m):
 96        self.max_conns = m
 97
 98    def connect_ftp(self, user, passwd, host, dirs, timeout):
 99        key = user, host, '/'.join(dirs), timeout
100        if key in self.cache:
101            self.timeout[key] = time.time() + self.delay
102        else:
103            self.cache[key] = ftpwrapper(user, passwd, host,
104                                         dirs, timeout)
105            self.timeout[key] = time.time() + self.delay
106        self.check_cache()
107        return self.cache[key]
108
109    def check_cache(self):
110        # first check for old ones
111        t = time.time()
112        if self.soonest <= t:
113            for k, v in list(self.timeout.items()):
114                if v < t:
115                    self.cache[k].close()
116                    del self.cache[k]
117                    del self.timeout[k]
118        self.soonest = min(list(self.timeout.values()))
119
120        # then check the size
121        if len(self.cache) == self.max_conns:
122            for k, v in list(self.timeout.items()):
123                if v == self.soonest:
124                    del self.cache[k]
125                    del self.timeout[k]
126                    break
127            self.soonest = min(list(self.timeout.values()))
128
129    def clear_cache(self):
130        for conn in self.cache.values():
131            conn.close()
132        self.cache.clear()
133        self.timeout.clear()
cache
timeout
soonest
delay
max_conns
def setTimeout(self, t):
92    def setTimeout(self, t):
93        self.delay = t
def setMaxConns(self, m):
95    def setMaxConns(self, m):
96        self.max_conns = m
def connect_ftp(self, user, passwd, host, dirs, timeout):
 98    def connect_ftp(self, user, passwd, host, dirs, timeout):
 99        key = user, host, '/'.join(dirs), timeout
100        if key in self.cache:
101            self.timeout[key] = time.time() + self.delay
102        else:
103            self.cache[key] = ftpwrapper(user, passwd, host,
104                                         dirs, timeout)
105            self.timeout[key] = time.time() + self.delay
106        self.check_cache()
107        return self.cache[key]
def check_cache(self):
109    def check_cache(self):
110        # first check for old ones
111        t = time.time()
112        if self.soonest <= t:
113            for k, v in list(self.timeout.items()):
114                if v < t:
115                    self.cache[k].close()
116                    del self.cache[k]
117                    del self.timeout[k]
118        self.soonest = min(list(self.timeout.values()))
119
120        # then check the size
121        if len(self.cache) == self.max_conns:
122            for k, v in list(self.timeout.items()):
123                if v == self.soonest:
124                    del self.cache[k]
125                    del self.timeout[k]
126                    break
127            self.soonest = min(list(self.timeout.values()))
def clear_cache(self):
129    def clear_cache(self):
130        for conn in self.cache.values():
131            conn.close()
132        self.cache.clear()
133        self.timeout.clear()
Inherited Members
FTPHandler
ftp_open
class ftpwrapper:
136class ftpwrapper:
137    """Class used by open_ftp() for cache of open FTP connections."""
138
139    def __init__(self, user, passwd, host, dir, timeout=None):
140        self.user = user
141        self.passwd = passwd
142        self.host = host
143        self.dir = dir
144        self.timeout = timeout
145        try:
146            self.ftp = ftplib.FTP(self.host, timeout=self.timeout)
147            self.ftp.set_pasv(False)
148            self.ftp.login(self.user, self.passwd)
149            self.ftp.cwd(self.dir)
150        except ftplib.error_perm as reason:
151            raise URLError('ftp error: %r' % reason).with_traceback(
152                sys.exc_info()[2])
153
154    def retrmemory(self, file, delete=False):
155        data = []
156        def read_block(block):
157            data.append(block)
158        try:
159            self.ftp.voidcmd('TYPE I')
160            cmd = 'RETR ' + file
161            self.ftp.retrbinary(cmd, read_block)
162            if delete:
163                self.ftp.delete(file)
164            return b''.join(data)
165        except ftplib.error_perm as reason:
166            raise URLError('ftp error: %r' % reason).with_traceback(
167                sys.exc_info()[2])
168
169    def retrfile(self, file, delete=False):
170        try:
171            self.ftp.voidcmd('TYPE I')
172            with open(file + '.tmp', 'wb') as fp:
173                cmd = 'RETR ' + file
174                self.ftp.retrbinary(cmd, fp.write)
175            if delete:
176                self.ftp.delete(file)
177            os.rename(file + '.tmp', file)
178        except ftplib.error_perm as reason:
179            raise URLError('ftp error: %r' % reason).with_traceback(
180                sys.exc_info()[2])
181
182    def close(self):
183        try:
184            self.ftp.close()
185        except ftplib.all_errors:
186            pass

Class used by open_ftp() for cache of open FTP connections.

ftpwrapper(user, passwd, host, dir, timeout=None)
139    def __init__(self, user, passwd, host, dir, timeout=None):
140        self.user = user
141        self.passwd = passwd
142        self.host = host
143        self.dir = dir
144        self.timeout = timeout
145        try:
146            self.ftp = ftplib.FTP(self.host, timeout=self.timeout)
147            self.ftp.set_pasv(False)
148            self.ftp.login(self.user, self.passwd)
149            self.ftp.cwd(self.dir)
150        except ftplib.error_perm as reason:
151            raise URLError('ftp error: %r' % reason).with_traceback(
152                sys.exc_info()[2])
user
passwd
host
dir
timeout
def retrmemory(self, file, delete=False):
154    def retrmemory(self, file, delete=False):
155        data = []
156        def read_block(block):
157            data.append(block)
158        try:
159            self.ftp.voidcmd('TYPE I')
160            cmd = 'RETR ' + file
161            self.ftp.retrbinary(cmd, read_block)
162            if delete:
163                self.ftp.delete(file)
164            return b''.join(data)
165        except ftplib.error_perm as reason:
166            raise URLError('ftp error: %r' % reason).with_traceback(
167                sys.exc_info()[2])
def retrfile(self, file, delete=False):
169    def retrfile(self, file, delete=False):
170        try:
171            self.ftp.voidcmd('TYPE I')
172            with open(file + '.tmp', 'wb') as fp:
173                cmd = 'RETR ' + file
174                self.ftp.retrbinary(cmd, fp.write)
175            if delete:
176                self.ftp.delete(file)
177            os.rename(file + '.tmp', file)
178        except ftplib.error_perm as reason:
179            raise URLError('ftp error: %r' % reason).with_traceback(
180                sys.exc_info()[2])
def close(self):
182    def close(self):
183        try:
184            self.ftp.close()
185        except ftplib.all_errors:
186            pass