edgeserve.worker
1import sys 2import pulsar 3from _pulsar import InitialPosition 4 5from edgeserve.util import ftp_fetch 6from edgeserve.message_format import GraphCodec 7 8 9class Worker: 10 def __init__(self, pulsar_node, topic='code', ftp=False, ftp_memory=True, local_ftp_path='/srv/ftp/'): 11 self.client = pulsar.Client(pulsar_node) 12 self.consumer = self.client.subscribe(topic, subscription_name='worker-sub', 13 schema=pulsar.schema.BytesSchema(), 14 initial_position=InitialPosition.Earliest) 15 self.gate = lambda x: x.decode('utf-8') 16 self.ftp = ftp 17 self.local_ftp_path = local_ftp_path 18 self.ftp_memory = ftp_memory 19 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0) 20 21 def __enter__(self): 22 return self 23 24 def __exit__(self, exc_type, exc_val, exc_tb): 25 self.client.close() 26 27 def __iter__(self): 28 return self 29 30 def __next__(self): 31 msg = self.consumer.receive() 32 msg_in_uuid, op_from, _, payload = self.graph_codec.decode(msg.value()) 33 data = self.gate(payload) 34 35 if self.ftp and not self.ftp_memory: # FTP file mode 36 # download the file from FTP server and then delete the file from server 37 local_file_path = ftp_fetch(data, self.local_ftp_path, memory=False, delete=False) 38 data = open(local_file_path).read() 39 else: 40 if self.ftp: # FTP memory mode 41 data = ftp_fetch(data, self.local_ftp_path, memory=True, delete=False) 42 43 exec(data) 44 self.consumer.acknowledge(msg) 45 46 47if __name__ == "__main__": 48 # Example: python worker.py pulsar://localhost:6650 code-ftp true false /srv/ftp/ 49 node = sys.argv[1] 50 topic = sys.argv[2] if len(sys.argv) >= 3 else 'code' 51 ftp = sys.argv[3] in ['true', 'True'] if len(sys.argv) >= 4 else False 52 ftp_memory = sys.argv[4] in ['true', 'True'] if len(sys.argv) >= 5 else True 53 local_ftp_path = sys.argv[5] if len(sys.argv) >= 6 else '/srv/ftp/' 54 with Worker(node, topic, ftp, ftp_memory, local_ftp_path) as worker: 55 while True: 56 next(worker)
class
Worker:
10class Worker: 11 def __init__(self, pulsar_node, topic='code', ftp=False, ftp_memory=True, local_ftp_path='/srv/ftp/'): 12 self.client = pulsar.Client(pulsar_node) 13 self.consumer = self.client.subscribe(topic, subscription_name='worker-sub', 14 schema=pulsar.schema.BytesSchema(), 15 initial_position=InitialPosition.Earliest) 16 self.gate = lambda x: x.decode('utf-8') 17 self.ftp = ftp 18 self.local_ftp_path = local_ftp_path 19 self.ftp_memory = ftp_memory 20 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0) 21 22 def __enter__(self): 23 return self 24 25 def __exit__(self, exc_type, exc_val, exc_tb): 26 self.client.close() 27 28 def __iter__(self): 29 return self 30 31 def __next__(self): 32 msg = self.consumer.receive() 33 msg_in_uuid, op_from, _, payload = self.graph_codec.decode(msg.value()) 34 data = self.gate(payload) 35 36 if self.ftp and not self.ftp_memory: # FTP file mode 37 # download the file from FTP server and then delete the file from server 38 local_file_path = ftp_fetch(data, self.local_ftp_path, memory=False, delete=False) 39 data = open(local_file_path).read() 40 else: 41 if self.ftp: # FTP memory mode 42 data = ftp_fetch(data, self.local_ftp_path, memory=True, delete=False) 43 44 exec(data) 45 self.consumer.acknowledge(msg)
Worker( pulsar_node, topic='code', ftp=False, ftp_memory=True, local_ftp_path='/srv/ftp/')
11 def __init__(self, pulsar_node, topic='code', ftp=False, ftp_memory=True, local_ftp_path='/srv/ftp/'): 12 self.client = pulsar.Client(pulsar_node) 13 self.consumer = self.client.subscribe(topic, subscription_name='worker-sub', 14 schema=pulsar.schema.BytesSchema(), 15 initial_position=InitialPosition.Earliest) 16 self.gate = lambda x: x.decode('utf-8') 17 self.ftp = ftp 18 self.local_ftp_path = local_ftp_path 19 self.ftp_memory = ftp_memory 20 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)