Source code for grove.connectors.torq.api

# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

"""Torq API client."""

import base64
import logging
from typing import Any, Dict, Optional

import requests

from grove.exceptions import RequestFailedException
from grove.types import AuditLogEntries, HTTPResponse

API_BASE_URI = "https://api.torq.io/public"
API_PAGE_SIZE = 100


[docs] class Client: def __init__( self, base_url: Optional[str] = API_BASE_URI, identity: Optional[str] = None, key: Optional[str] = None, bearer_token: Optional[str] = None, retry: Optional[bool] = True, ) -> None: """Setup a new client. :param base_url: The Torq api url - default is https://api.tor.io/public :param identity: The Torq integration client id :param key: The Torq integration client secret :param bearer_token: The allocated bearer token provided by Torq given our client id and client secret :param retry: Whether to automatically retry if recoverable errors are encountered, such as rate-limiting. """ self.base_url = base_url self.key = key self.identity = identity self.bearer_token = bearer_token self.retry = retry self.logger = logging.getLogger(__name__) self._set_default_headers() if self.bearer_token is None: self.refresh_bearer_token() def _set_default_headers(self): self.headers = { "Accept": "application/json", "Content-Type": "application/json", } def _get( self, url: str, params: Optional[Dict[str, Optional[str]]] = None ) -> HTTPResponse: """A GET wrapper to handle retries for the caller. :param url: A URL to perform the HTTP GET against. :param parameters: An optional set of HTTP parameters to add to the request. :raises RateLimitException: A rate limit was encountered. :raises RequestFailedException: An HTTP request failed. :return: HTTP Response object containing the headers and body of a response. """ try: response = requests.get(url, headers=self.headers, params=params) response.raise_for_status() except requests.exceptions.HTTPError as err: raise RequestFailedException(err) return HTTPResponse(headers=response.headers, body=response.json()) def _post( self, url: str, headers: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Optional[str]]] = None, ) -> Dict[str, Any]: """A POST wrapper to handle retries for the caller. :param url: URL to perform the HTTP POST against. :param headers: Dictionary of headers to add to the request. :param data: HTTP parameters to add to the request. :raises RequestFailedException: An HTTP request failed. :return: The response to the request in JSON. """ try: response = requests.post( url, headers=headers, data=data, ) response.raise_for_status() except requests.exceptions.HTTPError as err: raise RequestFailedException(err) return response.json()
[docs] def refresh_bearer_token(self): """Use Basic Auth to get Bearer Token :returns: If the request is successful, the bearer token is returned to the Client class header. """ auth_url = "https://auth.torq.io/v1/auth/token" # set basic auth value for authorisation header basic_auth = str( base64.b64encode(bytes(f"{self.identity}:{self.key}", "utf-8")), "utf-8", ) bearer_response = self._post( auth_url, data={ "grant_type": "client_credentials", }, headers={ "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", "Authorization": f"Basic {basic_auth}", }, ) self.bearer_token = bearer_response.get("access_token") if self.headers is None or self.headers == {}: self._set_default_headers() self.headers["Authorization"] = f"Bearer {self.bearer_token}"
[docs] def get_logs( self, endpoint: str, result_field: str, start_time: Optional[str] = None, limit: Optional[int] = API_PAGE_SIZE, cursor: Optional[str] = None, ) -> AuditLogEntries: """Fetches a list of logs from Torq which match the provided filters. :param start_time: The required date and time of the earliest log entry. Start times are in RFC3339 format, for example, 2022-03-09T08:40:18.490771179Z. :param result_field: The key name for the list of logs in the returned json. :param to_date: The required date and time in UTC of the latest log entry. :param limit: The maximum number of items to include in a single response. :param cursor: The cursor to use when paging. :return: AuditLogEntries object containing a pagination cursor, and log entries. """ result = self._get( f"{self.base_url}/{endpoint}", params={ "start_time": start_time, "page_size": str(limit), "page_token": cursor, }, ) # get the list of logs from the result_field parameters value data = result.body.get(result_field, []) # keep paging until we meet the total number of results cursor = result.body.get("next_page_token", None) if cursor == "": cursor = None # Return the cursor and the results to allow the caller to page as required. return AuditLogEntries(cursor=cursor, entries=data)
[docs] def get_audit_logs( self, start_time: Optional[str] = None, cursor: Optional[str] = None ) -> AuditLogEntries: """Fetches a list of audit logs from Torq which match the provided filters. :param start_time: The required date and time of the earliest log entry. Start times are in RFC 3339 format, for example, 2022-03-09T08:40:18.490771179Z. :param to_date: The required date and time in UTC of the latest log entry. :param cursor: The cursor to use when paging. :return: AuditLogEntries object containing a pagination cursor, and log entries. """ return self.get_logs( endpoint="v1alpha/audit_logs", result_field="audit_logs", start_time=start_time, cursor=cursor, )
[docs] def get_activity_logs( self, start_time: Optional[str] = None, cursor: Optional[str] = None ) -> AuditLogEntries: """Fetches a list of activity logs from Torq which match the provided filters. :param start_time: The required date and time of the earliest log entry. Start times are in RFC 3339 format, for example, 2022-03-09T08:40:18.490771179Z. :param to_date: The required date and time in UTC of the latest log entry. :param cursor: The cursor to use when paging. :return: AuditLogEntries object containing a pagination cursor, and log entries. """ return self.get_logs( endpoint="v1alpha/activity_logs", result_field="activity_logs", start_time=start_time, cursor=cursor, )