Fix [config] metadata mapping

pull/941/head
niclas 2024-02-22 10:52:23 +01:00
parent b9746f2b41
commit 9d2dfba0b9
3 changed files with 66 additions and 26 deletions

View File

@ -45,9 +45,9 @@ The configuration file is located in `config.json` and maps the fields of the Ti
The extraction configuration is a dictionary that maps the fields of the Tidal Cyber API to the fields of the MISP galaxy. It can be used to extract data stored in a array or object in the API response. The extraction configuration looks like this: The extraction configuration is a dictionary that maps the fields of the Tidal Cyber API to the fields of the MISP galaxy. It can be used to extract data stored in a array or object in the API response. The extraction configuration looks like this:
```json ```json
{ {
"extract": <mode>, "extract": "<mode>",
"key": <key>, "key": "<key>",
"subkey": <subkey> "subkey": "<subkey>"
} }
``` ```
**Extract modes**: **Extract modes**:
@ -64,4 +64,4 @@ To build all galaxies and clusters, run the following command:
```bash ```bash
python3 main.py create-galaxy -v <version> --all python3 main.py create-galaxy -v <version> --all
``` ```

View File

@ -120,8 +120,16 @@
"source": "source", "source": "source",
"type": "type", "type": "type",
"software-attack-id": "software_attack_id", "software-attack-id": "software_attack_id",
"platforms": "platforms", "platforms": {
"tags": "tags", "extract": "multiple",
"key": "platforms",
"subkey": "name"
},
"tags": {
"extract": "multiple",
"key": "tags",
"subkey": "tag"
},
"owner": "owner_name" "owner": "owner_name"
}, },
"related": { "related": {
@ -192,8 +200,16 @@
"description": "description", "description": "description",
"meta": { "meta": {
"source": "source", "source": "source",
"platforms": "platforms", "platforms": {
"tags": "tags", "extract": "multiple",
"key": "platforms",
"subkey": "name"
},
"tags": {
"extract": "multiple",
"key": "tags",
"subkey": "tag"
},
"owner": "owner_name" "owner": "owner_name"
}, },
"related": { "related": {

View File

@ -8,12 +8,13 @@ import argparse
CLUSTER_PATH = "../../clusters/" CLUSTER_PATH = "../../clusters/"
GALAXY_PATH = "../../galaxies/" GALAXY_PATH = "../../galaxies/"
config = load_config('./config.json') config = load_config("./config.json")
UUIDS = config["UUIDS"]
GALAXY_CONFIGS = config["GALAXY_CONFIGS"]
CLUSTER_CONFIGS = config["CLUSTER_CONFIGS"]
VALUE_FIELDS = config["VALUE_FIELDS"]
UUIDS = config['UUIDS']
GALAXY_CONFIGS = config['GALAXY_CONFIGS']
CLUSTER_CONFIGS = config['CLUSTER_CONFIGS']
VALUE_FIELDS = config['VALUE_FIELDS']
def create_cluster_values(data, cluster): def create_cluster_values(data, cluster):
value_fields = VALUE_FIELDS[cluster.internal_type] value_fields = VALUE_FIELDS[cluster.internal_type]
@ -34,23 +35,33 @@ def create_cluster_values(data, cluster):
case "value": case "value":
values[key] = entry.get(value) values[key] = entry.get(value)
case _: case _:
print(f"Error: Invalid configuration for {key} in {cluster.internal_type} value fields.") print(
f"Error: Invalid configuration for {key} in {cluster.internal_type} value fields."
)
cluster.add_value(values) cluster.add_value(values)
def create_metadata(data, format): def create_metadata(data, format):
metadata = {} metadata = {}
for meta_key, meta_value in format.items(): for meta_key, meta_value in format.items():
if isinstance(meta_value, dict): if isinstance(meta_value, dict):
if meta_value.get("extract") == "single" and data.get(meta_value["key"]): if meta_value.get("extract") == "single" and data.get(meta_value["key"]):
metadata[meta_key] = data.get(meta_value["key"])[0].get(meta_value["subkey"]) metadata[meta_key] = data.get(meta_value["key"])[0].get(
elif meta_value.get("extract") == "multiple" and data.get(meta_value["key"]): meta_value["subkey"]
metadata[meta_key] = [entry.get(meta_value["subkey"]) for entry in data.get(meta_value["key"])] )
elif meta_value.get("extract") == "multiple" and data.get(
meta_value["key"]
):
metadata[meta_key] = [
entry.get(meta_value["subkey"])
for entry in data.get(meta_value["key"])
]
elif meta_value.get("extract") == "reverse" and data.get(meta_value["key"]): elif meta_value.get("extract") == "reverse" and data.get(meta_value["key"]):
metadata[meta_key] = [data.get(meta_value["key"])] metadata[meta_key] = [data.get(meta_value["key"])]
elif data.get(meta_value): elif data.get(meta_value):
metadata[meta_key] = data.get(meta_value) metadata[meta_key] = data.get(meta_value)
return metadata return metadata
def create_relations(data, format): def create_relations(data, format):
relations = [] relations = []
@ -64,7 +75,7 @@ def create_relations(data, format):
relation_entry[relation_key] = relation_value relation_entry[relation_key] = relation_value
relations.append(relation_entry) relations.append(relation_entry)
return relations return relations
def create_galaxy_and_cluster(galaxy_type, version): def create_galaxy_and_cluster(galaxy_type, version):
api = TidalAPI() api = TidalAPI()
@ -78,6 +89,7 @@ def create_galaxy_and_cluster(galaxy_type, version):
print(f"Galaxy tidal-{galaxy_type} created") print(f"Galaxy tidal-{galaxy_type} created")
def create_galaxy(args): def create_galaxy(args):
if args.all: if args.all:
for galaxy_type in GALAXY_CONFIGS: for galaxy_type in GALAXY_CONFIGS:
@ -85,19 +97,31 @@ def create_galaxy(args):
else: else:
create_galaxy_and_cluster(args.type, args.version) create_galaxy_and_cluster(args.type, args.version)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Create a galaxy and cluster for Tidal API") parser = argparse.ArgumentParser(
description="Create a galaxy and cluster for Tidal API"
)
subparsers = parser.add_subparsers(dest="command") subparsers = parser.add_subparsers(dest="command")
galaxy_parser = subparsers.add_parser("create_galaxy", help="Create a galaxy from the Tidal API") galaxy_parser = subparsers.add_parser(
galaxy_parser.add_argument("--type", choices=list(GALAXY_CONFIGS.keys()) + ['all'], help="The type of the galaxy") "create_galaxy", help="Create a galaxy from the Tidal API"
galaxy_parser.add_argument("-v", "--version", type=int, required=True, help="The version of the galaxy") )
galaxy_parser.add_argument("--all", action="store_true", help="Flag to create all predefined galaxy types") galaxy_parser.add_argument(
"--type",
choices=list(GALAXY_CONFIGS.keys()) + ["all"],
help="The type of the galaxy",
)
galaxy_parser.add_argument(
"-v", "--version", type=int, required=True, help="The version of the galaxy"
)
galaxy_parser.add_argument(
"--all", action="store_true", help="Flag to create all predefined galaxy types"
)
galaxy_parser.set_defaults(func=create_galaxy) galaxy_parser.set_defaults(func=create_galaxy)
args = parser.parse_args() args = parser.parse_args()
if hasattr(args, 'func'): if hasattr(args, "func"):
args.func(args) args.func(args)
else: else:
parser.print_help() parser.print_help()