mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			445 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			445 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| #!/usr/bin/env python3
 | |
| # -*-coding:UTF-8 -*
 | |
| """
 | |
| Abstract Chat JSON Feeder Importer Module
 | |
| ================
 | |
| 
 | |
| Process Feeder Json (example: Twitter feeder)
 | |
| 
 | |
| """
 | |
| import datetime
 | |
| import os
 | |
| import sys
 | |
| 
 | |
| from abc import ABC
 | |
| 
 | |
| sys.path.append(os.environ['AIL_BIN'])
 | |
| ##################################
 | |
| # Import Project packages
 | |
| ##################################
 | |
| from importer.feeders.Default import DefaultFeeder
 | |
| from lib.objects.Chats import Chat
 | |
| from lib.objects import ChatSubChannels
 | |
| from lib.objects import ChatThreads
 | |
| from lib.objects import Images
 | |
| from lib.objects import Messages
 | |
| from lib.objects import FilesNames
 | |
| # from lib.objects import Files
 | |
| from lib.objects import UsersAccount
 | |
| from lib.objects.Usernames import Username
 | |
| from lib import chats_viewer
 | |
| 
 | |
| import base64
 | |
| import io
 | |
| import gzip
 | |
| 
 | |
| # TODO remove compression ???
 | |
| def _gunzip_bytes_obj(bytes_obj):
 | |
|     gunzipped_bytes_obj = None
 | |
|     try:
 | |
|         in_ = io.BytesIO()
 | |
|         in_.write(bytes_obj)
 | |
|         in_.seek(0)
 | |
| 
 | |
|         with gzip.GzipFile(fileobj=in_, mode='rb') as fo:
 | |
|             gunzipped_bytes_obj = fo.read()
 | |
|     except Exception as e:
 | |
|         print(f'Global; Invalid Gzip file: {e}')
 | |
| 
 | |
|     return gunzipped_bytes_obj
 | |
| 
 | |
| class AbstractChatFeeder(DefaultFeeder, ABC):
 | |
| 
 | |
|     def __init__(self, name, json_data):
 | |
|         super().__init__(json_data)
 | |
|         self.obj = None
 | |
|         self.name = name
 | |
| 
 | |
|     def get_chat_protocol(self):  # TODO # # # # # # # # # # # # #
 | |
|         return self.name
 | |
| 
 | |
|     def get_chat_network(self):
 | |
|         self.json_data['meta'].get('network', None)
 | |
| 
 | |
|     def get_chat_address(self):
 | |
|         self.json_data['meta'].get('address', None)
 | |
| 
 | |
|     def get_chat_instance_uuid(self):
 | |
|         chat_instance_uuid = chats_viewer.create_chat_service_instance(self.get_chat_protocol(),
 | |
|                                                                        network=self.get_chat_network(),
 | |
|                                                                        address=self.get_chat_address())
 | |
|         # TODO SET
 | |
|         return chat_instance_uuid
 | |
| 
 | |
|     def get_chat_id(self):  # TODO RAISE ERROR IF NONE
 | |
|         return self.json_data['meta']['chat']['id']
 | |
| 
 | |
|     def get_subchannel_id(self):
 | |
|         return self.json_data['meta']['chat'].get('subchannel', {}).get('id')
 | |
| 
 | |
|     def get_subchannels(self):
 | |
|         pass
 | |
| 
 | |
|     def get_thread_id(self):
 | |
|         return self.json_data['meta'].get('thread', {}).get('id')
 | |
| 
 | |
|     def get_message_id(self):
 | |
|         return self.json_data['meta']['id']
 | |
| 
 | |
|     def get_media_name(self):
 | |
|         return self.json_data['meta'].get('media', {}).get('name')
 | |
| 
 | |
|     def get_reactions(self):
 | |
|         return self.json_data['meta'].get('reactions', [])
 | |
| 
 | |
|     def get_message_timestamp(self):
 | |
|         if not self.json_data['meta'].get('date'):
 | |
|             return None
 | |
|         else:
 | |
|             return self.json_data['meta']['date']['timestamp']
 | |
|         # if self.json_data['meta'].get('date'):
 | |
|         #     date = datetime.datetime.fromtimestamp( self.json_data['meta']['date']['timestamp'])
 | |
