AIL-framework/bin/lib/Screenshot.py

159 lines
4.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*-coding:UTF-8 -*
import os
import sys
import redis
from io import BytesIO
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
import Item
import Date
import Tag
import Correlate_object
import ConfigLoader
config_loader = ConfigLoader.ConfigLoader()
r_serv_onion = config_loader.get_redis_conn("ARDB_Onion")
r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata")
SCREENSHOT_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "crawled_screenshot"), 'screenshot')
config_loader = None
# get screenshot relative path
def get_screenshot_rel_path(sha256_string, add_extension=False):
screenshot_path = os.path.join(sha256_string[0:2], sha256_string[2:4], sha256_string[4:6], sha256_string[6:8], sha256_string[8:10], sha256_string[10:12], sha256_string[12:])
if add_extension:
screenshot_path = screenshot_path + '.png'
return screenshot_path
def get_screenshot_filepath(sha256_string):
filename = os.path.join(SCREENSHOT_FOLDER, get_screenshot_rel_path(sha256_string, add_extension=True))
return os.path.realpath(filename)
def exist_screenshot(sha256_string):
screenshot_path = get_screenshot_filepath(sha256_string)
return os.path.isfile(screenshot_path)
def get_metadata(sha256_string):
metadata_dict = {}
metadata_dict['img'] = get_screenshot_rel_path(sha256_string)
metadata_dict['tags'] = get_screenshot_tags(sha256_string)
metadata_dict['is_tags_safe'] = Tag.is_tags_safe(metadata_dict['tags'])
return metadata_dict
def get_screenshot_tags(sha256_string):
return Tag.get_obj_tag(sha256_string)
def get_screenshot_items_list(sha256_string):
res = r_serv_onion.smembers('screenshot:{}'.format(sha256_string))
if res:
return list(res)
else:
return []
def get_item_screenshot(item_id):
return r_serv_metadata.hget('paste_metadata:{}'.format(item_id), 'screenshot')
def get_item_screenshot_list(item_id):
'''
Retun all decoded item of a given item id.
:param item_id: item id
'''
screenshot = get_item_screenshot(item_id)
if screenshot:
return [screenshot]
else:
return []
def get_domain_screenshot(domain):
'''
Retun all screenshot of a given domain.
:param domain: crawled domain
'''
res = r_serv_onion.smembers('domain_screenshot:{}'.format(domain))
if res:
return list(res)
else:
return []
def get_randon_domain_screenshot(domain, r_path=True):
'''
Retun all screenshot of a given domain.
:param domain: crawled domain
'''
res = r_serv_onion.srandmember('domain_screenshot:{}'.format(domain))
if res and r_path:
return get_screenshot_rel_path(res)
return res
def get_screenshot_domain(sha256_string):
'''
Retun all domain of a given screenshot.
:param sha256_string: sha256_string
'''
res = r_serv_onion.smembers('screenshot_domain:{}'.format(sha256_string))
if res:
return list(res)
else:
return []
def get_screenshot_correlated_object(sha256_string, correlation_objects=[]):
'''
Retun all correlation of a given sha256.
:param sha1_string: sha256
:type sha1_string: str
:return: a dict of all correlation for a given sha256
:rtype: dict
'''
if correlation_objects is None:
correlation_objects = Correlate_object.get_all_correlation_objects()
decoded_correlation = {}
for correlation_object in correlation_objects:
if correlation_object == 'paste':
res = get_screenshot_items_list(sha256_string)
elif correlation_object == 'domain':
res = get_screenshot_domain(sha256_string)
else:
res = None
if res:
decoded_correlation[correlation_object] = res
return decoded_correlation
def get_screenshot_file_content(sha256_string):
filepath = get_screenshot_filepath(sha256_string)
with open(filepath, 'rb') as f:
file_content = BytesIO(f.read())
return file_content
def save_screenshot_file(sha256_string, io_content):
filepath = get_screenshot_filepath(sha256_string)
if os.path.isfile(filepath):
print('File already exist')
return False
# # TODO: check if is IO file
with open(filepath, 'wb') as f:
f.write(io_content.getvalue())
return True
def create_screenshot(sha256_string, io_content):
# check if sha256
res = save_screenshot_file(sha256_string, io_content)
if res:
# creata tags
if 'tags' in obj_metadata:
# # TODO: handle mixed tags: taxonomies and Galaxies
Tag.api_add_obj_tags(tags=obj_metadata['tags'], object_id=obj_id, object_type="image")
return True
return False