[docs]classConnector(BaseConnector):NAME="okta_system_log"POINTER_PATH="published"LOG_ORDER=CHRONOLOGICAL@propertydefdomain(self):"""Fetches the Okta domain from the configuration. This field is used to allow configuration of collection of log data from specific Okta domains, including okta-emea.com, and oktapreview.com. This must not include the customer name / organisation name. The default is 'okta.com'. :return: The "domain" portion of the connector's configuration. """try:returnself.configuration.domainexceptAttributeError:return"okta.com"
[docs]defcollect(self):"""Collects all logs from the Okta Audit API. This will first check whether there are any pointers cached to indicate previous collections. If not, the last week of data will be collected. """client=Client(identity=self.identity,token=self.key,domain=self.domain)cursor=None# If no pointer is stored then a previous run hasn't been performed, so set the# pointer to a week ago.now=datetime.utcnow()try:_=self.pointerexceptNotFoundException:self.pointer=(now-timedelta(days=7)).isoformat(sep="T",timespec="milliseconds",)+"Z"# Get log data from the upstream API, paging if required.whileTrue:log=client.get_audit_logs(since=self.pointer,cursor=cursor)# Save this batch of log entries.self.save(log.entries)# Check if we need to continue paging.cursor=log.cursorifcursorisNone:break