# Copyright (c) HashiCorp, Inc.# SPDX-License-Identifier: MPL-2.0"""Atlassian API client.As Atlassian does not currently support the Events API, this client has been created inthe interim."""importloggingimporttimefromtypingimportDict,Optionalfromdatetimeimportdatetime,timezoneimportrequestsfromgrove.exceptionsimportRateLimitException,RequestFailedExceptionfromgrove.typesimportAuditLogEntries,HTTPResponseAPI_BASE_URI="https://api.atlassian.com/admin/v1/orgs/{identity}"API_DATE_FORMAT="%Y-%m-%dT%H:%M%z"
[docs]classClient:def__init__(self,identity:Optional[str]=None,token:Optional[str]=None,retry:Optional[bool]=True,):"""Setup a new client. :param identity: The name of the Atlassian organisation. :param token: The Atlassian API token. :param retry: Automatically retry if recoverable errors are encountered, such as rate-limiting. """self.logger=logging.getLogger(__name__)self.retry=retryself.headers={"Accept":"application/json","Authorization":f"Bearer {token}",}# We need to push the identity into the URI, so we'll keep track of this.self._api_base_uri=API_BASE_URI.format(identity=identity)def_get(self,url:str,params:Optional[Dict[str,Optional[str]]]=None,)->HTTPResponse:"""A GET wrapper to handle retries for the caller. :param url: URL to perform the HTTP GET against. :param params: HTTP parameters to add to the request. :raises RateLimitException: A rate limit was encountered. :raises RequestFailedException: An HTTP request failed. :return: HTTP Response object containing the headers and body of a response. """whileTrue:try:response=requests.get(url,headers=self.headers,params=params)response.raise_for_status()breakexceptrequests.exceptions.RequestExceptionaserr:ifint(getattr(err.response,"status_code",0))!=429:raiseRequestFailedException(err)ifnotself.retry:raiseRateLimitException(err)# Retry on rate-limit, but only if requested or if the API does not# return a reset at time.reset_at=err.response.headers.get("X-Ratelimit-Reset")ifnotreset_at:raiseRateLimitException(err)self.logger.warning("Rate-limit was exceeded during collection.",extra={"X-Ratelimit-Reset":reset_at,})# Work out how long we need to wait, and if too long, just bail early.time_current=int(datetime.now(timezone.utc).strftime("%s"))time_reset=int(datetime.strptime(reset_at,API_DATE_FORMAT).strftime("%s"))# noqa: E501time_wait=time_reset-time_currentiftime_wait>=180:raiseRateLimitException(err)iftime_wait<=0:continuetime.sleep(time_wait)returnHTTPResponse(headers=response.headers,body=response.json())
[docs]defget_audit(self,cursor:Optional[str]=None,from_date:Optional[str]=None,)->AuditLogEntries:"""Fetches a list of signing attempt logs. :param cursor: Cursor to use when fetching results. Supersedes other parameters. :param from_date: The earliest date an event represented as a UNIX epoch time. :return: AuditLogEntries object containing a pagination cursor, and log entries. """url=f"{self._api_base_uri}/events"# Use the cursor if set, otherwise construct the initial query.ifcursorisnotNone:self.logger.debug("Collecting next page with provided cursor.",extra={"cursor":cursor})result=self._get(url,params={"from":from_date,"cursor":cursor})else:self.logger.debug("Collecting first page with provided",extra={"from_date":from_date})result=self._get(url,params={"from":from_date})cursor=result.body.get("meta",{}).get("next",None)# Return the cursor and the results to allow the caller to page as required.returnAuditLogEntries(cursor=cursor,entries=result.body.get("data",[]))