diff --git a/bin/Global.py b/bin/Global.py index c9daeef8..5cd0edf1 100755 --- a/bin/Global.py +++ b/bin/Global.py @@ -76,5 +76,6 @@ if __name__ == '__main__': with open(filename, 'wb') as f: f.write(base64.standard_b64decode(gzip64encoded)) + print(filename) p.populate_set_out(filename.encode('utf8')) processed_paste+=1 diff --git a/bin/Helper.py b/bin/Helper.py index 38e1b85a..315962bb 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -57,7 +57,8 @@ class PubSub(object): for address in addresses.split(','): new_sub = context.socket(zmq.SUB) new_sub.connect(address) - new_sub.setsockopt_string(zmq.SUBSCRIBE, channel) + # bytes64 encode bytes to ascii only bytes + new_sub.setsockopt(zmq.SUBSCRIBE, channel.encode('ascii')) self.subscribers.append(new_sub) def setup_publish(self, conn_name): @@ -77,14 +78,16 @@ class PubSub(object): self.publishers['ZMQ'].append((p, channel)) def publish(self, message): - m = json.loads(message.decode('utf8')) + m = json.loads(message.decode('ascii')) channel_message = m.get('channel') for p, channel in self.publishers['Redis']: if channel_message is None or channel_message == channel: - p.publish(channel, m['message']) + p.publish(channel, ( m['message']).encode('ascii') ) for p, channel in self.publishers['ZMQ']: if channel_message is None or channel_message == channel: - p.send('{} {}'.format(channel, m['message'])) + mess = ( m['message'] ).encode('ascii') + p.send(b' '.join( [channel, mess] ) ) + def subscribe(self): if self.redis_sub: @@ -158,7 +161,7 @@ class Process(object): if b'.gz' in message: path = message.split(b".")[-2].split(b"/")[-1] #find start of path with AIL_HOME - index_s = (message.decode('utf8')).find(os.environ['AIL_HOME']) + index_s = (message.decode('ascii')).find(os.environ['AIL_HOME']) #Stop when .gz index_e = message.find(b".gz")+3 if(index_s == -1): @@ -167,9 +170,10 @@ class Process(object): complete_path = message[index_s:index_e] else: - path = "?" + path = "-" + complete_path = "?" - value = str(timestamp) + ", " + path.decode('utf8') + value = str(timestamp) + ", " + path.decode('ascii') self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", complete_path) self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) @@ -186,15 +190,14 @@ class Process(object): def populate_set_out(self, msg, channel=None): # multiproc - msg = msg.decode('utf8') + msg = msg.decode('ascii') msg = {'message': msg} if channel is not None: msg.update({'channel': channel}) - # TODO use bytes here ? - #j = (json.dumps(msg)).encode('utf8') - j = json.dumps(msg) - self.r_temp.sadd(self.subscriber_name + 'out', json.dumps(msg)) + # bytes64 encode bytes to ascii only bytes + j = (json.dumps(msg)).encode('ascii') + self.r_temp.sadd(self.subscriber_name + 'out', j) def publish(self): # monoproc