|         #     date = date.strftime('%Y/%m/%d')
 | |
|         # else:
 | |
|         #     date = datetime.date.today().strftime("%Y/%m/%d")
 | |
| 
 | |
|     def get_message_date_timestamp(self):
 | |
|         timestamp = self.get_message_timestamp()
 | |
|         date = datetime.datetime.fromtimestamp(timestamp)
 | |
|         date = date.strftime('%Y%m%d')
 | |
|         return date, timestamp
 | |
| 
 | |
|     def get_message_sender_id(self):
 | |
|         return self.json_data['meta']['sender']['id']
 | |
| 
 | |
|     def get_message_reply(self):
 | |
|         return self.json_data['meta'].get('reply_to')  # TODO change to reply ???
 | |
| 
 | |
|     def get_message_reply_id(self):
 | |
|         return self.json_data['meta'].get('reply_to', {}).get('message_id')
 | |
| 
 | |
|     def get_message_forward(self):
 | |
|         return self.json_data['meta'].get('forward')
 | |
| 
 | |
|     def get_message_content(self):
 | |
|         decoded = base64.standard_b64decode(self.json_data['data'])
 | |
|         return _gunzip_bytes_obj(decoded)
 | |
| 
 | |
|     def get_obj(self):
 | |
|         #### TIMESTAMP ####
 | |
|         timestamp = self.get_message_timestamp()
 | |
| 
 | |
|         #### Create Object ID ####
 | |
|         chat_id = self.get_chat_id()
 | |
|         try:
 | |
|             message_id = self.get_message_id()
 | |
|         except KeyError:
 | |
|             if chat_id:
 | |
|                 self.obj = Chat(chat_id, self.get_chat_instance_uuid())
 | |
|                 return self.obj
 | |
|             else:
 | |
|                 self.obj = None
 | |
|                 return None
 | |
| 
 | |
|         thread_id = self.get_thread_id()
 | |
|         # channel id
 | |
|         # thread id
 | |
| 
 | |
|         # TODO sanitize obj type
 | |
|         obj_type = self.get_obj_type()
 | |
| 
 | |
|         if obj_type == 'image':
 | |
|             self.obj = Images.Image(self.json_data['data-sha256'])
 | |
| 
 | |
|         else:
 | |
|             obj_id = Messages.create_obj_id(self.get_chat_instance_uuid(), chat_id, message_id, timestamp, thread_id=thread_id)
 | |
|             self.obj = Messages.Message(obj_id)
 | |
|         return self.obj
 | |
| 
 | |
|     def process_chat(self, new_objs, obj, date, timestamp, reply_id=None):
 | |
|         meta = self.json_data['meta']['chat'] # todo replace me by function
 | |
|         chat = Chat(self.get_chat_id(), self.get_chat_instance_uuid())
 | |
|         subchannel = None
 | |
|         thread = None
 | |
| 
 | |
|         # date stat + correlation
 | |
|         chat.add(date, obj)
 | |
| 
 | |
|         if meta.get('name'):
 | |
|             chat.set_name(meta['name'])
 | |
| 
 | |
|         if meta.get('info'):
 | |
|             chat.set_info(meta['info'])
 | |
| 
 | |
|         if meta.get('date'): # TODO check if already exists
 | |
|             chat.set_created_at(int(meta['date']['timestamp']))
 | |
| 
 | |
|         if meta.get('icon'):
 | |
|             img = Images.create(meta['icon'], b64=True)
 | |
|             img.add(date, chat)
 | |
|             chat.set_icon(img.get_global_id())
 | |
|             new_objs.add(img)
 | |
| 
 | |
|         if meta.get('username'):
 | |
|             username = Username(meta['username'], self.get_chat_protocol())
 | |
|             chat.update_username_timeline(username.get_global_id(), timestamp)
 | |
| 
 | |
|         if meta.get('subchannel'):
 | |
|             subchannel, thread = self.process_subchannel(obj, date, timestamp, reply_id=reply_id)
 | |
|             chat.add_children(obj_global_id=subchannel.get_global_id())
 | |
|         else:
 | |
|             if obj.type == 'message':
 | |
|                 if self.get_thread_id():
 | |
