#!/usr/bin/env python3 # -*-coding:UTF-8 -* """ Abstract Chat JSON Feeder Importer Module ================ Process Feeder Json (example: Twitter feeder) """ import datetime import os import sys from abc import ABC sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## from importer.feeders.Default import DefaultFeeder from lib.objects.Chats import Chat from lib.objects import ChatSubChannels from lib.objects import ChatThreads from lib.objects import Images from lib.objects import Messages from lib.objects import FilesNames # from lib.objects import Files from lib.objects import UsersAccount from lib.objects.Usernames import Username from lib import chats_viewer import base64 import io import gzip # TODO remove compression ??? def _gunzip_bytes_obj(bytes_obj): gunzipped_bytes_obj = None try: in_ = io.BytesIO() in_.write(bytes_obj) in_.seek(0) with gzip.GzipFile(fileobj=in_, mode='rb') as fo: gunzipped_bytes_obj = fo.read() except Exception as e: print(f'Global; Invalid Gzip file: {e}') return gunzipped_bytes_obj class AbstractChatFeeder(DefaultFeeder, ABC): def __init__(self, name, json_data): super().__init__(json_data) self.obj = None self.name = name def get_chat_protocol(self): # TODO # # # # # # # # # # # # # return self.name def get_chat_network(self): self.json_data['meta'].get('network', None) def get_chat_address(self): self.json_data['meta'].get('address', None) def get_chat_instance_uuid(self): chat_instance_uuid = chats_viewer.create_chat_service_instance(self.get_chat_protocol(), network=self.get_chat_network(), address=self.get_chat_address()) # TODO SET return chat_instance_uuid def get_chat_id(self): # TODO RAISE ERROR IF NONE return self.json_data['meta']['chat']['id'] def get_subchannel_id(self): return self.json_data['meta']['chat'].get('subchannel', {}).get('id') def get_subchannels(self): pass def get_thread_id(self): return self.json_data['meta'].get('thread', {}).get('id') def get_message_id(self): return self.json_data['meta']['id'] def get_media_name(self): return self.json_data['meta'].get('media', {}).get('name') def get_reactions(self): return self.json_data['meta'].get('reactions', []) def get_message_timestamp(self): if not self.json_data['meta'].get('date'): return None else: return self.json_data['meta']['date']['timestamp'] # if self.json_data['meta'].get('date'): # date = datetime.datetime.fromtimestamp( self.json_data['meta']['date']['timestamp']) # date = date.strftime('%Y/%m/%d') # else: # date = datetime.date.today().strftime("%Y/%m/%d") def get_message_date_timestamp(self): timestamp = self.get_message_timestamp() date = datetime.datetime.fromtimestamp(timestamp) date = date.strftime('%Y%m%d') return date, timestamp def get_message_sender_id(self): return self.json_data['meta']['sender']['id'] def get_message_reply(self): return self.json_data['meta'].get('reply_to') # TODO change to reply ??? def get_message_reply_id(self): return self.json_data['meta'].get('reply_to', {}).get('message_id') def get_message_forward(self): return self.json_data['meta'].get('forward') def get_message_content(self): decoded = base64.standard_b64decode(self.json_data['data']) return _gunzip_bytes_obj(decoded) def get_obj(self): #### TIMESTAMP #### timestamp = self.get_message_timestamp() #### Create Object ID #### chat_id = self.get_chat_id() try: message_id = self.get_message_id() except KeyError: if chat_id: self.obj = Chat(chat_id, self.get_chat_instance_uuid()) return self.obj else: self.obj = None return None thread_id = self.get_thread_id() # channel id # thread id # TODO sanitize obj type obj_type = self.get_obj_type() if obj_type == 'image': self.obj = Images.Image(self.json_data['data-sha256']) else: obj_id = Messages.create_obj_id(self.get_chat_instance_uuid(), chat_id, message_id, timestamp, thread_id=thread_id) self.obj = Messages.Message(obj_id) return self.obj def process_chat(self, new_objs, obj, date, timestamp, reply_id=None): meta = self.json_data['meta']['chat'] # todo replace me by function chat = Chat(self.get_chat_id(), self.get_chat_instance_uuid()) subchannel = None thread = None # date stat + correlation chat.add(date, obj) if meta.get('name'): chat.set_name(meta['name']) if meta.get('info'): chat.set_info(meta['info']) if meta.get('date'): # TODO check if already exists chat.set_created_at(int(meta['date']['timestamp'])) if meta.get('icon'): img = Images.create(meta['icon'], b64=True) img.add(date, chat) chat.set_icon(img.get_global_id()) new_objs.add(img) if meta.get('username'): username = Username(meta['username'], self.get_chat_protocol()) chat.update_username_timeline(username.get_global_id(), timestamp) if meta.get('subchannel'): subchannel, thread = self.process_subchannel(obj, date, timestamp, reply_id=reply_id) chat.add_children(obj_global_id=subchannel.get_global_id()) else: if obj.type == 'message': if self.get_thread_id(): thread = self.process_thread(obj, chat, date, timestamp, reply_id=reply_id) else: chat.add_message(obj.get_global_id(), self.get_message_id(), timestamp, reply_id=reply_id) chats_obj = [chat] if subchannel: chats_obj.append(subchannel) if thread: chats_obj.append(thread) return chats_obj def process_subchannel(self, obj, date, timestamp, reply_id=None): # TODO CREATE DATE meta = self.json_data['meta']['chat']['subchannel'] subchannel = ChatSubChannels.ChatSubChannel(f'{self.get_chat_id()}/{meta["id"]}', self.get_chat_instance_uuid()) thread = None subchannel.add(date, obj) if meta.get('date'): # TODO check if already exists subchannel.set_created_at(int(meta['date']['timestamp'])) if meta.get('name'): subchannel.set_name(meta['name']) # subchannel.update_name(meta['name'], timestamp) # TODO ################# if meta.get('info'): subchannel.set_info(meta['info']) if obj.type == 'message': if self.get_thread_id(): thread = self.process_thread(obj, subchannel, date, timestamp, reply_id=reply_id) else: subchannel.add_message(obj.get_global_id(), self.get_message_id(), timestamp, reply_id=reply_id) return subchannel, thread def process_thread(self, obj, obj_chat, date, timestamp, reply_id=None): meta = self.json_data['meta']['thread'] thread_id = self.get_thread_id() p_chat_id = meta['parent'].get('chat') p_subchannel_id = meta['parent'].get('subchannel') p_message_id = meta['parent'].get('message') # print(thread_id, p_chat_id, p_subchannel_id, p_message_id) if p_chat_id == self.get_chat_id() and p_subchannel_id == self.get_subchannel_id(): thread = ChatThreads.create(thread_id, self.get_chat_instance_uuid(), p_chat_id, p_subchannel_id, p_message_id, obj_chat) thread.add(date, obj) thread.add_message(obj.get_global_id(), self.get_message_id(), timestamp, reply_id=reply_id) # TODO OTHERS CORRELATIONS TO ADD if meta.get('name'): thread.set_name(meta['name']) return thread # TODO # else: # # ADD NEW MESSAGE REF (used by discord) def process_sender(self, new_objs, obj, date, timestamp): meta = self.json_data['meta'].get('sender') if not meta: return None user_account = UsersAccount.UserAccount(meta['id'], self.get_chat_instance_uuid()) # date stat + correlation user_account.add(date, obj) if meta.get('username'): username = Username(meta['username'], self.get_chat_protocol()) # TODO timeline or/and correlation ???? user_account.add_correlation(username.type, username.get_subtype(r_str=True), username.id) user_account.update_username_timeline(username.get_global_id(), timestamp) # Username---Message username.add(date) # TODO # correlation message ??? # ADDITIONAL METAS if meta.get('firstname'): user_account.set_first_name(meta['firstname']) if meta.get('lastname'): user_account.set_last_name(meta['lastname']) if meta.get('phone'): user_account.set_phone(meta['phone']) if meta.get('icon'): img = Images.create(meta['icon'], b64=True) img.add(date, user_account) user_account.set_icon(img.get_global_id()) new_objs.add(img) if meta.get('info'): user_account.set_info(meta['info']) return user_account def process_meta(self): # TODO CHECK MANDATORY FIELDS """ Process JSON meta filed. """ # meta = self.get_json_meta() objs = set() if self.obj: objs.add(self.obj) new_objs = set() date, timestamp = self.get_message_date_timestamp() # REPLY reply_id = self.get_message_reply_id() print(self.obj.type) # TODO FILES + FILES REF # get object by meta object type if self.obj.type == 'message': # Content obj = Messages.create(self.obj.id, self.get_message_content()) # FILENAME media_name = self.get_media_name() if media_name: print(media_name) FilesNames.FilesNames().create(media_name, date, obj) for reaction in self.get_reactions(): obj.add_reaction(reaction['reaction'], int(reaction['count'])) elif self.obj.type == 'chat': pass else: chat_id = self.get_chat_id() thread_id = self.get_thread_id() channel_id = self.get_subchannel_id() message_id = self.get_message_id() message_id = Messages.create_obj_id(self.get_chat_instance_uuid(), chat_id, message_id, timestamp, channel_id=channel_id, thread_id=thread_id) message = Messages.Message(message_id) # create empty message if message don't exist if not message.exists(): message.create('') objs.add(message) if message.exists(): # TODO Correlation user-account image/filename ???? obj = Images.create(self.get_message_content()) obj.add(date, message) obj.set_parent(obj_global_id=message.get_global_id()) # FILENAME media_name = self.get_media_name() if media_name: FilesNames.FilesNames().create(media_name, date, message, file_obj=obj) for reaction in self.get_reactions(): message.add_reaction(reaction['reaction'], int(reaction['count'])) for obj in objs: # TODO PERF avoid parsing metas multiple times # TODO get created subchannel + thread # => create correlation user-account with object print(obj.id) # CHAT chat_objs = self.process_chat(new_objs, obj, date, timestamp, reply_id=reply_id) # # TODO HANDLE OTHERS OBJECT TYPE # # TODO MAKE IT GENERIC FOR OTHERS CHATS !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! # # Message forward + Discussion # if self.get_json_meta().get('forward'): # discussion_id = self.get_json_meta().get('discussion') # forward_from = self.get_message_forward() # # if discussion_id: # TODO HANDLE FORWARDED MESSAGES FROM EXTERNAL CHANNELS # chat_forward_id = forward_from['from']['id'] # message_forward_id = forward_from['from']['channel_post'] # # # if chat_forward_id == discussion_id: # # linked_chat = Chat(chat_forward_id, self.get_chat_instance_uuid()) # # if linked_chat.exists(): # # # create thread # # # add message replies for each childrens # # # TODO HANDLE THREAD # # TODO Change FORWARD META FIELDS # # meta['forward'] = {} # # # CHAT ID # # # SUBCHANNEL ID -> can be None # # # Message ID # # # meta['forward']['origin'] # # # same as 'forward' # # if self.get_json_meta().get('forward'): # forward = self.get_message_forward() # f_chat = forward['chat'] # f_subchannel = forward.get('subchannel') # f_id = forward.get('id') # if not f_subchannel: # chat_forward = Chat(f_chat, self.get_chat_instance_uuid()) # if chat_forward.exists(): # for chat_obj in chat_objs: # if chat_obj.type == 'chat': # chat_forward.add_relationship(chat_obj.get_global_id(), 'forward') # # TODO LIST FORWARDED MESSAGES # # # # Discord -> serverID + subchannel ID + message ID # # Telegram -> chat ID + Message ID # # + ORIGIN IDs # # # # # TODO create relationships graph # # # # TODO REMOVE ME # # Message forward # TODO handle subchannel + message ID # if self.get_json_meta().get('forward'): # forward_from = self.get_message_forward() # print('-----------------------------------------------------------') # print(forward_from) # if forward_from: # forward_from_type = forward_from['from']['type'] # if forward_from_type == 'channel' or forward_from_type == 'chat': # chat_forward_id = forward_from['from']['id'] # chat_forward = Chat(chat_forward_id, self.get_chat_instance_uuid()) # if chat_forward.exists(): # for chat_obj in chat_objs: # if chat_obj.type == 'chat': # chat_forward.add_relationship(chat_obj.get_global_id(), 'forward') # # chat_forward.add_relationship(obj.get_global_id(), 'forward') # SENDER # TODO HANDLE NULL SENDER user_account = self.process_sender(new_objs, obj, date, timestamp) if user_account: # UserAccount---ChatObjects for obj_chat in chat_objs: user_account.add_correlation(obj_chat.type, obj_chat.get_subtype(r_str=True), obj_chat.id) # if chat: # TODO Chat---Username correlation ??? # # Chat---Username => need to handle members and participants # chat.add_correlation(username.type, username.get_subtype(r_str=True), username.id) # TODO Sender image -> correlation # image # -> subchannel ? # -> thread id ? return new_objs | objs