mirror of https://github.com/MISP/PyMISP
Add an attributes to an event without sending the full existing event
parent
a7e66ca5db
commit
38f2dbe94d
|
@ -365,7 +365,7 @@ class PyMISP(object):
|
||||||
if not self._valid_uuid(uuid):
|
if not self._valid_uuid(uuid):
|
||||||
raise PyMISPError('Invalid UUID')
|
raise PyMISPError('Invalid UUID')
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
to_post = {'uuid':uuid, 'tag':tag}
|
to_post = {'uuid': uuid, 'tag': tag}
|
||||||
path = 'tags/attachTagToObject'
|
path = 'tags/attachTagToObject'
|
||||||
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
@ -374,7 +374,7 @@ class PyMISP(object):
|
||||||
if not self._valid_uuid(uuid):
|
if not self._valid_uuid(uuid):
|
||||||
raise PyMISPError('Invalid UUID')
|
raise PyMISPError('Invalid UUID')
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
to_post = {'uuid':uuid, 'tag':tag}
|
to_post = {'uuid': uuid, 'tag': tag}
|
||||||
path = 'tags/removeTagFromObject'
|
path = 'tags/removeTagFromObject'
|
||||||
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
@ -382,24 +382,30 @@ class PyMISP(object):
|
||||||
# ##### File attributes #####
|
# ##### File attributes #####
|
||||||
|
|
||||||
def _send_attributes(self, event, attributes, proposal=False):
|
def _send_attributes(self, event, attributes, proposal=False):
|
||||||
# FIXME: unable to send a proposal if we have a full event.
|
eventID_to_update = None
|
||||||
if isinstance(event, MISPEvent):
|
if isinstance(event, MISPEvent):
|
||||||
event.attributes += attributes
|
if hasattr(event, 'id'):
|
||||||
response = self.update(event)
|
eventID_to_update = event.id
|
||||||
|
elif hasattr(event, 'uuid'):
|
||||||
|
eventID_to_update = event.uuid
|
||||||
elif isinstance(event, int) or (isinstance(event, str) and (event.isdigit() or self._valid_uuid(event))):
|
elif isinstance(event, int) or (isinstance(event, str) and (event.isdigit() or self._valid_uuid(event))):
|
||||||
# No full event, just an ID
|
eventID_to_update = event
|
||||||
session = self.__prepare_session()
|
|
||||||
url = urljoin(self.root_url, 'attributes/add/{}'.format(event))
|
|
||||||
for a in attributes:
|
|
||||||
if proposal:
|
|
||||||
response = self.proposal_add(event, json.dumps(a, cls=EncodeUpdate))
|
|
||||||
else:
|
|
||||||
response = session.post(url, data=json.dumps(a, cls=EncodeUpdate))
|
|
||||||
else:
|
else:
|
||||||
e = MISPEvent(self.describe_types)
|
e = MISPEvent(self.describe_types)
|
||||||
e.load(event)
|
e.load(event)
|
||||||
e.attributes += attributes
|
if hasattr(e, 'id'):
|
||||||
response = self.update(e)
|
eventID_to_update = e.id
|
||||||
|
elif hasattr(e, 'uuid'):
|
||||||
|
eventID_to_update = e.uuid
|
||||||
|
if eventID_to_update is None:
|
||||||
|
raise PyMISPError("Unable to find the ID of the event to update")
|
||||||
|
for a in attributes:
|
||||||
|
if proposal:
|
||||||
|
response = self.proposal_add(eventID_to_update, json.dumps(a, cls=EncodeUpdate))
|
||||||
|
else:
|
||||||
|
session = self.__prepare_session()
|
||||||
|
url = urljoin(self.root_url, 'attributes/add/{}'.format(eventID_to_update))
|
||||||
|
response = session.post(url, data=json.dumps(a, cls=EncodeUpdate))
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs):
|
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs):
|
||||||
|
|
|
@ -155,11 +155,11 @@ class TestOffline(unittest.TestCase):
|
||||||
self.initURI(m)
|
self.initURI(m)
|
||||||
p = MockPyMISP(self.domain, self.key)
|
p = MockPyMISP(self.domain, self.key)
|
||||||
evt = p.get(1)
|
evt = p.get(1)
|
||||||
self.assertEquals(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940',
|
self.assertEqual(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940',
|
||||||
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
|
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
|
||||||
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b',
|
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b',
|
||||||
filename='foobar.exe'))
|
filename='foobar.exe'))
|
||||||
self.assertEquals(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940',
|
self.assertEqual(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940',
|
||||||
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
|
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
|
||||||
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b'))
|
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b'))
|
||||||
p.av_detection_link(evt, 'https://foocorp.com')
|
p.av_detection_link(evt, 'https://foocorp.com')
|
||||||
|
@ -178,29 +178,29 @@ class TestOffline(unittest.TestCase):
|
||||||
self.assertRaises(pm.PyMISPError, p.add_pattern, evt, '.*foobar.*', in_memory=False, in_file=False)
|
self.assertRaises(pm.PyMISPError, p.add_pattern, evt, '.*foobar.*', in_memory=False, in_file=False)
|
||||||
p.add_pipe(evt, 'foo')
|
p.add_pipe(evt, 'foo')
|
||||||
p.add_pipe(evt, '\\.\\pipe\\foo')
|
p.add_pipe(evt, '\\.\\pipe\\foo')
|
||||||
self.assertEquals(3, p.add_pipe(evt, ['foo', 'bar', 'baz']))
|
self.assertEqual(3, p.add_pipe(evt, ['foo', 'bar', 'baz']))
|
||||||
self.assertEquals(3, p.add_pipe(evt, ['foo', 'bar', '\\.\\pipe\\baz']))
|
self.assertEqual(3, p.add_pipe(evt, ['foo', 'bar', '\\.\\pipe\\baz']))
|
||||||
p.add_mutex(evt, 'foo')
|
p.add_mutex(evt, 'foo')
|
||||||
self.assertEquals(1, p.add_mutex(evt, '\\BaseNamedObjects\\foo'))
|
self.assertEqual(1, p.add_mutex(evt, '\\BaseNamedObjects\\foo'))
|
||||||
self.assertEquals(3, p.add_mutex(evt, ['foo', 'bar', 'baz']))
|
self.assertEqual(3, p.add_mutex(evt, ['foo', 'bar', 'baz']))
|
||||||
self.assertEquals(3, p.add_mutex(evt, ['foo', 'bar', '\\BaseNamedObjects\\baz']))
|
self.assertEqual(3, p.add_mutex(evt, ['foo', 'bar', '\\BaseNamedObjects\\baz']))
|
||||||
p.add_yara(evt, 'rule Foo {}')
|
p.add_yara(evt, 'rule Foo {}')
|
||||||
self.assertEquals(2, p.add_yara(evt, ['rule Foo {}', 'rule Bar {}']))
|
self.assertEqual(2, p.add_yara(evt, ['rule Foo {}', 'rule Bar {}']))
|
||||||
p.add_ipdst(evt, '1.2.3.4')
|
p.add_ipdst(evt, '1.2.3.4')
|
||||||
self.assertEquals(2, p.add_ipdst(evt, ['1.2.3.4', '5.6.7.8']))
|
self.assertEqual(2, p.add_ipdst(evt, ['1.2.3.4', '5.6.7.8']))
|
||||||
p.add_ipsrc(evt, '1.2.3.4')
|
p.add_ipsrc(evt, '1.2.3.4')
|
||||||
self.assertEquals(2, p.add_ipsrc(evt, ['1.2.3.4', '5.6.7.8']))
|
self.assertEqual(2, p.add_ipsrc(evt, ['1.2.3.4', '5.6.7.8']))
|
||||||
p.add_hostname(evt, 'a.foobar.com')
|
p.add_hostname(evt, 'a.foobar.com')
|
||||||
self.assertEquals(2, p.add_hostname(evt, ['a.foobar.com', 'a.foobaz.com']))
|
self.assertEqual(2, p.add_hostname(evt, ['a.foobar.com', 'a.foobaz.com']))
|
||||||
p.add_domain(evt, 'foobar.com')
|
p.add_domain(evt, 'foobar.com')
|
||||||
self.assertEquals(2, p.add_domain(evt, ['foobar.com', 'foobaz.com']))
|
self.assertEqual(2, p.add_domain(evt, ['foobar.com', 'foobaz.com']))
|
||||||
p.add_domain_ip(evt, 'foo.com', '1.2.3.4')
|
p.add_domain_ip(evt, 'foo.com', '1.2.3.4')
|
||||||
self.assertEquals(2, p.add_domain_ip(evt, 'foo.com', ['1.2.3.4', '5.6.7.8']))
|
self.assertEqual(2, p.add_domain_ip(evt, 'foo.com', ['1.2.3.4', '5.6.7.8']))
|
||||||
self.assertEquals(2, p.add_domains_ips(evt, {'foo.com': '1.2.3.4', 'bar.com': '4.5.6.7'}))
|
self.assertEqual(2, p.add_domains_ips(evt, {'foo.com': '1.2.3.4', 'bar.com': '4.5.6.7'}))
|
||||||
p.add_url(evt, 'https://example.com')
|
p.add_url(evt, 'https://example.com')
|
||||||
self.assertEquals(2, p.add_url(evt, ['https://example.com', 'http://foo.com']))
|
self.assertEqual(2, p.add_url(evt, ['https://example.com', 'http://foo.com']))
|
||||||
p.add_useragent(evt, 'Mozilla')
|
p.add_useragent(evt, 'Mozilla')
|
||||||
self.assertEquals(2, p.add_useragent(evt, ['Mozilla', 'Godzilla']))
|
self.assertEqual(2, p.add_useragent(evt, ['Mozilla', 'Godzilla']))
|
||||||
p.add_traffic_pattern(evt, 'blabla')
|
p.add_traffic_pattern(evt, 'blabla')
|
||||||
p.add_snort(evt, 'blaba')
|
p.add_snort(evt, 'blaba')
|
||||||
p.add_net_other(evt, 'blabla')
|
p.add_net_other(evt, 'blabla')
|
||||||
|
|
Loading…
Reference in New Issue