mirror of https://github.com/MISP/PyMISP
Merge branch 'master' of https://github.com/MISP/PyMISP
commit
03089ea7da
|
@ -31,6 +31,6 @@ if __name__ == '__main__':
|
|||
attribute = temp
|
||||
break
|
||||
|
||||
misp.add_tag(attribute, args.tag, True)
|
||||
misp.add_tag(attribute, args.tag, attribute=True)
|
||||
else:
|
||||
misp.add_tag(event['Event'], args.tag)
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
'''
|
||||
YARA dumper for MISP
|
||||
by Christophe Vandeplas
|
||||
'''
|
||||
|
||||
import keys
|
||||
from pymisp import PyMISP
|
||||
import yara
|
||||
import re
|
||||
|
||||
|
||||
def dirty_cleanup(value):
|
||||
changed = False
|
||||
substitutions = (('”', '"'),
|
||||
('“', '"'),
|
||||
('″', '"'),
|
||||
('`', "'"),
|
||||
('\r', '')
|
||||
# ('$ ', '$'), # this breaks rules
|
||||
# ('\t\t', '\n'), # this breaks rules
|
||||
)
|
||||
for substitution in substitutions:
|
||||
if substitution[0] in value:
|
||||
changed = True
|
||||
value = value.replace(substitution[0], substitution[1])
|
||||
return value, changed
|
||||
|
||||
|
||||
misp = PyMISP(keys.misp_url, keys.misp_key, keys.misp_verify, 'json')
|
||||
result = misp.search(controller='attributes', type_attribute='yara')
|
||||
|
||||
attr_cnt = 0
|
||||
attr_cnt_invalid = 0
|
||||
attr_cnt_duplicate = 0
|
||||
attr_cnt_changed = 0
|
||||
yara_rules = []
|
||||
yara_rule_names = []
|
||||
if 'response' in result and 'Attribute' in result['response']:
|
||||
for attribute in result['response']['Attribute']:
|
||||
value = attribute['value']
|
||||
event_id = attribute['event_id']
|
||||
attribute_id = attribute['id']
|
||||
|
||||
value = re.sub('^[ \t]*rule ', 'rule misp_e{}_'.format(event_id), value, flags=re.MULTILINE)
|
||||
value, changed = dirty_cleanup(value)
|
||||
if changed:
|
||||
attr_cnt_changed += 1
|
||||
if 'global rule' in value: # refuse any global rules as they might disable everything
|
||||
continue
|
||||
|
||||
# compile the yara rule to confirm it's validity
|
||||
# if valid, ignore duplicate rules
|
||||
try:
|
||||
attr_cnt += 1
|
||||
yara.compile(source=value)
|
||||
yara_rules.append(value)
|
||||
# print("Rule e{} a{} OK".format(event_id, attribute_id))
|
||||
except yara.SyntaxError as e:
|
||||
attr_cnt_invalid += 1
|
||||
# print("Rule e{} a{} NOK - {}".format(event_id, attribute_id, e))
|
||||
except yara.Error as e:
|
||||
attr_cnt_invalid += 1
|
||||
print(e)
|
||||
import traceback
|
||||
print(traceback.format_exc())
|
||||
|
||||
# remove duplicates - process the full yara rule list and process errors to eliminate duplicate rule names
|
||||
all_yara_rules = '\n'.join(yara_rules)
|
||||
while True:
|
||||
try:
|
||||
yara.compile(source=all_yara_rules)
|
||||
except yara.SyntaxError as e:
|
||||
if 'duplicated identifier' in e.args[0]:
|
||||
duplicate_rule_names = re.findall('duplicated identifier "(.*)"', e.args[0])
|
||||
for item in duplicate_rule_names:
|
||||
all_yara_rules = all_yara_rules.replace('rule {}'.format(item), 'rule duplicate_{}'.format(item), 1)
|
||||
attr_cnt_duplicate += 1
|
||||
continue
|
||||
else:
|
||||
# This should never happen as all rules were processed before separately. So logically we should only have duplicates.
|
||||
exit("ERROR SyntaxError in rules: {}".format(e.args))
|
||||
break
|
||||
|
||||
# save to a file
|
||||
fname = 'misp.yara'
|
||||
with open(fname, 'w') as f_out:
|
||||
f_out.write(all_yara_rules)
|
||||
|
||||
print("")
|
||||
print("MISP attributes with YARA rules: total={} valid={} invalid={} duplicate={} changed={}.".format(attr_cnt, attr_cnt - attr_cnt_invalid, attr_cnt_invalid, attr_cnt_duplicate, attr_cnt_changed))
|
||||
print("Valid YARA rule file save to file '{}'. Invalid rules/attributes were ignored.".format(fname))
|
|
@ -1,4 +1,4 @@
|
|||
__version__ = '2.4.56'
|
||||
__version__ = '2.4.63'
|
||||
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey
|
||||
from .api import PyMISP
|
||||
|
|
252
pymisp/api.py
252
pymisp/api.py
|
@ -100,18 +100,19 @@ class PyMISP(object):
|
|||
|
||||
try:
|
||||
# Make sure the MISP instance is working and the URL is valid
|
||||
response = self.get_version()
|
||||
misp_version = response['version'].split('.')
|
||||
pymisp_version = __version__.split('.')
|
||||
for a, b in zip(misp_version, pymisp_version):
|
||||
if a == b:
|
||||
continue
|
||||
elif a < b:
|
||||
warnings.warn("Remote MISP instance (v{}) older than PyMISP (v{}). You should update your MISP instance, or install an older PyMISP version.".format(response['version'], __version__))
|
||||
else: # a > b
|
||||
# NOTE: That can happen and should not be blocking
|
||||
warnings.warn("Remote MISP instance (v{}) newer than PyMISP (v{}). Please check if a newer version of PyMISP is available.".format(response['version'], __version__))
|
||||
continue
|
||||
response = self.get_recommended_api_version()
|
||||
if not response.get('version'):
|
||||
warnings.warn("Unable to check the recommended PyMISP version (MISP <2.4.60), please upgrade.")
|
||||
else:
|
||||
recommended_pymisp_version = response['version'].split('.')
|
||||
for a, b in zip(pymisp_version, recommended_pymisp_version):
|
||||
if a == b:
|
||||
continue
|
||||
elif a > b:
|
||||
warnings.warn("The version of PyMISP recommended by the MISP instance ({}) is older than the one you're using now ({}). Please upgrade the MISP instance or use an older PyMISP version.".format(response['version'], __version__))
|
||||
else: # a < b
|
||||
warnings.warn("The version of PyMISP recommended by the MISP instance ({}) is newer than the one you're using now ({}). Please upgrade PyMISP.".format(response['version'], __version__))
|
||||
|
||||
except Exception as e:
|
||||
raise PyMISPError('Unable to connect to MISP ({}). Please make sure the API key and the URL are correct (http/https is required): {}'.format(self.root_url, e))
|
||||
|
@ -127,7 +128,8 @@ class PyMISP(object):
|
|||
if not self.describe_types.get('sane_defaults'):
|
||||
raise PyMISPError('The MISP server your are trying to reach is outdated (<2.4.52). Please use PyMISP v2.4.51.1 (pip install -I PyMISP==v2.4.51.1) and/or contact your administrator.')
|
||||
except:
|
||||
describe_types = json.load(open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r'))
|
||||
with open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r') as f:
|
||||
describe_types = json.load(f)
|
||||
self.describe_types = describe_types['result']
|
||||
|
||||
self.categories = self.describe_types['categories']
|
||||
|
@ -173,7 +175,7 @@ class PyMISP(object):
|
|||
for e in errors:
|
||||
if not e:
|
||||
continue
|
||||
if isinstance(e, str):
|
||||
if isinstance(e, basestring):
|
||||
messages.append(e)
|
||||
continue
|
||||
for type_e, msgs in e.items():
|
||||
|
@ -348,18 +350,18 @@ class PyMISP(object):
|
|||
if e.published:
|
||||
return {'error': 'Already published'}
|
||||
e.publish()
|
||||
return self.update(event)
|
||||
return self.update(e)
|
||||
|
||||
def change_threat_level(self, event, threat_level_id):
|
||||
e = self._make_mispevent(event)
|
||||
e.threat_level_id = threat_level_id
|
||||
return self.update(event)
|
||||
return self.update(e)
|
||||
|
||||
def change_sharing_group(self, event, sharing_group_id):
|
||||
e = self._make_mispevent(event)
|
||||
e.distribution = 4 # Needs to be 'Sharing group'
|
||||
e.sharing_group_id = sharing_group_id
|
||||
return self.update(event)
|
||||
return self.update(e)
|
||||
|
||||
def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False, orgc_id=None, org_id=None, sharing_group_id=None):
|
||||
misp_event = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published, orgc_id, org_id, sharing_group_id)
|
||||
|
@ -372,6 +374,9 @@ class PyMISP(object):
|
|||
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
|
||||
path = 'attributes/addTag'
|
||||
else:
|
||||
# Allow for backwards-compat with old style
|
||||
if "Event" in event:
|
||||
event = event["Event"]
|
||||
to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}}
|
||||
path = 'events/addTag'
|
||||
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
||||
|
@ -400,6 +405,7 @@ class PyMISP(object):
|
|||
regex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}\Z', re.I)
|
||||
match = regex.match(uuid)
|
||||
return bool(match)
|
||||
|
||||
# ##### File attributes #####
|
||||
|
||||
def _send_attributes(self, event, attributes, proposal=False):
|
||||
|
@ -409,7 +415,7 @@ class PyMISP(object):
|
|||
e = MISPEvent(self.describe_types)
|
||||
e.load(event)
|
||||
e.attributes += attributes
|
||||
response = self.update(event)
|
||||
response = self.update(e)
|
||||
return response
|
||||
|
||||
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs):
|
||||
|
@ -462,7 +468,7 @@ class PyMISP(object):
|
|||
# It's a file handle - we can read it
|
||||
fileData = attachment.read()
|
||||
|
||||
elif isinstance(attachment, str):
|
||||
elif isinstance(attachment, basestring):
|
||||
# It can either be the b64 encoded data or a file path
|
||||
if os.path.exists(attachment):
|
||||
# It's a path!
|
||||
|
@ -1040,6 +1046,13 @@ class PyMISP(object):
|
|||
else:
|
||||
return {'error': 'Impossible to retrieve the version of the master branch.'}
|
||||
|
||||
def get_recommended_api_version(self):
|
||||
"""Returns the recommended API version from the server"""
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'servers/getPyMISPVersion.json')
|
||||
response = session.get(url)
|
||||
return self._check_response(response)
|
||||
|
||||
def get_version(self):
|
||||
"""Returns the version of the instance."""
|
||||
session = self.__prepare_session()
|
||||
|
@ -1058,10 +1071,10 @@ class PyMISP(object):
|
|||
|
||||
# ############## Export Attributes in text ####################################
|
||||
|
||||
def get_all_attributes_txt(self, type_attr):
|
||||
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported."""
|
||||
def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False):
|
||||
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise."""
|
||||
session = self.__prepare_session('txt')
|
||||
url = urljoin(self.root_url, 'attributes/text/download/%s' % type_attr)
|
||||
url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished))
|
||||
response = session.get(url)
|
||||
return response
|
||||
|
||||
|
@ -1110,7 +1123,8 @@ class PyMISP(object):
|
|||
|
||||
def sighting_per_json(self, json_file):
|
||||
session = self.__prepare_session()
|
||||
jdata = json.load(open(json_file))
|
||||
with open(json_file) as f:
|
||||
jdata = json.load(f)
|
||||
url = urljoin(self.root_url, 'sightings/add/')
|
||||
response = session.post(url, data=json.dumps(jdata))
|
||||
return self._check_response(response)
|
||||
|
@ -1125,45 +1139,42 @@ class PyMISP(object):
|
|||
|
||||
# ############## Users ##################
|
||||
|
||||
def _set_user_parameters(self, email, org_id, role_id, password, external_auth_required,
|
||||
external_auth_key, enable_password, nids_sid, server_id,
|
||||
gpgkey, certif_public, autoalert, contactalert, disabled,
|
||||
change_pw, termsaccepted, newsread):
|
||||
def _set_user_parameters(self, **kwargs):
|
||||
user = {}
|
||||
if email is not None:
|
||||
user['email'] = email
|
||||
if org_id is not None:
|
||||
user['org_id'] = org_id
|
||||
if role_id is not None:
|
||||
user['role_id'] = role_id
|
||||
if password is not None:
|
||||
user['password'] = password
|
||||
if external_auth_required is not None:
|
||||
user['external_auth_required'] = external_auth_required
|
||||
if external_auth_key is not None:
|
||||
user['external_auth_key'] = external_auth_key
|
||||
if enable_password is not None:
|
||||
user['enable_password'] = enable_password
|
||||
if nids_sid is not None:
|
||||
user['nids_sid'] = nids_sid
|
||||
if server_id is not None:
|
||||
user['server_id'] = server_id
|
||||
if gpgkey is not None:
|
||||
user['gpgkey'] = gpgkey
|
||||
if certif_public is not None:
|
||||
user['certif_public'] = certif_public
|
||||
if autoalert is not None:
|
||||
user['autoalert'] = autoalert
|
||||
if contactalert is not None:
|
||||
user['contactalert'] = contactalert
|
||||
if disabled is not None:
|
||||
user['disabled'] = disabled
|
||||
if change_pw is not None:
|
||||
user['change_pw'] = change_pw
|
||||
if termsaccepted is not None:
|
||||
user['termsaccepted'] = termsaccepted
|
||||
if newsread is not None:
|
||||
user['newsread'] = newsread
|
||||
if kwargs.get('email'):
|
||||
user['email'] = kwargs.get('email')
|
||||
if kwargs.get('org_id'):
|
||||
user['org_id'] = kwargs.get('org_id')
|
||||
if kwargs.get('role_id'):
|
||||
user['role_id'] = kwargs.get('role_id')
|
||||
if kwargs.get('password'):
|
||||
user['password'] = kwargs.get('password')
|
||||
if kwargs.get('external_auth_required'):
|
||||
user['external_auth_required'] = kwargs.get('external_auth_required')
|
||||
if kwargs.get('external_auth_key'):
|
||||
user['external_auth_key'] = kwargs.get('external_auth_key')
|
||||
if kwargs.get('enable_password'):
|
||||
user['enable_password'] = kwargs.get('enable_password')
|
||||
if kwargs.get('nids_sid'):
|
||||
user['nids_sid'] = kwargs.get('nids_sid')
|
||||
if kwargs.get('server_id'):
|
||||
user['server_id'] = kwargs.get('server_id')
|
||||
if kwargs.get('gpgkey'):
|
||||
user['gpgkey'] = kwargs.get('gpgkey')
|
||||
if kwargs.get('certif_public'):
|
||||
user['certif_public'] = kwargs.get('certif_public')
|
||||
if kwargs.get('autoalert'):
|
||||
user['autoalert'] = kwargs.get('autoalert')
|
||||
if kwargs.get('contactalert'):
|
||||
user['contactalert'] = kwargs.get('contactalert')
|
||||
if kwargs.get('disabled'):
|
||||
user['disabled'] = kwargs.get('disabled')
|
||||
if kwargs.get('change_pw'):
|
||||
user['change_pw'] = kwargs.get('change_pw')
|
||||
if kwargs.get('termsaccepted'):
|
||||
user['termsaccepted'] = kwargs.get('termsaccepted')
|
||||
if kwargs.get('newsread'):
|
||||
user['newsread'] = kwargs.get('newsread')
|
||||
return user
|
||||
|
||||
def get_users_list(self):
|
||||
|
@ -1178,18 +1189,8 @@ class PyMISP(object):
|
|||
response = session.get(url)
|
||||
return self._check_response(response)
|
||||
|
||||
def add_user(self, email, org_id, role_id, password=None,
|
||||
external_auth_required=None, external_auth_key=None,
|
||||
enable_password=None, nids_sid=None, server_id=None,
|
||||
gpgkey=None, certif_public=None, autoalert=None,
|
||||
contactalert=None, disabled=None, change_pw=None,
|
||||
termsaccepted=None, newsread=None):
|
||||
new_user = self._set_user_parameters(email, org_id, role_id, password,
|
||||
external_auth_required, external_auth_key,
|
||||
enable_password, nids_sid, server_id,
|
||||
gpgkey, certif_public, autoalert,
|
||||
contactalert, disabled, change_pw,
|
||||
termsaccepted, newsread)
|
||||
def add_user(self, email, org_id, role_id, **kwargs):
|
||||
new_user = self._set_user_parameters(**dict(email=email, org_id=org_id, role_id=role_id, **kwargs))
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'admin/users/add/')
|
||||
response = session.post(url, data=json.dumps(new_user))
|
||||
|
@ -1197,30 +1198,20 @@ class PyMISP(object):
|
|||
|
||||
def add_user_json(self, json_file):
|
||||
session = self.__prepare_session()
|
||||
jdata = json.load(open(json_file))
|
||||
with open(json_file) as f:
|
||||
jdata = json.load(f)
|
||||
url = urljoin(self.root_url, 'admin/users/add/')
|
||||
response = session.post(url, data=json.dumps(jdata))
|
||||
return self._check_response(response)
|
||||
|
||||
def get_add_user_fields_list(self):
|
||||
def get_user_fields_list(self):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'admin/users/add/')
|
||||
response = session.get(url)
|
||||
return self._check_response(response)
|
||||
|
||||
def edit_user(self, user_id, email=None, org_id=None, role_id=None,
|
||||
password=None, external_auth_required=None,
|
||||
external_auth_key=None, enable_password=None, nids_sid=None,
|
||||
server_id=None, gpgkey=None, certif_public=None,
|
||||
autoalert=None, contactalert=None, disabled=None,
|
||||
change_pw=None, termsaccepted=None, newsread=None):
|
||||
edit_user = self._set_user_parameters(email, org_id, role_id, password,
|
||||
external_auth_required, external_auth_key,
|
||||
enable_password, nids_sid, server_id,
|
||||
gpgkey, certif_public, autoalert,
|
||||
contactalert, disabled, change_pw,
|
||||
termsaccepted, newsread)
|
||||
|
||||
def edit_user(self, user_id, **kwargs):
|
||||
edit_user = self._set_user_parameters(**kwargs)
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
||||
response = session.post(url, data=json.dumps(edit_user))
|
||||
|
@ -1228,19 +1219,92 @@ class PyMISP(object):
|
|||
|
||||
def edit_user_json(self, json_file, user_id):
|
||||
session = self.__prepare_session()
|
||||
jdata = json.load(open(json_file))
|
||||
with open(json_file) as f:
|
||||
jdata = json.load(f)
|
||||
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
||||
response = session.post(url, data=json.dumps(jdata))
|
||||
return self._check_response(response)
|
||||
|
||||
def get_edit_user_fields_list(self, user_id):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
||||
response = session.get(url)
|
||||
return self._check_response(response)
|
||||
|
||||
def delete_user(self, user_id):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'admin/users/delete/{}'.format(user_id))
|
||||
response = session.post(url)
|
||||
return self._check_response(response)
|
||||
|
||||
# ############## Organisations ##################
|
||||
|
||||
def _set_organisation_parameters(self, **kwargs):
|
||||
organisation = {}
|
||||
if kwargs.get('name'):
|
||||
organisation['name'] = kwargs.get('name')
|
||||
if kwargs.get('anonymise'):
|
||||
organisation['anonymise'] = kwargs.get('anonymise')
|
||||
if kwargs.get('description'):
|
||||
organisation['description'] = kwargs.get('description')
|
||||
if kwargs.get('type'):
|
||||
organisation['type'] = kwargs.get('type')
|
||||
if kwargs.get('nationality'):
|
||||
organisation['nationality'] = kwargs.get('nationality')
|
||||
if kwargs.get('sector'):
|
||||
organisation['sector'] = kwargs.get('sector')
|
||||
if kwargs.get('uuid'):
|
||||
organisation['uuid'] = kwargs.get('uuid')
|
||||
if kwargs.get('contacts'):
|
||||
organisation['contacts'] = kwargs.get('contacts')
|
||||
if kwargs.get('local'):
|
||||
organisation['local'] = kwargs.get('local')
|
||||
return organisation
|
||||
|
||||
def get_organisations_list(self):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'organisations')
|
||||
response = session.get(url)
|
||||
return self._check_response(response)['response']
|
||||
|
||||
def get_organisation(self, organisation_id):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'organisations/view/{}'.format(organisation_id))
|
||||
response = session.get(url)
|
||||
return self._check_response(response)
|
||||
|
||||
def add_organisation(self, name, **kwargs):
|
||||
new_org = self._set_organisation_parameters(**dict(name=name, **kwargs))
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||
response = session.post(url, data=json.dumps(new_org))
|
||||
return self._check_response(response)
|
||||
|
||||
def add_organisation_json(self, json_file):
|
||||
session = self.__prepare_session()
|
||||
with open(json_file) as f:
|
||||
jdata = json.load(f)
|
||||
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||
response = session.post(url, data=json.dumps(jdata))
|
||||
return self._check_response(response)
|
||||
|
||||
def get_organisation_fields_list(self):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||
response = session.get(url)
|
||||
return self._check_response(response)
|
||||
|
||||
def edit_organisation(self, org_id, **kwargs):
|
||||
edit_org = self._set_organisation_parameters(**kwargs)
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
||||
response = session.post(url, data=json.dumps(edit_org))
|
||||
return self._check_response(response)
|
||||
|
||||
def edit_organisation_json(self, json_file, org_id):
|
||||
session = self.__prepare_session()
|
||||
with open(json_file) as f:
|
||||
jdata = json.load(f)
|
||||
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
||||
response = session.post(url, data=json.dumps(jdata))
|
||||
return self._check_response(response)
|
||||
|
||||
def delete_organisation(self, org_id):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'admin/organisations/delete/{}'.format(org_id))
|
||||
response = session.post(url)
|
||||
return self._check_response(response)
|
||||
|
|
|
@ -101,6 +101,9 @@ class MISPAttribute(object):
|
|||
def delete(self):
|
||||
self.deleted = True
|
||||
|
||||
def add_tag(self, tag):
|
||||
self.Tag.append({'name': tag})
|
||||
|
||||
def verify(self, gpg_uid):
|
||||
if not has_pyme:
|
||||
raise Exception('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
|
||||
|
@ -174,7 +177,7 @@ class MISPAttribute(object):
|
|||
if kwargs.get('sig'):
|
||||
self.sig = kwargs['sig']
|
||||
if kwargs.get('Tag'):
|
||||
self.Tag = kwargs['Tag']
|
||||
self.Tag = [t for t in kwargs['Tag'] if t]
|
||||
|
||||
# If the user wants to disable correlation, let them. Defaults to False.
|
||||
self.disable_correlation = kwargs.get("disable_correlation", False)
|
||||
|
@ -214,6 +217,8 @@ class MISPAttribute(object):
|
|||
to_return = {'type': self.type, 'category': self.category, 'to_ids': self.to_ids,
|
||||
'distribution': self.distribution, 'value': self.value,
|
||||
'comment': self.comment, 'disable_correlation': self.disable_correlation}
|
||||
if self.uuid:
|
||||
to_return['uuid'] = self.uuid
|
||||
if self.sig:
|
||||
to_return['sig'] = self.sig
|
||||
if self.sharing_group_id:
|
||||
|
@ -231,9 +236,8 @@ class MISPAttribute(object):
|
|||
to_return = self._json()
|
||||
if self.id:
|
||||
to_return['id'] = self.id
|
||||
if self.uuid:
|
||||
to_return['uuid'] = self.uuid
|
||||
if self.timestamp:
|
||||
# Should never be set on an update, MISP will automatically set it to now
|
||||
to_return['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
|
||||
if self.deleted is not None:
|
||||
to_return['deleted'] = self.deleted
|
||||
|
@ -481,7 +485,7 @@ class MISPEvent(object):
|
|||
if kwargs.get('Galaxy'):
|
||||
self.Galaxy = kwargs['Galaxy']
|
||||
if kwargs.get('Tag'):
|
||||
self.Tag = kwargs['Tag']
|
||||
self.Tag = [t for t in kwargs['Tag'] if t]
|
||||
if kwargs.get('sig'):
|
||||
self.sig = kwargs['sig']
|
||||
if kwargs.get('global_sig'):
|
||||
|
@ -542,6 +546,7 @@ class MISPEvent(object):
|
|||
if self.publish_timestamp:
|
||||
to_return['Event']['publish_timestamp'] = int(time.mktime(self.publish_timestamp.timetuple()))
|
||||
if self.timestamp:
|
||||
# Should never be set on an update, MISP will automatically set it to now
|
||||
to_return['Event']['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
|
||||
to_return['Event'] = _int_to_str(to_return['Event'])
|
||||
if self.attributes:
|
||||
|
@ -549,6 +554,19 @@ class MISPEvent(object):
|
|||
jsonschema.validate(to_return, self.json_schema)
|
||||
return to_return
|
||||
|
||||
def add_tag(self, tag):
|
||||
self.Tag.append({'name': tag})
|
||||
|
||||
def add_attribute_tag(self, tag, attribute_identifier):
|
||||
attribute = None
|
||||
for a in self.attributes:
|
||||
if a.id == attribute_identifier or a.uuid == attribute_identifier or attribute_identifier in a.value:
|
||||
a.add_tag(tag)
|
||||
attribute = a
|
||||
if not attribute:
|
||||
raise Exception('No attribute with identifier {} found.'.format(attribute_identifier))
|
||||
return attribute
|
||||
|
||||
def publish(self):
|
||||
self.published = True
|
||||
|
||||
|
|
|
@ -38,7 +38,8 @@ class TestOffline(unittest.TestCase):
|
|||
|
||||
def initURI(self, m):
|
||||
m.register_uri('GET', self.domain + 'events/1', json=self.auth_error_msg, status_code=403)
|
||||
m.register_uri('GET', self.domain + 'servers/getVersion.json', json={"version": "2.4.56"})
|
||||
m.register_uri('GET', self.domain + 'servers/getVersion.json', json={"version": "2.4.62"})
|
||||
m.register_uri('GET', self.domain + 'servers/getPyMISPVersion.json', json={"version": "2.4.62"})
|
||||
m.register_uri('GET', self.domain + 'sharing_groups.json', json=self.sharing_groups)
|
||||
m.register_uri('GET', self.domain + 'attributes/describeTypes.json', json=self.types)
|
||||
m.register_uri('GET', self.domain + 'events/2', json=self.event)
|
||||
|
@ -97,7 +98,7 @@ class TestOffline(unittest.TestCase):
|
|||
api_version = pymisp.get_api_version()
|
||||
self.assertEqual(api_version, {'version': pm.__version__})
|
||||
server_version = pymisp.get_version()
|
||||
self.assertEqual(server_version, {"version": "2.4.56"})
|
||||
self.assertEqual(server_version, {"version": "2.4.62"})
|
||||
|
||||
def test_getSharingGroups(self, m):
|
||||
self.initURI(m)
|
||||
|
|
Loading…
Reference in New Issue