mirror of https://github.com/MISP/PyMISP
parent
1e060f669f
commit
e5a42b812f
|
@ -0,0 +1,84 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from pymisp.tools import CSVLoader
|
||||||
|
|
||||||
|
from pymisp import MISPEvent
|
||||||
|
|
||||||
|
try:
|
||||||
|
from keys import misp_url, misp_key, misp_verifycert
|
||||||
|
from pymisp import ExpandedPyMISP
|
||||||
|
offline = False
|
||||||
|
except ImportError as e:
|
||||||
|
offline = True
|
||||||
|
print(f'Unable to import MISP parameters, unable to POST on MISP: {e}')
|
||||||
|
|
||||||
|
'''
|
||||||
|
Example:
|
||||||
|
* If the CSV file has fieldnames matching the object-relation:
|
||||||
|
|
||||||
|
load_csv.py -n file -p /tmp/foo.csv
|
||||||
|
|
||||||
|
* If you want to force the fieldnames:
|
||||||
|
|
||||||
|
load_csv.py -n file -p /tmp/foo.csv -f SHA1 fileName size-in-bytes
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser(description='Load a CSV file as MISP objects')
|
||||||
|
parser.add_argument("-n", "--object_name", type=str, required=True, help="Template name of the objects in the CSV.")
|
||||||
|
parser.add_argument("-p", "--path", required=True, type=Path, help="Path to the CSV file.")
|
||||||
|
parser.add_argument("-f", "--fieldnames", nargs='*', default=[], help="Fieldnames of the CSV, have to match the object-relation allowed in the template. If empty, the fieldnames of the CSV have to match the template.")
|
||||||
|
parser.add_argument("-s", "--skip_fieldnames", action='store_true', help="Skip fieldnames in the CSV.")
|
||||||
|
parser.add_argument("-d", "--dump", action='store_true', help="(Debug) Dump the object in the terminal.")
|
||||||
|
|
||||||
|
# Interact with MISP
|
||||||
|
misp_group = parser.add_mutually_exclusive_group()
|
||||||
|
misp_group.add_argument('-i', '--new_event', type=str, help="Info field of the new event")
|
||||||
|
misp_group.add_argument('-u', '--update_event', type=int, help="ID of the existing event to update")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not args.fieldnames:
|
||||||
|
has_fieldnames = True
|
||||||
|
else:
|
||||||
|
has_fieldnames = args.skip_fieldnames
|
||||||
|
csv_loader = CSVLoader(template_name=args.object_name, csv_path=args.path,
|
||||||
|
fieldnames=args.fieldnames, has_fieldnames=has_fieldnames)
|
||||||
|
|
||||||
|
objects = csv_loader.load()
|
||||||
|
if args.dump:
|
||||||
|
for o in objects:
|
||||||
|
print(o.to_json())
|
||||||
|
else:
|
||||||
|
if offline:
|
||||||
|
print('You are in offline mode, quitting.')
|
||||||
|
else:
|
||||||
|
misp = ExpandedPyMISP(url=misp_url, key=misp_key, ssl=misp_verifycert)
|
||||||
|
if args.new_event:
|
||||||
|
event = MISPEvent()
|
||||||
|
event.info = args.new_event
|
||||||
|
for o in objects:
|
||||||
|
event.add_object(**o)
|
||||||
|
new_event = misp.add_event(event)
|
||||||
|
if isinstance(new_event, str):
|
||||||
|
print(new_event)
|
||||||
|
elif 'id' in new_event:
|
||||||
|
print(f'Created new event {new_event.id}')
|
||||||
|
else:
|
||||||
|
print('Something went wrong:')
|
||||||
|
print(new_event)
|
||||||
|
else:
|
||||||
|
for o in objects:
|
||||||
|
new_object = misp.add_object(args.update_event, o)
|
||||||
|
if isinstance(new_object, str):
|
||||||
|
print(new_object)
|
||||||
|
elif new_object.attributes:
|
||||||
|
print(f'New {new_object.name} object added to {args.update_event}')
|
||||||
|
else:
|
||||||
|
print('Something went wrong:')
|
||||||
|
print(new_event)
|
|
@ -3,7 +3,7 @@
|
||||||
|
|
||||||
from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet
|
from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet
|
||||||
from .api import PyMISP, everything_broken
|
from .api import PyMISP, everything_broken
|
||||||
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog
|
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject
|
||||||
from typing import TypeVar, Optional, Tuple, List, Dict, Union
|
from typing import TypeVar, Optional, Tuple, List, Dict, Union
|
||||||
from datetime import date, datetime
|
from datetime import date, datetime
|
||||||
import csv
|
import csv
|
||||||
|
@ -97,6 +97,16 @@ class ExpandedPyMISP(PyMISP):
|
||||||
e.load(event)
|
e.load(event)
|
||||||
return e
|
return e
|
||||||
|
|
||||||
|
def add_object(self, event_id: int, misp_object: MISPObject):
|
||||||
|
created_object = super().add_object(event_id, misp_object)
|
||||||
|
if isinstance(created_object, str):
|
||||||
|
raise NewEventError(f'Unexpected response from server: {created_object}')
|
||||||
|
elif 'errors' in created_object:
|
||||||
|
return created_object
|
||||||
|
o = MISPObject(misp_object.name)
|
||||||
|
o.from_dict(**created_object)
|
||||||
|
return o
|
||||||
|
|
||||||
def add_event(self, event: MISPEvent):
|
def add_event(self, event: MISPEvent):
|
||||||
created_event = super().add_event(event)
|
created_event = super().add_event(event)
|
||||||
if isinstance(created_event, str):
|
if isinstance(created_event, str):
|
||||||
|
|
|
@ -19,3 +19,4 @@ from .geolocationobject import GeolocationObject # noqa
|
||||||
if sys.version_info >= (3, 6):
|
if sys.version_info >= (3, 6):
|
||||||
from .emailobject import EMailObject # noqa
|
from .emailobject import EMailObject # noqa
|
||||||
from .vehicleobject import VehicleObject # noqa
|
from .vehicleobject import VehicleObject # noqa
|
||||||
|
from .csvloader import CSVLoader # noqa
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
import csv
|
||||||
|
from pymisp import MISPObject
|
||||||
|
|
||||||
|
|
||||||
|
class CSVLoader():
|
||||||
|
|
||||||
|
def __init__(self, template_name: str, csv_path: Path, fieldnames: list=[], has_fieldnames=False):
|
||||||
|
self.template_name = template_name
|
||||||
|
self.csv_path = csv_path
|
||||||
|
self.fieldnames = [f.strip().lower() for f in fieldnames]
|
||||||
|
if not self.fieldnames:
|
||||||
|
# If the user doesn't pass fieldnames, we assume the CSV has them.
|
||||||
|
self.has_fieldnames = True
|
||||||
|
else:
|
||||||
|
self.has_fieldnames = has_fieldnames
|
||||||
|
|
||||||
|
def load(self):
|
||||||
|
|
||||||
|
objects = []
|
||||||
|
|
||||||
|
with open(self.csv_path, newline='') as csvfile:
|
||||||
|
reader = csv.reader(csvfile)
|
||||||
|
if self.has_fieldnames:
|
||||||
|
# The file has fieldnames, we either ignore it, or validate its validity
|
||||||
|
fieldnames = [f.strip().lower() for f in reader.__next__()]
|
||||||
|
if not self.fieldnames:
|
||||||
|
self.fieldnames = fieldnames
|
||||||
|
|
||||||
|
if not self.fieldnames:
|
||||||
|
raise Exception(f'No fieldnames, impossible to create objects.')
|
||||||
|
else:
|
||||||
|
# Check if the CSV file has a header, and if it matches with the object template
|
||||||
|
tmp_object = MISPObject(self.template_name)
|
||||||
|
allowed_fieldnames = list(tmp_object._definition['attributes'].keys())
|
||||||
|
for fieldname in self.fieldnames:
|
||||||
|
if fieldname not in allowed_fieldnames:
|
||||||
|
raise Exception(f'{fieldname} is not a valid object relation for {self.template_name}: {allowed_fieldnames}')
|
||||||
|
|
||||||
|
for row in reader:
|
||||||
|
tmp_object = MISPObject(self.template_name)
|
||||||
|
for object_relation, value in zip(self.fieldnames, row):
|
||||||
|
tmp_object.add_attribute(object_relation, value=value)
|
||||||
|
objects.append(tmp_object)
|
||||||
|
return objects
|
Loading…
Reference in New Issue