PyMISPGalaxies/tests/tests.py

152 lines
6.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import unittest
from pymispgalaxies import Galaxies, Clusters, UnableToRevertMachinetag, Galaxy, Cluster
from glob import glob
import os
import json
from collections import Counter, defaultdict
import warnings
from uuid import UUID
import filecmp
import tempfile
class TestPyMISPGalaxies(unittest.TestCase):
def setUp(self):
self.galaxies = Galaxies()
self.clusters = Clusters(skip_duplicates=True)
self.maxDiff = None
def test_searchable(self):
for cluster in self.clusters.values():
all_searchable = []
for c_values in cluster.values():
all_searchable += c_values.searchable
count = Counter(all_searchable)
for k, v in count.items():
if v != 1:
warnings.warn('On search in {}: {} is present multiple times'.format(cluster.type, k))
def test_duplicates(self):
has_duplicates = False
for name, c in self.clusters.items():
if c.duplicates:
has_duplicates = True
to_print = Counter(c.duplicates)
for entry, counter in to_print.items():
print(counter + 1, entry)
self.assertFalse(has_duplicates, msg="Duplicates found")
def test_dump_galaxies(self):
galaxies_from_files = {}
for galaxy_file in glob(os.path.join(self.galaxies.root_dir_galaxies, '*.json')):
with open(galaxy_file, 'r') as f:
galaxy = json.load(f)
galaxies_from_files[galaxy['type']] = galaxy
for _, g in self.galaxies.items():
out = g.to_dict()
self.assertDictEqual(out, galaxies_from_files[g.type])
@unittest.skip("We don't want to enforce it.")
def test_save_galaxies(self):
for galaxy_file in glob(os.path.join(self.galaxies.root_dir_galaxies, '*.json')):
with open(galaxy_file, 'r') as f:
galaxy = Galaxy(json.load(f))
with tempfile.NamedTemporaryFile(suffix='.json') as temp_file:
temp_file_no_suffix = temp_file.name[:-5]
galaxy.save(temp_file_no_suffix)
self.assertTrue(filecmp.cmp(galaxy_file, temp_file.name), msg=f"{galaxy_file} different when saving using Galaxy.save(). Maybe an sorting issue?")
def test_dump_clusters(self):
clusters_from_files = {}
for cluster_file in glob(os.path.join(self.clusters.root_dir_clusters, '*.json')):
with open(cluster_file, 'r') as f:
cluster = json.load(f)
clusters_from_files[cluster['name']] = cluster
for name, c in self.clusters.items():
out = c.to_dict()
print(name, c.name)
self.assertCountEqual(out, clusters_from_files[c.name])
@unittest.skip("We don't want to enforce it.")
def test_save_clusters(self):
for cluster_file in glob(os.path.join(self.clusters.root_dir_clusters, '*.json')):
with open(cluster_file, 'r') as f:
cluster = Cluster(json.load(f))
with tempfile.NamedTemporaryFile(suffix='.json') as temp_file:
temp_file_no_suffix = temp_file.name[:-5]
cluster.save(temp_file_no_suffix)
self.assertTrue(filecmp.cmp(cluster_file, temp_file.name), msg=f"{cluster_file} different when saving using Cluster.save(). Maybe a sorting issue?")
def test_validate_schema_clusters(self):
self.clusters.validate_with_schema()
def test_validate_schema_galaxies(self):
self.galaxies.validate_with_schema()
def test_meta_additional_properties(self):
# All the properties in the meta key of the bundled-in clusters should be known
for c in self.clusters.values():
for cv in c.values():
if cv.meta:
self.assertIsNot(cv.meta.additional_properties, {})
for key, value in cv.meta.to_dict().items():
self.assertTrue(isinstance(value, (str, list)), value)
if isinstance(value, list):
for v in value:
self.assertTrue(isinstance(v, str), f'Error in {c.name}: {json.dumps(value, indent=2)}')
def test_machinetags(self):
self.clusters.all_machinetags()
def test_print(self):
print(self.clusters)
def test_search(self):
self.assertIsNot(len(self.clusters.search('apt')), 0)
def test_revert_machinetag(self):
self.assertEqual(len(self.clusters.revert_machinetag('misp-galaxy:tool="Babar"')), 2)
with self.assertRaises(UnableToRevertMachinetag):
self.clusters.revert_machinetag('blah')
def test_len(self):
self.assertIsNot(len(self.clusters), 0)
self.assertIsNot(len(self.galaxies), 0)
for c in self.clusters.values():
self.assertIsNot(len(c), 0)
def test_json(self):
for g in self.galaxies.values():
g.to_json()
for c in self.clusters.values():
c.to_json()
def test_uuids(self):
all_uuids = defaultdict(list)
for cluster in self.clusters.values():
# Skip deprecated
if self.galaxies[cluster.type].namespace == 'deprecated':
continue
try:
self.assertIsInstance(UUID(cluster.uuid), UUID, f'{cluster.name} - {cluster.uuid}')
except ValueError:
raise Exception(f'{cluster.name} - {cluster.uuid}')
all_uuids[cluster.uuid].append(cluster.name)
for value in cluster.values():
try:
self.assertIsInstance(UUID(value.uuid), UUID, f'{cluster.name} - {value.value} - {value.uuid}')
except ValueError:
raise Exception(f'{cluster.name} - {value.value} - {value.uuid}')
all_uuids[value.uuid].append(f'{cluster.name}|{value.value}')
errors = {}
for uuid, entries in all_uuids.items():
if len(entries) != 1:
errors[uuid] = entries
print(json.dumps(errors, indent=2))
self.assertFalse(errors)