mirror of https://github.com/MISP/PyMISP
commit
c64fd79313
|
@ -11,7 +11,9 @@ import re
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from urllib.parse import urljoin
|
from urllib.parse import urljoin
|
||||||
|
from urllib.parse import quote
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
from urllib import quote
|
||||||
from urlparse import urljoin
|
from urlparse import urljoin
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import zipfile
|
import zipfile
|
||||||
|
@ -46,6 +48,8 @@ class NewEventError(PyMISPError):
|
||||||
class NewAttributeError(PyMISPError):
|
class NewAttributeError(PyMISPError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class SearchError(PyMISPError):
|
||||||
|
pass
|
||||||
|
|
||||||
class MissingDependency(PyMISPError):
|
class MissingDependency(PyMISPError):
|
||||||
pass
|
pass
|
||||||
|
@ -170,6 +174,8 @@ class PyMISP(object):
|
||||||
raise PyMISPError('Unknown error: {}'.format(response.text))
|
raise PyMISPError('Unknown error: {}'.format(response.text))
|
||||||
|
|
||||||
errors = []
|
errors = []
|
||||||
|
if type(to_return) is list:
|
||||||
|
to_return = {'response':to_return}
|
||||||
if to_return.get('error'):
|
if to_return.get('error'):
|
||||||
if not isinstance(to_return['error'], list):
|
if not isinstance(to_return['error'], list):
|
||||||
errors.append(to_return['error'])
|
errors.append(to_return['error'])
|
||||||
|
@ -700,6 +706,51 @@ class PyMISP(object):
|
||||||
response = session.post(url, data=json.dumps(query))
|
response = session.post(url, data=json.dumps(query))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
|
def search_index(self, published=None, eventid = None, tag = None, datefrom = None,
|
||||||
|
dateto = None, eventinfo = None, threatlevel = None, distribution = None,
|
||||||
|
analysis = None, attribute = None, org=None):
|
||||||
|
"""
|
||||||
|
Search only at the index level. Use ! infront of value as NOT, default OR
|
||||||
|
|
||||||
|
:param published: Published (0,1)
|
||||||
|
:param eventid: Evend ID(s) | str or list
|
||||||
|
:param tag: Tag(s) | str or list
|
||||||
|
:param datefrom: First date, in format YYYY-MM-DD
|
||||||
|
:param datefrom: Last date, in format YYYY-MM-DD
|
||||||
|
:param eventinfo: Event info(s) to match | str or list
|
||||||
|
:param threatlevel: Threat level(s) (1,2,3,4) | str or list
|
||||||
|
:param distribution: Distribution level(s) (0,1,2,3) | str or list
|
||||||
|
:param analysis: Analysis level(s) (0,1,2) | str or list
|
||||||
|
:param org: Organisation(s) | str or list
|
||||||
|
|
||||||
|
"""
|
||||||
|
allowed = {'published':published, 'eventid':eventid, 'tag':tag, 'Dateto':dateto,
|
||||||
|
'Datefrom':datefrom, 'eventinfo':eventinfo, 'threatlevel':threatlevel,
|
||||||
|
'distribution':distribution, 'analysis':analysis, 'attribute':attribute,
|
||||||
|
'org':org }
|
||||||
|
rule_levels = {'distribution':["0","1","2","3","!0","!1","!2","!3"],
|
||||||
|
'threatlevel':["1","2","3","4","!1","!2","!3","!4"],
|
||||||
|
'analysis':["0","1","2","!0","!1","!2"]}
|
||||||
|
buildup_url = "events/index"
|
||||||
|
|
||||||
|
for rule in allowed.keys():
|
||||||
|
if allowed[rule] != None:
|
||||||
|
if type(allowed[rule])!=list:
|
||||||
|
allowed[rule]=[allowed[rule]]
|
||||||
|
allowed[rule] = map(str, allowed[rule])
|
||||||
|
if rule in rule_levels:
|
||||||
|
if not set(allowed[rule]).issubset(rule_levels[rule]):
|
||||||
|
raise SearchError('Values in your {} are invalid, has to be in {}'.format(rule, ', '.join(str(x) for x in rule_levels[rule])))
|
||||||
|
if type(allowed[rule]) == list:
|
||||||
|
joined = '|'.join(str(x) for x in allowed[rule])
|
||||||
|
buildup_url += '/search{}:{}'.format(rule, joined)
|
||||||
|
else:
|
||||||
|
buildup_url += '/search{}:{}'.format(rule, allowed[rule])
|
||||||
|
session = self.__prepare_session('json')
|
||||||
|
url = urljoin(self.root_url, buildup_url)
|
||||||
|
response = session.get(url)
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
def search_all(self, value):
|
def search_all(self, value):
|
||||||
query = {'value': value, 'searchall': 1}
|
query = {'value': value, 'searchall': 1}
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session('json')
|
||||||
|
|
Loading…
Reference in New Issue