mirror of https://github.com/MISP/misp-modules
make flake8 happier
commit
85864dad2e
|
@ -89,8 +89,8 @@ if __name__ == "__main__":
|
||||||
{"count":2560,"rdata":["cpab.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1449584660","time_last":"1617676151"}"""
|
{"count":2560,"rdata":["cpab.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1449584660","time_last":"1617676151"}"""
|
||||||
|
|
||||||
i = 0
|
i = 0
|
||||||
for l in ndjson.loads(mock_input):
|
for entry in ndjson.loads(mock_input):
|
||||||
retval = validate_cof(l, strict=False)
|
retval = validate_cof(entry, strict=False)
|
||||||
assert retval
|
assert retval
|
||||||
print("line %d is valid: %s" % (i, retval))
|
print("line %d is valid: %s" % (i, retval))
|
||||||
i += 1
|
i += 1
|
||||||
|
|
|
@ -9,10 +9,11 @@ License: see LICENSE
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
import json
|
import json
|
||||||
import base64
|
import base64
|
||||||
|
|
||||||
import pprint
|
|
||||||
import ndjson
|
import ndjson
|
||||||
|
|
||||||
# from pymisp import MISPObject, MISPEvent, PyMISP
|
# from pymisp import MISPObject, MISPEvent, PyMISP
|
||||||
|
@ -59,22 +60,22 @@ def parse_and_insert_cof(data: str) -> dict:
|
||||||
objects = []
|
objects = []
|
||||||
try:
|
try:
|
||||||
entries = ndjson.loads(data)
|
entries = ndjson.loads(data)
|
||||||
for l in entries: # iterate over all ndjson lines
|
for entry in entries: # iterate over all ndjson lines
|
||||||
|
|
||||||
# validate here (simple validation or full JSON Schema validation)
|
# validate here (simple validation or full JSON Schema validation)
|
||||||
if not validate_cof(l):
|
if not validate_cof(entry):
|
||||||
return {"error": "Could not validate the COF input '%r'" % l}
|
return {"error": "Could not validate the COF input '%r'" % entry}
|
||||||
|
|
||||||
# Next, extract some fields
|
# Next, extract some fields
|
||||||
rrtype = l['rrtype'].upper()
|
rrtype = entry['rrtype'].upper()
|
||||||
rrname = l['rrname'].rstrip('.')
|
rrname = entry['rrname'].rstrip('.')
|
||||||
rdata = [x.rstrip('.') for x in l['rdata']]
|
rdata = [x.rstrip('.') for x in entry['rdata']]
|
||||||
|
|
||||||
# create a new MISP object, based on the passive-dns object for each nd-JSON line
|
# create a new MISP object, based on the passive-dns object for each nd-JSON line
|
||||||
o = MISPObject(name='passive-dns', standalone=False, comment='created by cof2misp')
|
o = MISPObject(name='passive-dns', standalone=False, comment='created by cof2misp')
|
||||||
|
|
||||||
# o.add_tag('tlp:amber') # FIXME: we'll want to add a tlp: tag to the object
|
# o.add_tag('tlp:amber') # FIXME: we'll want to add a tlp: tag to the object
|
||||||
o.add_attribute('bailiwick', value=l['bailiwick'].rstrip('.'))
|
o.add_attribute('bailiwick', value=entry['bailiwick'].rstrip('.'))
|
||||||
|
|
||||||
#
|
#
|
||||||
# handle the combinations of rrtype (domain, ip) on both left and right side
|
# handle the combinations of rrtype (domain, ip) on both left and right side
|
||||||
|
@ -100,17 +101,17 @@ def parse_and_insert_cof(data: str) -> dict:
|
||||||
for r in rdata:
|
for r in rdata:
|
||||||
o.add_attribute('rdata', value=r)
|
o.add_attribute('rdata', value=r)
|
||||||
o.add_attribute('raw_rdata', value=json.dumps(rdata)) # FIXME: do we need to hex encode it?
|
o.add_attribute('raw_rdata', value=json.dumps(rdata)) # FIXME: do we need to hex encode it?
|
||||||
o.add_attribute('time_first', value=l['time_first'])
|
o.add_attribute('time_first', value=entry['time_first'])
|
||||||
o.add_attribute('time_last', value=l['time_last'])
|
o.add_attribute('time_last', value=entry['time_last'])
|
||||||
o.first_seen = l['time_first'] # is this redundant?
|
o.first_seen = entry['time_first'] # is this redundant?
|
||||||
o.last_seen = l['time_last']
|
o.last_seen = entry['time_last']
|
||||||
|
|
||||||
#
|
#
|
||||||
# Now add the other optional values. # FIXME: how about a map() other function. DNRY
|
# Now add the other optional values. # FIXME: how about a map() other function. DNRY
|
||||||
#
|
#
|
||||||
for k in ['count', 'sensor_id', 'origin', 'text', 'time_first_ms', 'time_last_ms', 'zone_time_first', 'zone_time_last']:
|
for k in ['count', 'sensor_id', 'origin', 'text', 'time_first_ms', 'time_last_ms', 'zone_time_first', 'zone_time_last']:
|
||||||
if k in l and l[k]:
|
if k in entry and entry[k]:
|
||||||
o.add_attribute(k, value=l[k])
|
o.add_attribute(k, value=entry[k])
|
||||||
|
|
||||||
#
|
#
|
||||||
# add COF entry to MISP object
|
# add COF entry to MISP object
|
||||||
|
@ -164,7 +165,8 @@ def is_dnsdbflex(data: str) -> bool:
|
||||||
if not set(l.keys()) == {'rrname', 'rrtype'}:
|
if not set(l.keys()) == {'rrname', 'rrtype'}:
|
||||||
return False # shortcut
|
return False # shortcut
|
||||||
return True
|
return True
|
||||||
except Exception as _ex:
|
except Exception as ex:
|
||||||
|
print("oops, this should not have happened. Maybe not an ndjson file?", file=sys.sterr)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@ -183,7 +185,7 @@ def handler(q=False):
|
||||||
# push to MISP
|
# push to MISP
|
||||||
event_id = request['event_id']
|
event_id = request['event_id']
|
||||||
# event = misp.get_event(event_id)
|
# event = misp.get_event(event_id)
|
||||||
pprint.pprint("event_id = %s" % event_id)
|
print("event_id = %s" % event_id, file=sys.stderr)
|
||||||
try:
|
try:
|
||||||
data = base64.b64decode(request["data"]).decode('utf-8')
|
data = base64.b64decode(request["data"]).decode('utf-8')
|
||||||
if not data:
|
if not data:
|
||||||
|
@ -197,7 +199,7 @@ def handler(q=False):
|
||||||
else:
|
else:
|
||||||
return {'error': 'Could not find any valid COF input nor dnsdbflex input. Please have a loot at: https://datatracker.ietf.org/doc/draft-dulaunoy-dnsop-passive-dns-cof/'}
|
return {'error': 'Could not find any valid COF input nor dnsdbflex input. Please have a loot at: https://datatracker.ietf.org/doc/draft-dulaunoy-dnsop-passive-dns-cof/'}
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print("oops, got exception %s" % str(ex))
|
print("oops, got exception %s" % str(ex), file=sys.stderr)
|
||||||
return {'error': "Got exception %s" % str(ex)}
|
return {'error': "Got exception %s" % str(ex)}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue