Source code for grove.connectors.gsuite.activities

# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

"""Google GSuite Activities connector for Grove."""

import json
from datetime import datetime, timedelta

import google_auth_httplib2
import httplib2
from google.auth.exceptions import GoogleAuthError
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import Error as GACError

from grove.connectors import BaseConnector
from grove.constants import REVERSE_CHRONOLOGICAL
from grove.exceptions import (
    ConfigurationException,
    NotFoundException,
    RequestFailedException,
)

# This connector is only interested in the GSuite Reports API.
SCOPES = ["https://www.googleapis.com/auth/admin.reports.audit.readonly"]


[docs] def as_rfc3339(date: datetime) -> str: """Formats an input date into RFC3339 format. :param date: The input datetime object to convert to RFC3339 format. :return: An RFC 3339 format date as a string. """ return date.isoformat(sep="T", timespec="milliseconds") + "Z"
[docs] class Connector(BaseConnector): NAME = "gsuite_activities" POINTER_PATH = "id.time" LOG_ORDER = REVERSE_CHRONOLOGICAL @property def delay(self): """Defines the amount of time to delay collection of logs (in minutes). This is used to allow time for logs to become 'consistent' before they are collected. Google backfill log entries based on their published lag guidelines. As a result of this, collection of events within this lag window may result in missed events. As a result of these constraints, this value is configurable to allow operators to preference consistency over speed of delivery, and vice versa. For example, a delay of 20 would instruct Grove to only collect logs after they are at least 20 minutes old. This defaults to 0 (no delay). :return: The "delay" component of the connector configuration. """ try: candidate = self.configuration.delay except AttributeError: return 0 try: candidate = int(candidate) except ValueError as err: raise ConfigurationException( f"Configured 'delay' is not valid. Value must be an integer. {err}" ) return candidate
[docs] def collect(self): """Collects activities from the Google GSuite Reports API. As the Google APIs use OAuth 2.0 2LO ('two-legged OAuth') which contains a number of fields inside of a JSON 'service account file' the key and identity are treated a little differently in this connector. Rather than the key being a single authentication token, the key should contain the entire 'service account file' in JSON format - as generated by the Google API console. The identity must be the name of a service account which has been granted domain wide delegation. Please see the following guides for more information: https://developers.google.com/admin-sdk/directory/v1/guides/delegation https://developers.google.com/admin-sdk/reports/reference/rest/v1/activities :raises RequestFailedException: An HTTP request failed. """ cursor = str() http = google_auth_httplib2.AuthorizedHttp( self.get_credentials(), http=self.get_http_transport(), ) # If no pointer is stored then a previous run hasn't been performed, so set the # pointer to a week ago. In the case of the GSuite reports API the pointer is # the value of the id.time field from the latest record retrieved from the API. try: _ = self.pointer except NotFoundException: self.pointer = as_rfc3339(datetime.utcnow() - timedelta(days=7)) # Page over all activities since the last pointer. more_requests = True with build("admin", "reports_v1", http=http) as service: while more_requests: # Transform the dates back to native to allow timedelta and comparison. start = datetime.fromisoformat(self.pointer.replace("Z", "")) end = datetime.utcnow() - timedelta(minutes=self.delay) if end <= start: self.logger.debug( "Collection end time is prior to start, skipping.", extra={ "start": as_rfc3339(start), "end": as_rfc3339(end), **self.log_context, }, ) break if cursor: self.logger.debug( "Using pageToken as cursor to request next page of activities.", extra={"cursor": cursor, **self.log_context}, ) request = service.activities().list( userKey="all", applicationName=self.operation, startTime=as_rfc3339(start), endTime=as_rfc3339(end), pageToken=cursor, ) else: self.logger.debug( "Using startTime from pointer to request activity list.", extra={ "start": as_rfc3339(start), "end": as_rfc3339(end), **self.log_context, }, ) request = service.activities().list( userKey="all", startTime=as_rfc3339(start), endTime=as_rfc3339(end), applicationName=self.operation, ) # As both the _entries and activities objects are Lists, we can extend # them in order to support appending more data on any subsequent # iterations (due to paging). try: results = request.execute() activities = results.get("items", []) self.logger.debug( "Got activities from the GSuite API.", extra={ "count": len(activities), **self.log_context, }, ) self.save(activities) except (GoogleAuthError, GACError) as err: raise RequestFailedException(f"Request to GSuite API failed: {err}") # Determine whether we're still paging. if "nextPageToken" not in results: self.logger.debug( "No nextPageToken, finishing collection.", extra=self.log_context, ) more_requests = False break self.logger.debug( "nextPageToken is present, paging.", extra=self.log_context ) cursor = results["nextPageToken"] more_requests = True
[docs] def get_http_transport(self): """Creates an HTTP object for use by the Google API Client. :return: An httplib2.Http object for use with the Google API client. """ return httplib2.Http()
[docs] def get_credentials(self): """Generates and returns a credentials instance from the connector's configured service account info. This is used for required to perform operations using the Google API client. :return: A credentials instance built from configured service account info. :raises ConfigurationException: There is an issue with the configuration for this connector. """ try: service_account_info = json.loads(self.key) except json.JSONDecodeError as err: raise ConfigurationException( f"Unable to load service account JSON for {self.identity}: {err}" ) # Construct the credentials, including scopes and delegation. try: credentials = service_account.Credentials.from_service_account_info( service_account_info, scopes=SCOPES, subject=self.identity, ) except ValueError as err: raise ConfigurationException( "Unable to generate credentials from service account info for " f" {self.identity}: {err}" ) return credentials