Add [tidal] relation enrichment with mitre

pull/941/head
niclas 2024-02-23 14:54:25 +01:00
parent a311ce6a1c
commit 5062c61620
2 changed files with 149 additions and 35 deletions

View File

@ -17,7 +17,7 @@ GALAXY_PATH = "../../galaxies"
CLUSTER_PATH = "../../clusters" CLUSTER_PATH = "../../clusters"
def create_galaxy(endpoint: str, version: int): def create_galaxy(endpoint: str, version: int, extended_relations: bool = False):
api = TidalAPI() api = TidalAPI()
data = api.get_data(endpoint) data = api.get_data(endpoint)
with open(f"{CONFIG}/{endpoint}.json", "r") as file: with open(f"{CONFIG}/{endpoint}.json", "r") as file:
@ -28,10 +28,10 @@ def create_galaxy(endpoint: str, version: int):
match endpoint: match endpoint:
case "groups": case "groups":
cluster = GroupCluster(**config["cluster"], uuid=galaxy.uuid) cluster = GroupCluster(**config["cluster"], uuid=galaxy.uuid, enrichment=extended_relations)
cluster.add_values(data) cluster.add_values(data)
case "software": case "software":
cluster = SoftwareCluster(**config["cluster"], uuid=galaxy.uuid) cluster = SoftwareCluster(**config["cluster"], uuid=galaxy.uuid, enrichment=extended_relations)
cluster.add_values(data) cluster.add_values(data)
case "campaigns": case "campaigns":
cluster = CampaignsCluster(**config["cluster"], uuid=galaxy.uuid) cluster = CampaignsCluster(**config["cluster"], uuid=galaxy.uuid)
@ -56,9 +56,9 @@ def create_galaxy(endpoint: str, version: int):
def main(args, galaxies): def main(args, galaxies):
if args.all: if args.all:
for galaxy in galaxies: for galaxy in galaxies:
create_galaxy(galaxy, args.version) create_galaxy(galaxy, args.version, args.extended_relations)
else: else:
create_galaxy(args.type, args.version) create_galaxy(args.type, args.version, args.extended_relations)
if __name__ == "__main__": if __name__ == "__main__":
@ -72,6 +72,7 @@ if __name__ == "__main__":
description="Create galaxy and cluster json files from Tidal API" description="Create galaxy and cluster json files from Tidal API"
) )
parser.add_argument( parser.add_argument(
"-a",
"--all", "--all",
action="store_true", action="store_true",
help="Create all galaxies and clusters", help="Create all galaxies and clusters",
@ -88,6 +89,11 @@ if __name__ == "__main__":
required=True, required=True,
help="The version of the galaxy", help="The version of the galaxy",
) )
parser.add_argument(
"--extended-relations",
action="store_true",
help="Create extended relations in the cluster",
)
parser.set_defaults(func=main) parser.set_defaults(func=main)
args = parser.parse_args() args = parser.parse_args()

View File

