new: Support X-MISP-AUTH Header

Also, improve HTTP headers init

Fix #1179
pull/1182/head
Raphaël Vinot 2024-03-22 11:39:19 +01:00
parent df9c20c86e
commit 2b66a3eabd
1 changed files with 17 additions and 9 deletions

View File

@ -158,6 +158,7 @@ class PyMISP:
:param tool: The software using PyMISP (string), used to set a unique user-agent
:param http_headers: Arbitrary headers to pass to all the requests.
:param https_adapter: Arbitrary HTTPS adapter for the requests session.
:param http_auth_header_name: The name of the HTTP header to use for the API key. Can be either "Authorization" or "X-MISP-AUTH".
:param timeout: Timeout, as described here: https://requests.readthedocs.io/en/master/user/advanced/#timeouts
"""
@ -165,7 +166,8 @@ class PyMISP:
cert: str | tuple[str, str] | None = None, auth: AuthBase | None = None, tool: str = '',
timeout: float | tuple[float, float] | None = None,
http_headers: dict[str, str] | None = None,
https_adapter: requests.adapters.BaseAdapter | None = None
https_adapter: requests.adapters.BaseAdapter | None = None,
http_auth_header_name: str = 'Authorization'
):
if not url:
@ -179,16 +181,25 @@ class PyMISP:
self.proxies: MutableMapping[str, str] | None = proxies
self.cert: str | tuple[str, str] | None = cert
self.auth: AuthBase | None = auth
self.tool: str = tool
self.timeout: float | tuple[float, float] | None = timeout
self.__session = requests.Session() # use one session to keep connection between requests
if https_adapter is not None:
self.__session.mount('https://', https_adapter)
if brotli_supported():
self.__session.headers['Accept-Encoding'] = ', '.join(('br', 'gzip', 'deflate'))
if http_auth_header_name in ['Authorization', 'X-MISP-AUTH']:
self.__session.headers[http_auth_header_name] = self.key
else:
raise PyMISPError('http_auth_header_name should be either "Authorization" or "X-MISP-AUTH"')
user_agent = f'PyMISP {__version__} - Python {".".join(str(x) for x in sys.version_info[:2])}'
if tool:
user_agent = f'{user_agent} - {tool}'
self.__session.headers['User-Agent'] = user_agent
if http_headers:
self.__session.headers.update(http_headers)
self._user_agent = f'PyMISP {__version__} - Python {".".join(str(x) for x in sys.version_info[:2])}'
self.global_pythonify = False
@ -3708,7 +3719,7 @@ class PyMISP:
def _check_response(self, response: requests.Response, lenient_response_type: bool = False, expect_json: bool = False) -> dict[str, Any] | str:
"""Check if the response from the server is not an unexpected error"""
if response.status_code >= 500:
headers_without_auth = {i: response.request.headers[i] for i in response.request.headers if i != 'Authorization'}
headers_without_auth = {h_name: h_value for h_name, h_value in response.request.headers.items() if h_value != self.key}
logger.critical(everything_broken.format(headers_without_auth, response.request.body, response.text))
raise MISPServerError(f'Error code 500:\n{response.text}')
@ -3778,14 +3789,11 @@ class PyMISP:
url = f'{url}/{to_append_url}'
req = requests.Request(request_type, url, data=d, params=params)
user_agent = f'{self._user_agent} - {self.tool}' if self.tool else self._user_agent
req.auth = self.auth
prepped = self.__session.prepare_request(req)
prepped.headers.update(
{'Authorization': self.key,
'Accept': f'application/{output_type}',
'content-type': f'application/{content_type}',
'User-Agent': user_agent})
{'Accept': f'application/{output_type}',
'content-type': f'application/{content_type}'})
logger.debug(prepped.headers)
settings = self.__session.merge_environment_settings(req.url, proxies=self.proxies or {}, stream=None,
verify=self.ssl, cert=self.cert)