edgeserve.materialize

 1import os
 2import pickle
 3import time
 4import pulsar
 5from _pulsar import InitialPosition
 6
 7from edgeserve.util import ftp_fetch, local_to_global_path
 8from edgeserve.message_format import GraphCodec
 9
10
11class Materialize:
12    def __init__(self, materialize, pulsar_node, gate=None, ftp=False, ftp_delete=False,
13                 local_ftp_path='/srv/ftp/', topic='dst', log_path=None, log_filename=None):
14        self.client = pulsar.Client(pulsar_node)
15        self.consumer = self.client.subscribe(topic, subscription_name='my-sub',
16                                              schema=pulsar.schema.BytesSchema(),
17                                              initial_position=InitialPosition.Earliest)
18        self.materialize = materialize
19        self.gate = (lambda x: x) if gate is None else gate
20        self.ftp = ftp
21        self.local_ftp_path = local_ftp_path
22        self.ftp_delete = ftp_delete
23        self.log_path = log_path
24        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
25        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
26
27    def __enter__(self):
28        return self
29
30    def __exit__(self, exc_type, exc_val, exc_tb):
31        self.client.close()
32
33    def __iter__(self):
34        return self
35
36    def __next__(self):
37        msg = self.consumer.receive()
38        msg_uuid, op_from, _, payload = self.graph_codec.decode(msg.value())
39        data = self.gate(payload)
40
41        if self.ftp:
42            # download the file from FTP server and then delete the file from server
43            local_file_path = ftp_fetch(data, self.local_ftp_path, memory=False, delete=self.ftp_delete)
44            self.materialize(local_file_path)
45            global_file_path = local_to_global_path(local_file_path, self.local_ftp_path)
46            self.consumer.acknowledge(msg)
47            return global_file_path
48
49        data_collection_time_ms = time.time() * 1000
50        output = self.materialize(data)
51        task_finish_time_ms = time.time() * 1000
52
53        # If log_path is not None, we write timestamps to a log file.
54        if self.log_path and os.path.isdir(self.log_path):
55            replay_log = {'msg_uuid': msg_uuid,
56                          'op_from': op_from,
57                          'payload': payload,
58                          'data_collection_time_ms': data_collection_time_ms,
59                          'task_finish_time_ms': task_finish_time_ms}
60            with open(os.path.join(self.log_path, self.log_filename + '.destination'), 'ab') as f:
61                pickle.dump(replay_log, f)
62
63        self.consumer.acknowledge(msg)
64        return output
class Materialize:
12class Materialize:
13    def __init__(self, materialize, pulsar_node, gate=None, ftp=False, ftp_delete=False,
14                 local_ftp_path='/srv/ftp/', topic='dst', log_path=None, log_filename=None):
15        self.client = pulsar.Client(pulsar_node)
16        self.consumer = self.client.subscribe(topic, subscription_name='my-sub',
17                                              schema=pulsar.schema.BytesSchema(),
18                                              initial_position=InitialPosition.Earliest)
19        self.materialize = materialize
20        self.gate = (lambda x: x) if gate is None else gate
21        self.ftp = ftp
22        self.local_ftp_path = local_ftp_path
23        self.ftp_delete = ftp_delete
24        self.log_path = log_path
25        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
26        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
27
28    def __enter__(self):
29        return self
30
31    def __exit__(self, exc_type, exc_val, exc_tb):
32        self.client.close()
33
34    def __iter__(self):
35        return self
36
37    def __next__(self):
38        msg = self.consumer.receive()
39        msg_uuid, op_from, _, payload = self.graph_codec.decode(msg.value())
40        data = self.gate(payload)
41
42        if self.ftp:
43            # download the file from FTP server and then delete the file from server
44            local_file_path = ftp_fetch(data, self.local_ftp_path, memory=False, delete=self.ftp_delete)
45            self.materialize(local_file_path)
46            global_file_path = local_to_global_path(local_file_path, self.local_ftp_path)
47            self.consumer.acknowledge(msg)
48            return global_file_path
49
50        data_collection_time_ms = time.time() * 1000
51        output = self.materialize(data)
52        task_finish_time_ms = time.time() * 1000
53
54        # If log_path is not None, we write timestamps to a log file.
55        if self.log_path and os.path.isdir(self.log_path):
56            replay_log = {'msg_uuid': msg_uuid,
57                          'op_from': op_from,
58                          'payload': payload,
59                          'data_collection_time_ms': data_collection_time_ms,
60                          'task_finish_time_ms': task_finish_time_ms}
61            with open(os.path.join(self.log_path, self.log_filename + '.destination'), 'ab') as f:
62                pickle.dump(replay_log, f)
63
64        self.consumer.acknowledge(msg)
65        return output
Materialize( materialize, pulsar_node, gate=None, ftp=False, ftp_delete=False, local_ftp_path='/srv/ftp/', topic='dst', log_path=None, log_filename=None)
13    def __init__(self, materialize, pulsar_node, gate=None, ftp=False, ftp_delete=False,
14                 local_ftp_path='/srv/ftp/', topic='dst', log_path=None, log_filename=None):
15        self.client = pulsar.Client(pulsar_node)
16        self.consumer = self.client.subscribe(topic, subscription_name='my-sub',
17                                              schema=pulsar.schema.BytesSchema(),
18                                              initial_position=InitialPosition.Earliest)
19        self.materialize = materialize
20        self.gate = (lambda x: x) if gate is None else gate
21        self.ftp = ftp
22        self.local_ftp_path = local_ftp_path
23        self.ftp_delete = ftp_delete
24        self.log_path = log_path
25        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
26        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
client
consumer
materialize
gate
ftp
local_ftp_path
ftp_delete
log_path
log_filename
graph_codec