Source code for grove.connectors.onepassword.events_signinattempts

# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

"""1Password Signin Attempt event log connector for Grove."""

import datetime

from grove.connectors.onepassword.api import Client
from grove.connectors import BaseConnector
from grove.constants import CHRONOLOGICAL
from grove.exceptions import NotFoundException


[docs] class Connector(BaseConnector): NAME = "onepassword_events_signinattempts" POINTER_PATH = "timestamp" LOG_ORDER = CHRONOLOGICAL
[docs] def collect(self): """Collects all logs from the 1Password signin event log API. This will first check whether there are any pointers cached to indicate previous collections. If not, the last week of data will be collected. """ client = Client(token=self.key) cursor = None # If no pointer is stored then a previous run hasn't been performed, so set the # pointer to a week ago. In the case of the 1Password API this is an ISO # timestamp in a field called "timestamp". try: _ = self.pointer except NotFoundException: week_ago = datetime.datetime.now() - datetime.timedelta(days=7) self.pointer = (week_ago).astimezone().replace(microsecond=0).isoformat() # Get log data from the upstream API, paging as required. while True: log = client.get_signinattempts(start_time=self.pointer, cursor=cursor) # Save this batch of log entries. self.save(log.entries) # Check if we need to continue paging. cursor = log.cursor if cursor is None: break