2018-08-09 18:11:45 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2019-04-04 10:42:37 +02:00
from . exceptions import MISPServerError , PyMISPUnexpectedResponse , PyMISPNotImplementedYet
2018-11-22 14:29:07 +01:00
from . api import PyMISP , everything_broken
2019-04-04 14:37:13 +02:00
from . mispevent import MISPEvent , MISPAttribute , MISPSighting , MISPLog , MISPObject , MISPUser , MISPOrganisation
2019-01-22 11:49:01 +01:00
from typing import TypeVar , Optional , Tuple , List , Dict , Union
2018-08-09 18:11:45 +02:00
from datetime import date , datetime
2018-09-18 03:37:07 +02:00
import csv
2018-08-09 18:11:45 +02:00
import logging
from urllib . parse import urljoin
SearchType = TypeVar ( ' SearchType ' , str , int )
# str: string to search / list: values to search (OR) / dict: {'OR': [list], 'NOT': [list], 'AND': [list]}
SearchParameterTypes = TypeVar ( ' SearchParameterTypes ' , str , List [ SearchType ] , Dict [ str , SearchType ] )
DateTypes = TypeVar ( ' DateTypes ' , datetime , date , SearchType , float )
DateInterval = TypeVar ( ' DateInterval ' , DateTypes , Tuple [ DateTypes , DateTypes ] )
2019-01-22 11:49:01 +01:00
ToIDSType = TypeVar ( ' ToIDSType ' , str , int , bool )
2018-08-09 18:11:45 +02:00
logger = logging . getLogger ( ' pymisp ' )
class ExpandedPyMISP ( PyMISP ) :
def build_complex_query ( self , or_parameters : Optional [ List [ SearchType ] ] = None ,
and_parameters : Optional [ List [ SearchType ] ] = None ,
not_parameters : Optional [ List [ SearchType ] ] = None ) :
to_return = { }
if and_parameters :
to_return [ ' AND ' ] = and_parameters
if not_parameters :
to_return [ ' NOT ' ] = not_parameters
if or_parameters :
to_return [ ' OR ' ] = or_parameters
return to_return
2018-09-25 15:32:17 +02:00
def toggle_warninglist ( self , warninglist_id : List [ int ] = None , warninglist_name : List [ str ] = None , force_enable : bool = None ) :
2018-09-21 22:02:15 +02:00
''' Toggle (enable/disable) the status of a warninglist by ID.
: param warninglist_id : ID of the WarningList
: param force_enable : Force the warning list in the enabled state ( does nothing is already enabled )
'''
2018-10-05 17:45:12 +02:00
return super ( ) . toggle_warninglist ( warninglist_id , warninglist_name , force_enable )
2018-09-21 22:02:15 +02:00
2018-08-09 18:11:45 +02:00
def make_timestamp ( self , value : DateTypes ) :
if isinstance ( value , datetime ) :
return datetime . timestamp ( )
elif isinstance ( value , date ) :
return datetime . combine ( value , datetime . max . time ( ) ) . timestamp ( )
elif isinstance ( value , str ) :
if value . isdigit ( ) :
return value
else :
try :
float ( value )
return value
except ValueError :
# The value can also be '1d', '10h', ...
return value
else :
return value
def _check_response ( self , response ) :
""" Check if the response from the server is not an unexpected error """
if response . status_code > = 500 :
logger . critical ( everything_broken . format ( response . request . headers , response . request . body , response . text ) )
raise MISPServerError ( ' Error code 500: \n {} ' . format ( response . text ) )
elif 400 < = response . status_code < 500 :
# The server returns a json message with the error details
error_message = response . json ( )
logger . error ( f ' Something went wrong ( { response . status_code } ): { error_message } ' )
2018-12-26 17:38:19 +01:00
return { ' errors ' : ( response . status_code , error_message ) }
2018-08-09 18:11:45 +02:00
# At this point, we had no error.
try :
response = response . json ( )
2018-08-19 14:35:32 +02:00
if logger . isEnabledFor ( logging . DEBUG ) :
logger . debug ( response )
2018-09-29 21:44:02 +02:00
if isinstance ( response , dict ) and response . get ( ' response ' ) is not None :
2018-08-09 18:11:45 +02:00
# Cleanup.
2018-12-18 11:04:36 +01:00
response = response [ ' response ' ]
2018-08-09 18:11:45 +02:00
return response
except Exception :
2018-08-19 14:35:32 +02:00
if logger . isEnabledFor ( logging . DEBUG ) :
logger . debug ( response . text )
2018-12-18 11:04:36 +01:00
if not len ( response . content ) :
# Empty response
logger . error ( ' Got an empty response. ' )
2018-12-26 17:38:19 +01:00
return { ' errors ' : ' The response is empty. ' }
2018-08-09 18:11:45 +02:00
return response . text
2018-09-29 21:11:42 +02:00
def get_event ( self , event_id : int ) :
event = super ( ) . get_event ( event_id )
e = MISPEvent ( )
e . load ( event )
return e
2019-04-03 16:28:26 +02:00
def add_object ( self , event_id : int , misp_object : MISPObject ) :
created_object = super ( ) . add_object ( event_id , misp_object )
if isinstance ( created_object , str ) :
2019-04-04 10:42:37 +02:00
raise PyMISPUnexpectedResponse ( f ' Unexpected response from server: { created_object } ' )
2019-04-03 16:28:26 +02:00
elif ' errors ' in created_object :
return created_object
o = MISPObject ( misp_object . name )
o . from_dict ( * * created_object )
return o
2019-04-03 17:46:52 +02:00
def update_object ( self , misp_object : MISPObject ) :
updated_object = super ( ) . edit_object ( misp_object )
if isinstance ( updated_object , str ) :
2019-04-04 10:42:37 +02:00
raise PyMISPUnexpectedResponse ( f ' Unexpected response from server: { updated_object } ' )
2019-04-03 17:46:52 +02:00
elif ' errors ' in updated_object :
return updated_object
o = MISPObject ( misp_object . name )
o . from_dict ( * * updated_object )
return o
2019-04-04 10:42:37 +02:00
def get_object ( self , object_id : int ) :
""" Get an object
: param obj_id : Object id to get
"""
misp_object = super ( ) . get_object ( object_id )
if isinstance ( misp_object , str ) :
raise PyMISPUnexpectedResponse ( f ' Unexpected response from server: { misp_object } ' )
elif ' errors ' in misp_object :
return misp_object
o = MISPObject ( misp_object [ ' Object ' ] [ ' name ' ] )
o . from_dict ( * * misp_object )
return o
2018-08-20 18:27:06 +02:00
def add_event ( self , event : MISPEvent ) :
created_event = super ( ) . add_event ( event )
if isinstance ( created_event , str ) :
2019-04-04 10:42:37 +02:00
raise PyMISPUnexpectedResponse ( f ' Unexpected response from server: { created_event } ' )
2018-11-28 17:34:38 +01:00
elif ' errors ' in created_event :
return created_event
2018-08-20 18:27:06 +02:00
e = MISPEvent ( )
e . load ( created_event )
return e
2018-08-21 00:32:27 +02:00
def update_event ( self , event : MISPEvent ) :
updated_event = super ( ) . update_event ( event . uuid , event )
if isinstance ( updated_event , str ) :
2019-04-04 10:42:37 +02:00
raise PyMISPUnexpectedResponse ( f ' Unexpected response from server: { updated_event } ' )
2018-11-28 17:34:38 +01:00
elif ' errors ' in updated_event :
return updated_event
2018-08-21 00:32:27 +02:00
e = MISPEvent ( )
e . load ( updated_event )
return e
def update_attribute ( self , attribute : MISPAttribute ) :
updated_attribute = super ( ) . update_attribute ( attribute . uuid , attribute )
if isinstance ( updated_attribute , str ) :
2019-04-04 10:42:37 +02:00
raise PyMISPUnexpectedResponse ( f ' Unexpected response from server: { updated_attribute } ' )
2018-11-28 17:34:38 +01:00
elif ' errors ' in updated_attribute :
return updated_attribute
2018-08-21 00:32:27 +02:00
a = MISPAttribute ( )
a . from_dict ( * * updated_attribute )
return a
2019-04-04 14:37:13 +02:00
def add_user ( self , user : MISPUser ) :
user = super ( ) . add_user ( user )
if isinstance ( user , str ) :
raise PyMISPUnexpectedResponse ( f ' Unexpected response from server: { user } ' )
elif ' errors ' in user :
return user
u = MISPUser ( )
u . from_dict ( * * user )
return u
def get_user ( self , userid = ' me ' ) :
user = super ( ) . get_user ( userid )
if isinstance ( user , str ) :
raise PyMISPUnexpectedResponse ( f ' Unexpected response from server: { user } ' )
elif ' errors ' in user :
return user
u = MISPUser ( )
u . from_dict ( * * user )
return u
def add_organisation ( self , organisation : MISPOrganisation ) :
organisation = super ( ) . add_organisation ( organisation )
if isinstance ( organisation , str ) :
raise PyMISPUnexpectedResponse ( f ' Unexpected response from server: { organisation } ' )
elif ' errors ' in organisation :
return organisation
o = MISPOrganisation ( )
o . from_dict ( * * organisation )
return o
2018-10-31 16:42:01 +01:00
def search_sightings ( self , context : Optional [ str ] = None ,
context_id : Optional [ SearchType ] = None ,
type_sighting : Optional [ str ] = None ,
date_from : Optional [ DateTypes ] = None ,
date_to : Optional [ DateTypes ] = None ,
publish_timestamp : Optional [ DateInterval ] = None , last : Optional [ DateInterval ] = None ,
org : Optional [ SearchType ] = None ,
source : Optional [ str ] = None ,
include_attribute : Optional [ bool ] = None ,
include_event_meta : Optional [ bool ] = None ,
pythonify : Optional [ bool ] = False
) :
''' Search sightings
: param context : The context of the search . Can be either " attribute " , " event " , or nothing ( will then match on events and attributes ) .
: param context_id : Only relevant if context is either " attribute " or " event " . Then it is the relevant ID .
: param type_sighting : Type of sighting
: param date_from : Events with the date set to a date after the one specified . This filter will use the date of the event .
: param date_to : Events with the date set to a date before the one specified . This filter will use the date of the event .
: param publish_timestamp : Restrict the results by the last publish timestamp ( newer than ) .
: param org : Search by the creator organisation by supplying the organisation identifier .
: param source : Source of the sighting
: param include_attribute : Include the attribute .
: param include_event_meta : Include the meta information of the event .
Deprecated :
: param last : synonym for publish_timestamp
: Example :
>> > misp . search_sightings ( publish_timestamp = ' 30d ' ) # search sightings for the last 30 days on the instance
[ . . . ]
>> > misp . search_sightings ( context = ' attribute ' , context_id = 6 , include_attribute = True ) # return list of sighting for attribute 6 along with the attribute itself
[ . . . ]
>> > misp . search_sightings ( context = ' event ' , context_id = 17 , include_event_meta = True , org = 2 ) # return list of sighting for event 17 filtered with org id 2
'''
query = { ' returnFormat ' : ' json ' }
if context is not None :
if context not in [ ' attribute ' , ' event ' ] :
raise ValueError ( ' context has to be in {} ' . format ( ' , ' . join ( [ ' attribute ' , ' event ' ] ) ) )
url_path = f ' sightings/restSearch/ { context } '
else :
url_path = ' sightings/restSearch '
query [ ' id ' ] = context_id
query [ ' type ' ] = type_sighting
query [ ' from ' ] = date_from
query [ ' to ' ] = date_to
query [ ' last ' ] = publish_timestamp
query [ ' org_id ' ] = org
query [ ' source ' ] = source
query [ ' includeAttribute ' ] = include_attribute
query [ ' includeEvent ' ] = include_event_meta
url = urljoin ( self . root_url , url_path )
2018-12-18 11:04:36 +01:00
response = self . _prepare_request ( ' POST ' , url , data = query )
2018-10-31 16:42:01 +01:00
normalized_response = self . _check_response ( response )
2019-01-22 11:49:01 +01:00
if isinstance ( normalized_response , str ) or ( isinstance ( normalized_response , dict )
and normalized_response . get ( ' errors ' ) ) :
2018-10-31 16:42:01 +01:00
return normalized_response
elif pythonify :
to_return = [ ]
for s in normalized_response :
entries = { }
s_data = s [ ' Sighting ' ]
if include_event_meta :
e = s_data . pop ( ' Event ' )
me = MISPEvent ( )
me . from_dict ( * * e )
entries [ ' event ' ] = me
if include_attribute :
a = s_data . pop ( ' Attribute ' )
ma = MISPAttribute ( )
ma . from_dict ( * * a )
entries [ ' attribute ' ] = ma
ms = MISPSighting ( )
ms . from_dict ( * * s_data )
entries [ ' sighting ' ] = ms
to_return . append ( entries )
return to_return
else :
return normalized_response
2018-08-09 18:11:45 +02:00
def search ( self , controller : str = ' events ' , return_format : str = ' json ' ,
2018-10-31 16:42:01 +01:00
limit : Optional [ int ] = None , page : Optional [ int ] = None ,
2018-08-09 18:11:45 +02:00
value : Optional [ SearchParameterTypes ] = None ,
type_attribute : Optional [ SearchParameterTypes ] = None ,
category : Optional [ SearchParameterTypes ] = None ,
org : Optional [ SearchParameterTypes ] = None ,
tags : Optional [ SearchParameterTypes ] = None ,
2018-11-20 14:59:58 +01:00
quick_filter : Optional [ str ] = None , quickFilter : Optional [ str ] = None ,
2018-09-19 06:58:20 +02:00
date_from : Optional [ DateTypes ] = None ,
date_to : Optional [ DateTypes ] = None ,
2018-08-09 18:11:45 +02:00
eventid : Optional [ SearchType ] = None ,
2018-09-19 06:58:20 +02:00
with_attachments : Optional [ bool ] = None , withAttachments : Optional [ bool ] = None ,
2018-08-09 18:11:45 +02:00
metadata : Optional [ bool ] = None ,
uuid : Optional [ str ] = None ,
2018-09-19 06:58:20 +02:00
publish_timestamp : Optional [ DateInterval ] = None , last : Optional [ DateInterval ] = None ,
timestamp : Optional [ DateInterval ] = None ,
2018-08-09 18:11:45 +02:00
published : Optional [ bool ] = None ,
enforce_warninglist : Optional [ bool ] = None , enforceWarninglist : Optional [ bool ] = None ,
2019-01-22 11:49:01 +01:00
to_ids : Optional [ Union [ ToIDSType , List [ ToIDSType ] ] ] = None ,
2018-09-19 06:58:20 +02:00
deleted : Optional [ str ] = None ,
include_event_uuid : Optional [ str ] = None , includeEventUuid : Optional [ str ] = None ,
event_timestamp : Optional [ DateTypes ] = None ,
2018-09-30 14:22:44 +02:00
sg_reference_only : Optional [ bool ] = None ,
2018-09-19 06:58:20 +02:00
eventinfo : Optional [ str ] = None ,
searchall : Optional [ bool ] = None ,
2018-10-31 16:42:01 +01:00
requested_attributes : Optional [ str ] = None ,
2018-10-22 04:58:07 +02:00
include_context : Optional [ bool ] = None , includeContext : Optional [ bool ] = None ,
2018-10-31 16:42:01 +01:00
headerless : Optional [ bool ] = None ,
2018-09-19 06:58:20 +02:00
pythonify : Optional [ bool ] = False ,
2018-08-09 18:11:45 +02:00
* * kwargs ) :
2018-10-31 16:42:01 +01:00
''' Search in the MISP instance
2018-09-19 06:58:20 +02:00
: param returnFormat : Set the return format of the search ( Currently supported : json , xml , openioc , suricata , snort - more formats are being moved to restSearch with the goal being that all searches happen through this API ) . Can be passed as the first parameter after restSearch or via the JSON payload .
2018-10-31 16:42:01 +01:00
: param limit : Limit the number of results returned , depending on the scope ( for example 10 attributes or 10 full events ) .
: param page : If a limit is set , sets the page to be returned . page 3 , limit 100 will return records 201 - > 300 ) .
2018-09-19 06:58:20 +02:00
: param value : Search for the given value in the attributes ' value field.
: param type_attribute : The attribute type , any valid MISP attribute type is accepted .
: param category : The attribute category , any valid MISP attribute category is accepted .
: param org : Search by the creator organisation by supplying the organisation identifier .
: param tags : Tags to search or to exclude . You can pass a list , or the output of ` build_complex_query `
2018-11-20 14:59:20 +01:00
: param quick_filter : The string passed to this field will ignore all of the other arguments . MISP will return an xml / json ( depending on the header sent ) of all events that have a sub - string match on value in the event info , event orgc , or any of the attribute value1 / value2 fields , or in the attribute comment .
2018-09-19 06:58:20 +02:00
: param date_from : Events with the date set to a date after the one specified . This filter will use the date of the event .
: param date_to : Events with the date set to a date before the one specified . This filter will use the date of the event .
: param eventid : The events that should be included / excluded from the search
: param with_attachments : If set , encodes the attachments / zipped malware samples as base64 in the data field within each attribute
: param metadata : Only the metadata ( event , tags , relations ) is returned , attributes and proposals are omitted .
: param uuid : Restrict the results by uuid .
: param publish_timestamp : Restrict the results by the last publish timestamp ( newer than ) .
: param timestamp : Restrict the results by the timestamp ( last edit ) . Any event with a timestamp newer than the given timestamp will be returned . In case you are dealing with / attributes as scope , the attribute ' s timestamp will be used for the lookup.
: param published : Set whether published or unpublished events should be returned . Do not set the parameter if you want both .
: param enforce_warninglist : Remove any attributes from the result that would cause a hit on a warninglist entry .
2019-01-22 11:49:01 +01:00
: param to_ids : By default all attributes are returned that match the other filter parameters , irregardless of their to_ids setting . To restrict the returned data set to to_ids only attributes set this parameter to 1. 0 for the ones with to_ids set to False .
2018-09-19 06:58:20 +02:00
: param deleted : If this parameter is set to 1 , it will return soft - deleted attributes along with active ones . By using " only " as a parameter it will limit the returned data set to soft - deleted data only .
: param include_event_uuid : Instead of just including the event ID , also include the event UUID in each of the attributes .
: param event_timestamp : Only return attributes from events that have received a modification after the given timestamp .
2018-09-30 14:22:44 +02:00
: param sg_reference_only : If this flag is set , sharing group objects will not be included , instead only the sharing group ID is set .
: param eventinfo : Filter on the event ' s info field.
: param searchall : Search for a full or a substring ( delimited by % for substrings ) in the event info , event tags , attribute tags , attribute values or attribute comment fields .
2018-10-31 16:42:01 +01:00
: param requested_attributes : [ CSV only ] Select the fields that you wish to include in the CSV export . By setting event level fields additionally , includeContext is not required to get event metadata .
2018-10-22 04:58:07 +02:00
: param include_context : [ CSV Only ] Include the event data with each attribute .
2018-10-31 16:42:01 +01:00
: param headerless : [ CSV Only ] The CSV created when this setting is set to true will not contain the header row .
2018-11-22 14:29:07 +01:00
: param pythonify : Returns a list of PyMISP Objects instead of the plain json output . Warning : it might use a lot of RAM
2018-09-19 06:58:20 +02:00
Deprecated :
2018-11-20 14:59:20 +01:00
: param quickFilter : synponym for quick_filter
2018-09-19 06:58:20 +02:00
: param withAttachments : synonym for with_attachments
: param last : synonym for publish_timestamp
: param enforceWarninglist : synonym for enforce_warninglist
: param includeEventUuid : synonym for include_event_uuid
2018-10-22 04:58:07 +02:00
: param includeContext : synonym for include_context
2018-09-19 06:58:20 +02:00
'''
2018-08-09 18:11:45 +02:00
2019-02-01 11:07:42 +01:00
return_formats = [ ' openioc ' , ' json ' , ' xml ' , ' suricata ' , ' snort ' , ' text ' , ' rpz ' , ' csv ' , ' cache ' , ' stix ' , ' stix2 ' ]
2018-10-22 00:49:38 +02:00
2018-10-31 16:42:01 +01:00
if controller not in [ ' events ' , ' attributes ' , ' objects ' , ' sightings ' ] :
2018-08-09 18:11:45 +02:00
raise ValueError ( ' controller has to be in {} ' . format ( ' , ' . join ( [ ' events ' , ' attributes ' , ' objects ' ] ) ) )
2018-09-19 06:58:20 +02:00
# Deprecated stuff / synonyms
2018-11-20 14:59:20 +01:00
if quickFilter is not None :
quick_filter = quickFilter
2018-09-19 06:58:20 +02:00
if withAttachments is not None :
with_attachments = withAttachments
if last is not None :
publish_timestamp = last
if enforceWarninglist is not None :
enforce_warninglist = enforceWarninglist
if includeEventUuid is not None :
include_event_uuid = includeEventUuid
2018-10-22 04:58:07 +02:00
if includeContext is not None :
include_context = includeContext
2018-09-19 06:58:20 +02:00
2018-08-09 18:11:45 +02:00
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
# They are passed as-is.
query = kwargs
2018-10-31 16:42:01 +01:00
if return_format not in return_formats :
raise ValueError ( ' return_format has to be in {} ' . format ( ' , ' . join ( return_formats ) ) )
query [ ' returnFormat ' ] = return_format
query [ ' page ' ] = page
query [ ' limit ' ] = limit
query [ ' value ' ] = value
query [ ' type ' ] = type_attribute
query [ ' category ' ] = category
query [ ' org ' ] = org
query [ ' tags ' ] = tags
2018-11-20 14:59:20 +01:00
query [ ' quickFilter ' ] = quick_filter
2018-10-31 16:42:01 +01:00
query [ ' from ' ] = self . make_timestamp ( date_from )
query [ ' to ' ] = self . make_timestamp ( date_to )
query [ ' eventid ' ] = eventid
query [ ' withAttachments ' ] = with_attachments
query [ ' metadata ' ] = metadata
query [ ' uuid ' ] = uuid
2018-08-09 18:11:45 +02:00
if publish_timestamp is not None :
if isinstance ( publish_timestamp , ( list , tuple ) ) :
query [ ' publish_timestamp ' ] = ( self . make_timestamp ( publish_timestamp [ 0 ] ) , self . make_timestamp ( publish_timestamp [ 1 ] ) )
else :
query [ ' publish_timestamp ' ] = self . make_timestamp ( publish_timestamp )
if timestamp is not None :
if isinstance ( timestamp , ( list , tuple ) ) :
query [ ' timestamp ' ] = ( self . make_timestamp ( timestamp [ 0 ] ) , self . make_timestamp ( timestamp [ 1 ] ) )
else :
query [ ' timestamp ' ] = self . make_timestamp ( timestamp )
2018-10-31 16:42:01 +01:00
query [ ' published ' ] = published
query [ ' enforceWarninglist ' ] = enforce_warninglist
2018-09-19 06:58:20 +02:00
if to_ids is not None :
2019-01-22 11:49:01 +01:00
if int ( to_ids ) not in [ 0 , 1 ] :
raise ValueError ( ' to_ids has to be in {} ' . format ( ' , ' . join ( [ 0 , 1 ] ) ) )
2018-09-19 06:58:20 +02:00
query [ ' to_ids ' ] = to_ids
2018-10-31 16:42:01 +01:00
query [ ' deleted ' ] = deleted
query [ ' includeEventUuid ' ] = include_event_uuid
2018-09-19 06:58:20 +02:00
if event_timestamp is not None :
if isinstance ( event_timestamp , ( list , tuple ) ) :
query [ ' event_timestamp ' ] = ( self . make_timestamp ( event_timestamp [ 0 ] ) , self . make_timestamp ( event_timestamp [ 1 ] ) )
else :
query [ ' event_timestamp ' ] = self . make_timestamp ( event_timestamp )
2018-10-31 16:42:01 +01:00
query [ ' sgReferenceOnly ' ] = sg_reference_only
query [ ' eventinfo ' ] = eventinfo
query [ ' searchall ' ] = searchall
query [ ' requested_attributes ' ] = requested_attributes
query [ ' includeContext ' ] = include_context
query [ ' headerless ' ] = headerless
2018-08-09 18:11:45 +02:00
url = urljoin ( self . root_url , f ' { controller } /restSearch ' )
2018-12-18 11:04:36 +01:00
response = self . _prepare_request ( ' POST ' , url , data = query )
2018-08-19 14:35:32 +02:00
normalized_response = self . _check_response ( response )
2018-10-31 16:42:01 +01:00
if return_format == ' csv ' and pythonify and not headerless :
2018-10-22 04:58:07 +02:00
return self . _csv_to_dict ( normalized_response )
2019-01-22 11:49:01 +01:00
elif isinstance ( normalized_response , str ) or ( isinstance ( normalized_response , dict )
and normalized_response . get ( ' errors ' ) ) :
2018-08-19 14:35:32 +02:00
return normalized_response
2018-09-19 06:58:20 +02:00
elif return_format == ' json ' and pythonify :
# The response is in json, we can convert it to a list of pythonic MISP objects
to_return = [ ]
if controller == ' events ' :
for e in normalized_response :
me = MISPEvent ( )
me . load ( e )
to_return . append ( me )
elif controller == ' attributes ' :
for a in normalized_response . get ( ' Attribute ' ) :
ma = MISPAttribute ( )
ma . from_dict ( * * a )
to_return . append ( ma )
elif controller == ' objects ' :
raise PyMISPNotImplementedYet ( ' Not implemented yet ' )
return to_return
else :
return normalized_response
2018-09-18 03:37:07 +02:00
2018-10-22 04:58:07 +02:00
def _csv_to_dict ( self , csv_content ) :
''' Makes a list of dict out of a csv file (requires headers) '''
fieldnames , lines = csv_content . split ( ' \n ' , 1 )
fieldnames = fieldnames . split ( ' , ' )
to_return = [ ]
for line in csv . reader ( lines . split ( ' \n ' ) ) :
if line :
to_return . append ( { fname : value for fname , value in zip ( fieldnames , line ) } )
return to_return
2018-11-20 01:21:06 +01:00
def search_logs ( self , limit : Optional [ int ] = None , page : Optional [ int ] = None ,
log_id : Optional [ int ] = None , title : Optional [ str ] = None ,
created : Optional [ DateTypes ] = None , model : Optional [ str ] = None ,
action : Optional [ str ] = None , user_id : Optional [ int ] = None ,
change : Optional [ str ] = None , email : Optional [ str ] = None ,
org : Optional [ str ] = None , description : Optional [ str ] = None ,
2018-11-22 14:29:07 +01:00
ip : Optional [ str ] = None , pythonify : Optional [ bool ] = False ) :
2018-11-20 01:21:06 +01:00
''' Search in logs
Note : to run substring queries simply append / prepend / encapsulate the search term with %
: param limit : Limit the number of results returned , depending on the scope ( for example 10 attributes or 10 full events ) .
: param page : If a limit is set , sets the page to be returned . page 3 , limit 100 will return records 201 - > 300 ) .
: param log_id : Log ID
: param title : Log Title
: param created : Creation timestamp
: param model : Model name that generated the log entry
: param action : The thing that was done
: param user_id : ID of the user doing the action
: param change : Change that occured
: param email : Email of the user
: param org : Organisation of the User doing the action
: param description : Description of the action
: param ip : Origination IP of the User doing the action
2018-11-22 14:29:07 +01:00
: param pythonify : Returns a list of PyMISP Objects instead or the plain json output . Warning : it might use a lot of RAM
2018-11-20 01:21:06 +01:00
'''
query = locals ( )
query . pop ( ' self ' )
2018-11-22 14:29:07 +01:00
query . pop ( ' pythonify ' )
2018-11-20 01:21:06 +01:00
if log_id is not None :
query [ ' id ' ] = query . pop ( ' log_id ' )
url = urljoin ( self . root_url , ' admin/logs/index ' )
2018-12-18 11:04:36 +01:00
response = self . _prepare_request ( ' POST ' , url , data = query )
2018-11-20 01:21:06 +01:00
normalized_response = self . _check_response ( response )
2018-11-22 14:29:07 +01:00
if not pythonify :
return normalized_response
to_return = [ ]
for l in normalized_response :
ml = MISPLog ( )
ml . from_dict ( * * l [ ' Log ' ] )
to_return . append ( ml )
return to_return
def search_index ( self , published : Optional [ bool ] = None , eventid : Optional [ SearchType ] = None ,
tags : Optional [ SearchParameterTypes ] = None ,
date_from : Optional [ DateTypes ] = None ,
date_to : Optional [ DateTypes ] = None ,
eventinfo : Optional [ str ] = None ,
threatlevel : Optional [ List [ SearchType ] ] = None ,
distribution : Optional [ List [ SearchType ] ] = None ,
analysis : Optional [ List [ SearchType ] ] = None ,
org : Optional [ SearchParameterTypes ] = None ,
timestamp : Optional [ DateInterval ] = None ,
pythonify : Optional [ bool ] = None ) :
""" Search only at the index level. Using ! in front of a value means NOT (default is OR)
: param published : Set whether published or unpublished events should be returned . Do not set the parameter if you want both .
: param eventid : The events that should be included / excluded from the search
: param tags : Tags to search or to exclude . You can pass a list , or the output of ` build_complex_query `
: param date_from : Events with the date set to a date after the one specified . This filter will use the date of the event .
: param date_to : Events with the date set to a date before the one specified . This filter will use the date of the event .
: param eventinfo : Filter on the event ' s info field.
: param threatlevel : Threat level ( s ) ( 1 , 2 , 3 , 4 ) | list
: param distribution : Distribution level ( s ) ( 0 , 1 , 2 , 3 ) | list
: param analysis : Analysis level ( s ) ( 0 , 1 , 2 ) | list
: param org : Search by the creator organisation by supplying the organisation identifier .
: param timestamp : Restrict the results by the timestamp ( last edit ) . Any event with a timestamp newer than the given timestamp will be returned . In case you are dealing with / attributes as scope , the attribute ' s timestamp will be used for the lookup.
: param pythonify : Returns a list of PyMISP Objects instead or the plain json output . Warning : it might use a lot of RAM
"""
query = locals ( )
query . pop ( ' self ' )
query . pop ( ' pythonify ' )
if query . get ( ' date_from ' ) :
query [ ' datefrom ' ] = self . make_timestamp ( query . pop ( ' date_from ' ) )
if query . get ( ' date_to ' ) :
query [ ' dateuntil ' ] = self . make_timestamp ( query . pop ( ' date_to ' ) )
if query . get ( ' timestamp ' ) is not None :
timestamp = query . pop ( ' timestamp ' )
if isinstance ( timestamp , ( list , tuple ) ) :
query [ ' timestamp ' ] = ( self . make_timestamp ( timestamp [ 0 ] ) , self . make_timestamp ( timestamp [ 1 ] ) )
else :
query [ ' timestamp ' ] = self . make_timestamp ( timestamp )
url = urljoin ( self . root_url , ' events/index ' )
2018-12-18 11:04:36 +01:00
response = self . _prepare_request ( ' POST ' , url , data = query )
2018-11-22 14:29:07 +01:00
normalized_response = self . _check_response ( response )
if not pythonify :
return normalized_response
to_return = [ ]
for e_meta in normalized_response :
me = MISPEvent ( )
me . from_dict ( * * e_meta )
to_return . append ( me )
return to_return
2019-04-04 16:39:17 +02:00
def set_default_role ( self , role_id : int ) :
url = urljoin ( self . root_url , f ' /admin/roles/set_default/ { role_id } ' )
response = self . _prepare_request ( ' POST ' , url )
return self . _check_response ( response )