|                     thread = self.process_thread(obj, chat, date, timestamp, reply_id=reply_id)
 | |
|                 else:
 | |
|                     chat.add_message(obj.get_global_id(), self.get_message_id(), timestamp, reply_id=reply_id)
 | |
| 
 | |
|         chats_obj = [chat]
 | |
|         if subchannel:
 | |
|             chats_obj.append(subchannel)
 | |
|         if thread:
 | |
|             chats_obj.append(thread)
 | |
|         return chats_obj
 | |
| 
 | |
|     def process_subchannel(self, obj, date, timestamp, reply_id=None):  # TODO CREATE DATE
 | |
|         meta = self.json_data['meta']['chat']['subchannel']
 | |
|         subchannel = ChatSubChannels.ChatSubChannel(f'{self.get_chat_id()}/{meta["id"]}', self.get_chat_instance_uuid())
 | |
|         thread = None
 | |
| 
 | |
|         subchannel.add(date, obj)
 | |
| 
 | |
|         if meta.get('date'): # TODO check if already exists
 | |
|             subchannel.set_created_at(int(meta['date']['timestamp']))
 | |
| 
 | |
|         if meta.get('name'):
 | |
|             subchannel.set_name(meta['name'])
 | |
|             # subchannel.update_name(meta['name'], timestamp) # TODO #################
 | |
| 
 | |
|         if meta.get('info'):
 | |
|             subchannel.set_info(meta['info'])
 | |
| 
 | |
|         if obj.type == 'message':
 | |
|             if self.get_thread_id():
 | |
|                 thread = self.process_thread(obj, subchannel, date, timestamp, reply_id=reply_id)
 | |
|             else:
 | |
|                 subchannel.add_message(obj.get_global_id(), self.get_message_id(), timestamp, reply_id=reply_id)
 | |
|         return subchannel, thread
 | |
| 
 | |
|     def process_thread(self, obj, obj_chat, date, timestamp, reply_id=None):
 | |
|         meta = self.json_data['meta']['thread']
 | |
|         thread_id = self.get_thread_id()
 | |
|         p_chat_id = meta['parent'].get('chat')
 | |
|         p_subchannel_id = meta['parent'].get('subchannel')
 | |
|         p_message_id = meta['parent'].get('message')
 | |
| 
 | |
|         # print(thread_id, p_chat_id, p_subchannel_id, p_message_id)
 | |
| 
 | |
|         if p_chat_id == self.get_chat_id() and p_subchannel_id == self.get_subchannel_id():
 | |
|             thread = ChatThreads.create(thread_id, self.get_chat_instance_uuid(), p_chat_id, p_subchannel_id, p_message_id, obj_chat)
 | |
|             thread.add(date, obj)
 | |
|             thread.add_message(obj.get_global_id(), self.get_message_id(), timestamp, reply_id=reply_id)
 | |
|             # TODO OTHERS CORRELATIONS TO ADD
 | |
| 
 | |
|             if meta.get('name'):
 | |
|                 thread.set_name(meta['name'])
 | |
| 
 | |
|             return thread
 | |
| 
 | |
|         # TODO
 | |
|         # else:
 | |
|         #   # ADD NEW MESSAGE REF (used by discord)
 | |
| 
 | |
|     def process_sender(self, new_objs, obj, date, timestamp):
 | |
|         meta = self.json_data['meta'].get('sender')
 | |
|         if not meta:
 | |
|             return None
 | |
| 
 | |
|         user_account = UsersAccount.UserAccount(meta['id'], self.get_chat_instance_uuid())
 | |
| 
 | |
|         # date stat + correlation
 | |
|         user_account.add(date, obj)
 | |
| 
 | |
|         if meta.get('username'):
 | |
|             username = Username(meta['username'], self.get_chat_protocol())
 | |
|             # TODO timeline or/and correlation ????
 | |
|             user_account.add_correlation(username.type, username.get_subtype(r_str=True), username.id)
 | |
|             user_account.update_username_timeline(username.get_global_id(), timestamp)
 | |
| 
 | |
|             # Username---Message
 | |
|             username.add(date)  # TODO # correlation message ???
 | |
| 
 | |
|         # ADDITIONAL METAS
 | |
|         if meta.get('firstname'):
 | |
