edgeserve.worker

 1import sys
 2import pulsar
 3from _pulsar import InitialPosition
 4
 5from edgeserve.util import ftp_fetch
 6from edgeserve.message_format import GraphCodec
 7
 8
 9class Worker:
10    def __init__(self, pulsar_node, topic='code', ftp=False, ftp_memory=True, local_ftp_path='/srv/ftp/'):
11        self.client = pulsar.Client(pulsar_node)
12        self.consumer = self.client.subscribe(topic, subscription_name='worker-sub',
13                                              schema=pulsar.schema.BytesSchema(),
14                                              initial_position=InitialPosition.Earliest)
15        self.gate = lambda x: x.decode('utf-8')
16        self.ftp = ftp
17        self.local_ftp_path = local_ftp_path
18        self.ftp_memory = ftp_memory
19        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
20
21    def __enter__(self):
22        return self
23
24    def __exit__(self, exc_type, exc_val, exc_tb):
25        self.client.close()
26
27    def __iter__(self):
28        return self
29
30    def __next__(self):
31        msg = self.consumer.receive()
32        msg_in_uuid, op_from, _, payload = self.graph_codec.decode(msg.value())
33        data = self.gate(payload)
34
35        if self.ftp and not self.ftp_memory:  # FTP file mode
36            # download the file from FTP server and then delete the file from server
37            local_file_path = ftp_fetch(data, self.local_ftp_path, memory=False, delete=False)
38            data = open(local_file_path).read()
39        else:
40            if self.ftp:  # FTP memory mode
41                data = ftp_fetch(data, self.local_ftp_path, memory=True, delete=False)
42
43        exec(data)
44        self.consumer.acknowledge(msg)
45
46
47if __name__ == "__main__":
48    # Example: python worker.py pulsar://localhost:6650 code-ftp true false /srv/ftp/
49    node = sys.argv[1]
50    topic = sys.argv[2] if len(sys.argv) >= 3 else 'code'
51    ftp = sys.argv[3] in ['true', 'True'] if len(sys.argv) >= 4 else False
52    ftp_memory = sys.argv[4] in ['true', 'True'] if len(sys.argv) >= 5 else True
53    local_ftp_path = sys.argv[5] if len(sys.argv) >= 6 else '/srv/ftp/'
54    with Worker(node, topic, ftp, ftp_memory, local_ftp_path) as worker:
55        while True:
56            next(worker)
class Worker:
10class Worker:
11    def __init__(self, pulsar_node, topic='code', ftp=False, ftp_memory=True, local_ftp_path='/srv/ftp/'):
12        self.client = pulsar.Client(pulsar_node)
13        self.consumer = self.client.subscribe(topic, subscription_name='worker-sub',
14                                              schema=pulsar.schema.BytesSchema(),
15                                              initial_position=InitialPosition.Earliest)
16        self.gate = lambda x: x.decode('utf-8')
17        self.ftp = ftp
18        self.local_ftp_path = local_ftp_path
19        self.ftp_memory = ftp_memory
20        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
21
22    def __enter__(self):
23        return self
24
25    def __exit__(self, exc_type, exc_val, exc_tb):
26        self.client.close()
27
28    def __iter__(self):
29        return self
30
31    def __next__(self):
32        msg = self.consumer.receive()
33        msg_in_uuid, op_from, _, payload = self.graph_codec.decode(msg.value())
34        data = self.gate(payload)
35
36        if self.ftp and not self.ftp_memory:  # FTP file mode
37            # download the file from FTP server and then delete the file from server
38            local_file_path = ftp_fetch(data, self.local_ftp_path, memory=False, delete=False)
39            data = open(local_file_path).read()
40        else:
41            if self.ftp:  # FTP memory mode
42                data = ftp_fetch(data, self.local_ftp_path, memory=True, delete=False)
43
44        exec(data)
45        self.consumer.acknowledge(msg)
Worker( pulsar_node, topic='code', ftp=False, ftp_memory=True, local_ftp_path='/srv/ftp/')
11    def __init__(self, pulsar_node, topic='code', ftp=False, ftp_memory=True, local_ftp_path='/srv/ftp/'):
12        self.client = pulsar.Client(pulsar_node)
13        self.consumer = self.client.subscribe(topic, subscription_name='worker-sub',
14                                              schema=pulsar.schema.BytesSchema(),
15                                              initial_position=InitialPosition.Earliest)
16        self.gate = lambda x: x.decode('utf-8')
17        self.ftp = ftp
18        self.local_ftp_path = local_ftp_path
19        self.ftp_memory = ftp_memory
20        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
client
consumer
gate
ftp
local_ftp_path
ftp_memory
graph_codec