2021-06-07 22:12:23 +02:00
#!/usr/bin/env python3
2024-01-12 17:15:41 +01:00
from __future__ import annotations
2021-06-07 22:12:23 +02:00
import base64
2021-12-02 17:55:02 +01:00
import hashlib
2021-09-07 12:59:31 +02:00
import json
2023-09-27 12:09:20 +02:00
from io import BytesIO
2024-01-13 01:24:32 +01:00
from typing import Any
2023-09-27 12:09:20 +02:00
from zipfile import ZipFile
2021-06-07 22:12:23 +02:00
2024-01-26 15:03:36 +01:00
import flask_login # type: ignore[import-untyped]
2024-01-12 17:15:41 +01:00
from flask import request , send_file , Response
2024-01-26 15:03:36 +01:00
from flask_restx import Namespace , Resource , abort , fields # type: ignore[import-untyped]
2021-06-07 22:12:23 +02:00
from werkzeug . security import check_password_hash
2022-11-29 15:24:35 +01:00
from lacuscore import CaptureStatus as CaptureStatusCore
2024-01-16 00:27:43 +01:00
from pylacus import CaptureStatus as CaptureStatusPy
2024-01-13 01:24:32 +01:00
from lookyloo import CaptureSettings , Lookyloo
2023-01-18 16:31:12 +01:00
from lookyloo . comparator import Comparator
2023-08-30 12:45:06 +02:00
from lookyloo . exceptions import MissingUUID , NoValidHarFile
2021-06-07 22:12:23 +02:00
2023-11-07 16:10:47 +01:00
from . helpers import build_users_table , load_user_from_request , src_request_ip , get_lookyloo_instance
2021-06-07 22:12:23 +02:00
api = Namespace ( ' GenericAPI ' , description = ' Generic Lookyloo API ' , path = ' / ' )
2023-11-07 16:10:47 +01:00
lookyloo : Lookyloo = get_lookyloo_instance ( )
2023-01-31 11:22:43 +01:00
comparator : Comparator = Comparator ( )
2021-06-07 22:12:23 +02:00
2024-01-26 15:03:36 +01:00
def api_auth_check ( method ) : # type: ignore[no-untyped-def]
2021-06-07 22:12:23 +02:00
if flask_login . current_user . is_authenticated or load_user_from_request ( request ) :
return method
abort ( 403 , ' Authentication required. ' )
token_request_fields = api . model ( ' AuthTokenFields ' , {
' username ' : fields . String ( description = " Your username " , required = True ) ,
' password ' : fields . String ( description = " Your password " , required = True ) ,
} )
2024-01-12 17:15:41 +01:00
@api.errorhandler ( NoValidHarFile ) # type: ignore[misc]
def handle_no_HAR_file_exception ( error : Any ) - > tuple [ dict [ str , str ] , int ] :
2023-08-30 12:45:06 +02:00
''' The capture has no HAR file, it failed for some reason. '''
return { ' message ' : str ( error ) } , 400
2021-06-07 22:12:23 +02:00
@api.route ( ' /json/get_token ' )
@api.doc ( description = ' Get the API token required for authenticated calls ' )
2024-01-12 17:15:41 +01:00
class AuthToken ( Resource ) : # type: ignore[misc]
2021-06-07 22:12:23 +02:00
users_table = build_users_table ( )
2024-01-12 17:15:41 +01:00
@api.param ( ' username ' , ' Your username ' ) # type: ignore[misc]
@api.param ( ' password ' , ' Your password ' ) # type: ignore[misc]
def get ( self ) - > dict [ str , str ] | tuple [ dict [ str , str ] , int ] :
username : str | None = request . args [ ' username ' ] if request . args . get ( ' username ' ) else None
password : str | None = request . args [ ' password ' ] if request . args . get ( ' password ' ) else None
2022-11-02 12:23:41 +01:00
if username and password and username in self . users_table and check_password_hash ( self . users_table [ username ] [ ' password ' ] , password ) :
2021-06-07 22:12:23 +02:00
return { ' authkey ' : self . users_table [ username ] [ ' authkey ' ] }
2021-08-17 12:12:10 +02:00
return { ' error ' : ' User/Password invalid. ' } , 401
2021-06-07 22:12:23 +02:00
2024-01-12 17:15:41 +01:00
@api.doc ( body = token_request_fields ) # type: ignore[misc]
def post ( self ) - > dict [ str , str ] | tuple [ dict [ str , str ] , int ] :
auth : dict [ str , Any ] = request . get_json ( force = True )
2021-06-07 22:12:23 +02:00
if ' username ' in auth and ' password ' in auth : # Expected keys in json
if ( auth [ ' username ' ] in self . users_table
and check_password_hash ( self . users_table [ auth [ ' username ' ] ] [ ' password ' ] , auth [ ' password ' ] ) ) :
return { ' authkey ' : self . users_table [ auth [ ' username ' ] ] [ ' authkey ' ] }
2021-08-17 12:12:10 +02:00
return { ' error ' : ' User/Password invalid. ' } , 401
2021-06-07 22:12:23 +02:00
2021-06-08 00:37:11 +02:00
@api.route ( ' /json/<string:capture_uuid>/status ' )
2021-06-07 22:12:23 +02:00
@api.doc ( description = ' Get the status of a capture ' ,
2021-06-08 00:37:11 +02:00
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureStatusQuery ( Resource ) : # type: ignore[misc]
2022-11-29 15:24:35 +01:00
2024-01-12 17:15:41 +01:00
@api.param ( ' with_error ' , ' Add the error message of the capture (if there is one) ' ) # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] :
2022-11-29 15:24:35 +01:00
with_error : bool = True if request . args . get ( ' with_error ' ) else False
status_code = lookyloo . get_capture_status ( capture_uuid )
2024-01-12 17:15:41 +01:00
to_return : dict [ str , Any ] = { ' status_code ' : status_code }
2022-11-29 15:24:35 +01:00
if status_code in [ CaptureStatusCore . DONE , CaptureStatusPy . DONE ] and with_error :
cache = lookyloo . capture_cache ( capture_uuid )
if cache and cache . error :
to_return [ ' error ' ] = cache . error
return to_return
2021-06-07 22:12:23 +02:00
2021-06-16 23:57:14 +02:00
@api.route ( ' /json/<string:capture_uuid>/hostnames ' )
@api.doc ( description = ' Get all the hostnames of all the resources of a capture ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureHostnames ( Resource ) : # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] | tuple [ dict [ str , Any ] , int ] :
2021-06-16 23:57:14 +02:00
cache = lookyloo . capture_cache ( capture_uuid )
if not cache :
2022-09-28 12:40:26 +02:00
return { ' error ' : ' UUID missing in cache, try again later and check the status first. ' } , 400
2024-01-12 17:15:41 +01:00
to_return : dict [ str , Any ] = { ' response ' : { ' hostnames ' : list ( lookyloo . get_hostnames ( capture_uuid ) ) } }
2021-06-16 23:57:14 +02:00
return to_return
@api.route ( ' /json/<string:capture_uuid>/urls ' )
@api.doc ( description = ' Get all the URLs of all the resources of a capture ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureURLs ( Resource ) : # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] | tuple [ dict [ str , Any ] , int ] :
2021-06-16 23:57:14 +02:00
cache = lookyloo . capture_cache ( capture_uuid )
if not cache :
2022-09-28 12:40:26 +02:00
return { ' error ' : ' UUID missing in cache, try again later and check the status first. ' } , 400
2024-01-12 17:15:41 +01:00
to_return : dict [ str , Any ] = { ' response ' : { ' urls ' : list ( lookyloo . get_urls ( capture_uuid ) ) } }
2021-06-16 23:57:14 +02:00
return to_return
2021-06-16 03:26:41 +02:00
@api.route ( ' /json/<string:capture_uuid>/hashes ' )
@api.doc ( description = ' Get all the hashes of all the resources of a capture ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureHashes ( Resource ) : # type: ignore[misc]
2021-12-02 17:55:02 +01:00
# Note: shake algos require a length for the digest, discarding them.
supported_hash_algos = [ algo for algo in hashlib . algorithms_available if not algo . startswith ( ' shake ' ) ]
# NOTE: the SHA512 hashes are pre-computed in the tree, anything else must be computed on the spot
# so we return the SHA512 hashes by default
2024-01-12 17:15:41 +01:00
@api.param ( ' algorithm ' , default = ' sha512 ' , description = f ' Algorithm of the hashes (default: sha512). Supported options: { " , " . join ( supported_hash_algos ) } ' ) # type: ignore[misc]
@api.param ( ' hashes_only ' , default = 1 , description = ' If 1 (default), only returns a list hashes instead of a dictionary of hashes with their respective URLs.. ' ) # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] | tuple [ dict [ str , Any ] , int ] :
2021-06-16 03:26:41 +02:00
cache = lookyloo . capture_cache ( capture_uuid )
if not cache :
2022-09-28 12:40:26 +02:00
return { ' error ' : ' UUID missing in cache, try again later and check the status first. ' } , 400
2021-12-02 17:55:02 +01:00
algorithm = request . args [ ' algorithm ' ] . lower ( ) if request . args . get ( ' algorithm ' ) else ' sha512 '
hashes_only = False if ' hashes_only ' in request . args and request . args [ ' hashes_only ' ] in [ 0 , ' 0 ' ] else True
if algorithm == ' sha512 ' and hashes_only :
2024-01-12 17:15:41 +01:00
to_return : dict [ str , Any ] = { ' response ' : { ' hashes ' : list ( lookyloo . get_hashes ( capture_uuid ) ) } }
2021-12-02 17:55:02 +01:00
else :
hashes = lookyloo . get_hashes_with_context ( capture_uuid , algorithm = algorithm , urls_only = True )
to_return = { ' response ' : { ' hashes ' : list ( hashes . keys ( ) ) } }
if not hashes_only :
to_return [ ' response ' ] [ ' hashes_with_urls ' ] = { h : list ( urls ) for h , urls in hashes . items ( ) }
2021-06-16 03:26:41 +02:00
return to_return
2021-06-08 00:37:11 +02:00
@api.route ( ' /json/<string:capture_uuid>/redirects ' )
2021-06-07 22:12:23 +02:00
@api.doc ( description = ' Get all the redirects of a capture ' ,
2021-06-08 00:37:11 +02:00
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureRedirects ( Resource ) : # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] | tuple [ dict [ str , Any ] , int ] :
2021-06-08 00:37:11 +02:00
cache = lookyloo . capture_cache ( capture_uuid )
2021-06-07 22:12:23 +02:00
if not cache :
2022-09-28 12:40:26 +02:00
return { ' error ' : ' UUID missing in cache, try again later and check the status first. ' } , 400
2021-06-07 22:12:23 +02:00
2024-01-12 17:15:41 +01:00
to_return : dict [ str , Any ] = { }
2022-10-10 14:14:31 +02:00
try :
2023-07-28 14:05:28 +02:00
to_return = { ' response ' : { ' url ' : cache . url ,
' redirects ' : cache . redirects if cache . redirects else [ ] } }
2022-10-10 14:14:31 +02:00
if not cache . redirects :
to_return [ ' response ' ] [ ' info ' ] = ' No redirects '
except Exception as e :
if cache and hasattr ( cache , ' error ' ) :
to_return [ ' error ' ] = cache . error
else :
to_return [ ' error ' ] = str ( e )
2021-06-07 22:12:23 +02:00
return to_return
2021-06-08 00:37:11 +02:00
@api.route ( ' /json/<string:capture_uuid>/misp_export ' )
2021-06-07 22:12:23 +02:00
@api.doc ( description = ' Get an export of the capture in MISP format ' ,
2021-06-08 00:37:11 +02:00
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class MISPExport ( Resource ) : # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] | list [ dict [ str , Any ] ] :
2021-06-07 22:12:23 +02:00
with_parents = request . args . get ( ' with_parents ' )
2021-06-08 00:37:11 +02:00
event = lookyloo . misp_export ( capture_uuid , True if with_parents else False )
2021-06-07 22:12:23 +02:00
if isinstance ( event , dict ) :
return event
to_return = [ ]
for e in event :
2023-07-20 14:02:14 +02:00
to_return . append ( json . loads ( e . to_json ( ) ) )
2021-06-07 22:12:23 +02:00
return to_return
misp_push_fields = api . model ( ' MISPPushFields ' , {
' allow_duplicates ' : fields . Integer ( description = " Push the event even if it is already present on the MISP instance " ,
example = 0 , min = 0 , max = 1 ) ,
' with_parents ' : fields . Integer ( description = " Also push the parents of the capture (if any) " ,
example = 0 , min = 0 , max = 1 ) ,
} )
2021-06-08 00:37:11 +02:00
@api.route ( ' /json/<string:capture_uuid>/misp_push ' )
2023-08-28 17:25:55 +02:00
@api.route ( ' /json/<string:capture_uuid>/misp_push/<string:instance_name> ' )
2021-06-07 22:12:23 +02:00
@api.doc ( description = ' Push an event to a pre-configured MISP instance ' ,
2021-06-08 00:37:11 +02:00
params = { ' capture_uuid ' : ' The UUID of the capture ' } ,
2021-06-07 22:12:23 +02:00
security = ' apikey ' )
2024-01-12 17:15:41 +01:00
class MISPPush ( Resource ) : # type: ignore[misc]
2021-06-07 22:12:23 +02:00
method_decorators = [ api_auth_check ]
2024-01-12 17:15:41 +01:00
@api.param ( ' with_parents ' , ' Also push the parents of the capture (if any) ' ) # type: ignore[misc]
@api.param ( ' allow_duplicates ' , ' Push the event even if it is already present on the MISP instance ' ) # type: ignore[misc]
def get ( self , capture_uuid : str , instance_name : str | None = None ) - > dict [ str , Any ] | list [ dict [ str , Any ] ] :
2021-06-07 22:12:23 +02:00
with_parents = True if request . args . get ( ' with_parents ' ) else False
allow_duplicates = True if request . args . get ( ' allow_duplicates ' ) else False
2023-08-29 17:30:45 +02:00
if instance_name is None :
misp = lookyloo . misps . default_misp
elif lookyloo . misps . get ( instance_name ) is not None :
misp = lookyloo . misps [ instance_name ]
else :
return { ' error ' : f ' MISP instance " { instance_name } " does not exists. ' }
2024-01-12 17:15:41 +01:00
to_return : dict [ str , Any ] = { }
2023-08-28 17:25:55 +02:00
if not misp . available :
2021-06-07 22:12:23 +02:00
to_return [ ' error ' ] = ' MISP module not available. '
2023-08-28 17:25:55 +02:00
elif not misp . enable_push :
2021-06-07 22:12:23 +02:00
to_return [ ' error ' ] = ' Push not enabled in MISP module. '
else :
2021-06-08 00:37:11 +02:00
event = lookyloo . misp_export ( capture_uuid , with_parents )
2021-06-07 22:12:23 +02:00
if isinstance ( event , dict ) :
to_return [ ' error ' ] = event
else :
2023-08-28 17:25:55 +02:00
new_events = misp . push ( event , allow_duplicates )
2021-06-07 22:12:23 +02:00
if isinstance ( new_events , dict ) :
to_return [ ' error ' ] = new_events
else :
events_to_return = [ ]
for e in new_events :
2023-08-29 17:30:45 +02:00
events_to_return . append ( json . loads ( e . to_json ( ) ) )
2021-06-07 22:12:23 +02:00
return events_to_return
return to_return
2024-01-12 17:15:41 +01:00
@api.doc ( body = misp_push_fields ) # type: ignore[misc]
def post ( self , capture_uuid : str , instance_name : str | None = None ) - > dict [ str , Any ] | list [ dict [ str , Any ] ] :
parameters : dict [ str , Any ] = request . get_json ( force = True )
2021-06-07 22:12:23 +02:00
with_parents = True if parameters . get ( ' with_parents ' ) else False
allow_duplicates = True if parameters . get ( ' allow_duplicates ' ) else False
2023-08-29 17:30:45 +02:00
if instance_name is None :
misp = lookyloo . misps . default_misp
elif lookyloo . misps . get ( instance_name ) is not None :
misp = lookyloo . misps [ instance_name ]
else :
return { ' error ' : f ' MISP instance " { instance_name } " does not exists. ' }
2021-06-07 22:12:23 +02:00
2024-01-12 17:15:41 +01:00
to_return : dict [ str , Any ] = { }
2023-08-28 17:25:55 +02:00
if not misp . available :
2021-06-07 22:12:23 +02:00
to_return [ ' error ' ] = ' MISP module not available. '
2023-08-28 17:25:55 +02:00
elif not misp . enable_push :
2021-06-07 22:12:23 +02:00
to_return [ ' error ' ] = ' Push not enabled in MISP module. '
else :
2021-06-08 00:37:11 +02:00
event = lookyloo . misp_export ( capture_uuid , with_parents )
2021-06-07 22:12:23 +02:00
if isinstance ( event , dict ) :
to_return [ ' error ' ] = event
else :
2023-08-28 17:25:55 +02:00
new_events = misp . push ( event , allow_duplicates )
2021-06-07 22:12:23 +02:00
if isinstance ( new_events , dict ) :
to_return [ ' error ' ] = new_events
else :
events_to_return = [ ]
for e in new_events :
2023-08-29 17:30:45 +02:00
events_to_return . append ( json . loads ( e . to_json ( ) ) )
2021-06-07 22:12:23 +02:00
return events_to_return
return to_return
2021-08-13 13:50:26 +02:00
trigger_modules_fields = api . model ( ' TriggerModulesFields ' , {
' force ' : fields . Boolean ( description = " Force trigger the modules, even if the results are already cached. " ,
default = False , required = False ) ,
} )
@api.route ( ' /json/<string:capture_uuid>/trigger_modules ' )
@api.doc ( description = ' Trigger all the available 3rd party modules on the given capture ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class TriggerModules ( Resource ) : # type: ignore[misc]
@api.doc ( body = trigger_modules_fields ) # type: ignore[misc]
def post ( self , capture_uuid : str ) - > dict [ str , Any ] :
parameters : dict [ str , Any ] = request . get_json ( force = True )
2021-08-13 13:50:26 +02:00
force = True if parameters . get ( ' force ' ) else False
return lookyloo . trigger_modules ( capture_uuid , force = force )
2024-03-26 17:10:10 +01:00
2024-03-26 14:08:14 +01:00
@api.route ( ' /json/<string:tree_uuid>/modules ' )
@api.doc ( description = ' Get responses from the 3rd party modules ' ,
params = { ' tree_uuid ' : ' The UUID of the capture ' } )
class ModulesResponse ( Resource ) : # type: ignore[misc]
def get ( self , tree_uuid : str ) - > dict [ str , Any ] :
return lookyloo . get_modules_responses ( tree_uuid )
2021-08-13 13:50:26 +02:00
2021-06-07 22:12:23 +02:00
@api.route ( ' /json/hash_info/<h> ' )
@api.doc ( description = ' Search for a ressource with a specific hash (sha512) ' ,
params = { ' h ' : ' The hash (sha512) ' } )
2024-01-12 17:15:41 +01:00
class HashInfo ( Resource ) : # type: ignore[misc]
def get ( self , h : str ) - > dict [ str , Any ] | tuple [ dict [ str , Any ] , int ] :
2024-03-05 20:51:21 +01:00
from . import get_body_hash_full
details , body = get_body_hash_full ( h )
2021-06-07 22:12:23 +02:00
if not details :
2021-08-17 12:12:10 +02:00
return { ' error ' : ' Unknown Hash. ' } , 400
2024-01-12 17:15:41 +01:00
to_return : dict [ str , Any ] = { ' response ' : { ' hash ' : h , ' details ' : details ,
2021-06-07 22:12:23 +02:00
' body ' : base64 . b64encode ( body . getvalue ( ) ) . decode ( ) } }
return to_return
url_info_fields = api . model ( ' URLInfoFields ' , {
' url ' : fields . String ( description = " The URL to search " , required = True ) ,
' limit ' : fields . Integer ( description = " The maximal amount of captures to return " , example = 20 ) ,
2023-01-20 11:15:33 +01:00
' cached_captures_only ' : fields . Boolean ( description = " If false, re-cache the missing captures (can take a while) " , default = True ) ,
2021-06-07 22:12:23 +02:00
} )
@api.route ( ' /json/url_info ' )
@api.doc ( description = ' Search for a URL ' )
2024-01-12 17:15:41 +01:00
class URLInfo ( Resource ) : # type: ignore[misc]
2021-06-07 22:12:23 +02:00
2024-01-12 17:15:41 +01:00
@api.doc ( body = url_info_fields ) # type: ignore[misc]
def post ( self ) - > list [ dict [ str , Any ] ] :
2024-03-05 20:51:21 +01:00
from . import get_url_occurrences
2024-01-12 17:15:41 +01:00
to_query : dict [ str , Any ] = request . get_json ( force = True )
2024-03-05 20:51:21 +01:00
occurrences = get_url_occurrences ( to_query . pop ( ' url ' ) , * * to_query )
2021-06-07 22:12:23 +02:00
return occurrences
hostname_info_fields = api . model ( ' HostnameInfoFields ' , {
' hostname ' : fields . String ( description = " The hostname to search " , required = True ) ,
' limit ' : fields . Integer ( description = " The maximal amount of captures to return " , example = 20 ) ,
2023-01-20 11:15:33 +01:00
' cached_captures_only ' : fields . Boolean ( description = " If false, re-cache the missing captures (can take a while) " , default = True ) ,
2021-06-07 22:12:23 +02:00
} )
@api.route ( ' /json/hostname_info ' )
@api.doc ( description = ' Search for a hostname ' )
2024-01-12 17:15:41 +01:00
class HostnameInfo ( Resource ) : # type: ignore[misc]
2021-06-07 22:12:23 +02:00
2024-01-12 17:15:41 +01:00
@api.doc ( body = hostname_info_fields ) # type: ignore[misc]
def post ( self ) - > list [ dict [ str , Any ] ] :
2024-03-05 20:51:21 +01:00
from . import get_hostname_occurrences
2024-01-12 17:15:41 +01:00
to_query : dict [ str , Any ] = request . get_json ( force = True )
2024-03-05 20:51:21 +01:00
return get_hostname_occurrences ( to_query . pop ( ' hostname ' ) , * * to_query )
2021-06-07 22:12:23 +02:00
@api.route ( ' /json/stats ' )
@api.doc ( description = ' Get the statistics of the lookyloo instance. ' )
2024-01-12 17:15:41 +01:00
class InstanceStats ( Resource ) : # type: ignore[misc]
def get ( self ) - > dict [ str , Any ] :
2021-06-07 22:12:23 +02:00
return lookyloo . get_stats ( )
2022-08-18 11:19:32 +02:00
@api.route ( ' /json/devices ' )
@api.doc ( description = ' Get the list of devices pre-configured on the platform ' )
2024-01-12 17:15:41 +01:00
class Devices ( Resource ) : # type: ignore[misc]
2022-08-18 11:19:32 +02:00
2024-01-12 17:15:41 +01:00
def get ( self ) - > dict [ str , Any ] :
2022-08-18 11:19:32 +02:00
return lookyloo . get_playwright_devices ( )
2021-06-08 00:37:11 +02:00
@api.route ( ' /json/<string:capture_uuid>/stats ' )
@api.doc ( description = ' Get the statistics of the capture. ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureStats ( Resource ) : # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] :
2021-06-08 00:37:11 +02:00
return lookyloo . get_statistics ( capture_uuid )
2021-06-10 02:59:24 +02:00
@api.route ( ' /json/<string:capture_uuid>/info ' )
@api.doc ( description = ' Get basic information about the capture. ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureInfo ( Resource ) : # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] :
2021-06-10 02:59:24 +02:00
return lookyloo . get_info ( capture_uuid )
2021-06-08 00:37:11 +02:00
@api.route ( ' /json/<string:capture_uuid>/cookies ' )
@api.doc ( description = ' Get the complete cookie jar created during the capture. ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureCookies ( Resource ) : # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] :
2021-06-08 00:37:11 +02:00
return json . loads ( lookyloo . get_cookies ( capture_uuid ) . read ( ) )
2024-03-27 13:44:35 +01:00
2024-03-27 10:29:52 +01:00
@api.route ( ' /json/<string:capture_uuid>/report ' )
@api.doc ( description = ' Reports the url by sending an email to the investigation team ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
class CaptureReport ( Resource ) : # type: ignore[misc]
@api.param ( ' email ' , ' Email of the reporter, used by the analyst to get in touch. ' ) # type: ignore[misc]
@api.param ( ' comment ' , ' Description of the URL, will be given to the analyst. ' ) # type: ignore[misc]
2024-03-27 13:44:35 +01:00
def post ( self , capture_uuid : str ) - > bool | dict [ str , Any ] :
2024-03-27 10:29:52 +01:00
parameters : dict [ str , Any ] = request . get_json ( force = True )
2024-03-27 13:44:35 +01:00
return lookyloo . send_mail ( capture_uuid , parameters . get ( ' email ' , ' ' ) , parameters . get ( ' comment ' ) )
2021-06-08 00:37:11 +02:00
2023-04-28 17:19:49 +02:00
auto_report_model = api . model ( ' AutoReportModel ' , {
2023-11-08 13:49:48 +01:00
' email ' : fields . String ( description = " Email of the reporter, used by the analyst to get in touch. " , example = ' ' ) ,
' comment ' : fields . String ( description = " Description of the URL, will be given to the analyst. " , example = ' ' )
2023-04-28 17:19:49 +02:00
} )
2021-08-17 12:12:10 +02:00
submit_fields_post = api . model ( ' SubmitFieldsPost ' , {
2023-11-08 13:49:48 +01:00
' url ' : fields . Url ( description = " The URL to capture " , example = ' ' ) ,
' document ' : fields . String ( description = " A base64 encoded document, it can be anything a browser can display. " , example = ' ' ) ,
' document_name ' : fields . String ( description = " The name of the document. " , example = ' ' ) ,
2021-08-17 12:12:10 +02:00
' listing ' : fields . Integer ( description = " Display the capture on the index " , min = 0 , max = 1 , example = 1 ) ,
2024-03-19 19:21:41 +01:00
' allow_tracking ' : fields . Integer ( description = " Attempt to let the website violate your privacy " , min = 0 , max = 1 , example = 0 ) ,
2021-08-17 12:12:10 +02:00
' user_agent ' : fields . String ( description = " User agent to use for the capture " , example = ' ' ) ,
2022-08-18 11:19:32 +02:00
' browser_name ' : fields . String ( description = " Use this browser. Must be chromium, firefox or webkit. " , example = ' ' ) ,
' device_name ' : fields . String ( description = " Use the pre-configured settings for this device. Get a list from /json/devices. " , example = ' ' ) ,
2021-08-17 12:12:10 +02:00
' referer ' : fields . String ( description = " Referer to pass to the capture " , example = ' ' ) ,
2022-08-18 11:19:32 +02:00
' headers ' : fields . String ( description = " Headers to pass to the capture " , example = ' Accept-Language: en-US;q=0.5, fr-FR;q=0.4 ' ) ,
2021-08-17 12:12:10 +02:00
' proxy ' : fields . Url ( description = " Proxy to use for the capture. Format: [scheme]://[username]:[password]@[hostname]:[port] " , example = ' ' ) ,
2023-04-28 17:19:49 +02:00
' cookies ' : fields . String ( description = " JSON export of a list of cookies as exported from an other capture " , example = ' ' ) ,
' auto_report ' : fields . Nested ( auto_report_model , description = " The settings for the automatic reporting. " )
2021-08-17 12:12:10 +02:00
} )
2021-06-07 22:12:23 +02:00
@api.route ( ' /submit ' )
2024-01-12 17:15:41 +01:00
class SubmitCapture ( Resource ) : # type: ignore[misc]
@api.param ( ' url ' , ' The URL to capture ' , required = True ) # type: ignore[misc]
@api.param ( ' listing ' , ' Display the capture on the index ' , default = 1 ) # type: ignore[misc]
2024-03-19 19:21:41 +01:00
@api.param ( ' allow_tracking ' , ' Attempt to let the website violate your privacy ' , default = 1 ) # type: ignore[misc]
2024-01-12 17:15:41 +01:00
@api.param ( ' user_agent ' , ' User agent to use for the capture ' ) # type: ignore[misc]
@api.param ( ' browser_name ' , ' Use this browser. Must be chromium, firefox or webkit. ' ) # type: ignore[misc]
@api.param ( ' device_name ' , ' Use the pre-configured settings for this device ' ) # type: ignore[misc]
@api.param ( ' referer ' , ' Referer to pass to the capture ' ) # type: ignore[misc]
@api.param ( ' proxy ' , ' Proxy to use for the the capture ' ) # type: ignore[misc]
@api.produces ( [ ' text/text ' ] ) # type: ignore[misc]
def get ( self ) - > str | tuple [ str , int ] :
2021-08-17 12:12:10 +02:00
if flask_login . current_user . is_authenticated :
user = flask_login . current_user . get_id ( )
else :
user = src_request_ip ( request )
if ' url ' not in request . args or not request . args . get ( ' url ' ) :
return ' No " url " in the URL params, nothting to capture. ' , 400
2023-05-15 16:08:19 +02:00
to_query : CaptureSettings = {
' url ' : request . args [ ' url ' ] ,
2024-03-19 19:21:41 +01:00
' listing ' : False if ' listing ' in request . args and request . args [ ' listing ' ] in [ 0 , ' 0 ' ] else True ,
' allow_tracking ' : False if ' allow_tracking ' in request . args and request . args [ ' allow_tracking ' ] in [ 0 , ' 0 ' ] else True
}
2021-08-17 12:12:10 +02:00
if request . args . get ( ' user_agent ' ) :
to_query [ ' user_agent ' ] = request . args [ ' user_agent ' ]
2022-08-18 11:19:32 +02:00
if request . args . get ( ' browser_name ' ) :
to_query [ ' browser_name ' ] = request . args [ ' browser_name ' ]
if request . args . get ( ' device_name ' ) :
to_query [ ' device_name ' ] = request . args [ ' device_name ' ]
2021-08-17 12:12:10 +02:00
if request . args . get ( ' referer ' ) :
to_query [ ' referer ' ] = request . args [ ' referer ' ]
2021-11-23 21:59:56 +01:00
if request . args . get ( ' headers ' ) :
to_query [ ' headers ' ] = request . args [ ' headers ' ]
2021-08-17 12:12:10 +02:00
if request . args . get ( ' proxy ' ) :
to_query [ ' proxy ' ] = request . args [ ' proxy ' ]
perma_uuid = lookyloo . enqueue_capture ( to_query , source = ' api ' , user = user , authenticated = flask_login . current_user . is_authenticated )
return perma_uuid
2024-01-12 17:15:41 +01:00
@api.doc ( body = submit_fields_post ) # type: ignore[misc]
@api.produces ( [ ' text/text ' ] ) # type: ignore[misc]
def post ( self ) - > str :
2021-06-07 22:12:23 +02:00
if flask_login . current_user . is_authenticated :
user = flask_login . current_user . get_id ( )
else :
user = src_request_ip ( request )
2023-05-15 16:08:19 +02:00
to_query : CaptureSettings = request . get_json ( force = True )
2021-06-07 22:12:23 +02:00
perma_uuid = lookyloo . enqueue_capture ( to_query , source = ' api ' , user = user , authenticated = flask_login . current_user . is_authenticated )
2021-06-08 00:37:11 +02:00
return perma_uuid
2021-06-07 22:12:23 +02:00
2021-06-08 00:37:11 +02:00
# Binary stuff
@api.route ( ' /bin/<string:capture_uuid>/screenshot ' )
@api.doc ( description = ' Get the screenshot associated to the capture. ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureScreenshot ( Resource ) : # type: ignore[misc]
2021-06-08 00:37:11 +02:00
2024-01-12 17:15:41 +01:00
@api.produces ( [ ' image/png ' ] ) # type: ignore[misc]
def get ( self , capture_uuid : str ) - > Response :
2021-06-08 00:37:11 +02:00
return send_file ( lookyloo . get_screenshot ( capture_uuid ) , mimetype = ' image/png ' )
@api.route ( ' /bin/<string:capture_uuid>/export ' )
@api.doc ( description = ' Get all the files generated by the capture, except the pickle. ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureExport ( Resource ) : # type: ignore[misc]
2021-06-08 00:37:11 +02:00
2024-01-12 17:15:41 +01:00
@api.produces ( [ ' application/zip ' ] ) # type: ignore[misc]
def get ( self , capture_uuid : str ) - > Response :
2021-06-08 00:37:11 +02:00
return send_file ( lookyloo . get_capture ( capture_uuid ) , mimetype = ' application/zip ' )
2023-09-27 12:09:20 +02:00
@api.route ( ' /bin/<string:capture_uuid>/data ' )
@api.doc ( description = ' Get the file downloaded by the capture. ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } )
2024-01-12 17:15:41 +01:00
class CaptureData ( Resource ) : # type: ignore[misc]
2023-09-27 12:09:20 +02:00
2024-01-12 17:15:41 +01:00
@api.produces ( [ ' application/zip ' ] ) # type: ignore[misc]
def get ( self , capture_uuid : str ) - > Response :
2023-09-27 12:09:20 +02:00
filename , data = lookyloo . get_data ( capture_uuid )
2023-11-07 16:47:34 +01:00
if not filename :
# This capture didn't trigger a download.
filename = ' no_download '
data = BytesIO ( b " This capture didn ' t trigger a download " )
2023-09-27 12:09:20 +02:00
to_return = BytesIO ( )
with ZipFile ( to_return , ' w ' ) as z :
z . writestr ( filename , data . getvalue ( ) )
to_return . seek ( 0 )
return send_file ( to_return , mimetype = ' application/zip ' )
2023-01-18 16:31:12 +01:00
# Compare captures (WiP)
2023-03-24 15:47:41 +01:00
compare_settings_mapping = api . model ( ' CompareSettings ' , {
' ressources_ignore_domains ' : fields . List ( fields . String ( description = " A domain to ignore " ) ) ,
' ressources_ignore_regexes ' : fields . List ( fields . String ( description = " A regex to match anything in a URL " ) )
} )
2023-01-18 16:31:12 +01:00
compare_captures_fields = api . model ( ' CompareCapturesFields ' , {
2023-01-31 11:22:43 +01:00
' capture_left ' : fields . String ( description = " Left capture to compare. " , required = True ) ,
' capture_right ' : fields . String ( description = " Right capture to compare. " , required = True ) ,
2023-03-24 15:47:41 +01:00
' compare_settings ' : fields . Nested ( compare_settings_mapping , description = " The settings to compare captures. " )
2023-01-18 16:31:12 +01:00
} )
@api.route ( ' /json/compare_captures ' )
2023-01-31 11:22:43 +01:00
@api.doc ( description = ' Compare two captures ' )
2024-01-12 17:15:41 +01:00
class CompareCaptures ( Resource ) : # type: ignore[misc]
@api.doc ( body = compare_captures_fields ) # type: ignore[misc]
def post ( self ) - > dict [ str , Any ] :
parameters : dict [ str , Any ] = request . get_json ( force = True )
2023-02-23 18:37:40 +01:00
left_uuid = parameters . get ( ' capture_left ' )
right_uuid = parameters . get ( ' capture_right ' )
2023-03-24 15:47:41 +01:00
if not left_uuid or not right_uuid :
return { ' error ' : ' UUIDs of captures to compare missing ' , ' details ' : f ' Left: { left_uuid } / Right: { right_uuid } ' }
2023-02-23 18:37:40 +01:00
try :
2023-05-11 15:07:37 +02:00
different , result = comparator . compare_captures ( left_uuid , right_uuid , settings = parameters . get ( ' compare_settings ' ) )
2023-02-23 18:37:40 +01:00
except MissingUUID as e :
# UUID non-existent, or capture still ongoing.
2023-02-23 18:47:16 +01:00
if left_uuid and right_uuid :
status_left = lookyloo . get_capture_status ( left_uuid )
status_right = lookyloo . get_capture_status ( right_uuid )
2023-02-27 16:01:46 +01:00
return { ' error ' : str ( e ) , ' details ' : { left_uuid : status_left , right_uuid : status_right } }
2023-02-23 18:47:16 +01:00
else :
2023-02-27 16:01:46 +01:00
return { ' error ' : str ( e ) , ' details ' : ' Invalid request (left/right UUIDs missing.) ' }
2023-05-11 15:07:37 +02:00
result [ ' different ' ] = different
2023-01-18 16:31:12 +01:00
return result
2023-04-24 16:25:29 +02:00
comparables_nodes_model = api . model ( ' ComparablesNodeModel ' , {
' url ' : fields . String ,
' hostname ' : fields . String ,
' ip_address ' : fields . String ,
} )
redirects_model = api . model ( ' RedirectsModel ' , {
' length ' : fields . Integer ,
' nodes ' : fields . List ( fields . Nested ( comparables_nodes_model ) ) ,
} )
comparables_model = api . model ( ' ComparablesModel ' , {
' root_url ' : fields . String ,
' final_url ' : fields . String ,
' final_hostname ' : fields . String ,
' final_status_code ' : fields . Integer ,
' redirects ' : fields . Nested ( redirects_model ) ,
' ressources ' : fields . List ( fields . List ( fields . String ) ) ,
} )
2023-04-24 18:10:16 +02:00
@api.route ( ' /json/<string:capture_uuid>/comparables ' )
2023-04-24 16:25:29 +02:00
@api.doc ( description = ' Get the data we can compare across captures ' )
2024-01-12 17:15:41 +01:00
class Comparables ( Resource ) : # type: ignore[misc]
2023-04-24 16:25:29 +02:00
2024-01-12 17:15:41 +01:00
@api.marshal_with ( comparables_model ) # type: ignore[misc]
def get ( self , capture_uuid : str ) - > dict [ str , Any ] :
2023-04-24 16:25:29 +02:00
return comparator . get_comparables_capture ( capture_uuid )
2023-02-02 15:18:26 +01:00
# Get information for takedown
takedown_fields = api . model ( ' TakedownFields ' , {
' capture_uuid ' : fields . String ( description = " The UUID of the capture. " , required = True ) ,
} )
@api.route ( ' /json/takedown ' )
@api.doc ( description = ' Get information for triggering a takedown request ' )
2024-01-12 17:15:41 +01:00
class Takedown ( Resource ) : # type: ignore[misc]
@api.doc ( body = takedown_fields ) # type: ignore[misc]
def post ( self ) - > list [ dict [ str , Any ] ] | dict [ str , str ] :
parameters : dict [ str , Any ] = request . get_json ( force = True )
2023-02-02 15:18:26 +01:00
capture_uuid = parameters . get ( ' capture_uuid ' )
if not capture_uuid :
2023-03-09 11:55:45 +01:00
return { ' error ' : f ' Invalid request: { parameters } ' }
return lookyloo . contacts ( capture_uuid )
2023-02-02 15:18:26 +01:00
2021-06-08 00:37:11 +02:00
# Admin stuff
@api.route ( ' /admin/rebuild_all ' )
@api.doc ( description = ' Rebuild all the trees. WARNING: IT IS GOING TO TAKE A VERY LONG TIME. ' ,
security = ' apikey ' )
2024-01-12 17:15:41 +01:00
class RebuildAll ( Resource ) : # type: ignore[misc]
2021-06-08 00:37:11 +02:00
method_decorators = [ api_auth_check ]
2024-01-12 17:15:41 +01:00
def post ( self ) - > dict [ str , str ] | tuple [ dict [ str , str ] , int ] :
2021-06-08 00:37:11 +02:00
try :
lookyloo . rebuild_all ( )
except Exception as e :
2021-08-17 12:12:10 +02:00
return { ' error ' : f ' Unable to rebuild all captures: { e } . ' } , 400
2021-06-08 00:37:11 +02:00
else :
return { ' info ' : ' Captures successfully rebuilt. ' }
@api.route ( ' /admin/rebuild_all_cache ' )
@api.doc ( description = ' Rebuild all the caches. It will take a while, but less that rebuild all. ' ,
security = ' apikey ' )
2024-01-12 17:15:41 +01:00
class RebuildAllCache ( Resource ) : # type: ignore[misc]
2021-06-08 00:37:11 +02:00
method_decorators = [ api_auth_check ]
2024-01-12 17:15:41 +01:00
def post ( self ) - > dict [ str , str ] | tuple [ dict [ str , str ] , int ] :
2021-06-08 00:37:11 +02:00
try :
lookyloo . rebuild_cache ( )
except Exception as e :
2021-08-17 12:12:10 +02:00
return { ' error ' : f ' Unable to rebuild all the caches: { e } . ' } , 400
2021-06-08 00:37:11 +02:00
else :
return { ' info ' : ' All caches successfully rebuilt. ' }
@api.route ( ' /admin/<string:capture_uuid>/rebuild ' )
@api.doc ( description = ' Rebuild the tree. ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } ,
security = ' apikey ' )
2024-01-12 17:15:41 +01:00
class CaptureRebuildTree ( Resource ) : # type: ignore[misc]
2021-06-08 00:37:11 +02:00
method_decorators = [ api_auth_check ]
2024-01-12 17:15:41 +01:00
def post ( self , capture_uuid : str ) - > dict [ str , str ] | tuple [ dict [ str , str ] , int ] :
2021-06-08 00:37:11 +02:00
try :
lookyloo . remove_pickle ( capture_uuid )
lookyloo . get_crawled_tree ( capture_uuid )
except Exception as e :
2021-08-17 12:12:10 +02:00
return { ' error ' : f ' Unable to rebuild tree: { e } . ' } , 400
2021-06-08 00:37:11 +02:00
else :
return { ' info ' : f ' Tree { capture_uuid } successfully rebuilt. ' }
@api.route ( ' /admin/<string:capture_uuid>/hide ' )
@api.doc ( description = ' Hide the capture from the index. ' ,
params = { ' capture_uuid ' : ' The UUID of the capture ' } ,
security = ' apikey ' )
2024-01-12 17:15:41 +01:00
class CaptureHide ( Resource ) : # type: ignore[misc]
2021-06-08 00:37:11 +02:00
method_decorators = [ api_auth_check ]
2024-01-12 17:15:41 +01:00
def post ( self , capture_uuid : str ) - > dict [ str , str ] | tuple [ dict [ str , str ] , int ] :
2021-06-08 00:37:11 +02:00
try :
lookyloo . hide_capture ( capture_uuid )
except Exception as e :
2021-08-17 12:12:10 +02:00
return { ' error ' : f ' Unable to hide the tree: { e } . ' } , 400
2021-06-08 00:37:11 +02:00
else :
return { ' info ' : f ' Capture { capture_uuid } successfully hidden. ' }