edgeserve.materialize
1import os 2import pickle 3import time 4import pulsar 5from _pulsar import InitialPosition 6 7from edgeserve.util import ftp_fetch, local_to_global_path 8from edgeserve.message_format import GraphCodec 9 10 11class Materialize: 12 def __init__(self, materialize, pulsar_node, gate=None, ftp=False, ftp_delete=False, 13 local_ftp_path='/srv/ftp/', topic='dst', log_path=None, log_filename=None): 14 self.client = pulsar.Client(pulsar_node) 15 self.consumer = self.client.subscribe(topic, subscription_name='my-sub', 16 schema=pulsar.schema.BytesSchema(), 17 initial_position=InitialPosition.Earliest) 18 self.materialize = materialize 19 self.gate = (lambda x: x) if gate is None else gate 20 self.ftp = ftp 21 self.local_ftp_path = local_ftp_path 22 self.ftp_delete = ftp_delete 23 self.log_path = log_path 24 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 25 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0) 26 27 def __enter__(self): 28 return self 29 30 def __exit__(self, exc_type, exc_val, exc_tb): 31 self.client.close() 32 33 def __iter__(self): 34 return self 35 36 def __next__(self): 37 msg = self.consumer.receive() 38 msg_uuid, op_from, _, payload = self.graph_codec.decode(msg.value()) 39 data = self.gate(payload) 40 41 if self.ftp: 42 # download the file from FTP server and then delete the file from server 43 local_file_path = ftp_fetch(data, self.local_ftp_path, memory=False, delete=self.ftp_delete) 44 self.materialize(local_file_path) 45 global_file_path = local_to_global_path(local_file_path, self.local_ftp_path) 46 self.consumer.acknowledge(msg) 47 return global_file_path 48 49 data_collection_time_ms = time.time() * 1000 50 output = self.materialize(data) 51 task_finish_time_ms = time.time() * 1000 52 53 # If log_path is not None, we write timestamps to a log file. 54 if self.log_path and os.path.isdir(self.log_path): 55 replay_log = {'msg_uuid': msg_uuid, 56 'op_from': op_from, 57 'payload': payload, 58 'data_collection_time_ms': data_collection_time_ms, 59 'task_finish_time_ms': task_finish_time_ms} 60 with open(os.path.join(self.log_path, self.log_filename + '.destination'), 'ab') as f: 61 pickle.dump(replay_log, f) 62 63 self.consumer.acknowledge(msg) 64 return output
class
Materialize:
12class Materialize: 13 def __init__(self, materialize, pulsar_node, gate=None, ftp=False, ftp_delete=False, 14 local_ftp_path='/srv/ftp/', topic='dst', log_path=None, log_filename=None): 15 self.client = pulsar.Client(pulsar_node) 16 self.consumer = self.client.subscribe(topic, subscription_name='my-sub', 17 schema=pulsar.schema.BytesSchema(), 18 initial_position=InitialPosition.Earliest) 19 self.materialize = materialize 20 self.gate = (lambda x: x) if gate is None else gate 21 self.ftp = ftp 22 self.local_ftp_path = local_ftp_path 23 self.ftp_delete = ftp_delete 24 self.log_path = log_path 25 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 26 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0) 27 28 def __enter__(self): 29 return self 30 31 def __exit__(self, exc_type, exc_val, exc_tb): 32 self.client.close() 33 34 def __iter__(self): 35 return self 36 37 def __next__(self): 38 msg = self.consumer.receive() 39 msg_uuid, op_from, _, payload = self.graph_codec.decode(msg.value()) 40 data = self.gate(payload) 41 42 if self.ftp: 43 # download the file from FTP server and then delete the file from server 44 local_file_path = ftp_fetch(data, self.local_ftp_path, memory=False, delete=self.ftp_delete) 45 self.materialize(local_file_path) 46 global_file_path = local_to_global_path(local_file_path, self.local_ftp_path) 47 self.consumer.acknowledge(msg) 48 return global_file_path 49 50 data_collection_time_ms = time.time() * 1000 51 output = self.materialize(data) 52 task_finish_time_ms = time.time() * 1000 53 54 # If log_path is not None, we write timestamps to a log file. 55 if self.log_path and os.path.isdir(self.log_path): 56 replay_log = {'msg_uuid': msg_uuid, 57 'op_from': op_from, 58 'payload': payload, 59 'data_collection_time_ms': data_collection_time_ms, 60 'task_finish_time_ms': task_finish_time_ms} 61 with open(os.path.join(self.log_path, self.log_filename + '.destination'), 'ab') as f: 62 pickle.dump(replay_log, f) 63 64 self.consumer.acknowledge(msg) 65 return output
Materialize( materialize, pulsar_node, gate=None, ftp=False, ftp_delete=False, local_ftp_path='/srv/ftp/', topic='dst', log_path=None, log_filename=None)
13 def __init__(self, materialize, pulsar_node, gate=None, ftp=False, ftp_delete=False, 14 local_ftp_path='/srv/ftp/', topic='dst', log_path=None, log_filename=None): 15 self.client = pulsar.Client(pulsar_node) 16 self.consumer = self.client.subscribe(topic, subscription_name='my-sub', 17 schema=pulsar.schema.BytesSchema(), 18 initial_position=InitialPosition.Earliest) 19 self.materialize = materialize 20 self.gate = (lambda x: x) if gate is None else gate 21 self.ftp = ftp 22 self.local_ftp_path = local_ftp_path 23 self.ftp_delete = ftp_delete 24 self.log_path = log_path 25 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 26 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)