mirror of https://github.com/MISP/MISP
new: [tools/misp-delegation] Added misp-delegation tool
MISP-Delegation is a customisable tool to help sending events on a remote MISP instance and create a delegation request.pull/9432/head
parent
2fd4aa702b
commit
9d15554ba8
|
@ -0,0 +1,116 @@
|
|||
# MISP Delegation
|
||||
|
||||
## Overview
|
||||
|
||||
This script facilitates synchronization between two MISP instances using the PUSH strategy and supports delegation requests on the remote MISP. It provides flexible options for customization, including event filters, delegation request parameters, and incremental synchronization.
|
||||
|
||||
## Features
|
||||
|
||||
- **Push Strategy:** Utilizes the PUSH strategy for synchronization between a source and a destination MISP instance.
|
||||
- **Delegation Request:** Requests delegation on the remote MISP after pushing events.
|
||||
- **Event Filters:** Supports filters from the event/index endpoint on the source MISP instance.
|
||||
- **Customization:**
|
||||
- Specify desired distribution level for delegation requests.
|
||||
- Customize delegation request messages.
|
||||
- **Incremental Synchronization:**
|
||||
- Syncs events that were created or modified since the last execution.
|
||||
- **Hooks:**
|
||||
- Attach or detach tags on the source MISP instance using hooks.
|
||||
|
||||
|
||||
## Installation
|
||||
|
||||
### Prerequisites
|
||||
- Python 3.x
|
||||
|
||||
## Installation
|
||||
```bash
|
||||
cd tools/misp-delegation/src
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip3 install -rU requirements.txt
|
||||
cp config.json.dist config.json
|
||||
```
|
||||
|
||||
### Configuration
|
||||
|
||||
1. Open the `config.json` file.
|
||||
2. Customize the following parameters:
|
||||
- `misp_instance.source`: URL and API key for the source MISP instance.
|
||||
- `misp_instance.destination`: URL and API key for the destination MISP instance.
|
||||
- `filters`: List of filters support by /event/index
|
||||
- Other customization options as needed.
|
||||
|
||||
### Help
|
||||
```text
|
||||
usage: misp-delegation.py [-h] [-c CONFIG]
|
||||
|
||||
optional arguments:
|
||||
-h, --help show this help message and exit
|
||||
-c CONFIG, --config CONFIG
|
||||
The JSON config file to use
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Running the Script
|
||||
|
||||
```bash
|
||||
python3 misp-delegation.py
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
The following configuration file instructs the script to execute the following tasks:
|
||||
1. Retrieve events for pushing to the destination MISP instance based on the defined filters
|
||||
- Specifically, events tagged with `to_delegate` will be fetched.
|
||||
- With `incremental_sync` enabled, only events modified or created since the last execution will be considered.
|
||||
- As `unpublish_event_on_remote` is enabled, the event pushed on the remote will be in an unpublished state, regardless of its state on the source instance.
|
||||
2. Push the collected events to the destination MISP
|
||||
3. Generate a delegation request on the destination MISP using the configuration specified in the `delegation` key.
|
||||
4. For all successfully delegated events execute two tag actions on the source instance:
|
||||
- Attach the local tag `delegation:successful` on all delegated on events
|
||||
- Detach the `to_delegate` tag from all delegated on events
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
A(Source) --Push and delegate--> B(Destination)
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"misp_instance": {
|
||||
"source": {
|
||||
"url": "https://misp1.admin.test/",
|
||||
"api_key": "rQooTtK0IRtG7YlL5IPbXoVFzCKJPAHMBv3jMGHT",
|
||||
"verify_ssl": true
|
||||
},
|
||||
"destination": {
|
||||
"url": "https://misp2.admin.test/",
|
||||
"api_key": "IwikG4g8Cak1Vk5wTZpskcOp225IlnCw2mx9KhoA",
|
||||
"verify_ssl": true
|
||||
}
|
||||
},
|
||||
"filters": {
|
||||
"tags": [
|
||||
"to_delegate"
|
||||
]
|
||||
},
|
||||
"delegation": {
|
||||
"target_org_uuid": "591c3fb6-4abe-4d47-b2ea-22e2299819e9",
|
||||
"desired_distribution": "3",
|
||||
"sharing_group_uuid": "d1492e7a-748f-4615-a1d6-893abdffa5db",
|
||||
"message": "We kindly request you to take over this event"
|
||||
},
|
||||
"incremental_sync": true,
|
||||
"unpublish_event_on_remote": true,
|
||||
"tag_actions": {
|
||||
"attach": [
|
||||
"delegation:successful"
|
||||
],
|
||||
"detach": [
|
||||
"to_delegate"
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
|
@ -0,0 +1,33 @@
|
|||
{
|
||||
"misp_instance": {
|
||||
"source": {
|
||||
"url": "https://misp_src_url/",
|
||||
"api_key": "API_KEY",
|
||||
"verify_ssl": true
|
||||
},
|
||||
"destination": {
|
||||
"url": "https://misp_dst_url/",
|
||||
"api_key": "API_KEY",
|
||||
"verify_ssl": true
|
||||
}
|
||||
},
|
||||
"filters": {
|
||||
"tags": [
|
||||
"tlp:clear"
|
||||
]
|
||||
},
|
||||
"delegation": {
|
||||
"target_org_uuid": "ORG_UUID",
|
||||
"desired_distribution": "DISTRIBUTION_LEVEL",
|
||||
"sharing_group_uuid": "SHARINGGROUP_UUID",
|
||||
"message": "DELEGATION_MESSAGE"
|
||||
},
|
||||
"incremental_sync": true,
|
||||
"unpublish_event_on_remote": true,
|
||||
"tag_actions": {
|
||||
"attach": [
|
||||
],
|
||||
"detach": [
|
||||
]
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
version: 1
|
||||
formatters:
|
||||
simple:
|
||||
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
verbose:
|
||||
format: "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s"
|
||||
handlers:
|
||||
console:
|
||||
class: logging.StreamHandler
|
||||
level: DEBUG
|
||||
formatter: verbose
|
||||
stream: ext://sys.stdout
|
||||
file:
|
||||
class: logging.handlers.TimedRotatingFileHandler
|
||||
level: INFO
|
||||
formatter: simple
|
||||
when: D
|
||||
filename: 'misp-delegation.log'
|
||||
loggers:
|
||||
misp-delegation:
|
||||
level: DEBUG
|
||||
handlers: [file]
|
||||
root:
|
||||
level: DEBUG
|
||||
handlers: [console]
|
|
@ -0,0 +1,343 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
import json
|
||||
import logging
|
||||
import logging.config
|
||||
from pathlib import Path
|
||||
import sys
|
||||
import time
|
||||
from typing import List, Optional, Union
|
||||
import yaml
|
||||
|
||||
from utils import MISPInstance
|
||||
|
||||
|
||||
logging_file_path = Path(__file__).parent / 'logging.yml'
|
||||
with open(logging_file_path, 'r') as f:
|
||||
yaml_config = yaml.safe_load(f.read())
|
||||
try:
|
||||
yaml_config['handlers']['file']['filename'] = Path(__file__).parent / 'logs' / yaml_config['handlers']['file']['filename']
|
||||
except KeyError:
|
||||
pass
|
||||
logging.config.dictConfig(yaml_config)
|
||||
|
||||
logger = logging.getLogger('misp-delegation')
|
||||
last_timestamp_path = Path(__file__).parent / 'last_sync_timestamp.txt'
|
||||
unpublish_event_on_remote = False
|
||||
sourceEventUUIDToID = {}
|
||||
|
||||
def main():
|
||||
global unpublish_event_on_remote
|
||||
parser = argparse.ArgumentParser(Path(__file__).name)
|
||||
parser.add_argument('-c', '--config', default='config.json', help='The JSON config file to use')
|
||||
options = parser.parse_args()
|
||||
|
||||
config = {}
|
||||
config_file_path = Path(__file__).parent / options.config
|
||||
with open(config_file_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
|
||||
source_instance = MISPInstance(config['misp_instance']['source'])
|
||||
destination_instance = MISPInstance(config['misp_instance']['destination'])
|
||||
filters = config['filters']
|
||||
delegation_config = config['delegation']
|
||||
unpublish_event_on_remote = config['unpublish_event_on_remote']
|
||||
incremental_sync = config['incremental_sync']
|
||||
tag_actions = config['tag_actions']
|
||||
|
||||
# Check connectivity
|
||||
if test_connectivity(source_instance):
|
||||
logger.debug(f'Connection to MISP source<{source_instance.base_url}> successfull')
|
||||
else:
|
||||
logger.error('Could not connect to source MISP instance')
|
||||
return 1
|
||||
if test_connectivity(destination_instance):
|
||||
logger.debug(f'Connection to MISP remote<{destination_instance.base_url}> successfull')
|
||||
else:
|
||||
logger.error('Could not connect to remote MISP instance')
|
||||
return 1
|
||||
|
||||
|
||||
# Get target sharing group
|
||||
if delegation_config['desired_distribution'] == 4:
|
||||
logger.debug(f'Fetching requested sharing group...')
|
||||
sharinggroup_uuid = delegation_config['sharing_group_uuid']
|
||||
try:
|
||||
sharinggroup_id = get_sharing_group_id(destination_instance, sharinggroup_uuid)
|
||||
delegation_config['sharinggroup_id'] = sharinggroup_id
|
||||
except Exception as err:
|
||||
logger.debug(f'Unexpected {err=}, {type(err)=}')
|
||||
logger.error(f'Could not fetch sharing group with UUID {sharinggroup_uuid} on remote')
|
||||
return 1
|
||||
|
||||
# Collect events from source
|
||||
logger.debug('Collecting events from source...')
|
||||
try:
|
||||
events_on_source = collect_events_from_source(source_instance, filters, incremental_sync)
|
||||
except Exception as err:
|
||||
logger.debug(f'Unexpected {err=}, {type(err)=}')
|
||||
events_on_source = None
|
||||
|
||||
if events_on_source is None:
|
||||
logger.error(f'Could not collect events from source<{source_instance.base_url}>')
|
||||
return 1
|
||||
logger.debug(f'Collected {len(events_on_source)} events from source')
|
||||
for event in events_on_source:
|
||||
sourceEventUUIDToID[event['uuid']] = event['id']
|
||||
|
||||
# Collect events from remote
|
||||
logger.debug('Collecting events from remote...')
|
||||
try:
|
||||
events_on_remote = collect_existing_events_on_remote(destination_instance, incremental_sync)
|
||||
except Exception as err:
|
||||
logger.debug(f'Unexpected {err=}, {type(err)=}')
|
||||
events_on_remote = None
|
||||
|
||||
if events_on_remote is None:
|
||||
logger.error(f'Could not collect events from remote<{destination_instance.base_url}>')
|
||||
return 1
|
||||
logger.debug(f'Collected {len(events_on_remote)} events from remote')
|
||||
|
||||
# Peform event diff for source and remote
|
||||
logger.debug('Finding events missing on the remote by diffing with the source...')
|
||||
events_to_push = get_outdated_or_non_existing_events(events_on_source, events_on_remote)
|
||||
if not events_to_push:
|
||||
logger.info(f'All {len(events_on_source)} events exist and are up-to-date on the remote<{destination_instance.base_url}>')
|
||||
if incremental_sync:
|
||||
save_current_sync_timestamp()
|
||||
return 0
|
||||
logger.debug(f'Found {len(events_to_push)} missing/outdated on the remote')
|
||||
|
||||
# Push events
|
||||
logger.debug(f'Pushing the {len(events_to_push)} missing events on the remote...')
|
||||
pushed_event_uuids = push_eligible_events_to_remote(source_instance, destination_instance, events_to_push)
|
||||
rejected_push_count = len(events_to_push) - len(pushed_event_uuids)
|
||||
if rejected_push_count > 0:
|
||||
logger.warning(f'Could not push all events. {rejected_push_count} events were rejected by the remote<{destination_instance.base_url}>')
|
||||
logger.debug(f'Successfully pushed {len(pushed_event_uuids)} events')
|
||||
|
||||
# Delegate events
|
||||
logger.debug('Requesting delegation on the remote...')
|
||||
delegated_event_uuids = request_delegation_for_pushed_events(destination_instance, pushed_event_uuids, delegation_config)
|
||||
rejected_delegation_count = len(pushed_event_uuids) - len(delegated_event_uuids)
|
||||
if rejected_delegation_count > 0:
|
||||
logger.warning(f'Could not delegate all events. {rejected_delegation_count} events were not delegated')
|
||||
logger.debug(f'Successfully delegated {len(delegated_event_uuids)} events')
|
||||
|
||||
# Attach tags on delegated events
|
||||
if tag_actions['attach'] or tag_actions['detach']:
|
||||
all_tag_ids = get_tag_ids_from_name(source_instance, tag_actions)
|
||||
if all_tag_ids:
|
||||
if all_tag_ids['attach']:
|
||||
attach_tags_on_events(source_instance, all_tag_ids['attach'], delegated_event_uuids)
|
||||
if all_tag_ids['detach']:
|
||||
detach_tags_from_events(source_instance, all_tag_ids['detach'], delegated_event_uuids)
|
||||
|
||||
if incremental_sync:
|
||||
save_current_sync_timestamp()
|
||||
|
||||
logger.info(f'Pushed {len(pushed_event_uuids)}/{len(events_to_push)} events and delegated {len(delegated_event_uuids)}/{len(pushed_event_uuids)}')
|
||||
return 0
|
||||
|
||||
|
||||
def test_connectivity(instance: MISPInstance) -> bool:
|
||||
response = instance.GET('/servers/getVersion')
|
||||
return 'version' in response
|
||||
|
||||
|
||||
def get_sharing_group_id(destination_instance: MISPInstance, sharinggroup_uuid: str) -> int:
|
||||
sharinggroup = destination_instance.GET(f'/sharing_groups/view/{sharinggroup_uuid}.json')
|
||||
return sharinggroup['SharingGroup']['id'] # type: ignore
|
||||
|
||||
|
||||
def save_current_sync_timestamp() -> None:
|
||||
last_timestamp = int(time.time())
|
||||
with open(last_timestamp_path, 'w') as f:
|
||||
f.write(str(last_timestamp))
|
||||
|
||||
|
||||
def get_last_sync_timestamp() -> Union[int, None]:
|
||||
try:
|
||||
with open(last_timestamp_path, 'r') as f:
|
||||
last_timestamp = int(f.readline())
|
||||
except Exception:
|
||||
return None
|
||||
return last_timestamp
|
||||
|
||||
|
||||
def update_event_for_push(event: dict) -> dict:
|
||||
logger.debug('Downgrading distribution levels and removing local tags')
|
||||
|
||||
if unpublish_event_on_remote:
|
||||
event['published'] = False
|
||||
|
||||
for t, tag in enumerate(event['Tag'][:]):
|
||||
if tag['local']:
|
||||
event['Tag'].pop(t)
|
||||
|
||||
# Downgrade distribution for Attribute
|
||||
for i, attribute in enumerate(event['Attribute'][:]):
|
||||
if int(attribute['distribution']) < 2:
|
||||
event['Attribute'].pop(i)
|
||||
elif attribute['distribution'] == 2:
|
||||
event['Attribute'][i]['distribution'] = 1
|
||||
|
||||
for t, tag in enumerate(attribute['Tag']):
|
||||
if tag['local']:
|
||||
event['Attribute'][i]['Tag'].pop(t)
|
||||
|
||||
# Downgrade distribution for Objects and their Attributes
|
||||
for i, object in enumerate(event['Object'][:]):
|
||||
if int(object['distribution']) < 2:
|
||||
event['Object'].pop(i)
|
||||
elif object['distribution'] == 2:
|
||||
event['Object'][i]['distribution'] = 1
|
||||
for j, attribute in enumerate(object['Attribute']):
|
||||
if int(attribute['distribution']) < 2:
|
||||
event['Object'][i]['Attribute'].pop(j)
|
||||
elif attribute['distribution'] == 2:
|
||||
event['Object'][i]['Attribute'][j]['distribution'] = 1
|
||||
|
||||
for t, tag in enumerate(attribute['Tag']):
|
||||
if tag['local']:
|
||||
event['Object'][i]['Attribute'][j]['Tag'].pop(t)
|
||||
|
||||
# Downgrade distribution for EventReport
|
||||
for i, report in enumerate(event['EventReport'][:]):
|
||||
if int(report['EventReport']) < 2:
|
||||
event['EventReport'].pop(i)
|
||||
elif report['distribution'] == 2:
|
||||
event['EventReport'][i]['distribution'] = 1
|
||||
|
||||
return event
|
||||
|
||||
|
||||
def collect_events_from_source(source_instance: MISPInstance, filters: dict, incremental_sync: bool = False) -> List[dict]:
|
||||
sync_filters = {
|
||||
'minimal': True,
|
||||
'published': True,
|
||||
}
|
||||
last_timestamp = get_last_sync_timestamp()
|
||||
if incremental_sync and last_timestamp is not None:
|
||||
logger.debug('Using timestamp from last synchronisation %s (%s)', last_timestamp, datetime.fromtimestamp(last_timestamp))
|
||||
sync_filters['timestamp'] = last_timestamp # type: ignore
|
||||
sync_filters.update(filters)
|
||||
events = source_instance.POST('/events/index', payload=sync_filters)
|
||||
return events # type: ignore
|
||||
|
||||
|
||||
def collect_existing_events_on_remote(destination_instance: MISPInstance, incremental_sync: bool = False) -> Optional[List[dict]]:
|
||||
sync_filters = {
|
||||
'minimal': True,
|
||||
'published': True,
|
||||
}
|
||||
last_timestamp = get_last_sync_timestamp()
|
||||
if incremental_sync and last_timestamp is not None:
|
||||
logger.debug('Using timestamp from last synchronisation %s (%s)', last_timestamp, datetime.fromtimestamp(last_timestamp))
|
||||
sync_filters['timestamp'] = last_timestamp # type: ignore
|
||||
events = destination_instance.POST('/events/index', payload=sync_filters)
|
||||
return events # type: ignore
|
||||
|
||||
|
||||
def get_outdated_or_non_existing_events(events_on_source: List[dict], events_on_remote: List[dict]) -> List[dict]:
|
||||
non_existing_e = []
|
||||
lookup_dest_events = {event['uuid']: event for event in events_on_remote}
|
||||
for src_e in events_on_source:
|
||||
if src_e['uuid'] not in lookup_dest_events or src_e['timestamp'] > lookup_dest_events[src_e['uuid']]['timestamp']:
|
||||
non_existing_e.append(src_e)
|
||||
return non_existing_e
|
||||
|
||||
|
||||
def push_eligible_events_to_remote(source_instance: MISPInstance, destination_instance: MISPInstance, eligible_events: List[dict]) -> List[str]:
|
||||
pushed_uuids = []
|
||||
for eligible_event in eligible_events:
|
||||
uuid = eligible_event['uuid']
|
||||
try:
|
||||
event_on_src = source_instance.GET(f'/events/view/{uuid}.json')
|
||||
except Exception as err:
|
||||
logger.debug(f'Unexpected {err=}, {type(err)=}')
|
||||
logger.warning(f'Event {uuid} could not be retrieved from source<{source_instance.base_url}>. {err=}')
|
||||
continue
|
||||
|
||||
event_on_src['Event']['distribution'] = 0 # type: ignore # Downgrade distribution level to `org_only` to prevent data leak and allow delegation
|
||||
event_on_src['Event'] = update_event_for_push(event_on_src['Event']) # type: ignore
|
||||
try:
|
||||
pushed_event = destination_instance.POST('/events/add', payload=event_on_src) # type: ignore
|
||||
except Exception as err:
|
||||
logger.debug(f'Unexpected {err=}, {type(err)=}')
|
||||
logger.warning(f'Event {uuid} was not pushed. {err=}')
|
||||
continue
|
||||
pushed_uuids.append(pushed_event['Event']['uuid']) # type: ignore
|
||||
return pushed_uuids
|
||||
|
||||
|
||||
def request_delegation_for_pushed_events(destination_instance: MISPInstance, pushed_events_uuids: List[str], delegation_config: dict) -> List[str]:
|
||||
delegated_events = []
|
||||
payload = {
|
||||
'EventDelegation': {
|
||||
'distribution': delegation_config['desired_distribution'],
|
||||
'sharing_group_id': 0,
|
||||
'org_id': delegation_config['target_org_uuid'],
|
||||
'message': delegation_config['message'],
|
||||
}
|
||||
}
|
||||
if delegation_config['desired_distribution'] == 4:
|
||||
payload['EventDelegation']['sharing_group_id'] = delegation_config['sharinggroup_id']
|
||||
|
||||
for uuid in pushed_events_uuids:
|
||||
try:
|
||||
delegated_event = destination_instance.POST(f'/event_delegations/delegateEvent/{uuid}', payload)
|
||||
except Exception as err:
|
||||
logger.debug(f'Unexpected {err=}, {type(err)=}')
|
||||
logger.warning(f'Event {uuid} could not be delegated. {err=}')
|
||||
continue
|
||||
delegated_events.append(uuid)
|
||||
return delegated_events
|
||||
|
||||
|
||||
def get_tag_ids_from_name(source_instance: MISPInstance, tag_actions: dict) -> Union[dict, None]:
|
||||
tag_ids = defaultdict(list)
|
||||
try:
|
||||
all_tags = source_instance.GET(f'/tags/index')['Tag'] # type: ignore
|
||||
except Exception as err:
|
||||
logger.debug(f'Unexpected {err=}, {type(err)=}')
|
||||
logger.warning(f'Could not fetch tags on source<{source_instance.base_url}>. {err=}')
|
||||
return None
|
||||
|
||||
for tag in all_tags:
|
||||
for action, action_tags in tag_actions.items():
|
||||
if tag['name'] in action_tags:
|
||||
tag_ids[action].append(tag['id'])
|
||||
return tag_ids
|
||||
|
||||
|
||||
def attach_tags_on_events(source_instance: MISPInstance, tag_ids: List[int], event_uuids: List[str]) -> None:
|
||||
logger.debug('Attaching local tags on delegated events')
|
||||
for event_uuid in event_uuids:
|
||||
payload = {
|
||||
'tag': json.dumps(tag_ids),
|
||||
}
|
||||
try:
|
||||
source_instance.POST(f'/events/addTag/{sourceEventUUIDToID[event_uuid]}/local:1', payload=payload)
|
||||
except Exception as err:
|
||||
logger.debug(f'Unexpected {err=}, {type(err)=}')
|
||||
logger.warning(f'Could not attach tags on event {event_uuid}. {err=}')
|
||||
|
||||
|
||||
def detach_tags_from_events(source_instance: MISPInstance, tag_ids: List[int], event_uuids: List[str]) -> None:
|
||||
logger.debug('Detaching tags on delegated events')
|
||||
for event_uuid in event_uuids:
|
||||
payload = {}
|
||||
for tag_id in tag_ids:
|
||||
try:
|
||||
source_instance.POST(f'/events/removeTag/{sourceEventUUIDToID[event_uuid]}/{tag_id}', payload=payload)
|
||||
except Exception as err:
|
||||
logger.debug(f'Unexpected {err=}, {type(err)=}')
|
||||
logger.warning(f'Could not attach tag {tag_id} on event {event_uuid}. {err=}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
|
@ -0,0 +1,3 @@
|
|||
PyYAML==6.0.1
|
||||
requests==2.31.0
|
||||
urllib3==2.1.0
|
|
@ -0,0 +1,49 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
|
||||
import copy
|
||||
import json
|
||||
from typing import Optional, List, Union
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
class MISPInstance():
|
||||
|
||||
headers = {
|
||||
'Accept': 'application/json',
|
||||
'content-type': 'application/json',
|
||||
'User-Agent': 'misp-delegation'
|
||||
}
|
||||
|
||||
def __init__(self, config: dict) -> None:
|
||||
self.base_url = config['url'][:-1] if config['url'][-1] == '/' else config['url']
|
||||
self.api_key = config['api_key']
|
||||
self.verify_ssl = config.get('verify_ssl', True)
|
||||
|
||||
if not self.verify_ssl:
|
||||
import urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
|
||||
def GET(self, url: str) -> Union[List, dict]:
|
||||
url = self.genURL(url)
|
||||
r = requests.get(url, headers=self.getHeaders(), verify=(self.verify_ssl))
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def POST(self, url: str, payload: Optional[Union[List, dict]] = {}) -> Union[List, dict]:
|
||||
url = self.genURL(url)
|
||||
r = requests.post(url, data=json.dumps(payload), headers=self.getHeaders(), verify=(self.verify_ssl))
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def genURL(self, url: str) -> str:
|
||||
return f'{self.base_url}{url}'
|
||||
|
||||
def getHeaders(self) -> dict:
|
||||
headers = copy.copy(self.headers)
|
||||
headers['Authorization'] = self.api_key
|
||||
return headers
|
Loading…
Reference in New Issue