Merge branch 'restx'

pull/231/head
Raphaël Vinot 2021-07-21 11:05:25 +02:00
commit 28e3162187
8 changed files with 755 additions and 327 deletions

View File

@ -324,6 +324,14 @@ class Lookyloo():
ct = self.get_crawled_tree(capture_uuid)
return ct.root_hartree.stats
def get_info(self, capture_uuid: str, /) -> Dict[str, Any]:
'''Get basic information about the capture.'''
ct = self.get_crawled_tree(capture_uuid)
to_return = {'url': ct.root_url, 'title': ct.root_hartree.har.initial_title,
'capture_time': ct.start_time.isoformat(), 'user_agent': ct.user_agent,
'referer': ct.referer}
return to_return
def get_meta(self, capture_uuid: str, /) -> Dict[str, str]:
'''Get the meta informations from a capture (mostly, details about the User Agent used.)'''
capture_dir = self._get_capture_dir(capture_uuid)
@ -1202,6 +1210,38 @@ class Lookyloo():
container = self.get_crawled_tree(tree_uuid)
return get_resources_hashes(container)
def get_hostnames(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]:
"""Return all the unique hostnames:
* of a complete tree if no hostnode_uuid and urlnode_uuid are given
* of a HostNode if hostnode_uuid is given
* of a URLNode if urlnode_uuid is given
"""
if urlnode_uuid:
node = self.get_urlnode_from_tree(tree_uuid, urlnode_uuid)
return {node.hostname}
elif hostnode_uuid:
node = self.get_hostnode_from_tree(tree_uuid, hostnode_uuid)
return {node.name}
else:
ct = self.get_crawled_tree(tree_uuid)
return {node.name for node in ct.root_hartree.hostname_tree.traverse()}
def get_urls(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]:
"""Return all the unique URLs:
* of a complete tree if no hostnode_uuid and urlnode_uuid are given
* of a HostNode if hostnode_uuid is given
* of a URLNode if urlnode_uuid is given
"""
if urlnode_uuid:
node = self.get_urlnode_from_tree(tree_uuid, urlnode_uuid)
return {node.name}
elif hostnode_uuid:
node = self.get_hostnode_from_tree(tree_uuid, hostnode_uuid)
return {urlnode.name for urlnode in node.urls}
else:
ct = self.get_crawled_tree(tree_uuid)
return {node.name for node in ct.root_hartree.url_tree.traverse()}
def get_hostnode_investigator(self, capture_uuid: str, /, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
'''Gather all the informations needed to display the Hostnode investigator popup.'''
ct = self.get_crawled_tree(capture_uuid)

136
poetry.lock generated
View File

@ -17,6 +17,17 @@ yarl = ">=1.0,<2.0"
[package.extras]
speedups = ["aiodns", "brotlipy", "cchardet"]
[[package]]
name = "aniso8601"
version = "9.0.1"
description = "A library for parsing ISO 8601 strings."
category = "main"
optional = false
python-versions = "*"
[package.extras]
dev = ["black", "coverage", "isort", "pre-commit", "pyenchant", "pylint"]
[[package]]
name = "appnope"
version = "0.1.2"
@ -137,7 +148,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
[[package]]
name = "charset-normalizer"
version = "2.0.1"
version = "2.0.3"
description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet."
category = "main"
optional = false
@ -290,6 +301,27 @@ python-versions = "*"
[package.dependencies]
Flask = "*"
[[package]]
name = "flask-restx"
version = "0.5.0"
description = "Fully featured framework for fast, easy and documented API development with Flask"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
aniso8601 = {version = ">=0.82", markers = "python_version >= \"3.5\""}
Flask = ">=0.8,<2.0.0 || >2.0.0"
jsonschema = "*"
pytz = "*"
six = ">=1.3.0"
werkzeug = "!=2.0.0"
[package.extras]
dev = ["blinker", "Faker (==2.0.0)", "mock (==3.0.5)", "pytest-benchmark (==3.2.2)", "pytest-cov (==2.7.1)", "pytest-flask (==0.15.1)", "pytest-mock (==1.10.4)", "pytest-profiling (==1.7.0)", "tzlocal", "invoke (==1.3.0)", "readme-renderer (==24.0)", "twine (==1.15.0)", "tox", "pytest (==4.6.5)", "pytest (==5.4.1)", "ossaudit", "black"]
doc = ["alabaster (==0.7.12)", "Sphinx (==2.1.2)", "sphinx-issues (==1.2.0)"]
test = ["blinker", "Faker (==2.0.0)", "mock (==3.0.5)", "pytest-benchmark (==3.2.2)", "pytest-cov (==2.7.1)", "pytest-flask (==0.15.1)", "pytest-mock (==1.10.4)", "pytest-profiling (==1.7.0)", "tzlocal", "invoke (==1.3.0)", "readme-renderer (==24.0)", "twine (==1.15.0)", "pytest (==4.6.5)", "pytest (==5.4.1)", "ossaudit"]
[[package]]
name = "gunicorn"
version = "20.1.0"
@ -426,7 +458,7 @@ python-versions = "*"
[[package]]
name = "itemadapter"
version = "0.2.0"
version = "0.3.0"
description = "Common interface for data container classes"
category = "main"
optional = false
@ -584,7 +616,7 @@ python-versions = "*"
[[package]]
name = "numpy"
version = "1.21.0"
version = "1.21.1"
description = "NumPy is the fundamental package for array computing with Python."
category = "main"
optional = false
@ -872,6 +904,14 @@ category = "main"
optional = true
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
[[package]]
name = "pytz"
version = "2021.1"
description = "World timezone definitions, modern and historical"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "queuelib"
version = "1.6.1"
@ -1101,6 +1141,14 @@ category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "types-pkg-resources"
version = "0.1.3"
description = "Typing stubs for pkg_resources"
category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "types-redis"
version = "3.5.4"
@ -1226,7 +1274,7 @@ misp = ["python-magic", "pydeep"]
[metadata]
lock-version = "1.1"
python-versions = "^3.8"
content-hash = "86e29ee9755ab5aa5afe3fdd605a3ef6ade0bdbad4d765f337b273ddb8a42f9f"
content-hash = "ed4fe4ee2136c515db868d13ef7308e572863d3d1f050b983b3ac123670b2b90"
[metadata.files]
aiohttp = [
@ -1268,6 +1316,10 @@ aiohttp = [
{file = "aiohttp-3.7.4.post0-cp39-cp39-win_amd64.whl", hash = "sha256:02f46fc0e3c5ac58b80d4d56eb0a7c7d97fcef69ace9326289fb9f1955e65cfe"},
{file = "aiohttp-3.7.4.post0.tar.gz", hash = "sha256:493d3299ebe5f5a7c66b9819eacdcfbbaaf1a8e84911ddffcdc48888497afecf"},
]
aniso8601 = [
{file = "aniso8601-9.0.1-py2.py3-none-any.whl", hash = "sha256:1d2b7ef82963909e93c4f24ce48d4de9e66009a21bf1c1e1c85bdd0812fe412f"},
{file = "aniso8601-9.0.1.tar.gz", hash = "sha256:72e3117667eedf66951bb2d93f4296a56b94b078a8a95905a052611fb3f1b973"},
]
appnope = [
{file = "appnope-0.1.2-py2.py3-none-any.whl", hash = "sha256:93aa393e9d6c54c5cd570ccadd8edad61ea0c4b9ea7a01409020c9aa019eb442"},
{file = "appnope-0.1.2.tar.gz", hash = "sha256:dd83cd4b5b460958838f6eb3000c660b1f9caf2a5b1de4264e941512f603258a"},
@ -1379,8 +1431,8 @@ chardet = [
{file = "chardet-4.0.0.tar.gz", hash = "sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa"},
]
charset-normalizer = [
{file = "charset-normalizer-2.0.1.tar.gz", hash = "sha256:ad0da505736fc7e716a8da15bf19a985db21ac6415c26b34d2fafd3beb3d927e"},
{file = "charset_normalizer-2.0.1-py3-none-any.whl", hash = "sha256:b68b38179052975093d71c1b5361bf64afd80484697c1f27056e50593e695ceb"},
{file = "charset-normalizer-2.0.3.tar.gz", hash = "sha256:c46c3ace2d744cfbdebceaa3c19ae691f53ae621b39fd7570f59d14fb7f2fd12"},
{file = "charset_normalizer-2.0.3-py3-none-any.whl", hash = "sha256:88fce3fa5b1a84fdcb3f603d889f723d1dd89b26059d0123ca435570e848d5e1"},
]
click = [
{file = "click-8.0.1-py3-none-any.whl", hash = "sha256:fba402a4a47334742d782209a7c79bc448911afe1149d07bdabdf480b3e2f4b6"},
@ -1441,6 +1493,10 @@ flask-login = [
{file = "Flask-Login-0.5.0.tar.gz", hash = "sha256:6d33aef15b5bcead780acc339464aae8a6e28f13c90d8b1cf9de8b549d1c0b4b"},
{file = "Flask_Login-0.5.0-py2.py3-none-any.whl", hash = "sha256:7451b5001e17837ba58945aead261ba425fdf7b4f0448777e597ddab39f4fba0"},
]
flask-restx = [
{file = "flask-restx-0.5.0.tar.gz", hash = "sha256:7e9f7cd5e843dd653a71fafb7c8ce9d7b4fef29f982a2254b1e0ebb3fac1fe12"},
{file = "flask_restx-0.5.0-py2.py3-none-any.whl", hash = "sha256:c3c2b724e688c0a50ee5e78f2a508b7f0c34644f00f64170fa8a3d0cdc34f67a"},
]
gunicorn = [
{file = "gunicorn-20.1.0.tar.gz", hash = "sha256:e0a968b5ba15f8a328fdfd7ab1fcb5af4470c28aaf7e55df02a99bc13138e6e8"},
]
@ -1481,8 +1537,8 @@ ipython-genutils = [
{file = "ipython_genutils-0.2.0.tar.gz", hash = "sha256:eb2e116e75ecef9d4d228fdc66af54269afa26ab4463042e33785b887c628ba8"},
]
itemadapter = [
{file = "itemadapter-0.2.0-py3-none-any.whl", hash = "sha256:5327c2136353cb965b6b4ba564af002fd458691b8e30d3bd6b14c474d92c6b25"},
{file = "itemadapter-0.2.0.tar.gz", hash = "sha256:cb7aaa577fefe2aa6f229ccf4d058e05f44e0178a98c8fb70ee4d95acfabb423"},
{file = "itemadapter-0.3.0-py3-none-any.whl", hash = "sha256:cfc7964518016412dfa23ade9d094ec3b5b3009f200117d4ce773aceff6efe5a"},
{file = "itemadapter-0.3.0.tar.gz", hash = "sha256:ab2651ba20f5f6d0e15f041deba4c13ffc59270def2bd01518d13e94c4cd27d1"},
]
itemloaders = [
{file = "itemloaders-1.0.4-py3-none-any.whl", hash = "sha256:4cb46a0f8915e910c770242ae3b60b1149913ed37162804f1e40e8535d6ec497"},
@ -1680,34 +1736,34 @@ mypy-extensions = [
{file = "mypy_extensions-0.4.3.tar.gz", hash = "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"},
]
numpy = [
{file = "numpy-1.21.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:d5caa946a9f55511e76446e170bdad1d12d6b54e17a2afe7b189112ed4412bb8"},
{file = "numpy-1.21.0-cp37-cp37m-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:ac4fd578322842dbda8d968e3962e9f22e862b6ec6e3378e7415625915e2da4d"},
{file = "numpy-1.21.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:598fe100b2948465cf3ed64b1a326424b5e4be2670552066e17dfaa67246011d"},
{file = "numpy-1.21.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7c55407f739f0bfcec67d0df49103f9333edc870061358ac8a8c9e37ea02fcd2"},
{file = "numpy-1.21.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:75579acbadbf74e3afd1153da6177f846212ea2a0cc77de53523ae02c9256513"},
{file = "numpy-1.21.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:cc367c86eb87e5b7c9592935620f22d13b090c609f1b27e49600cd033b529f54"},
{file = "numpy-1.21.0-cp37-cp37m-win32.whl", hash = "sha256:d89b0dc7f005090e32bb4f9bf796e1dcca6b52243caf1803fdd2b748d8561f63"},
{file = "numpy-1.21.0-cp37-cp37m-win_amd64.whl", hash = "sha256:eda2829af498946c59d8585a9fd74da3f810866e05f8df03a86f70079c7531dd"},
{file = "numpy-1.21.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:1a784e8ff7ea2a32e393cc53eb0003eca1597c7ca628227e34ce34eb11645a0e"},
{file = "numpy-1.21.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:bba474a87496d96e61461f7306fba2ebba127bed7836212c360f144d1e72ac54"},
{file = "numpy-1.21.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:fd0a359c1c17f00cb37de2969984a74320970e0ceef4808c32e00773b06649d9"},
{file = "numpy-1.21.0-cp38-cp38-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:e4d5a86a5257843a18fb1220c5f1c199532bc5d24e849ed4b0289fb59fbd4d8f"},
{file = "numpy-1.21.0-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:620732f42259eb2c4642761bd324462a01cdd13dd111740ce3d344992dd8492f"},
{file = "numpy-1.21.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b9205711e5440954f861ceeea8f1b415d7dd15214add2e878b4d1cf2bcb1a914"},
{file = "numpy-1.21.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:ad09f55cc95ed8d80d8ab2052f78cc21cb231764de73e229140d81ff49d8145e"},
{file = "numpy-1.21.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:a1f2fb2da242568af0271455b89aee0f71e4e032086ee2b4c5098945d0e11cf6"},
{file = "numpy-1.21.0-cp38-cp38-win32.whl", hash = "sha256:e58ddb53a7b4959932f5582ac455ff90dcb05fac3f8dcc8079498d43afbbde6c"},
{file = "numpy-1.21.0-cp38-cp38-win_amd64.whl", hash = "sha256:d2910d0a075caed95de1a605df00ee03b599de5419d0b95d55342e9a33ad1fb3"},
{file = "numpy-1.21.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:a290989cd671cd0605e9c91a70e6df660f73ae87484218e8285c6522d29f6e38"},
{file = "numpy-1.21.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:3537b967b350ad17633b35c2f4b1a1bbd258c018910b518c30b48c8e41272717"},
{file = "numpy-1.21.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ccc6c650f8700ce1e3a77668bb7c43e45c20ac06ae00d22bdf6760b38958c883"},
{file = "numpy-1.21.0-cp39-cp39-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:709884863def34d72b183d074d8ba5cfe042bc3ff8898f1ffad0209161caaa99"},
{file = "numpy-1.21.0-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:bebab3eaf0641bba26039fb0b2c5bf9b99407924b53b1ea86e03c32c64ef5aef"},
{file = "numpy-1.21.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cf680682ad0a3bef56dae200dbcbac2d57294a73e5b0f9864955e7dd7c2c2491"},
{file = "numpy-1.21.0-cp39-cp39-win32.whl", hash = "sha256:d95d16204cd51ff1a1c8d5f9958ce90ae190be81d348b514f9be39f878b8044a"},
{file = "numpy-1.21.0-cp39-cp39-win_amd64.whl", hash = "sha256:2ba579dde0563f47021dcd652253103d6fd66165b18011dce1a0609215b2791e"},
{file = "numpy-1.21.0-pp37-pypy37_pp73-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:3c40e6b860220ed862e8097b8f81c9af6d7405b723f4a7af24a267b46f90e461"},
{file = "numpy-1.21.0.zip", hash = "sha256:e80fe25cba41c124d04c662f33f6364909b985f2eb5998aaa5ae4b9587242cce"},
{file = "numpy-1.21.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:38e8648f9449a549a7dfe8d8755a5979b45b3538520d1e735637ef28e8c2dc50"},
{file = "numpy-1.21.1-cp37-cp37m-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:fd7d7409fa643a91d0a05c7554dd68aa9c9bb16e186f6ccfe40d6e003156e33a"},
{file = "numpy-1.21.1-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:a75b4498b1e93d8b700282dc8e655b8bd559c0904b3910b144646dbbbc03e062"},
{file = "numpy-1.21.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1412aa0aec3e00bc23fbb8664d76552b4efde98fb71f60737c83efbac24112f1"},
{file = "numpy-1.21.1-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:e46ceaff65609b5399163de5893d8f2a82d3c77d5e56d976c8b5fb01faa6b671"},
{file = "numpy-1.21.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:c6a2324085dd52f96498419ba95b5777e40b6bcbc20088fddb9e8cbb58885e8e"},
{file = "numpy-1.21.1-cp37-cp37m-win32.whl", hash = "sha256:73101b2a1fef16602696d133db402a7e7586654682244344b8329cdcbbb82172"},
{file = "numpy-1.21.1-cp37-cp37m-win_amd64.whl", hash = "sha256:7a708a79c9a9d26904d1cca8d383bf869edf6f8e7650d85dbc77b041e8c5a0f8"},
{file = "numpy-1.21.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:95b995d0c413f5d0428b3f880e8fe1660ff9396dcd1f9eedbc311f37b5652e16"},
{file = "numpy-1.21.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:635e6bd31c9fb3d475c8f44a089569070d10a9ef18ed13738b03049280281267"},
{file = "numpy-1.21.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4a3d5fb89bfe21be2ef47c0614b9c9c707b7362386c9a3ff1feae63e0267ccb6"},
{file = "numpy-1.21.1-cp38-cp38-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:8a326af80e86d0e9ce92bcc1e65c8ff88297de4fa14ee936cb2293d414c9ec63"},
{file = "numpy-1.21.1-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:791492091744b0fe390a6ce85cc1bf5149968ac7d5f0477288f78c89b385d9af"},
{file = "numpy-1.21.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0318c465786c1f63ac05d7c4dbcecd4d2d7e13f0959b01b534ea1e92202235c5"},
{file = "numpy-1.21.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:9a513bd9c1551894ee3d31369f9b07460ef223694098cf27d399513415855b68"},
{file = "numpy-1.21.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:91c6f5fc58df1e0a3cc0c3a717bb3308ff850abdaa6d2d802573ee2b11f674a8"},
{file = "numpy-1.21.1-cp38-cp38-win32.whl", hash = "sha256:978010b68e17150db8765355d1ccdd450f9fc916824e8c4e35ee620590e234cd"},
{file = "numpy-1.21.1-cp38-cp38-win_amd64.whl", hash = "sha256:9749a40a5b22333467f02fe11edc98f022133ee1bfa8ab99bda5e5437b831214"},
{file = "numpy-1.21.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:d7a4aeac3b94af92a9373d6e77b37691b86411f9745190d2c351f410ab3a791f"},
{file = "numpy-1.21.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:d9e7912a56108aba9b31df688a4c4f5cb0d9d3787386b87d504762b6754fbb1b"},
{file = "numpy-1.21.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:25b40b98ebdd272bc3020935427a4530b7d60dfbe1ab9381a39147834e985eac"},
{file = "numpy-1.21.1-cp39-cp39-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:8a92c5aea763d14ba9d6475803fc7904bda7decc2a0a68153f587ad82941fec1"},
{file = "numpy-1.21.1-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:05a0f648eb28bae4bcb204e6fd14603de2908de982e761a2fc78efe0f19e96e1"},
{file = "numpy-1.21.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f01f28075a92eede918b965e86e8f0ba7b7797a95aa8d35e1cc8821f5fc3ad6a"},
{file = "numpy-1.21.1-cp39-cp39-win32.whl", hash = "sha256:88c0b89ad1cc24a5efbb99ff9ab5db0f9a86e9cc50240177a571fbe9c2860ac2"},
{file = "numpy-1.21.1-cp39-cp39-win_amd64.whl", hash = "sha256:01721eefe70544d548425a07c80be8377096a54118070b8a62476866d5208e33"},
{file = "numpy-1.21.1-pp37-pypy37_pp73-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:2d4d1de6e6fb3d28781c73fbde702ac97f03d79e4ffd6598b880b2d95d62ead4"},
{file = "numpy-1.21.1.zip", hash = "sha256:dff4af63638afcc57a3dfb9e4b26d434a7a602d225b42d746ea7fe2edf1342fd"},
]
parsel = [
{file = "parsel-1.6.0-py2.py3-none-any.whl", hash = "sha256:9e1fa8db1c0b4a878bf34b35c043d89c9d1cbebc23b4d34dbc3c0ec33f2e087d"},
@ -1887,6 +1943,10 @@ python-magic = [
{file = "python-magic-0.4.24.tar.gz", hash = "sha256:de800df9fb50f8ec5974761054a708af6e4246b03b4bdaee993f948947b0ebcf"},
{file = "python_magic-0.4.24-py2.py3-none-any.whl", hash = "sha256:4fec8ee805fea30c07afccd1592c0f17977089895bdfaae5fec870a84e997626"},
]
pytz = [
{file = "pytz-2021.1-py2.py3-none-any.whl", hash = "sha256:eb10ce3e7736052ed3623d49975ce333bcd712c7bb19a58b9e2089d4057d0798"},
{file = "pytz-2021.1.tar.gz", hash = "sha256:83a4a90894bf38e243cf052c8b58f381bfe9a7a483f6a9cab140bc7f702ac4da"},
]
queuelib = [
{file = "queuelib-1.6.1-py2.py3-none-any.whl", hash = "sha256:90ee30ebb0b57112606358b63c09a681bbb9a7dd1120af09c836b475504cea85"},
{file = "queuelib-1.6.1.tar.gz", hash = "sha256:631d067c9be57e395c382d680d3653ca1452cd29e8da25c5e8d94b5c0c528c31"},
@ -1969,6 +2029,10 @@ types-markupsafe = [
{file = "types-MarkupSafe-1.1.4.tar.gz", hash = "sha256:4fd2cc858fb4aea38555850f4ac2ecafae3543c88abb056669a3346c5c1b202c"},
{file = "types_MarkupSafe-1.1.4-py3-none-any.whl", hash = "sha256:2539a9e9b1b5a1bf1c10fdf2cb1dcb89e6f360759196883f4d5d103c53624375"},
]
types-pkg-resources = [
{file = "types-pkg_resources-0.1.3.tar.gz", hash = "sha256:834a9b8d3dbea343562fd99d5d3359a726f6bf9d3733bccd2b4f3096fbab9dae"},
{file = "types_pkg_resources-0.1.3-py2.py3-none-any.whl", hash = "sha256:0cb9972cee992249f93fff1a491bf2dc3ce674e5a1926e27d4f0866f7d9b6d9c"},
]
types-redis = [
{file = "types-redis-3.5.4.tar.gz", hash = "sha256:936e98f9090c11610f4f5171d2ca8fa5c5eab842422b3cc2f9355f57d01e1a6b"},
{file = "types_redis-3.5.4-py3-none-any.whl", hash = "sha256:954feb1f573216b215c1d564c1b27091a7ce8b7fd3af9474d9e88d4081881aff"},

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "lookyloo"
version = "1.6.0"
version = "1.7.0-dev"
description = "Web interface to track the trackers."
authors = ["Raphaël Vinot <raphael.vinot@circl.lu>"]
license = "BSD-3-Clause"
@ -36,7 +36,7 @@ background_indexer = "bin.background_indexer:main"
[tool.poetry.dependencies]
python = "^3.8"
requests = "^2.26"
flask = "^2.0.0"
flask = "^2.0.1"
gunicorn = "^20.1.0"
cchardet = "^2.1.7"
redis = "^3.5.3"
@ -58,6 +58,7 @@ pydeep = {version = "^0.4", optional = true}
Pillow = "^8.3.1"
lief = "^0.11.4"
Flask-Login = "^0.5.0"
flask-restx = "^0.5.0"
[tool.poetry.extras]
misp = ['python-magic', 'pydeep']
@ -68,6 +69,7 @@ ipython = "^7.25.0"
types-redis = "^3.5.4"
types-requests = "^2.25.0"
types-Flask = "^1.1.0"
types-pkg-resources = "^0.1.2"
[build-system]
requires = ["poetry_core>=1.0", "setuptools"]

View File

@ -5,7 +5,7 @@ from setuptools import setup # type: ignore
setup(
name='lookyloo',
version='1.0-dev',
version='1.7-dev',
author='Raphaël Vinot',
author_email='raphael.vinot@circl.lu',
maintainer='Raphaël Vinot',

View File

@ -1,46 +1,38 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import base64
from io import BytesIO, StringIO
import os
from pathlib import Path
from datetime import datetime, timedelta, timezone
import json
import http
import calendar
from typing import Optional, Dict, Any, Union, List
import logging
import hashlib
from urllib.parse import quote_plus, unquote_plus, urlparse
import time
import pkg_resources
from flask import Flask, render_template, request, send_file, redirect, url_for, Response, flash, jsonify
from flask_bootstrap import Bootstrap # type: ignore
import flask_login # type: ignore
from werkzeug.security import generate_password_hash, check_password_hash
from flask_restx import Api # type: ignore
from .genericapi import api as generic_api
from werkzeug.security import check_password_hash
from pymisp import MISPEvent, MISPServerError
from lookyloo.helpers import (get_homedir, update_user_agents, get_user_agents, get_config,
from lookyloo.helpers import (update_user_agents, get_user_agents, get_config,
get_taxonomies, load_cookies, CaptureStatus)
from lookyloo.lookyloo import Lookyloo, Indexing
from lookyloo.exceptions import NoValidHarFile, MissingUUID
from .proxied import ReverseProxied
from .helpers import sri_load
from .helpers import src_request_ip, User, load_user_from_request, build_users_table, get_secret_key, sri_load
app: Flask = Flask(__name__)
app.wsgi_app = ReverseProxied(app.wsgi_app) # type: ignore
secret_file_path: Path = get_homedir() / 'secret_key'
if not secret_file_path.exists() or secret_file_path.stat().st_size < 64:
with secret_file_path.open('wb') as f:
f.write(os.urandom(64))
with secret_file_path.open('rb') as f:
app.config['SECRET_KEY'] = f.read()
app.config['SECRET_KEY'] = get_secret_key()
Bootstrap(app)
app.config['BOOTSTRAP_SERVE_LOCAL'] = True
@ -51,45 +43,11 @@ app.debug = False
# Auth stuff
login_manager = flask_login.LoginManager()
login_manager.init_app(app)
try:
# Use legacy user mgmt, no need to print a warning, and it will fail on new install.
users = get_config('generic', 'cache_clean_user', quiet=True)
except Exception:
users = get_config('generic', 'users')
users_table: Dict[str, Dict[str, str]] = {}
for username, authstuff in users.items():
if isinstance(authstuff, str):
# just a password, make a key
users_table[username] = {}
users_table[username]['password'] = generate_password_hash(authstuff)
users_table[username]['authkey'] = hashlib.pbkdf2_hmac('sha256',
app.config['SECRET_KEY'],
authstuff.encode(),
100000).hex()
elif isinstance(authstuff, list) and len(authstuff) == 2:
if isinstance(authstuff[0], str) and isinstance(authstuff[1], str) and len(authstuff[1]) == 64:
users_table[username] = {}
users_table[username]['password'] = generate_password_hash(authstuff[0])
users_table[username]['authkey'] = authstuff[1]
if username not in users_table:
raise Exception('User setup invalid. Must be "username": "password" or "username": ["password", "token 64 chars (sha256)"]')
keys_table = {}
for username, authstuff in users_table.items():
if 'authkey' in authstuff:
keys_table[authstuff['authkey']] = username
class User(flask_login.UserMixin):
pass
@login_manager.user_loader
def user_loader(username):
if username not in users_table:
if username not in build_users_table():
return None
user = User()
user.id = username
@ -97,16 +55,8 @@ def user_loader(username):
@login_manager.request_loader
def load_user_from_request(request):
api_key = request.headers.get('Authorization')
if not api_key:
return None
user = User()
api_key = api_key.strip()
if api_key in keys_table:
user.id = keys_table[api_key]
return user
return None
def _load_user_from_request(request):
return load_user_from_request(request)
@app.route('/login', methods=['GET', 'POST'])
@ -121,6 +71,7 @@ def login():
'''
username = request.form['username']
users_table = build_users_table()
if username in users_table and check_password_hash(users_table[username]['password'], request.form['password']):
user = User()
user.id = username
@ -204,14 +155,6 @@ app.jinja_env.globals.update(get_sri=get_sri)
# ##### Generic/configuration methods #####
def src_request_ip(request) -> str:
# NOTE: X-Real-IP is the IP passed by the reverse proxy in the headers.
real_ip = request.headers.get('X-Real-IP')
if not real_ip:
real_ip = request.remote_addr
return real_ip
@app.after_request
def after_request(response):
# We keep a list user agents in order to build a list to use in the capture
@ -288,16 +231,6 @@ def hostnode_popup(tree_uuid: str, node_uuid: str):
# ##### Tree level Methods #####
@app.route('/tree/<string:tree_uuid>/rebuild')
@flask_login.login_required
def rebuild_tree(tree_uuid: str):
try:
lookyloo.remove_pickle(tree_uuid)
return redirect(url_for('tree', tree_uuid=tree_uuid))
except Exception:
return redirect(url_for('index'))
@app.route('/tree/<string:tree_uuid>/trigger_modules', methods=['GET'])
def trigger_modules(tree_uuid: str):
force = True if request.args.get('force') else False
@ -359,6 +292,76 @@ def web_misp_lookup_view(tree_uuid: str):
return render_template('misp_lookup.html', uuid=tree_uuid, hits=hits, misp_root_url=misp_root_url)
@app.route('/tree/<string:tree_uuid>/misp_push', methods=['GET', 'POST'])
@flask_login.login_required
def web_misp_push_view(tree_uuid: str):
error = False
if not lookyloo.misp.available:
flash('MISP module not available.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
elif not lookyloo.misp.enable_push:
flash('Push not enabled in MISP module.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
else:
event = lookyloo.misp_export(tree_uuid)
if isinstance(event, dict):
flash(f'Unable to generate the MISP export: {event}', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
if request.method == 'POST':
# event is a MISPEvent at this point
# Submit the event
tags = request.form.getlist('tags')
error = False
events: List[MISPEvent] = []
with_parents = request.form.get('with_parents')
if with_parents:
exports = lookyloo.misp_export(tree_uuid, True)
if isinstance(exports, dict):
flash(f'Unable to create event: {exports}', 'error')
error = True
else:
events = exports
else:
events = event
if error:
return redirect(url_for('tree', tree_uuid=tree_uuid))
for e in events:
for tag in tags:
e.add_tag(tag)
# Change the event info field of the last event in the chain
events[-1].info = request.form.get('event_info')
try:
new_events = lookyloo.misp.push(events, True if request.form.get('force_push') else False,
True if request.form.get('auto_publish') else False)
except MISPServerError:
flash(f'MISP returned an error, the event(s) might still have been created on {lookyloo.misp.client.root_url}', 'error')
else:
if isinstance(new_events, dict):
flash(f'Unable to create event(s): {new_events}', 'error')
else:
for e in new_events:
flash(f'MISP event {e.id} created on {lookyloo.misp.client.root_url}', 'success')
return redirect(url_for('tree', tree_uuid=tree_uuid))
else:
# the 1st attribute in the event is the link to lookyloo
existing_misp_url = lookyloo.misp.get_existing_event_url(event[-1].attributes[0].value)
fav_tags = lookyloo.misp.get_fav_tags()
cache = lookyloo.capture_cache(tree_uuid)
return render_template('misp_push_view.html', tree_uuid=tree_uuid,
event=event[0], fav_tags=fav_tags,
existing_event=existing_misp_url,
auto_publish=lookyloo.misp.auto_publish,
has_parent=True if cache and cache.parent else False,
default_tags=lookyloo.misp.default_tags)
@app.route('/tree/<string:tree_uuid>/modules', methods=['GET'])
def modules(tree_uuid: str):
modules_responses = lookyloo.get_modules_responses(tree_uuid)
@ -490,6 +493,16 @@ def hide_capture(tree_uuid: str):
return redirect(url_for('tree', tree_uuid=tree_uuid))
@app.route('/tree/<string:tree_uuid>/rebuild')
@flask_login.login_required
def rebuild_tree(tree_uuid: str):
try:
lookyloo.remove_pickle(tree_uuid)
return redirect(url_for('tree', tree_uuid=tree_uuid))
except Exception:
return redirect(url_for('index'))
@app.route('/tree/<string:tree_uuid>/cache', methods=['GET'])
def cache_tree(tree_uuid: str):
lookyloo.capture_cache(tree_uuid)
@ -545,6 +558,7 @@ def tree(tree_uuid: str, node_uuid: Optional[str]=None):
ct = lookyloo.get_crawled_tree(tree_uuid)
b64_thumbnail = lookyloo.get_screenshot_thumbnail(tree_uuid, for_datauri=True)
screenshot_size = lookyloo.get_screenshot(tree_uuid).getbuffer().nbytes
info = lookyloo.get_info(tree_uuid)
meta = lookyloo.get_meta(tree_uuid)
hostnode_to_highlight = None
if node_uuid:
@ -562,9 +576,7 @@ def tree(tree_uuid: str, node_uuid: Optional[str]=None):
print(e)
pass
return render_template('tree.html', tree_json=ct.to_json(),
start_time=ct.start_time.isoformat(),
user_agent=ct.user_agent, root_url=ct.root_url,
referer=ct.referer,
info=info,
tree_uuid=tree_uuid, public_domain=lookyloo.public_domain,
screenshot_thumbnail=b64_thumbnail, page_title=cache.title,
screenshot_size=screenshot_size,
@ -707,17 +719,6 @@ def rebuild_cache():
return redirect(url_for('index'))
@app.route('/submit', methods=['POST'])
def submit():
if flask_login.current_user.is_authenticated:
user = flask_login.current_user.get_id()
else:
user = src_request_ip(request)
to_query: Dict = request.get_json(force=True)
perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
return Response(perma_uuid, mimetype='text/text')
@app.route('/search', methods=['GET', 'POST'])
def search():
if request.form.get('url'):
@ -824,6 +825,13 @@ def statsfull():
return render_template('stats.html', stats=stats)
@app.route('/whois/<string:query>', methods=['GET'])
def whois(query: str):
to_return = lookyloo.uwhois.whois(query)
return send_file(BytesIO(to_return.encode()),
mimetype='test/plain', as_attachment=True, attachment_filename=f'whois.{query}.txt')
# ##### Methods related to a specific URLNode #####
@app.route('/tree/<string:tree_uuid>/url/<string:node_uuid>/request_cookies', methods=['GET'])
@ -975,192 +983,20 @@ def add_context(tree_uuid: str, node_uuid: str):
return redirect(url_for('ressources'))
@app.route('/tree/<string:tree_uuid>/misp_push', methods=['GET', 'POST'])
@flask_login.login_required
def web_misp_push_view(tree_uuid: str):
error = False
if not lookyloo.misp.available:
flash('MISP module not available.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
elif not lookyloo.misp.enable_push:
flash('Push not enabled in MISP module.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
else:
event = lookyloo.misp_export(tree_uuid)
if isinstance(event, dict):
flash(f'Unable to generate the MISP export: {event}', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
if request.method == 'POST':
# event is a MISPEvent at this point
# Submit the event
tags = request.form.getlist('tags')
error = False
events: List[MISPEvent] = []
with_parents = request.form.get('with_parents')
if with_parents:
exports = lookyloo.misp_export(tree_uuid, True)
if isinstance(exports, dict):
flash(f'Unable to create event: {exports}', 'error')
error = True
else:
events = exports
else:
events = event
if error:
return redirect(url_for('tree', tree_uuid=tree_uuid))
for e in events:
for tag in tags:
e.add_tag(tag)
# Change the event info field of the last event in the chain
events[-1].info = request.form.get('event_info')
try:
new_events = lookyloo.misp.push(events, True if request.form.get('force_push') else False,
True if request.form.get('auto_publish') else False)
except MISPServerError:
flash(f'MISP returned an error, the event(s) might still have been created on {lookyloo.misp.client.root_url}', 'error')
else:
if isinstance(new_events, dict):
flash(f'Unable to create event(s): {new_events}', 'error')
else:
for e in new_events:
flash(f'MISP event {e.id} created on {lookyloo.misp.client.root_url}', 'success')
return redirect(url_for('tree', tree_uuid=tree_uuid))
else:
# the 1st attribute in the event is the link to lookyloo
existing_misp_url = lookyloo.misp.get_existing_event_url(event[-1].attributes[0].value)
fav_tags = lookyloo.misp.get_fav_tags()
cache = lookyloo.capture_cache(tree_uuid)
return render_template('misp_push_view.html', tree_uuid=tree_uuid,
event=event[0], fav_tags=fav_tags,
existing_event=existing_misp_url,
auto_publish=lookyloo.misp.auto_publish,
has_parent=True if cache and cache.parent else False,
default_tags=lookyloo.misp.default_tags)
# Query API
@app.route('/json/get_token', methods=['POST'])
def json_get_token():
auth: Dict = request.get_json(force=True)
if 'username' in auth and 'password' in auth: # Expected keys in json
if (auth['username'] in users_table
and check_password_hash(users_table[auth['username']]['password'], auth['password'])):
return jsonify({'authkey': users_table[auth['username']]['authkey']})
return jsonify({'error': 'User/Password invalid.'})
authorizations = {
'apikey': {
'type': 'apiKey',
'in': 'header',
'name': 'Authorization'
}
}
@app.route('/json/<string:tree_uuid>/status', methods=['GET'])
def get_capture_status(tree_uuid: str):
return jsonify({'status_code': lookyloo.get_capture_status(tree_uuid)})
api = Api(app, title='Lookyloo API',
description='API to submit captures and query a lookyloo instance.',
doc='/doc/',
authorizations=authorizations,
version=pkg_resources.get_distribution('lookyloo').version)
@app.route('/json/<string:tree_uuid>/redirects', methods=['GET'])
def json_redirects(tree_uuid: str):
cache = lookyloo.capture_cache(tree_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
to_return: Dict[str, Any] = {'response': {'url': cache.url, 'redirects': []}}
if not cache.redirects:
to_return['response']['info'] = 'No redirects'
return to_return
if cache.incomplete_redirects:
# Trigger tree build, get all redirects
lookyloo.get_crawled_tree(tree_uuid)
cache = lookyloo.capture_cache(tree_uuid)
if cache:
to_return['response']['redirects'] = cache.redirects
else:
to_return['response']['redirects'] = cache.redirects
return jsonify(to_return)
@app.route('/json/<string:tree_uuid>/misp_export', methods=['GET'])
def misp_export(tree_uuid: str):
with_parents = request.args.get('with_parents')
event = lookyloo.misp_export(tree_uuid, True if with_parents else False)
if isinstance(event, dict):
return jsonify(event)
to_return = []
for e in event:
to_return.append(e.to_json(indent=2))
return jsonify(to_return)
@app.route('/json/<string:tree_uuid>/misp_push', methods=['GET', 'POST'])
@flask_login.login_required
def misp_push(tree_uuid: str):
if request.method == 'POST':
parameters: Dict = request.get_json(force=True)
with_parents = True if 'with_parents' in parameters else False
allow_duplicates = True if 'allow_duplicates' in parameters else False
else:
with_parents = False
allow_duplicates = False
to_return: Dict = {}
if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.'
else:
event = lookyloo.misp_export(tree_uuid, with_parents)
if isinstance(event, dict):
to_return['error'] = event
else:
new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
else:
events_to_return = []
for e in new_events:
events_to_return.append(e.to_json(indent=2))
jsonify(events_to_return)
return jsonify(to_return)
@app.route('/json/hash_info/<h>', methods=['GET'])
def json_hash_info(h: str):
details, body = lookyloo.get_body_hash_full(h)
if not details:
return {'error': 'Unknown Hash.'}
to_return: Dict[str, Any] = {'response': {'hash': h, 'details': details,
'body': base64.b64encode(body.getvalue()).decode()}}
return jsonify(to_return)
@app.route('/json/url_info', methods=['POST'])
def json_url_info():
to_query: Dict = request.get_json(force=True)
occurrences = lookyloo.get_url_occurrences(to_query.pop('url'), **to_query)
return jsonify(occurrences)
@app.route('/json/hostname_info', methods=['POST'])
def json_hostname_info():
to_query: Dict = request.get_json(force=True)
occurrences = lookyloo.get_hostname_occurrences(to_query.pop('hostname'), **to_query)
return jsonify(occurrences)
@app.route('/json/stats', methods=['GET'])
def json_stats():
to_return = lookyloo.get_stats()
return Response(json.dumps(to_return), mimetype='application/json')
@app.route('/whois/<string:query>', methods=['GET'])
def whois(query: str):
to_return = lookyloo.uwhois.whois(query)
return send_file(BytesIO(to_return.encode()),
mimetype='test/plain', as_attachment=True, attachment_filename=f'whois.{query}.txt')
api.add_namespace(generic_api)

402
website/web/genericapi.py Normal file
View File

@ -0,0 +1,402 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import base64
from typing import Dict, Any
from flask import request, send_file
import flask_login # type: ignore
from flask_restx import Namespace, Resource, fields, abort # type: ignore
from werkzeug.security import check_password_hash
from lookyloo.lookyloo import Lookyloo
from .helpers import src_request_ip, load_user_from_request, build_users_table
api = Namespace('GenericAPI', description='Generic Lookyloo API', path='/')
lookyloo: Lookyloo = Lookyloo()
def api_auth_check(method):
if flask_login.current_user.is_authenticated or load_user_from_request(request):
return method
abort(403, 'Authentication required.')
token_request_fields = api.model('AuthTokenFields', {
'username': fields.String(description="Your username", required=True),
'password': fields.String(description="Your password", required=True),
})
@api.route('/json/get_token')
@api.doc(description='Get the API token required for authenticated calls')
class AuthToken(Resource):
users_table = build_users_table()
@api.param('username', 'Your username')
@api.param('password', 'Your password')
def get(self):
username = request.args['username'] if request.args.get('username') else False
password = request.args['password'] if request.args.get('password') else False
if username in self.users_table and check_password_hash(self.users_table[username]['password'], password):
return {'authkey': self.users_table[username]['authkey']}
return {'error': 'User/Password invalid.'}
@api.doc(body=token_request_fields)
def post(self):
auth: Dict = request.get_json(force=True)
if 'username' in auth and 'password' in auth: # Expected keys in json
if (auth['username'] in self.users_table
and check_password_hash(self.users_table[auth['username']]['password'], auth['password'])):
return {'authkey': self.users_table[auth['username']]['authkey']}
return {'error': 'User/Password invalid.'}
@api.route('/json/<string:capture_uuid>/status')
@api.doc(description='Get the status of a capture',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureStatusQuery(Resource):
def get(self, capture_uuid: str):
return {'status_code': lookyloo.get_capture_status(capture_uuid)}
@api.route('/json/<string:capture_uuid>/hostnames')
@api.doc(description='Get all the hostnames of all the resources of a capture',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureHostnames(Resource):
def get(self, capture_uuid: str):
cache = lookyloo.capture_cache(capture_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
to_return: Dict[str, Any] = {'response': {'hostnames': list(lookyloo.get_hostnames(capture_uuid))}}
return to_return
@api.route('/json/<string:capture_uuid>/urls')
@api.doc(description='Get all the URLs of all the resources of a capture',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureURLs(Resource):
def get(self, capture_uuid: str):
cache = lookyloo.capture_cache(capture_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
to_return: Dict[str, Any] = {'response': {'urls': list(lookyloo.get_urls(capture_uuid))}}
return to_return
@api.route('/json/<string:capture_uuid>/hashes')
@api.doc(description='Get all the hashes of all the resources of a capture',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureHashes(Resource):
def get(self, capture_uuid: str):
cache = lookyloo.capture_cache(capture_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
to_return: Dict[str, Any] = {'response': {'hashes': list(lookyloo.get_hashes(capture_uuid))}}
return to_return
@api.route('/json/<string:capture_uuid>/redirects')
@api.doc(description='Get all the redirects of a capture',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureRedirects(Resource):
def get(self, capture_uuid: str):
cache = lookyloo.capture_cache(capture_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
to_return: Dict[str, Any] = {'response': {'url': cache.url, 'redirects': []}}
if not cache.redirects:
to_return['response']['info'] = 'No redirects'
return to_return
if cache.incomplete_redirects:
# Trigger tree build, get all redirects
lookyloo.get_crawled_tree(capture_uuid)
cache = lookyloo.capture_cache(capture_uuid)
if cache:
to_return['response']['redirects'] = cache.redirects
else:
to_return['response']['redirects'] = cache.redirects
return to_return
@api.route('/json/<string:capture_uuid>/misp_export')
@api.doc(description='Get an export of the capture in MISP format',
params={'capture_uuid': 'The UUID of the capture'})
class MISPExport(Resource):
def get(self, capture_uuid: str):
with_parents = request.args.get('with_parents')
event = lookyloo.misp_export(capture_uuid, True if with_parents else False)
if isinstance(event, dict):
return event
to_return = []
for e in event:
to_return.append(e.to_json(indent=2))
return to_return
misp_push_fields = api.model('MISPPushFields', {
'allow_duplicates': fields.Integer(description="Push the event even if it is already present on the MISP instance",
example=0, min=0, max=1),
'with_parents': fields.Integer(description="Also push the parents of the capture (if any)",
example=0, min=0, max=1),
})
@api.route('/json/<string:capture_uuid>/misp_push')
@api.doc(description='Push an event to a pre-configured MISP instance',
params={'capture_uuid': 'The UUID of the capture'},
security='apikey')
class MISPPush(Resource):
method_decorators = [api_auth_check]
@api.param('with_parents', 'Also push the parents of the capture (if any)')
@api.param('allow_duplicates', 'Push the event even if it is already present on the MISP instance')
def get(self, capture_uuid: str):
with_parents = True if request.args.get('with_parents') else False
allow_duplicates = True if request.args.get('allow_duplicates') else False
to_return: Dict = {}
if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.'
else:
event = lookyloo.misp_export(capture_uuid, with_parents)
if isinstance(event, dict):
to_return['error'] = event
else:
new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
else:
events_to_return = []
for e in new_events:
events_to_return.append(e.to_json(indent=2))
return events_to_return
return to_return
@api.doc(body=misp_push_fields)
def post(self, capture_uuid: str):
parameters: Dict = request.get_json(force=True)
with_parents = True if parameters.get('with_parents') else False
allow_duplicates = True if parameters.get('allow_duplicates') else False
to_return: Dict = {}
if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.'
else:
event = lookyloo.misp_export(capture_uuid, with_parents)
if isinstance(event, dict):
to_return['error'] = event
else:
new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
else:
events_to_return = []
for e in new_events:
events_to_return.append(e.to_json(indent=2))
return events_to_return
return to_return
@api.route('/json/hash_info/<h>')
@api.doc(description='Search for a ressource with a specific hash (sha512)',
params={'h': 'The hash (sha512)'})
class HashInfo(Resource):
def get(self, h: str):
details, body = lookyloo.get_body_hash_full(h)
if not details:
return {'error': 'Unknown Hash.'}
to_return: Dict[str, Any] = {'response': {'hash': h, 'details': details,
'body': base64.b64encode(body.getvalue()).decode()}}
return to_return
url_info_fields = api.model('URLInfoFields', {
'url': fields.String(description="The URL to search", required=True),
'limit': fields.Integer(description="The maximal amount of captures to return", example=20),
})
@api.route('/json/url_info')
@api.doc(description='Search for a URL')
class URLInfo(Resource):
@api.doc(body=url_info_fields)
def post(self):
to_query: Dict = request.get_json(force=True)
occurrences = lookyloo.get_url_occurrences(to_query.pop('url'), **to_query)
return occurrences
hostname_info_fields = api.model('HostnameInfoFields', {
'hostname': fields.String(description="The hostname to search", required=True),
'limit': fields.Integer(description="The maximal amount of captures to return", example=20),
})
@api.route('/json/hostname_info')
@api.doc(description='Search for a hostname')
class HostnameInfo(Resource):
@api.doc(body=hostname_info_fields)
def post(self):
to_query: Dict = request.get_json(force=True)
occurrences = lookyloo.get_hostname_occurrences(to_query.pop('hostname'), **to_query)
return occurrences
@api.route('/json/stats')
@api.doc(description='Get the statistics of the lookyloo instance.')
class InstanceStats(Resource):
def get(self):
return lookyloo.get_stats()
submit_fields = api.model('SubmitFields', {
'url': fields.String(description="The URL to capture", required=True),
'listing': fields.Integer(description="Display the capture on the index", min=0, max=1, example=1),
'user_agent': fields.String(description="User agent to use for the capture", example=''),
'referer': fields.String(description="Referer to pass to the capture", example=''),
'cookies': fields.String(description="JSON export of a list of cookies as exported from an other capture", example='')
})
@api.route('/json/<string:capture_uuid>/stats')
@api.doc(description='Get the statistics of the capture.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureStats(Resource):
def get(self, capture_uuid: str):
return lookyloo.get_statistics(capture_uuid)
@api.route('/json/<string:capture_uuid>/info')
@api.doc(description='Get basic information about the capture.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureInfo(Resource):
def get(self, capture_uuid: str):
return lookyloo.get_info(capture_uuid)
@api.route('/json/<string:capture_uuid>/cookies')
@api.doc(description='Get the complete cookie jar created during the capture.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureCookies(Resource):
def get(self, capture_uuid: str):
return json.loads(lookyloo.get_cookies(capture_uuid).read())
# Just text
@api.route('/submit')
class SubmitCapture(Resource):
@api.doc(body=submit_fields)
@api.produces(['text/text'])
def post(self):
if flask_login.current_user.is_authenticated:
user = flask_login.current_user.get_id()
else:
user = src_request_ip(request)
to_query: Dict = request.get_json(force=True)
perma_uuid = lookyloo.enqueue_capture(to_query, source='api', user=user, authenticated=flask_login.current_user.is_authenticated)
return perma_uuid
# Binary stuff
@api.route('/bin/<string:capture_uuid>/screenshot')
@api.doc(description='Get the screenshot associated to the capture.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureScreenshot(Resource):
@api.produces(['image/png'])
def get(self, capture_uuid: str):
return send_file(lookyloo.get_screenshot(capture_uuid), mimetype='image/png')
@api.route('/bin/<string:capture_uuid>/export')
@api.doc(description='Get all the files generated by the capture, except the pickle.',
params={'capture_uuid': 'The UUID of the capture'})
class CaptureExport(Resource):
@api.produces(['application/zip'])
def get(self, capture_uuid: str):
return send_file(lookyloo.get_capture(capture_uuid), mimetype='application/zip')
# Admin stuff
@api.route('/admin/rebuild_all')
@api.doc(description='Rebuild all the trees. WARNING: IT IS GOING TO TAKE A VERY LONG TIME.',
security='apikey')
class RebuildAll(Resource):
method_decorators = [api_auth_check]
def post(self):
try:
lookyloo.rebuild_all()
except Exception as e:
return {'error': f'Unable to rebuild all captures: {e}.'}
else:
return {'info': 'Captures successfully rebuilt.'}
@api.route('/admin/rebuild_all_cache')
@api.doc(description='Rebuild all the caches. It will take a while, but less that rebuild all.',
security='apikey')
class RebuildAllCache(Resource):
method_decorators = [api_auth_check]
def post(self):
try:
lookyloo.rebuild_cache()
except Exception as e:
return {'error': f'Unable to rebuild all the caches: {e}.'}
else:
return {'info': 'All caches successfully rebuilt.'}
@api.route('/admin/<string:capture_uuid>/rebuild')
@api.doc(description='Rebuild the tree.',
params={'capture_uuid': 'The UUID of the capture'},
security='apikey')
class CaptureRebuildTree(Resource):
method_decorators = [api_auth_check]
def post(self, capture_uuid):
try:
lookyloo.remove_pickle(capture_uuid)
lookyloo.get_crawled_tree(capture_uuid)
except Exception as e:
return {'error': f'Unable to rebuild tree: {e}.'}
else:
return {'info': f'Tree {capture_uuid} successfully rebuilt.'}
@api.route('/admin/<string:capture_uuid>/hide')
@api.doc(description='Hide the capture from the index.',
params={'capture_uuid': 'The UUID of the capture'},
security='apikey')
class CaptureHide(Resource):
method_decorators = [api_auth_check]
def post(self, capture_uuid):
try:
lookyloo.hide_capture(capture_uuid)
except Exception as e:
return {'error': f'Unable to hide the tree: {e}.'}
else:
return {'info': f'Capture {capture_uuid} successfully hidden.'}

View File

@ -1,11 +1,95 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import hashlib
import json
from functools import lru_cache
from typing import Dict
import os
from lookyloo.helpers import get_homedir
from functools import lru_cache
from pathlib import Path
from typing import Dict, List, Union
import flask_login # type: ignore
from werkzeug.security import generate_password_hash
from lookyloo.helpers import get_homedir, get_config
def src_request_ip(request) -> str:
# NOTE: X-Real-IP is the IP passed by the reverse proxy in the headers.
real_ip = request.headers.get('X-Real-IP')
if not real_ip:
real_ip = request.remote_addr
return real_ip
class User(flask_login.UserMixin):
pass
def load_user_from_request(request):
api_key = request.headers.get('Authorization')
if not api_key:
return None
user = User()
api_key = api_key.strip()
keys_table = build_keys_table()
if api_key in keys_table:
user.id = keys_table[api_key]
return user
return None
@lru_cache(64)
def build_keys_table() -> Dict[str, str]:
keys_table = {}
for username, authstuff in build_users_table().items():
if 'authkey' in authstuff:
keys_table[authstuff['authkey']] = username
return keys_table
@lru_cache(64)
def get_users() -> Dict[str, Union[str, List[str]]]:
try:
# Use legacy user mgmt, no need to print a warning, and it will fail on new install.
return get_config('generic', 'cache_clean_user', quiet=True)
except Exception:
return get_config('generic', 'users')
@lru_cache(64)
def build_users_table() -> Dict[str, Dict[str, str]]:
users_table: Dict[str, Dict[str, str]] = {}
for username, authstuff in get_users().items():
if isinstance(authstuff, str):
# just a password, make a key
users_table[username] = {}
users_table[username]['password'] = generate_password_hash(authstuff)
users_table[username]['authkey'] = hashlib.pbkdf2_hmac('sha256', get_secret_key(),
authstuff.encode(),
100000).hex()
elif isinstance(authstuff, list) and len(authstuff) == 2:
if isinstance(authstuff[0], str) and isinstance(authstuff[1], str) and len(authstuff[1]) == 64:
users_table[username] = {}
users_table[username]['password'] = generate_password_hash(authstuff[0])
users_table[username]['authkey'] = authstuff[1]
else:
raise Exception('User setup invalid. Must be "username": "password" or "username": ["password", "token 64 chars (sha256)"]')
return users_table
@lru_cache(64)
def get_secret_key() -> bytes:
secret_file_path: Path = get_homedir() / 'secret_key'
if not secret_file_path.exists() or secret_file_path.stat().st_size < 64:
if not secret_file_path.exists() or secret_file_path.stat().st_size < 64:
with secret_file_path.open('wb') as f:
f.write(os.urandom(64))
with secret_file_path.open('rb') as f:
return f.read()
@lru_cache(64)

View File

@ -3,14 +3,14 @@
{% from 'bootstrap/utils.html' import render_messages %}
{% from "macros.html" import shorten_string %}
{% block title %}Capture of {{root_url}}{% endblock %}
{% block title %}Capture of {{info['url']}}{% endblock %}
{% block card %}
<meta property="og:title" content="Lookyloo capture" />
<meta property="og:type" content="website"/>
<meta
property="og:description"
content="URL captured: {{root_url}}"
content="URL captured: {{info['url']}}"
/>
<meta
property="og:image"
@ -128,7 +128,7 @@
var enable_bookmark = {{ enable_bookmark|tojson }};
var treeData = {{ tree_json | safe }};
var parent_uuid = {{ parent_uuid|tojson }};
var capture_starttime = new Date(Date.parse("{{ start_time }}"));
var capture_starttime = new Date(Date.parse("{{ info['capture_time'] }}"));
window.addEventListener('DOMContentLoaded', (event) => {
document.getElementById("start_time").innerHTML =
`${capture_starttime.getFullYear()}-${("0" + (capture_starttime.getMonth() + 1)).slice(-2)}-${("0" + capture_starttime.getDate()).slice(-2)} ${capture_starttime.toLocaleTimeString()}`;
@ -346,20 +346,20 @@
<div class="modal-body">
<dl class="row">
<dt class="col-sm-2">URL captured</dt>
<dd class="col-sm-10">{{ shorten_string(root_url, 1000) }}</dd>
<dd class="col-sm-10">{{ shorten_string(info['url'], 1000) }}</dd>
<dt class="col-sm-2">Page title</dt>
<dd class="col-sm-10">{{ page_title }}</dd>
<dd class="col-sm-10">{{ info['title'] }}</dd>
<dt class="col-sm-2">Capture time</dt>
<dd class="col-sm-10" id="start_time"></dd>
<dt class="col-sm-2">User Agent</dt>
<dd class="col-sm-10">{{ user_agent }}</dd>
<dd class="col-sm-10">{{ info['user_agent'] }}</dd>
{% if referer %}
{% if 'referer' in info and info['referer'] %}
<dt class="col-sm-2">Referer</dt>
<dd class="col-sm-10">{{ referer }}</dd>
<dd class="col-sm-10">{{ info['referer'] }}</dd>
{%endif%}