|             user_account.set_first_name(meta['firstname'])
 | |
|         if meta.get('lastname'):
 | |
|             user_account.set_last_name(meta['lastname'])
 | |
|         if meta.get('phone'):
 | |
|             user_account.set_phone(meta['phone'])
 | |
| 
 | |
|         if meta.get('icon'):
 | |
|             img = Images.create(meta['icon'], b64=True)
 | |
|             img.add(date, user_account)
 | |
|             user_account.set_icon(img.get_global_id())
 | |
|             new_objs.add(img)
 | |
| 
 | |
|         if meta.get('info'):
 | |
|             user_account.set_info(meta['info'])
 | |
| 
 | |
|         return user_account
 | |
| 
 | |
|     def process_meta(self):  # TODO CHECK MANDATORY FIELDS
 | |
|         """
 | |
|         Process JSON meta filed.
 | |
|         """
 | |
|         # meta = self.get_json_meta()
 | |
| 
 | |
|         objs = set()
 | |
|         if self.obj:
 | |
|             objs.add(self.obj)
 | |
|         new_objs = set()
 | |
| 
 | |
|         date, timestamp = self.get_message_date_timestamp()
 | |
| 
 | |
|         # REPLY
 | |
|         reply_id = self.get_message_reply_id()
 | |
| 
 | |
|         print(self.obj.type)
 | |
| 
 | |
|         # TODO FILES + FILES REF
 | |
| 
 | |
|         # get object by meta object type
 | |
|         if self.obj.type == 'message':
 | |
|             # Content
 | |
|             obj = Messages.create(self.obj.id, self.get_message_content())
 | |
| 
 | |
|             # FILENAME
 | |
|             media_name = self.get_media_name()
 | |
|             if media_name:
 | |
|                 print(media_name)
 | |
|                 FilesNames.FilesNames().create(media_name, date, obj)
 | |
| 
 | |
|             for reaction in self.get_reactions():
 | |
|                 obj.add_reaction(reaction['reaction'], int(reaction['count']))
 | |
|         elif self.obj.type == 'chat':
 | |
|             pass
 | |
|         else:
 | |
|             chat_id = self.get_chat_id()
 | |
|             thread_id = self.get_thread_id()
 | |
|             channel_id = self.get_subchannel_id()
 | |
|             message_id = self.get_message_id()
 | |
|             message_id = Messages.create_obj_id(self.get_chat_instance_uuid(), chat_id, message_id, timestamp, channel_id=channel_id, thread_id=thread_id)
 | |
|             message = Messages.Message(message_id)
 | |
|             # create empty message if message don't exist
 | |
|             if not message.exists():
 | |
|                 message.create('')
 | |
|                 objs.add(message)
 | |
| 
 | |
|             if message.exists(): # TODO Correlation user-account image/filename ????
 | |
|                 obj = Images.create(self.get_message_content())
 | |
|                 obj.add(date, message)
 | |
|                 obj.set_parent(obj_global_id=message.get_global_id())
 | |
| 
 | |
|                 # FILENAME
 | |
|                 media_name = self.get_media_name()
 | |
|                 if media_name:
 | |
|                     FilesNames.FilesNames().create(media_name, date, message, file_obj=obj)
 | |
| 
 | |
|                 for reaction in self.get_reactions():
 | |
|                     message.add_reaction(reaction['reaction'], int(reaction['count']))
 | |
| 
 | |
|         for obj in objs:  # TODO PERF avoid parsing metas multiple times
 | |
| 
 | |
|             # TODO get created subchannel + thread
 | |
|             #    => create correlation user-account with object
 | |
| 
 | |
|             print(obj.id)
 | |
| 
 | |
|             # CHAT
 | |
|             chat_objs = self.process_chat(new_objs, obj, date, timestamp, reply_id=reply_id)
 | |
| 
 | |
|             # # TODO HANDLE OTHERS OBJECT TYPE
 | |
|             # # TODO MAKE IT GENERIC FOR OTHERS CHATS !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 | |
|             # # Message forward + Discussion
 | |
|             # if self.get_json_meta().get('forward'):
 | |
|             #     discussion_id = self.get_json_meta().get('discussion')
 | |
|             #     forward_from = self.get_message_forward()
 | |
