2018-07-06 15:28:53 +02:00
|
|
|
#!/usr/bin/env python3
|
2017-07-25 18:04:15 +02:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
import unittest
|
2017-11-01 19:26:45 +01:00
|
|
|
from pymispgalaxies import Galaxies, Clusters, UnableToRevertMachinetag
|
2017-07-25 18:04:15 +02:00
|
|
|
from glob import glob
|
|
|
|
import os
|
|
|
|
import json
|
2019-05-06 17:18:29 +02:00
|
|
|
from collections import Counter, defaultdict
|
2018-07-06 15:28:53 +02:00
|
|
|
import warnings
|
2019-05-07 12:11:16 +02:00
|
|
|
from uuid import UUID
|
2017-07-25 18:04:15 +02:00
|
|
|
|
|
|
|
|
|
|
|
class TestPyMISPGalaxies(unittest.TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.galaxies = Galaxies()
|
2019-05-07 12:11:16 +02:00
|
|
|
self.clusters = Clusters(skip_duplicates=False)
|
2017-07-25 18:04:15 +02:00
|
|
|
self.maxDiff = None
|
|
|
|
|
2018-07-06 15:28:53 +02:00
|
|
|
def test_searchable(self):
|
|
|
|
for cluster in self.clusters.values():
|
|
|
|
all_searchable = []
|
|
|
|
for c_values in cluster.values():
|
|
|
|
all_searchable += c_values.searchable
|
|
|
|
count = Counter(all_searchable)
|
|
|
|
for k, v in count.items():
|
|
|
|
if v != 1:
|
2019-05-07 12:11:16 +02:00
|
|
|
warnings.warn('On search in {}: {} is present multiple times'.format(cluster.type, k))
|
2018-07-06 15:28:53 +02:00
|
|
|
|
2018-03-22 16:20:10 +01:00
|
|
|
def test_duplicates(self):
|
|
|
|
has_duplicates = False
|
|
|
|
for name, c in self.clusters.items():
|
|
|
|
if c.duplicates:
|
|
|
|
has_duplicates = True
|
|
|
|
to_print = Counter(c.duplicates)
|
|
|
|
for entry, counter in to_print.items():
|
|
|
|
print(counter + 1, entry)
|
|
|
|
self.assertFalse(has_duplicates)
|
|
|
|
|
2017-07-25 18:04:15 +02:00
|
|
|
def test_dump_galaxies(self):
|
|
|
|
galaxies_from_files = {}
|
|
|
|
for galaxy_file in glob(os.path.join(self.galaxies.root_dir_galaxies, '*.json')):
|
|
|
|
with open(galaxy_file, 'r') as f:
|
|
|
|
galaxy = json.load(f)
|
|
|
|
galaxies_from_files[galaxy['name']] = galaxy
|
|
|
|
for name, g in self.galaxies.items():
|
2017-11-01 19:20:09 +01:00
|
|
|
out = g.to_dict()
|
2017-07-25 18:04:15 +02:00
|
|
|
self.assertDictEqual(out, galaxies_from_files[g.name])
|
|
|
|
|
|
|
|
def test_dump_clusters(self):
|
|
|
|
clusters_from_files = {}
|
|
|
|
for cluster_file in glob(os.path.join(self.clusters.root_dir_clusters, '*.json')):
|
|
|
|
with open(cluster_file, 'r') as f:
|
|
|
|
cluster = json.load(f)
|
|
|
|
clusters_from_files[cluster['name']] = cluster
|
|
|
|
for name, c in self.clusters.items():
|
2017-11-01 19:20:09 +01:00
|
|
|
out = c.to_dict()
|
2019-02-05 14:23:48 +01:00
|
|
|
print(name, c.name)
|
2017-07-26 15:12:35 +02:00
|
|
|
self.assertCountEqual(out, clusters_from_files[c.name])
|
2017-07-25 18:04:15 +02:00
|
|
|
|
|
|
|
def test_validate_schema_clusters(self):
|
|
|
|
self.clusters.validate_with_schema()
|
|
|
|
|
|
|
|
def test_validate_schema_galaxies(self):
|
|
|
|
self.galaxies.validate_with_schema()
|
|
|
|
|
|
|
|
def test_meta_additional_properties(self):
|
|
|
|
# All the properties in the meta key of the bundled-in clusters should be known
|
|
|
|
for c in self.clusters.values():
|
2017-07-26 16:37:13 +02:00
|
|
|
for cv in c.values():
|
2017-07-25 18:04:15 +02:00
|
|
|
if cv.meta:
|
|
|
|
self.assertIsNot(cv.meta.additional_properties, {})
|
2021-04-26 13:29:43 +02:00
|
|
|
for key, value in cv.meta.to_dict().items():
|
|
|
|
self.assertTrue(isinstance(value, (str, list)), value)
|
|
|
|
if isinstance(value, list):
|
|
|
|
for v in value:
|
|
|
|
self.assertTrue(isinstance(v, str), f'Error in {c.name}: {json.dumps(value, indent=2)}')
|
2017-07-25 18:43:49 +02:00
|
|
|
|
|
|
|
def test_machinetags(self):
|
|
|
|
self.clusters.all_machinetags()
|
|
|
|
|
|
|
|
def test_print(self):
|
|
|
|
print(self.clusters)
|
2017-07-26 15:29:57 +02:00
|
|
|
|
|
|
|
def test_search(self):
|
|
|
|
self.assertIsNot(len(self.clusters.search('apt')), 0)
|
2017-07-26 15:52:39 +02:00
|
|
|
|
|
|
|
def test_revert_machinetag(self):
|
|
|
|
self.assertEqual(len(self.clusters.revert_machinetag('misp-galaxy:tool="Babar"')), 2)
|
2017-07-26 17:06:44 +02:00
|
|
|
with self.assertRaises(UnableToRevertMachinetag):
|
|
|
|
self.clusters.revert_machinetag('blah')
|
2017-07-26 17:01:26 +02:00
|
|
|
|
|
|
|
def test_len(self):
|
|
|
|
self.assertIsNot(len(self.clusters), 0)
|
|
|
|
self.assertIsNot(len(self.galaxies), 0)
|
|
|
|
for c in self.clusters.values():
|
|
|
|
self.assertIsNot(len(c), 0)
|
2017-07-26 17:11:17 +02:00
|
|
|
|
|
|
|
def test_json(self):
|
2017-11-01 19:26:45 +01:00
|
|
|
for g in self.galaxies.values():
|
|
|
|
g.to_json()
|
2017-07-26 17:11:17 +02:00
|
|
|
for c in self.clusters.values():
|
2017-11-01 19:26:45 +01:00
|
|
|
c.to_json()
|
2019-05-06 17:18:29 +02:00
|
|
|
|
|
|
|
def test_uuids(self):
|
|
|
|
all_uuids = defaultdict(list)
|
|
|
|
for cluster in self.clusters.values():
|
|
|
|
# Skip deprecated
|
|
|
|
if self.galaxies[cluster.name].namespace == 'deprecated':
|
|
|
|
continue
|
2019-05-07 12:11:16 +02:00
|
|
|
try:
|
|
|
|
self.assertIsInstance(UUID(cluster.uuid), UUID, f'{cluster.name} - {cluster.uuid}')
|
|
|
|
except ValueError:
|
|
|
|
raise Exception(f'{cluster.name} - {cluster.uuid}')
|
2019-05-06 17:18:29 +02:00
|
|
|
all_uuids[cluster.uuid].append(cluster.name)
|
|
|
|
for value in cluster.values():
|
2019-05-07 12:11:16 +02:00
|
|
|
try:
|
|
|
|
self.assertIsInstance(UUID(value.uuid), UUID, f'{cluster.name} - {value.value} - {value.uuid}')
|
|
|
|
except ValueError:
|
|
|
|
raise Exception(f'{cluster.name} - {value.value} - {value.uuid}')
|
2019-05-06 17:18:29 +02:00
|
|
|
all_uuids[value.uuid].append(f'{cluster.name}|{value.value}')
|
|
|
|
|
|
|
|
errors = {}
|
|
|
|
for uuid, entries in all_uuids.items():
|
|
|
|
if len(entries) != 1:
|
|
|
|
errors[uuid] = entries
|
|
|
|
print(json.dumps(errors, indent=2))
|
|
|
|
self.assertFalse(errors)
|