edgeserve.compute
1import os 2import time 3import uuid 4 5import pulsar 6import pickle 7import pathlib 8from _pulsar import InitialPosition, ConsumerType 9from inspect import signature 10 11from edgeserve.util import ftp_fetch, local_to_global_path 12from edgeserve.message_format import GraphCodec 13 14 15class Compute: 16 def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, 17 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', 18 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, 19 drop_if_older_than_ms=None, log_verbose=False): 20 """Initializes a compute operator. 21 22 Args: 23 task: The actual task to be performed. 24 pulsar_node: Address of Apache Pulsar server. 25 worker_id: The unique identifier of this operator. 26 gate_in: The gating function applied to input stream. 27 gate_out: The gating function applied to output stream. 28 ftp: When set to `True`, lazy routing mode is enabled. 29 ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`. 30 ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`. 31 local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path. 32 topic_in: Pulsar topic of the input data stream. 33 topic_out: Pulsar topic of the output data stream. 34 max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together. 35 no_overlap: When set to `True`, we ensure that every message is at most processed once. 36 min_interval_ms: The minimum time interval between two consecutive runs. 37 log_path: Path to store the replay log. When set to `None`, log is disabled. 38 log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name. 39 drop_if_older_than_ms: When set to a value, messages older than this value relative to current time are dropped. 40 log_verbose: When set to `False`, we only log in the replay log when a join operation is performed. 41 """ 42 self.client = pulsar.Client(pulsar_node) 43 self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema()) 44 self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub', 45 consumer_type=ConsumerType.Shared, 46 schema=pulsar.schema.BytesSchema(), 47 initial_position=InitialPosition.Earliest) 48 self.task = task 49 self.worker_id = worker_id 50 self.gate_in = (lambda x: x) if gate_in is None else gate_in 51 self.gate_out = (lambda x: x) if gate_out is None else gate_out 52 self.ftp = ftp # consider changing this name to ftp_in 53 self.local_ftp_path = local_ftp_path 54 self.ftp_memory = ftp_memory # consider changing this name to (negate) ftp_out 55 self.ftp_delete = ftp_delete 56 self.latest_msg = dict() 57 self.latest_msg_in_uuid = dict() 58 self.latest_msg_publish_time_ms = dict() 59 self.latest_msg_consumed_time_ms = dict() 60 self.max_time_diff_ms = max_time_diff_ms 61 self.no_overlap = no_overlap 62 self.min_interval_ms = min_interval_ms # prediction frequency 63 self.last_run_start_ms = 0 64 self.last_run_finish_ms = 0 65 self.log_path = log_path 66 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 67 self.drop_if_older_than_ms = drop_if_older_than_ms 68 self.log_verbose = log_verbose 69 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0) 70 71 def __enter__(self): 72 return self 73 74 def __exit__(self, exc_type, exc_val, exc_tb): 75 self.client.close() 76 77 def _try_task(self): 78 # Avoid running too frequently for expensive tasks 79 if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms: 80 return False, None, None 81 82 if len(self.latest_msg) < len(signature(self.task).parameters): 83 return False, None, None 84 85 earliest = None 86 latest = None 87 for source_id in self.latest_msg.keys(): 88 if earliest is None or self.latest_msg_publish_time_ms[source_id] < earliest: 89 earliest = self.latest_msg_publish_time_ms[source_id] 90 if latest is None or self.latest_msg_publish_time_ms[source_id] > latest: 91 latest = self.latest_msg_publish_time_ms[source_id] 92 if latest - earliest > self.max_time_diff_ms: 93 return False, None, None 94 self.last_run_start_ms = time.time() * 1000 95 96 # Lazy data routing: only fetch data from FTP counterpart when we actually need it. 97 if self.ftp: 98 for source_id in self.latest_msg.keys(): 99 if 'ftp://' in self.latest_msg[source_id]: 100 local_file_path = ftp_fetch(self.latest_msg[source_id], self.local_ftp_path, memory=self.ftp_memory, delete=self.ftp_delete) 101 self.latest_msg[source_id] = local_file_path 102 103 output = self.task(**self.latest_msg) 104 self.last_run_finish_ms = time.time() * 1000 105 msg_out_uuid = uuid.uuid4() 106 107 # For now, use the completion timestamp as the filename of output FTP file 108 if self.ftp and not self.ftp_memory and output: 109 ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output') 110 pathlib.Path(ftp_output_dir).mkdir(exist_ok=True) 111 with open(os.path.join(ftp_output_dir, str(self.last_run_finish_ms)) + '.ftp', 'w') as f: 112 f.write(output) 113 output = os.path.join(ftp_output_dir, str(self.last_run_finish_ms)) + '.ftp' 114 115 # If no_overlap, reset latest_msg and latest_msg_time_ms so a message won't be processed twice. 116 if self.no_overlap: 117 self.latest_msg = dict() 118 self.latest_msg_in_uuid = dict() 119 self.latest_msg_publish_time_ms = dict() 120 self.latest_msg_consumed_time_ms = dict() 121 122 return True, msg_out_uuid, output 123 124 def __iter__(self): 125 return self 126 127 def __next__(self): 128 msg_in = self.consumer.receive() 129 130 if self.drop_if_older_than_ms is not None: 131 assert isinstance(self.drop_if_older_than_ms, int) 132 if msg_in.publish_timestamp() + self.drop_if_older_than_ms < time.time() * 1000: 133 # incoming message is too old, skip it. 134 self.consumer.acknowledge(msg_in) 135 return None 136 137 msg_in_uuid, op_from, _, payload = self.graph_codec.decode(msg_in.value()) 138 139 data = self.gate_in(payload) # path to file if ftp, raw data in bytes otherwise 140 if data is not None: 141 self.latest_msg_publish_time_ms[op_from] = msg_in.publish_timestamp() 142 self.latest_msg_consumed_time_ms[op_from] = time.time() * 1000 143 self.latest_msg[op_from] = data 144 self.latest_msg_in_uuid[op_from] = msg_in_uuid 145 146 if self.ftp and not self.ftp_memory: # FTP file mode 147 # download the file from FTP server and then delete the file from server 148 if not data.startswith('ftp://'): 149 return None 150 self.latest_msg[op_from] = data 151 ret, msg_out_uuid, output = self._try_task() 152 if ret and output: 153 global_file_path = local_to_global_path(output, self.local_ftp_path) 154 output = self.gate_out(global_file_path) 155 else: 156 if self.ftp: # FTP memory mode 157 self.latest_msg[op_from] = data 158 # memory mode 159 ret, msg_out_uuid, output = self._try_task() 160 if ret and output: 161 output = self.gate_out(output) 162 163 # If log_path is not None, we write aggregation decisions to a log file. 164 if self.log_path and os.path.isdir(self.log_path): 165 if ret or self.log_verbose: 166 replay_log = {'msg_in_uuid': self.latest_msg_in_uuid, 167 'msg_in_payload': self.latest_msg, 168 'msg_out_uuid': msg_out_uuid, 169 'msg_out_payload': output, 170 'msg_publish_time_ms': self.latest_msg_publish_time_ms, 171 'msg_consumed_time_ms': self.latest_msg_consumed_time_ms, 172 'start_compute_time_ms': self.last_run_start_ms, 173 'finish_compute_time_ms': self.last_run_finish_ms, 174 'worker_id': self.worker_id, 175 'is_join_performed': ret} 176 with open(os.path.join(self.log_path, self.log_filename + '.compute'), 'ab') as f: 177 pickle.dump(replay_log, f) 178 179 if output: 180 if self.log_path and os.path.isdir(self.log_path): 181 with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f: 182 pickle.dump(output, f) 183 if type(output) == str: 184 output = output.encode('utf-8') 185 msg_out = self.graph_codec.encode(msg_uuid=msg_out_uuid, op_from=self.worker_id, payload=output) 186 self.producer.send(msg_out) 187 self.consumer.acknowledge(msg_in) 188 return output 189 else: 190 # No output is given, no need to materialize 191 self.consumer.acknowledge(msg_in) 192 return None
class
Compute:
16class Compute: 17 def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, 18 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', 19 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, 20 drop_if_older_than_ms=None, log_verbose=False): 21 """Initializes a compute operator. 22 23 Args: 24 task: The actual task to be performed. 25 pulsar_node: Address of Apache Pulsar server. 26 worker_id: The unique identifier of this operator. 27 gate_in: The gating function applied to input stream. 28 gate_out: The gating function applied to output stream. 29 ftp: When set to `True`, lazy routing mode is enabled. 30 ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`. 31 ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`. 32 local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path. 33 topic_in: Pulsar topic of the input data stream. 34 topic_out: Pulsar topic of the output data stream. 35 max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together. 36 no_overlap: When set to `True`, we ensure that every message is at most processed once. 37 min_interval_ms: The minimum time interval between two consecutive runs. 38 log_path: Path to store the replay log. When set to `None`, log is disabled. 39 log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name. 40 drop_if_older_than_ms: When set to a value, messages older than this value relative to current time are dropped. 41 log_verbose: When set to `False`, we only log in the replay log when a join operation is performed. 42 """ 43 self.client = pulsar.Client(pulsar_node) 44 self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema()) 45 self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub', 46 consumer_type=ConsumerType.Shared, 47 schema=pulsar.schema.BytesSchema(), 48 initial_position=InitialPosition.Earliest) 49 self.task = task 50 self.worker_id = worker_id 51 self.gate_in = (lambda x: x) if gate_in is None else gate_in 52 self.gate_out = (lambda x: x) if gate_out is None else gate_out 53 self.ftp = ftp # consider changing this name to ftp_in 54 self.local_ftp_path = local_ftp_path 55 self.ftp_memory = ftp_memory # consider changing this name to (negate) ftp_out 56 self.ftp_delete = ftp_delete 57 self.latest_msg = dict() 58 self.latest_msg_in_uuid = dict() 59 self.latest_msg_publish_time_ms = dict() 60 self.latest_msg_consumed_time_ms = dict() 61 self.max_time_diff_ms = max_time_diff_ms 62 self.no_overlap = no_overlap 63 self.min_interval_ms = min_interval_ms # prediction frequency 64 self.last_run_start_ms = 0 65 self.last_run_finish_ms = 0 66 self.log_path = log_path 67 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 68 self.drop_if_older_than_ms = drop_if_older_than_ms 69 self.log_verbose = log_verbose 70 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0) 71 72 def __enter__(self): 73 return self 74 75 def __exit__(self, exc_type, exc_val, exc_tb): 76 self.client.close() 77 78 def _try_task(self): 79 # Avoid running too frequently for expensive tasks 80 if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms: 81 return False, None, None 82 83 if len(self.latest_msg) < len(signature(self.task).parameters): 84 return False, None, None 85 86 earliest = None 87 latest = None 88 for source_id in self.latest_msg.keys(): 89 if earliest is None or self.latest_msg_publish_time_ms[source_id] < earliest: 90 earliest = self.latest_msg_publish_time_ms[source_id] 91 if latest is None or self.latest_msg_publish_time_ms[source_id] > latest: 92 latest = self.latest_msg_publish_time_ms[source_id] 93 if latest - earliest > self.max_time_diff_ms: 94 return False, None, None 95 self.last_run_start_ms = time.time() * 1000 96 97 # Lazy data routing: only fetch data from FTP counterpart when we actually need it. 98 if self.ftp: 99 for source_id in self.latest_msg.keys(): 100 if 'ftp://' in self.latest_msg[source_id]: 101 local_file_path = ftp_fetch(self.latest_msg[source_id], self.local_ftp_path, memory=self.ftp_memory, delete=self.ftp_delete) 102 self.latest_msg[source_id] = local_file_path 103 104 output = self.task(**self.latest_msg) 105 self.last_run_finish_ms = time.time() * 1000 106 msg_out_uuid = uuid.uuid4() 107 108 # For now, use the completion timestamp as the filename of output FTP file 109 if self.ftp and not self.ftp_memory and output: 110 ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output') 111 pathlib.Path(ftp_output_dir).mkdir(exist_ok=True) 112 with open(os.path.join(ftp_output_dir, str(self.last_run_finish_ms)) + '.ftp', 'w') as f: 113 f.write(output) 114 output = os.path.join(ftp_output_dir, str(self.last_run_finish_ms)) + '.ftp' 115 116 # If no_overlap, reset latest_msg and latest_msg_time_ms so a message won't be processed twice. 117 if self.no_overlap: 118 self.latest_msg = dict() 119 self.latest_msg_in_uuid = dict() 120 self.latest_msg_publish_time_ms = dict() 121 self.latest_msg_consumed_time_ms = dict() 122 123 return True, msg_out_uuid, output 124 125 def __iter__(self): 126 return self 127 128 def __next__(self): 129 msg_in = self.consumer.receive() 130 131 if self.drop_if_older_than_ms is not None: 132 assert isinstance(self.drop_if_older_than_ms, int) 133 if msg_in.publish_timestamp() + self.drop_if_older_than_ms < time.time() * 1000: 134 # incoming message is too old, skip it. 135 self.consumer.acknowledge(msg_in) 136 return None 137 138 msg_in_uuid, op_from, _, payload = self.graph_codec.decode(msg_in.value()) 139 140 data = self.gate_in(payload) # path to file if ftp, raw data in bytes otherwise 141 if data is not None: 142 self.latest_msg_publish_time_ms[op_from] = msg_in.publish_timestamp() 143 self.latest_msg_consumed_time_ms[op_from] = time.time() * 1000 144 self.latest_msg[op_from] = data 145 self.latest_msg_in_uuid[op_from] = msg_in_uuid 146 147 if self.ftp and not self.ftp_memory: # FTP file mode 148 # download the file from FTP server and then delete the file from server 149 if not data.startswith('ftp://'): 150 return None 151 self.latest_msg[op_from] = data 152 ret, msg_out_uuid, output = self._try_task() 153 if ret and output: 154 global_file_path = local_to_global_path(output, self.local_ftp_path) 155 output = self.gate_out(global_file_path) 156 else: 157 if self.ftp: # FTP memory mode 158 self.latest_msg[op_from] = data 159 # memory mode 160 ret, msg_out_uuid, output = self._try_task() 161 if ret and output: 162 output = self.gate_out(output) 163 164 # If log_path is not None, we write aggregation decisions to a log file. 165 if self.log_path and os.path.isdir(self.log_path): 166 if ret or self.log_verbose: 167 replay_log = {'msg_in_uuid': self.latest_msg_in_uuid, 168 'msg_in_payload': self.latest_msg, 169 'msg_out_uuid': msg_out_uuid, 170 'msg_out_payload': output, 171 'msg_publish_time_ms': self.latest_msg_publish_time_ms, 172 'msg_consumed_time_ms': self.latest_msg_consumed_time_ms, 173 'start_compute_time_ms': self.last_run_start_ms, 174 'finish_compute_time_ms': self.last_run_finish_ms, 175 'worker_id': self.worker_id, 176 'is_join_performed': ret} 177 with open(os.path.join(self.log_path, self.log_filename + '.compute'), 'ab') as f: 178 pickle.dump(replay_log, f) 179 180 if output: 181 if self.log_path and os.path.isdir(self.log_path): 182 with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f: 183 pickle.dump(output, f) 184 if type(output) == str: 185 output = output.encode('utf-8') 186 msg_out = self.graph_codec.encode(msg_uuid=msg_out_uuid, op_from=self.worker_id, payload=output) 187 self.producer.send(msg_out) 188 self.consumer.acknowledge(msg_in) 189 return output 190 else: 191 # No output is given, no need to materialize 192 self.consumer.acknowledge(msg_in) 193 return None
Compute( task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', max_time_diff_ms=10000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, drop_if_older_than_ms=None, log_verbose=False)
17 def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, 18 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', 19 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, 20 drop_if_older_than_ms=None, log_verbose=False): 21 """Initializes a compute operator. 22 23 Args: 24 task: The actual task to be performed. 25 pulsar_node: Address of Apache Pulsar server. 26 worker_id: The unique identifier of this operator. 27 gate_in: The gating function applied to input stream. 28 gate_out: The gating function applied to output stream. 29 ftp: When set to `True`, lazy routing mode is enabled. 30 ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`. 31 ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`. 32 local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path. 33 topic_in: Pulsar topic of the input data stream. 34 topic_out: Pulsar topic of the output data stream. 35 max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together. 36 no_overlap: When set to `True`, we ensure that every message is at most processed once. 37 min_interval_ms: The minimum time interval between two consecutive runs. 38 log_path: Path to store the replay log. When set to `None`, log is disabled. 39 log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name. 40 drop_if_older_than_ms: When set to a value, messages older than this value relative to current time are dropped. 41 log_verbose: When set to `False`, we only log in the replay log when a join operation is performed. 42 """ 43 self.client = pulsar.Client(pulsar_node) 44 self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema()) 45 self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub', 46 consumer_type=ConsumerType.Shared, 47 schema=pulsar.schema.BytesSchema(), 48 initial_position=InitialPosition.Earliest) 49 self.task = task 50 self.worker_id = worker_id 51 self.gate_in = (lambda x: x) if gate_in is None else gate_in 52 self.gate_out = (lambda x: x) if gate_out is None else gate_out 53 self.ftp = ftp # consider changing this name to ftp_in 54 self.local_ftp_path = local_ftp_path 55 self.ftp_memory = ftp_memory # consider changing this name to (negate) ftp_out 56 self.ftp_delete = ftp_delete 57 self.latest_msg = dict() 58 self.latest_msg_in_uuid = dict() 59 self.latest_msg_publish_time_ms = dict() 60 self.latest_msg_consumed_time_ms = dict() 61 self.max_time_diff_ms = max_time_diff_ms 62 self.no_overlap = no_overlap 63 self.min_interval_ms = min_interval_ms # prediction frequency 64 self.last_run_start_ms = 0 65 self.last_run_finish_ms = 0 66 self.log_path = log_path 67 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 68 self.drop_if_older_than_ms = drop_if_older_than_ms 69 self.log_verbose = log_verbose 70 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
Initializes a compute operator.
Arguments:
- task: The actual task to be performed.
- pulsar_node: Address of Apache Pulsar server.
- worker_id: The unique identifier of this operator.
- gate_in: The gating function applied to input stream.
- gate_out: The gating function applied to output stream.
- ftp: When set to
True
, lazy routing mode is enabled. - ftp_memory: When set to
True
, in-memory lazy routing mode is enabled. Only effective whenftp=True
. - ftp_delete: When set to
True
, delete remote data after fetching is complete. Only effective whenftp=True
. - local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
- topic_in: Pulsar topic of the input data stream.
- topic_out: Pulsar topic of the output data stream.
- max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
- no_overlap: When set to
True
, we ensure that every message is at most processed once. - min_interval_ms: The minimum time interval between two consecutive runs.
- log_path: Path to store the replay log. When set to
None
, log is disabled. - log_filename: File name of replay log. When set to
None
, the current timestamp is used as file name. - drop_if_older_than_ms: When set to a value, messages older than this value relative to current time are dropped.
- log_verbose: When set to
False
, we only log in the replay log when a join operation is performed.