|             #
 | |
|             #     if discussion_id:       # TODO HANDLE FORWARDED MESSAGES FROM EXTERNAL CHANNELS
 | |
|             #         chat_forward_id = forward_from['from']['id']
 | |
|             #         message_forward_id = forward_from['from']['channel_post']
 | |
|             #
 | |
|             #         # if chat_forward_id == discussion_id:
 | |
|             #         #     linked_chat = Chat(chat_forward_id, self.get_chat_instance_uuid())
 | |
|             #         #     if linked_chat.exists():
 | |
|             #         #         # create thread
 | |
|             #         #         # add message replies for each childrens
 | |
|             #
 | |
|             # # TODO HANDLE THREAD
 | |
|             # # TODO Change FORWARD META FIELDS
 | |
|             # # meta['forward'] = {}
 | |
|             # #       # CHAT ID
 | |
|             # #       # SUBCHANNEL ID     -> can be None
 | |
|             # #       # Message ID
 | |
|             #
 | |
|             # # meta['forward']['origin']
 | |
|             # #       # same as 'forward'
 | |
|             #
 | |
|             # if self.get_json_meta().get('forward'):
 | |
|             #     forward = self.get_message_forward()
 | |
|             #     f_chat = forward['chat']
 | |
|             #     f_subchannel = forward.get('subchannel')
 | |
|             #     f_id = forward.get('id')
 | |
|             #     if not f_subchannel:
 | |
|             #         chat_forward = Chat(f_chat, self.get_chat_instance_uuid())
 | |
|             #         if chat_forward.exists():
 | |
|             #             for chat_obj in chat_objs:
 | |
|             #                 if chat_obj.type == 'chat':
 | |
|             #                     chat_forward.add_relationship(chat_obj.get_global_id(), 'forward')
 | |
|             #             # TODO LIST FORWARDED MESSAGES
 | |
|             #
 | |
|             #
 | |
|             # # Discord -> serverID + subchannel ID + message ID
 | |
|             # # Telegram -> chat ID + Message ID
 | |
|             # #                 + ORIGIN IDs
 | |
|             #
 | |
|             #
 | |
|             #
 | |
|             # # TODO create relationships graph
 | |
|             #
 | |
|             #
 | |
|             # # TODO REMOVE ME
 | |
|             # # Message forward  # TODO handle subchannel + message ID
 | |
|             # if self.get_json_meta().get('forward'):
 | |
|             #     forward_from = self.get_message_forward()
 | |
|             #     print('-----------------------------------------------------------')
 | |
|             #     print(forward_from)
 | |
|             #     if forward_from:
 | |
|             #         forward_from_type = forward_from['from']['type']
 | |
|             #         if forward_from_type == 'channel' or forward_from_type == 'chat':
 | |
|             #             chat_forward_id = forward_from['from']['id']
 | |
|             #             chat_forward = Chat(chat_forward_id, self.get_chat_instance_uuid())
 | |
|             #             if chat_forward.exists():
 | |
|             #                 for chat_obj in chat_objs:
 | |
|             #                     if chat_obj.type == 'chat':
 | |
|             #                         chat_forward.add_relationship(chat_obj.get_global_id(), 'forward')
 | |
|             #                         # chat_forward.add_relationship(obj.get_global_id(), 'forward')
 | |
| 
 | |
|             # SENDER # TODO HANDLE NULL SENDER
 | |
|             user_account = self.process_sender(new_objs, obj, date, timestamp)
 | |
| 
 | |
|             if user_account:
 | |
|             # UserAccount---ChatObjects
 | |
|                 for obj_chat in chat_objs:
 | |
|                     user_account.add_correlation(obj_chat.type, obj_chat.get_subtype(r_str=True), obj_chat.id)
 | |
| 
 | |
|             # if chat: # TODO Chat---Username correlation ???
 | |
|             #     # Chat---Username    => need to handle members and participants
 | |
|             #     chat.add_correlation(username.type, username.get_subtype(r_str=True), username.id)
 | |
| 
 | |
|             # TODO Sender image -> correlation
 | |
|                 # image
 | |
|                 #       -> subchannel ?
 | |
|                 #       -> thread id ?
 | |
| 
 | |
|         return new_objs | objs
 |