From 2c491b237a92a6431121ea937007e405cc76ee7e Mon Sep 17 00:00:00 2001 From: Manabu Niseki Date: Thu, 30 Jan 2020 07:35:30 +0900 Subject: [PATCH] chore: delete old examples Delete examples which use deprecated/deleted methods --- examples/add_sbsignature.py | 16 --- examples/add_user_json.py | 28 ---- examples/edit_organisation_json.py | 29 ---- examples/edit_user_json.py | 29 ---- examples/et2misp.py | 126 ----------------- examples/get_attachment.py | 26 ---- examples/sighting.py | 25 ---- examples/stats.py | 16 --- examples/suricata.py | 28 ---- examples/tagstatistics.py | 18 --- examples/vmray_automation.py | 213 ----------------------------- 11 files changed, 554 deletions(-) delete mode 100644 examples/add_sbsignature.py delete mode 100755 examples/add_user_json.py delete mode 100755 examples/edit_organisation_json.py delete mode 100755 examples/edit_user_json.py delete mode 100755 examples/et2misp.py delete mode 100755 examples/get_attachment.py delete mode 100755 examples/sighting.py delete mode 100755 examples/stats.py delete mode 100755 examples/suricata.py delete mode 100755 examples/tagstatistics.py delete mode 100644 examples/vmray_automation.py diff --git a/examples/add_sbsignature.py b/examples/add_sbsignature.py deleted file mode 100644 index 5a03068..0000000 --- a/examples/add_sbsignature.py +++ /dev/null @@ -1,16 +0,0 @@ -import json -from pymisp import PyMISP -from keys import misp_url, misp_key, misp_verifycert -from pymisp.tools import SBSignatureObject - -pymisp = PyMISP(misp_url, misp_key, misp_verifycert) -a = json.loads('{"signatures":[{"new_data":[],"confidence":100,"families":[],"severity":1,"weight":0,"description":"AttemptstoconnecttoadeadIP:Port(2uniquetimes)","alert":false,"references":[],"data":[{"IP":"95.101.39.58:80(Europe)"},{"IP":"192.35.177.64:80(UnitedStates)"}],"name":"dead_connect"},{"new_data":[],"confidence":30,"families":[],"severity":2,"weight":1,"description":"PerformssomeHTTPrequests","alert":false,"references":[],"data":[{"url":"http://cert.int-x3.letsencrypt.org/"},{"url":"http://apps.identrust.com/roots/dstrootcax3.p7c"}],"name":"network_http"},{"new_data":[],"confidence":100,"families":[],"severity":2,"weight":1,"description":"Theofficefilehasaunconventionalcodepage:ANSICyrillic;Cyrillic(Windows)","alert":false,"references":[],"data":[],"name":"office_code_page"}]}') -a = [(x['name'], x['description']) for x in a["signatures"]] - - -b = SBSignatureObject(a) - - -template_id = [x['ObjectTemplate']['id'] for x in pymisp.get_object_templates_list() if x['ObjectTemplate']['name'] == 'sb-signature'][0] - -pymisp.add_object(234111, template_id, b) diff --git a/examples/add_user_json.py b/examples/add_user_json.py deleted file mode 100755 index 759b26f..0000000 --- a/examples/add_user_json.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from pymisp import PyMISP -from keys import misp_url, misp_key, misp_verifycert -import argparse - -# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one -try: - input = raw_input -except NameError: - pass - - -def init(url, key): - return PyMISP(url, key, misp_verifycert, 'json') - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Add the user described in the given json. If no file is provided, returns a json listing all the fields used to describe a user.') - parser.add_argument("-f", "--json_file", help="The name of the json file describing the user you want to create.") - args = parser.parse_args() - - misp = init(misp_url, misp_key) - - if args.json_file is None: - print (misp.get_add_user_fields_list()) - else: - print(misp.add_user_json(args.json_file)) diff --git a/examples/edit_organisation_json.py b/examples/edit_organisation_json.py deleted file mode 100755 index 50aa3f5..0000000 --- a/examples/edit_organisation_json.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from pymisp import PyMISP -from keys import misp_url, misp_key, misp_verifycert -import argparse - -# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one -try: - input = raw_input -except NameError: - pass - - -def init(url, key): - return PyMISP(url, key, misp_verifycert, 'json') - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Edit the organisation designed by the organisation_id. If no file is provided, returns a json listing all the fields used to describe an organisation.') - parser.add_argument("-i", "--organisation_id", required=True, help="The name of the json file describing the organisation you want to modify.") - parser.add_argument("-f", "--json_file", help="The name of the json file describing your modifications.") - args = parser.parse_args() - - misp = init(misp_url, misp_key) - - if args.json_file is None: - print (misp.get_edit_organisation_fields_list(args.organisation_id)) - else: - print(misp.edit_organisation_json(args.json_file, args.organisation_id)) diff --git a/examples/edit_user_json.py b/examples/edit_user_json.py deleted file mode 100755 index 6e1d276..0000000 --- a/examples/edit_user_json.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from pymisp import PyMISP -from keys import misp_url, misp_key, misp_verifycert -import argparse - -# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one -try: - input = raw_input -except NameError: - pass - - -def init(url, key): - return PyMISP(url, key, misp_verifycert, 'json') - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Edit the user designed by the user_id. If no file is provided, returns a json listing all the fields used to describe a user.') - parser.add_argument("-i", "--user_id", required=True, help="The name of the json file describing the user you want to modify.") - parser.add_argument("-f", "--json_file", help="The name of the json file describing your modifications.") - args = parser.parse_args() - - misp = init(misp_url, misp_key) - - if args.json_file is None: - print (misp.get_edit_user_fields_list(args.user_id)) - else: - print(misp.edit_user_json(args.json_file, args.user_id)) diff --git a/examples/et2misp.py b/examples/et2misp.py deleted file mode 100755 index 3631ce4..0000000 --- a/examples/et2misp.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Copy Emerging Threats Block IPs list to several MISP events -# Because of the large size of the list the first run will take a minute -# Running it again will update the MISP events if changes are detected -# -# This script requires PyMISP 2.4.50 or later - -import sys, json, time, requests -from pymisp import PyMISP -from keys import misp_url, misp_key, misp_verifycert - -et_url = 'https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt' -et_str = 'Emerging Threats ' - -def init_misp(): - global mymisp - mymisp = PyMISP(misp_url, misp_key, misp_verifycert) - -def load_misp_event(eid): - global et_attr - global et_drev - global et_event - et_attr = {} - et_drev = {} - - et_event = mymisp.get(eid) - echeck(et_event) - for a in et_event['Event']['Attribute']: - if a['category'] == 'Network activity': - et_attr[a['value']] = a['id'] - continue - if a['category'] == 'Internal reference': - et_drev = a; - -def init_et(): - global et_data - global et_rev - requests.packages.urllib3.disable_warnings() - s = requests.Session() - r = s.get(et_url) - if r.status_code != 200: - raise Exception('Error getting ET data: {}'.format(r.text)) - name = '' - et_data = {} - et_rev = 0 - for line in r.text.splitlines(): - if line.startswith('# Rev '): - et_rev = int(line[6:]) - continue - if line.startswith('#'): - name = line[1:].strip() - if et_rev and not et_data.get(name): - et_data[name] = {} - continue - l = line.rstrip() - if l: - et_data[name][l] = name - -def update_et_event(name): - if et_drev and et_rev and int(et_drev['value']) < et_rev: - # Copy MISP attributes to new dict - et_ips = dict.fromkeys(et_attr.keys()) - - # Weed out attributes still in ET data - for k,v in et_data[name].items(): - et_attr.pop(k, None) - - # Delete the leftover attributes from MISP - for k,v in et_attr.items(): - r = mymisp.delete_attribute(v) - if r.get('errors'): - print("Error deleting attribute {} ({}): {}\n".format(v,k,r['errors'])) - - # Weed out ips already in the MISP event - for k,v in et_ips.items(): - et_data[name].pop(k, None) - - # Add new attributes to MISP event - ipdst = [] - for i,k in enumerate(et_data[name].items(), 1-len(et_data[name])): - ipdst.append(k[0]) - if i % 100 == 0: - r = mymisp.add_ipdst(et_event, ipdst) - echeck(r, et_event['Event']['id']) - ipdst = [] - - # Update revision number - et_drev['value'] = et_rev - et_drev.pop('timestamp', None) - attr = [] - attr.append(et_drev) - - # Publish updated MISP event - et_event['Event']['Attribute'] = attr - et_event['Event']['published'] = False - et_event['Event']['date'] = time.strftime('%Y-%m-%d') - r = mymisp.publish(et_event) - echeck(r, et_event['Event']['id']) - -def echeck(r, eid=None): - if r.get('errors'): - if eid: - print("Processing event {} failed: {}".format(eid, r['errors'])) - else: - print(r['errors']) - sys.exit(1) - -if __name__ == '__main__': - init_misp() - init_et() - - for et_type in set(et_data.keys()): - info = et_str + et_type - r = mymisp.search_index(eventinfo=info) - if r['response']: - eid=r['response'][0]['id'] - else: # event not found, create it - new_event = mymisp.new_event(info=info, distribution=3, threat_level_id=4, analysis=1) - echeck(new_event) - eid=new_event['Event']['id'] - r = mymisp.add_internal_text(new_event, 1, comment='Emerging Threats revision number') - echeck(r, eid) - load_misp_event(eid) - update_et_event(et_type) diff --git a/examples/get_attachment.py b/examples/get_attachment.py deleted file mode 100755 index f40f38d..0000000 --- a/examples/get_attachment.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from pymisp import PyMISP -from keys import misp_url, misp_key, misp_verifycert -import argparse - - -def init(url, key): - return PyMISP(url, key, misp_verifycert, 'json') - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Get an attachment.') - parser.add_argument("-a", "--attribute", type=int, help="Attribute ID to download.") - args = parser.parse_args() - - misp = init(misp_url, misp_key) - - with open('foo', 'wb') as f: - out = misp.get_attachment(args.attribute) - if isinstance(out, dict): - # Fails - print(out) - else: - f.write(out) diff --git a/examples/sighting.py b/examples/sighting.py deleted file mode 100755 index d6c8323..0000000 --- a/examples/sighting.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from pymisp import PyMISP -from keys import misp_url, misp_key, misp_verifycert -import argparse - -# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one -try: - input = raw_input -except NameError: - pass - - -def init(url, key): - return PyMISP(url, key, misp_verifycert, 'json') - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Add sighting.') - parser.add_argument("-f", "--json_file", required=True, help="The name of the json file describing the attribute you want to add sighting to.") - args = parser.parse_args() - - misp = init(misp_url, misp_key) - - misp.sighting_per_json(args.json_file) diff --git a/examples/stats.py b/examples/stats.py deleted file mode 100755 index 8f09263..0000000 --- a/examples/stats.py +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from pymisp import ExpandedPyMISP -from keys import misp_url, misp_key, misp_verifycert -import argparse - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Output attributes statistics from a MISP instance.') - args = parser.parse_args() - - misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert) - - print(misp.get_attributes_statistics(misp, percentage=True)) - print(misp.get_attributes_statistics(context='category', percentage=True)) diff --git a/examples/suricata.py b/examples/suricata.py deleted file mode 100755 index 2526033..0000000 --- a/examples/suricata.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from pymisp import PyMISP -from keys import misp_url, misp_key, misp_verifycert -import argparse - - -def init(url, key): - return PyMISP(url, key, misp_verifycert) - - -def fetch(m, all_events, event): - if all_events: - print(misp.download_all_suricata().text) - else: - print(misp.download_suricata_rule_event(event).text) - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Download Suricata events.') - parser.add_argument("-a", "--all", action='store_true', help="Download all suricata rules available.") - parser.add_argument("-e", "--event", help="Download suricata rules from one event.") - - args = parser.parse_args() - - misp = init(misp_url, misp_key) - - fetch(misp, args.all, args.event) diff --git a/examples/tagstatistics.py b/examples/tagstatistics.py deleted file mode 100755 index f0bc29c..0000000 --- a/examples/tagstatistics.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from pymisp import ExpandedPyMISP -from keys import misp_url, misp_key, misp_verifycert -import argparse -import json - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Get statistics from tags.') - parser.add_argument("-p", "--percentage", action='store_true', default=None, help="An optional field, if set, it will return the results in percentages, otherwise it returns exact count.") - parser.add_argument("-n", "--namesort", action='store_true', default=None, help="An optional field, if set, values are sort by the namespace, otherwise the sorting will happen on the value.") - args = parser.parse_args() - - misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert) - - stats = misp.get_tags_statistics(args.percentage, args.namesort) - print(json.dumps(stats)) diff --git a/examples/vmray_automation.py b/examples/vmray_automation.py deleted file mode 100644 index 17ed328..0000000 --- a/examples/vmray_automation.py +++ /dev/null @@ -1,213 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -''' -Koen Van Impe - -VMRay automatic import -Put this script in crontab to run every /15 or /60 - */5 * * * * mispuser /usr/bin/python3 /home/mispuser/PyMISP/examples/vmray_automation.py - -Calls "vmray_import" for all events that have an 'incomplete' VMray analysis - -Do inline config in "main" - -''' - -from pymisp import PyMISP -from keys import misp_url, misp_key, misp_verifycert -import argparse -import os -import json -import datetime -import time - -import requests -import sys - -# Suppress those "Unverified HTTPS request is being made" -import urllib3 -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - -def init(url, key): - ''' - Template to get MISP module started - ''' - return PyMISP(url, key, misp_verifycert, 'json') - - -def get_vmray_config(url, key, misp_verifycert, default_wait_period): - ''' - Fetch configuration settings from MISP - Includes VMRay API and modules URL - ''' - - try: - misp_headers = {'Content-Type': 'application/json', 'Accept': 'application/json', 'Authorization': key} - req = requests.get(url + 'servers/serverSettings.json', verify=misp_verifycert, headers=misp_headers) - - if req.status_code == 200: - req_json = req.json() - if 'finalSettings' in req_json: - finalSettings = req_json['finalSettings'] - vmray_api = '' - vmray_url = '' - vmray_wait_period = 0 - - for el in finalSettings: - # Is the vmray import module enabled? - if el['setting'] == 'Plugin.Import_vmray_import_enabled': - vmray_import_enabled = el['value'] - if vmray_import_enabled is False: - break - # Get the VMRay API key from the MISP settings - elif el['setting'] == 'Plugin.Import_vmray_import_apikey': - vmray_api = el['value'] - # The VMRay URL to query - elif el['setting'] == 'Plugin.Import_vmray_import_url': - vmray_url = el['value'].replace('/', '\\/') - # MISP modules - Port? - elif el['setting'] == 'Plugin.Import_services_port': - module_import_port = el['value'] - # MISP modules - URL - elif el['setting'] == 'Plugin.Import_services_url': - module_import_url = el['value'].replace('\/\/', '//') - # Wait period - elif el['setting'] == 'Plugin.Import_vmray_import_wait_period': - vmray_wait_period = abs(int(el['value'])) - - if vmray_wait_period < 1: - vmray_wait_period = default_wait_period - else: - sys.exit('Did not receive a 200 code from MISP') - - if vmray_import_enabled and vmray_api and vmray_url and module_import_port and module_import_url: - return {'vmray_wait_period': vmray_wait_period, 'vmray_api': vmray_api, 'vmray_url': vmray_url, 'module_import_port': module_import_port, 'module_import_url': module_import_url} - else: - sys.exit('Did not receive all the necessary configuration information from MISP') - - except Exception as e: - sys.exit('Unable to get VMRay config from MISP') - - -def search_vmray_incomplete(m, url, wait_period, module_import_url, module_import_port, vmray_url, vmray_api, vmray_attribute_category, vmray_include_analysisid, vmray_include_imphash_ssdeep, vmray_include_extracted_files, vmray_include_analysisdetails, vmray_include_vtidetails, custom_tags_incomplete, custom_tags_complete): - ''' - Search for the events with VMRay samples that are marked incomplete - and then update these events - ''' - - controller = 'attributes' - vmray_value = 'VMRay Sample ID:' # How sample IDs are stored in MISP - req = None - - # Search for the events - try: - result = m.search(controller, tags=custom_tags_incomplete) - response = result['response'] - - if len(response) == 0: - sys.exit("No VMRay attributes found that match %s" % custom_tags_incomplete) - - attribute = response['Attribute'] - - if len(attribute) == 0: - sys.exit("No VMRay attributes found that match %s" % custom_tags_incomplete) - - timestamp = int(attribute[0]["timestamp"]) - # Not enough time has gone by to lookup the analysis jobs - if int((time.time() - timestamp) / 60) < int(wait_period): - if module_DEBUG: - r_timestamp = datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') - print("Attribute to recent for wait_period (%s minutes) - timestamp attribute: %s (%s minutes old)" % (wait_period, r_timestamp, round((int(time.time() - timestamp) / 60), 2))) - return False - - if module_DEBUG: - print("All attributes older than %s" % int(wait_period)) - - for att in attribute: - value = att['value'] - - if vmray_value in value: # We found a sample ID - att_id = att['id'] - att_uuid = att['uuid'] - - # VMRay Sample IDs are stored as VMRay Sample ID: 2796577 - vmray_sample_id = value.split(vmray_value)[1].strip() - if vmray_sample_id.isdigit(): - event_id = att['event_id'] - if module_DEBUG: - print("Found event %s with matching tags %s for sample id %s " % (event_id, custom_tags_incomplete, vmray_sample_id)) - - # Prepare request to send to vmray_import via misp modules - misp_modules_url = module_import_url + ':' + module_import_port + '/query' - misp_modules_headers = {'Content-Type': 'application/json'} - misp_modules_body = '{ "sample_id":"' + vmray_sample_id + '","module":"vmray_import","event_id":"' + event_id + '","config":{"apikey":"' + vmray_api + '","url":"' + vmray_url + '","include_analysisid":"' + vmray_include_analysisid + '","include_analysisdetails":"' + vmray_include_analysisdetails + '","include_extracted_files":"' + vmray_include_extracted_files + '","include_imphash_ssdeep":"' + vmray_include_imphash_ssdeep + '","include_vtidetails":"' + vmray_include_vtidetails + '","sample_id":"' + vmray_sample_id + '"},"data":""}' - req = requests.post(misp_modules_url, data=misp_modules_body, headers=misp_modules_headers) - if module_DEBUG and req is not None: - print("Response code from submitting to MISP modules %s" % (req.status_code)) - - # Succesful response from the misp modules? - if req.status_code == 200: - req_json = req.json() - if "error" in req_json: - print("Error code in reply %s " % req_json["error"]) - continue - else: - results = req_json["results"] - - # Walk through all results in the misp-module reply - for el in results: - to_ids = True - values = el['values'] - types = el['types'] - if "to_ids" in el: - to_ids = el['to_ids'] - if "text" in types: - to_ids = False - comment = el['comment'] - if len(comment) < 1: - comment = "Enriched via the vmray_import module" - - # Attribute can belong in different types - for type in types: - try: - r = m.add_named_attribute(event_id, type, values, vmray_attribute_category, to_ids, comment) - if module_DEBUG: - print("Add event %s: %s as %s (%s) (toids: %s)" % (event_id, values, type, comment, to_ids)) - except Exception as e: - continue - if module_DEBUG: - print("Unable to add attribute %s as type %s for event %s" % (values, type, event_id)) - - # Remove 'incomplete' state tags - m.untag(att_uuid, custom_tags_incomplete) - # Update tags to 'complete' state - m.tag(att_uuid, custom_tags_complete) - if module_DEBUG: - print("Updated event %s" % event_id) - - else: - sys.exit('MISP modules did not return HTTP 200 code (event %s ; sampleid %s)' % (event_id, vmray_sample_id)) - - except Exception as e: - sys.exit("Invalid response received from MISP : %s", e) - - -if __name__ == '__main__': - - module_DEBUG = True - - # Set some defaults to be used in this module - vmray_attribute_category = 'External analysis' - vmray_include_analysisid = '0' - vmray_include_imphash_ssdeep = '0' - vmray_include_extracted_files = '0' - vmray_include_analysisdetails = '0' - vmray_include_vtidetails = '0' - custom_tags_incomplete = 'workflow:state="incomplete"' - custom_tags_complete = 'workflow:state="complete"' - default_wait_period = 30 - - misp = init(misp_url, misp_key) - vmray_config = get_vmray_config(misp_url, misp_key, misp_verifycert, default_wait_period) - search_vmray_incomplete(misp, misp_url, vmray_config['vmray_wait_period'], vmray_config['module_import_url'], vmray_config['module_import_port'], vmray_config['vmray_url'], vmray_config['vmray_api'], vmray_attribute_category, vmray_include_analysisid, vmray_include_imphash_ssdeep, vmray_include_extracted_files, vmray_include_analysisdetails, vmray_include_vtidetails, custom_tags_incomplete, custom_tags_complete)