mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			85 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			85 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| #!/usr/bin/env python3
 | |
| # -*-coding:UTF-8 -*
 | |
| 
 | |
| import os
 | |
| import re
 | |
| import sys
 | |
| 
 | |
| from urllib.parse import urlparse
 | |
| 
 | |
| sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
 | |
| import ConfigLoader
 | |
| import Username
 | |
| 
 | |
| config_loader = ConfigLoader.ConfigLoader()
 | |
| r_serv_crawler = config_loader.get_redis_conn("ARDB_Onion")
 | |
| config_loader = None
 | |
| 
 | |
| REGEX_USERNAME = re.compile(r'[0-9a-zA-z_]+')
 | |
| REGEX_JOIN_HASH = re.compile(r'[0-9a-zA-z-]+')
 | |
| 
 | |
| ##  ##
 | |
| 
 | |
| def save_item_correlation(username, item_id, item_date):
 | |
|     Username.save_item_correlation('telegram', username, item_id, item_date)
 | |
| 
 | |
| def save_telegram_invite_hash(invite_hash, item_id):
 | |
|     r_serv_crawler.sadd('telegram:invite_code', '{};{}'.format(invite_hash, item_id))
 | |
| 
 | |
| def get_data_from_telegram_url(base_url, url_path):
 | |
|     dict_url = {}
 | |
|     url_path = url_path.split('/')
 | |
| 
 | |
|     # username len > 5, a-z A-Z _
 | |
|     if len(url_path) == 1:
 | |
|         username = url_path[0].lower()
 | |
|         username = REGEX_USERNAME.search(username)
 | |
|         if username:
 | |
|             username = username[0].replace('\\', '')
 | |
|             if len(username) > 5:
 | |
|                 dict_url['username'] = username
 | |
|     elif url_path[0] == 'joinchat':
 | |
|         invite_hash = REGEX_JOIN_HASH.search(url_path[1])
 | |
|         if invite_hash:
 | |
|             invite_hash = invite_hash[0]
 | |
|             dict_url['invite_hash'] = invite_hash
 | |
|     return dict_url
 | |
| 
 | |
| # # TODO:
 | |
| #   Add openmessafe
 | |
| #   Add passport ?
 | |
| #   Add confirmphone
 | |
| #   Add user
 | |
| def get_data_from_tg_url(tg_link):
 | |
|     dict_url = {}
 | |
| 
 | |
|     url = urlparse(tg_link)
 | |
|     # username len > 5, a-z A-Z _
 | |
|     if url.netloc == 'resolve' and len(url.query) > 7:
 | |
|         if url.query[:7] == 'domain=':
 | |
|             # remove domain=
 | |
|             username = url.query[7:]
 | |
|             username = REGEX_USERNAME.search(username)
 | |
|             if username:
 | |
|                 username = username[0].replace('\\', '')
 | |
|                 if len(username) > 5:
 | |
|                     dict_url['username'] = username
 | |
| 
 | |
|     elif url.netloc == 'join' and len(url.query) > 7:
 | |
|         if url.query[:7] == 'invite=':
 | |
|             invite_hash = url.query[7:]
 | |
|             invite_hash = REGEX_JOIN_HASH.search(invite_hash)
 | |
|             if invite_hash:
 | |
|                 invite_hash = invite_hash[0]
 | |
|                 dict_url['invite_hash'] = invite_hash
 | |
| 
 | |
|     elif url.netloc == 'login' and len(url.query) > 5:
 | |
|         login_code = url.query[5:]
 | |
|         if login_code:
 | |
|             dict_url['login_code'] = login_code
 | |
|     else:
 | |
|         # # TODO: log invalid URL ???????
 | |
|         print(url)
 | |
| 
 | |
|     return dict_url
 |