AIL-framework/bin/lib/telegram.py

85 lines
2.5 KiB
Python
Executable File

#!/usr/bin/env python3
# -*-coding:UTF-8 -*
import os
import re
import sys
from urllib.parse import urlparse
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
import Username
config_loader = ConfigLoader.ConfigLoader()
r_serv_crawler = config_loader.get_redis_conn("ARDB_Onion")
config_loader = None
REGEX_USERNAME = re.compile(r'[0-9a-zA-z_]+')
REGEX_JOIN_HASH = re.compile(r'[0-9a-zA-z-]+')
## ##
def save_item_correlation(username, item_id, item_date):
Username.save_item_correlation('telegram', username, item_id, item_date)
def save_telegram_invite_hash(invite_hash, item_id):
r_serv_crawler.sadd('telegram:invite_code', '{};{}'.format(invite_hash, item_id))
def get_data_from_telegram_url(base_url, url_path):
dict_url = {}
url_path = url_path.split('/')
# username len > 5, a-z A-Z _
if len(url_path) == 1:
username = url_path[0].lower()
username = REGEX_USERNAME.search(username)
if username:
username = username[0].replace('\\', '')
if len(username) > 5:
dict_url['username'] = username
elif url_path[0] == 'joinchat':
invite_hash = REGEX_JOIN_HASH.search(url_path[1])
if invite_hash:
invite_hash = invite_hash[0]
dict_url['invite_hash'] = invite_hash
return dict_url
# # TODO:
# Add openmessafe
# Add passport ?
# Add confirmphone
# Add user
def get_data_from_tg_url(tg_link):
dict_url = {}
url = urlparse(tg_link)
# username len > 5, a-z A-Z _
if url.netloc == 'resolve' and len(url.query) > 7:
if url.query[:7] == 'domain=':
# remove domain=
username = url.query[7:]
username = REGEX_USERNAME.search(username)
if username:
username = username[0].replace('\\', '')
if len(username) > 5:
dict_url['username'] = username
elif url.netloc == 'join' and len(url.query) > 7:
if url.query[:7] == 'invite=':
invite_hash = url.query[7:]
invite_hash = REGEX_JOIN_HASH.search(invite_hash)
if invite_hash:
invite_hash = invite_hash[0]
dict_url['invite_hash'] = invite_hash
elif url.netloc == 'login' and len(url.query) > 5:
login_code = url.query[5:]
if login_code:
dict_url['login_code'] = login_code
else:
# # TODO: log invalid URL ???????
print(url)
return dict_url