mirror of https://github.com/MISP/PyMISP
Merge remote-tracking branch 'upstream/master'
commit
4eaf958ac2
|
@ -44,13 +44,16 @@ vim keys.py
|
|||
The API key of MISP is available in the Automation section of the MISP web interface.
|
||||
|
||||
To test if your URL and API keys are correct, you can test with examples/last.py to
|
||||
fetch the last 10 events published.
|
||||
|
||||
fetch the events published in the last x amount of time (supported time indicators: days (d), hours (h) and minutes (m)).
|
||||
last.py
|
||||
```
|
||||
cd examples
|
||||
python3 last.py -l 10
|
||||
python3 last.py -l 10h # 10 hours
|
||||
python3 last.py -l 5d # 5 days
|
||||
python3 last.py -l 45m # 45 minutes
|
||||
```
|
||||
|
||||
|
||||
## Debugging
|
||||
|
||||
You have two options there:
|
||||
|
|
|
@ -787,7 +787,7 @@
|
|||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.6.5"
|
||||
"version": "3.6.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
|
|
@ -22,10 +22,10 @@
|
|||
"outputs": [],
|
||||
"source": [
|
||||
"# The URL of the MISP instance to connect to\n",
|
||||
"misp_url = 'http://127.0.0.1:9090/'\n",
|
||||
"misp_url = 'http://127.0.0.1:8080/'\n",
|
||||
"# Can be found in the MISP web interface under \n",
|
||||
"# http://+MISP_URL+/users/view/me -> Authkey\n",
|
||||
"misp_key = 'btm3o1j6SzKUEsHiNz0vTMYzPfcc5eIKpfaWFADj'\n",
|
||||
"misp_key = 'BSip0zVadeFDeolkX2g7MHx8mrlr0uE04hh6CQj0'\n",
|
||||
"# Should PyMISP verify the MISP certificate\n",
|
||||
"misp_verifycert = False"
|
||||
]
|
||||
|
@ -67,9 +67,10 @@
|
|||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from pymisp import PyMISP\n",
|
||||
"from pymisp import ExpandedPyMISP, PyMISP\n",
|
||||
"\n",
|
||||
"misp = PyMISP(misp_url, misp_key, misp_verifycert)"
|
||||
"misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert)\n",
|
||||
"misp_old = PyMISP(misp_url, misp_key, misp_verifycert)"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -96,7 +97,7 @@
|
|||
" threat_level_id=1,\n",
|
||||
" analysis=1,\n",
|
||||
" info=\"Event from notebook\")\n",
|
||||
"print(\"Event id: %s\" % event['Event']['id'])"
|
||||
"print(\"Event id: %s\" % event.id)"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -120,7 +121,7 @@
|
|||
"event_obj.analysis = 1\n",
|
||||
"event_obj.info = \"Event from notebook 2\"\n",
|
||||
"event = misp.add_event(event_obj)\n",
|
||||
"event_id = event['Event']['id']\n",
|
||||
"event_id = event.id\n",
|
||||
"print(\"Event id: %s\" % event_id)"
|
||||
]
|
||||
},
|
||||
|
@ -237,7 +238,7 @@
|
|||
"source": [
|
||||
"# Add the attribute to the event\n",
|
||||
"## Fetch the event from MISP\n",
|
||||
"event_dict = misp.get(event_id)['Event']\n",
|
||||
"event_dict = misp_old.get(event_id)['Event']\n",
|
||||
"\n",
|
||||
"## Convert it to a PyMISP Event\n",
|
||||
"event = MISPEvent()\n",
|
||||
|
@ -248,7 +249,25 @@
|
|||
"event.add_attribute(type='domain', value='circl.lu', disable_correlation=True)\n",
|
||||
"\n",
|
||||
"## Push the updated event to MISP\n",
|
||||
"event_dict = misp.update(event)\n",
|
||||
"event_dict = misp.update_event(event)\n",
|
||||
"print(event_dict)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# New Python 3.6 API\n",
|
||||
"event = misp.get(event_id)\n",
|
||||
"\n",
|
||||
"## Add the attribute to the event\n",
|
||||
"event.add_attribute(**attribute)\n",
|
||||
"event.add_attribute(type='domain', value='circl.lu', disable_correlation=True)\n",
|
||||
"\n",
|
||||
"## Push the updated event to MISP\n",
|
||||
"event_dict = misp.update_event(event)\n",
|
||||
"print(event_dict)"
|
||||
]
|
||||
},
|
||||
|
@ -273,8 +292,6 @@
|
|||
"outputs": [],
|
||||
"source": [
|
||||
"results = misp.search_index(eventinfo='notebook')\n",
|
||||
"# The data is stored in the field 'response'\n",
|
||||
"results = results['response']\n",
|
||||
"\n",
|
||||
"for event in results:\n",
|
||||
" print(event['id'], ':', event['info'])"
|
||||
|
@ -304,12 +321,8 @@
|
|||
"source": [
|
||||
"# Search attributes (specified in controller) where the attribute type is 'ip-src'\n",
|
||||
"# And the to_ids flag is set\n",
|
||||
"response = misp.search(controller='attributes', type_attribute='ip-src', to_ids=False)\n",
|
||||
"# The data is stored in the field 'response'\n",
|
||||
"results = response['response']\n",
|
||||
"attributes = misp.search(controller='attributes', type_attribute='ip-src', to_ids=0, pythonify=True)\n",
|
||||
"\n",
|
||||
"# Get all related event\n",
|
||||
"attributes = results['Attribute']\n",
|
||||
"event_ids = set()\n",
|
||||
"for attr in attributes:\n",
|
||||
" event_ids.add(event_id)\n",
|
||||
|
@ -317,7 +330,7 @@
|
|||
"# Fetch all related events\n",
|
||||
"for event_id in event_ids:\n",
|
||||
" event = misp.get_event(event_id)\n",
|
||||
" print(event['Event']['info'])"
|
||||
" print(event.info)"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -451,7 +464,7 @@
|
|||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.6.5"
|
||||
"version": "3.6.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
"misp_url = 'http://127.0.0.1:8080'\n",
|
||||
"# Can be found in the MISP web interface under \n",
|
||||
"# http://+MISP_URL+/users/view/me -> Authkey\n",
|
||||
"misp_key = 'xe5okWNY2OB3O9ljR6t2cJPNsv4u1VZB0C1mKwtB'\n",
|
||||
"misp_key = 'BSip0zVadeFDeolkX2g7MHx8mrlr0uE04hh6CQj0'\n",
|
||||
"# Should PyMISP verify the MISP certificate\n",
|
||||
"misp_verifycert = False"
|
||||
]
|
||||
|
@ -52,9 +52,9 @@
|
|||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from pymisp import PyMISP\n",
|
||||
"from pymisp import ExpandedPyMISP\n",
|
||||
"\n",
|
||||
"misp = PyMISP(misp_url, misp_key, misp_verifycert, debug=False)"
|
||||
"misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert, debug=False)"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -368,12 +368,53 @@
|
|||
"r"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Because reason"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
"source": [
|
||||
"tag_to_remove = 'foo'\n",
|
||||
"\n",
|
||||
"events = misp.search(tags=tag_to_remove, pythonify=True)\n",
|
||||
"\n",
|
||||
"for event in events:\n",
|
||||
" for tag in event.tags:\n",
|
||||
" if tag.name == tag_to_remove:\n",
|
||||
" print(f'Got {tag_to_remove} in {event.info}')\n",
|
||||
" misp.untag(event.uuid, tag_to_remove)\n",
|
||||
" break\n",
|
||||
" for attribute in event.attributes:\n",
|
||||
" for tag in attribute.tags:\n",
|
||||
" if tag.name == tag_to_remove:\n",
|
||||
" print(f'Got {tag_to_remove} in {attribute.value}')\n",
|
||||
" misp.untag(attribute.uuid, tag_to_remove)\n",
|
||||
" break"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"log = misp.search_logs(model='Tag', title=tag_to_remove)[0]\n",
|
||||
"roles = misp.get_roles_list()\n",
|
||||
"for r in roles:\n",
|
||||
" if r['Role']['name'] == 'User':\n",
|
||||
" new_role = r['Role']['id']\n",
|
||||
" break\n",
|
||||
"user = misp.get_user(log['Log']['user_id'])\n",
|
||||
"user['User']['role_id'] = new_role\n",
|
||||
"misp.edit_user(user['User']['id'], **user['User'])"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
|
@ -392,7 +433,7 @@
|
|||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.6.5"
|
||||
"version": "3.6.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
|
|
@ -498,7 +498,7 @@
|
|||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.6.5"
|
||||
"version": "3.6.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
|
|
@ -1730,7 +1730,7 @@ class PyMISP(object):
|
|||
"""Get the list of existing roles"""
|
||||
url = urljoin(self.root_url, '/roles')
|
||||
response = self._prepare_request('GET', url)
|
||||
return self._check_response(response)['response']
|
||||
return self._check_response(response)
|
||||
|
||||
# ############## Tags ##################
|
||||
|
||||
|
@ -2258,7 +2258,7 @@ class PyMISP(object):
|
|||
"""Returns the list of Object templates available on the MISP instance"""
|
||||
url = urljoin(self.root_url, 'objectTemplates')
|
||||
response = self._prepare_request('GET', url)
|
||||
return self._check_response(response)['response']
|
||||
return self._check_response(response)
|
||||
|
||||
def get_object_template_id(self, object_uuid):
|
||||
"""Gets the template ID corresponting the UUID passed as parameter"""
|
||||
|
|
|
@ -209,7 +209,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
category: Optional[SearchParameterTypes]=None,
|
||||
org: Optional[SearchParameterTypes]=None,
|
||||
tags: Optional[SearchParameterTypes]=None,
|
||||
quickfilter: Optional[bool]=None,
|
||||
quick_filter: Optional[str]=None, quickFilter: Optional[str]=None,
|
||||
date_from: Optional[DateTypes]=None,
|
||||
date_to: Optional[DateTypes]=None,
|
||||
eventid: Optional[SearchType]=None,
|
||||
|
@ -242,7 +242,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
:param category: The attribute category, any valid MISP attribute category is accepted.
|
||||
:param org: Search by the creator organisation by supplying the organisation identifier.
|
||||
:param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query`
|
||||
:param quickfilter: Enabling this (by passing "1" as the argument) will make the search ignore all of the other arguments, except for the auth key and value. MISP will return an xml / json (depending on the header sent) of all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value1 / value2 fields, or in the attribute comment.
|
||||
:param quick_filter: The string passed to this field will ignore all of the other arguments. MISP will return an xml / json (depending on the header sent) of all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value1 / value2 fields, or in the attribute comment.
|
||||
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
|
||||
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
|
||||
:param eventid: The events that should be included / excluded from the search
|
||||
|
@ -267,6 +267,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
|
||||
Deprecated:
|
||||
|
||||
:param quickFilter: synponym for quick_filter
|
||||
:param withAttachments: synonym for with_attachments
|
||||
:param last: synonym for publish_timestamp
|
||||
:param enforceWarninglist: synonym for enforce_warninglist
|
||||
|
@ -281,6 +282,8 @@ class ExpandedPyMISP(PyMISP):
|
|||
raise ValueError('controller has to be in {}'.format(', '.join(['events', 'attributes', 'objects'])))
|
||||
|
||||
# Deprecated stuff / synonyms
|
||||
if quickFilter is not None:
|
||||
quick_filter = quickFilter
|
||||
if withAttachments is not None:
|
||||
with_attachments = withAttachments
|
||||
if last is not None:
|
||||
|
@ -307,7 +310,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
query['category'] = category
|
||||
query['org'] = org
|
||||
query['tags'] = tags
|
||||
query['quickfilter'] = quickfilter
|
||||
query['quickFilter'] = quick_filter
|
||||
query['from'] = self.make_timestamp(date_from)
|
||||
query['to'] = self.make_timestamp(date_to)
|
||||
query['eventid'] = eventid
|
||||
|
@ -383,3 +386,41 @@ class ExpandedPyMISP(PyMISP):
|
|||
if line:
|
||||
to_return.append({fname: value for fname, value in zip(fieldnames, line)})
|
||||
return to_return
|
||||
|
||||
def search_logs(self, limit: Optional[int]=None, page: Optional[int]=None,
|
||||
log_id: Optional[int]=None, title: Optional[str]=None,
|
||||
created: Optional[DateTypes]=None, model: Optional[str]=None,
|
||||
action: Optional[str]=None, user_id: Optional[int]=None,
|
||||
change: Optional[str]=None, email: Optional[str]=None,
|
||||
org: Optional[str]=None, description: Optional[str]=None,
|
||||
ip: Optional[str]=None):
|
||||
'''Search in logs
|
||||
|
||||
Note: to run substring queries simply append/prepend/encapsulate the search term with %
|
||||
|
||||
:param limit: Limit the number of results returned, depending on the scope (for example 10 attributes or 10 full events).
|
||||
:param page: If a limit is set, sets the page to be returned. page 3, limit 100 will return records 201->300).
|
||||
:param log_id: Log ID
|
||||
:param title: Log Title
|
||||
:param created: Creation timestamp
|
||||
:param model: Model name that generated the log entry
|
||||
:param action: The thing that was done
|
||||
:param user_id: ID of the user doing the action
|
||||
:param change: Change that occured
|
||||
:param email: Email of the user
|
||||
:param org: Organisation of the User doing the action
|
||||
:param description: Description of the action
|
||||
:param ip: Origination IP of the User doing the action
|
||||
'''
|
||||
query = locals()
|
||||
query.pop('self')
|
||||
if log_id is not None:
|
||||
query['id'] = query.pop('log_id')
|
||||
|
||||
url = urljoin(self.root_url, 'admin/logs/index')
|
||||
# Remove None values.
|
||||
# TODO: put that in self._prepare_request
|
||||
query = {k: v for k, v in query.items() if v is not None}
|
||||
response = self._prepare_request('POST', url, data=json.dumps(query))
|
||||
normalized_response = self._check_response(response)
|
||||
return normalized_response
|
||||
|
|
Loading…
Reference in New Issue