mirror of https://github.com/MISP/misp-modules
Make teh special attributes *_ip and _domain not needed.
See the discussion in https://github.com/MISP/misp-objects/pull/314pull/491/head
commit
0c6a12ea60
|
@ -22,6 +22,9 @@ from pymisp import MISPObject
|
||||||
from cof2misp.cof import validate_cof
|
from cof2misp.cof import validate_cof
|
||||||
|
|
||||||
|
|
||||||
|
create_specific_attributes = False # this is for https://github.com/MISP/misp-objects/pull/314
|
||||||
|
|
||||||
|
|
||||||
misperrors = {'error': 'Error'}
|
misperrors = {'error': 'Error'}
|
||||||
userConfig = {}
|
userConfig = {}
|
||||||
|
|
||||||
|
@ -64,7 +67,7 @@ def parse_and_insert_cof(data: str) -> dict:
|
||||||
|
|
||||||
# validate here (simple validation or full JSON Schema validation)
|
# validate here (simple validation or full JSON Schema validation)
|
||||||
if not validate_cof(entry):
|
if not validate_cof(entry):
|
||||||
return {"error": "Could not validate the COF input '%r'" % entry}
|
return {"error": "Could not validate the COF input '%s'" % entry}
|
||||||
|
|
||||||
# Next, extract some fields
|
# Next, extract some fields
|
||||||
rrtype = entry['rrtype'].upper()
|
rrtype = entry['rrtype'].upper()
|
||||||
|
@ -81,17 +84,18 @@ def parse_and_insert_cof(data: str) -> dict:
|
||||||
# handle the combinations of rrtype (domain, ip) on both left and right side
|
# handle the combinations of rrtype (domain, ip) on both left and right side
|
||||||
#
|
#
|
||||||
|
|
||||||
if rrtype in ['A', 'AAAA', 'A6']: # address type
|
if create_specific_attributes:
|
||||||
# address type
|
if rrtype in ['A', 'AAAA', 'A6']: # address type
|
||||||
o.add_attribute('rrname_domain', value=rrname)
|
# address type
|
||||||
for r in rdata:
|
o.add_attribute('rrname_domain', value=rrname)
|
||||||
o.add_attribute('rdata_ip', value=r)
|
for r in rdata:
|
||||||
elif rrtype in ['CNAME', 'DNAME', 'NS']: # both sides are domains
|
o.add_attribute('rdata_ip', value=r)
|
||||||
o.add_attribute('rrname_domain', value=rrname)
|
elif rrtype in ['CNAME', 'DNAME', 'NS']: # both sides are domains
|
||||||
for r in rdata:
|
o.add_attribute('rrname_domain', value=rrname)
|
||||||
o.add_attribute('rdata_domain', value=r)
|
for r in rdata:
|
||||||
elif rrtype in ['SOA']: # left side is a domain, right side is text
|
o.add_attribute('rdata_domain', value=r)
|
||||||
o.add_attribute('rrname_domain', value=rrname)
|
elif rrtype in ['SOA']: # left side is a domain, right side is text
|
||||||
|
o.add_attribute('rrname_domain', value=rrname)
|
||||||
|
|
||||||
#
|
#
|
||||||
# now do the regular filling up of rrname, rrtype, time_first, etc.
|
# now do the regular filling up of rrname, rrtype, time_first, etc.
|
||||||
|
@ -140,7 +144,7 @@ def parse_and_insert_dnsdbflex(data: str):
|
||||||
--------
|
--------
|
||||||
none
|
none
|
||||||
"""
|
"""
|
||||||
pass # XXX FIXME: need a MISP object for dnsdbflex
|
return {"error": "NOT IMPLEMENTED YET"} # XXX FIXME: need a MISP object for dnsdbflex
|
||||||
|
|
||||||
|
|
||||||
def is_dnsdbflex(data: str) -> bool:
|
def is_dnsdbflex(data: str) -> bool:
|
||||||
|
@ -190,7 +194,6 @@ def handler(q=False):
|
||||||
data = base64.b64decode(request["data"]).decode('utf-8')
|
data = base64.b64decode(request["data"]).decode('utf-8')
|
||||||
if not data:
|
if not data:
|
||||||
return json.dumps({'success': 0}) # empty file is ok
|
return json.dumps({'success': 0}) # empty file is ok
|
||||||
|
|
||||||
if is_dnsdbflex(data):
|
if is_dnsdbflex(data):
|
||||||
return parse_and_insert_dnsdbflex(data)
|
return parse_and_insert_dnsdbflex(data)
|
||||||
elif is_cof(data):
|
elif is_cof(data):
|
||||||
|
|
Loading…
Reference in New Issue