@ -1,4 +1,5 @@
from dataclasses import dataclass, field, asdict from dataclasses import dataclass, field, asdict
from typing import Type
import json import json
@ -19,6 +20,13 @@ class GroupsMeta(Meta):
owner: str = None owner: str = None
@dataclass
class AssociatedGroupsMeta(Meta):
id: str = None
owner_id: str = None
owner: str = None
@dataclass @dataclass
class SoftwareMeta(Meta): class SoftwareMeta(Meta):
source: str = None source: str = None
@ -29,6 +37,13 @@ class SoftwareMeta(Meta):
owner: str = None owner: str = None
@dataclass
class AssociatedSoftwareMeta(Meta):
id: str = None
owner_id: str = None
owner: str = None
@dataclass @dataclass
class TechniqueMeta(Meta): class TechniqueMeta(Meta):
source: str = None source: str = None
@ -108,9 +123,10 @@ class Cluster:
self.type = type self.type = type
self.uuid = uuid self.uuid = uuid
self.values = [] self.values = []
self.CLUSTER_PATH = "../../clusters"
def add_values(self): def add_values(self, data: dict, meta_class: Type[Meta]):
print("This method should be implemented in the child class") pass
def save_to_file(self, path): def save_to_file(self, path):
with open(path, "w") as file: with open(path, "w") as file:
@ -131,6 +147,24 @@ class Cluster:
"values": self.values, "values": self.values,
} }
def _get_relation_from_mitre_id(
self, mitre_id: str, cluster: str, meta_key: str, array: bool = False
):
with open(f"{self.CLUSTER_PATH}/{cluster}.json", "r") as file:
mitre = json.load(file)
for entry in mitre["values"]:
try:
if array:
for id in entry["meta"][meta_key]:
if id == mitre_id:
return entry["uuid"]
else:
if entry["meta"][meta_key] == mitre_id:
return entry["uuid"]
except KeyError:
continue
return None
class GroupCluster(Cluster): class GroupCluster(Cluster):
def __init__( def __init__(
@ -142,8 +176,10 @@ class GroupCluster(Cluster):
source: str, source: str,
type: str, type: str,
uuid: str, uuid: str,
enrichment: bool = False,
): ):
super().__init__(authors, category, description, name, source, type, uuid) super().__init__(authors, category, description, name, source, type, uuid)
self.enrichment = enrichment
def add_values(self, data): def add_values(self, data):
for entry in data["data"]: for entry in data["data"]:
@ -166,13 +202,39 @@ class GroupCluster(Cluster):
owner=entry.get("owner_name"), owner=entry.get("owner_name"),
) )
related = [] related = []
for relation in entry.get("associated_groups"): if self.enrichment:
related_cluster = self._get_relation_from_mitre_id(
entry.get("group_attack_id"), "threat-actor", "synonyms", True
)
if related_cluster:
related.append(
{
"dest-uuid": related_cluster,
"type": "similar",
}
)
for associated_group in entry.get("associated_groups"):
meta = AssociatedGroupsMeta(
id=associated_group.get("id"),
owner_id=associated_group.get("owner_id"),
owner=associated_group.get("owner_name"),
)
value = ClusterValue(
description=associated_group.get("description"),
meta=meta,
related=[],
uuid=associated_group.get("associated_group_id"),
value=associated_group.get("name"),
)
self.values.append(value.return_value())
related.append( related.append(
{ {
"dest-uuid": relation.get("id"), "dest-uuid": associated_group.get("associated_group_id"),
"type": "related-to", "type": "similar",
} }
) )
value = ClusterValue( value = ClusterValue(
description=entry.get("description"), description=entry.get("description"),
meta=meta, meta=meta,
@ -193,8 +255,10 @@ class SoftwareCluster(Cluster):
source: str, source: str,
type: str, type: str,
uuid: str, uuid: str,
enrichment: bool = False,
): ):
super().__init__(authors, category, description, name, source, type, uuid) super().__init__(authors, category, description, name, source, type, uuid)
self.enrichment = enrichment
def add_values(self, data): def add_values(self, data):
for entry in data["data"]: for entry in data["data"]:
@ -214,13 +278,50 @@ class SoftwareCluster(Cluster):
"type": "used-by", "type": "used-by",
} }
) )
for relation in entry.get("associated_software"): if self.enrichment:
related_cluster = self._get_relation_from_mitre_id(
entry.get("software_attack_id"), "mitre-tool", "external_id"
)
if related_cluster:
related.append(
{
"dest-uuid": related_cluster,
"type": "similar",
}
)
related_cluster = self._get_relation_from_mitre_id(
entry.get("software_attack_id"), "mitre-malware", "external_id"
)
if related_cluster:
related.append(
{
"dest-uuid": related_cluster,
"type": "similar",
}
)
for associated_software in entry.get("associated_software"):
meta = AssociatedSoftwareMeta(
id=associated_software.get("id"),
owner_id=associated_software.get("owner_id"),
owner=associated_software.get("owner_name"),
)
value = ClusterValue(
description=associated_software.get("description"),
meta=meta,
related=[],
uuid=associated_software.get("associated_software_id"),
value=associated_software.get("name"),
)
self.values.append(value.return_value())
related.append( related.append(
{ {
"dest-uuid": relation.get("id"), "dest-uuid": associated_software.get("associated_software_id"),
"type": "related-to", "type": "similar",
} }
) )
value = ClusterValue( value = ClusterValue(
description=entry.get("description"), description=entry.get("description"),
meta=meta, meta=meta,
@ -260,6 +361,35 @@ class TechniqueCluster(Cluster):
"type": "uses", "type": "uses",
} }
) )
for sub_technique in entry.get("sub_technique"):
meta = SubTechniqueMeta(
source=sub_technique.get("source"),
technique_attack_id=sub_technique.get("technique_attack_id"),
)
sub_related = []
for relation in sub_technique.get("tactic"):
sub_related.append(
{
"dest-uuid": relation.get("tactic_id"),
"type": "uses",
}
)
sub_value = ClusterValue(
description=sub_technique.get("description"),
meta=meta,
related=sub_related,
uuid=sub_technique.get("id"),
value=sub_technique.get("name"),
)
self.values.append(sub_value.return_value())
related.append(
{
"dest-uuid": sub_technique.get("id"),
"type": "similar",
}
)
value = ClusterValue( value = ClusterValue(
description=entry.get("description"), description=entry.get("description"),
meta=meta, meta=meta,
@ -269,28 +399,6 @@ class TechniqueCluster(Cluster):
) )
self.values.append(value.return_value()) self.values.append(value.return_value())
for sub_technique in entry.get("sub_technique"):
meta = SubTechniqueMeta(
source=sub_technique.get("source"),
technique_attack_id=sub_technique.get("technique_attack_id"),
)
related = []
for relation in sub_technique.get("tactic"):
related.append(
{
"dest-uuid": relation.get("tactic_id"),
"type": "uses",
}
)
value = ClusterValue(
description=sub_technique.get("description"),
meta=meta,
related=related,
uuid=sub_technique.get("id"),
value=sub_technique.get("name"),
)
self.values.append(value.return_value())
class TacticCluster(Cluster): class TacticCluster(Cluster):
def __init__( def __init__(