grove.connectors package

Grove connectors.

class grove.connectors.BaseConnector(config: ConnectorConfig, context: Dict[str, str])[source]

Bases: object

LOG_ORDER = 'REVERSE_CHRONOLOGICAL'
NAME = 'base'
POINTER_PATH = 'NOT_SET'
cache_key(prefix: str = 'pointer') str[source]

Generates a cache key which uniquely identifies this connector.

This includes the name and identity to allow multiple instances of the same connector to be used concurrently.

Parameters:

prefix – A prefix for the cache key.

Returns:

The constructed cache key.

abstract collect()[source]

Provides a stub for a connector to initiate a collection.

deduplicate_by_hash(candidates: List[Any])[source]

Deduplicate log entries by their hash.

This is performed by generating a hash of the log entry, and comparing these hashes against recently seen events in the cache with the same pointer value. If a value is in the cache, then the log entry will be discarded.

Please note: This only applies to events which have a pointer value that matches the most recently saved. This is to prevent needing to keep large amounts of log entry hashes.

Parameters:

candidates – A list of log entries to deduplicate.

Returns:

A deduplicated list of log entries.

deduplicate_by_pointer(entries: List[Any])[source]

Deduplicate log entries by pointer values.

Deduplicates records which occur before or after a pointer on the current page - depending on whether log entries are in chronological or reverse chronological order. This enables deduplication of log events within the same page of results, and is intended to solve for cases where a provider’s filters are not as granular as the pointer value.

For example, some provider’s only allow filtering on a date (YYYY-MM-DD) while returning log entries with timestamps that have millisecond precision.

Parameters:

entries – A list of log entries to deduplicate.

Returns:

A deduplicated list of log entries.

finalize()[source]

Performs final steps after each save operation has complete.

hash_entries(entries: List[Any]) Dict[str, set[str]][source]

Hashes a list of log entries.

Parameters:

entries – List of log entries to hash

Returns:

A dictionary containing a set of log hashes, keyed by the pointer of each event.

hash_entry(entry: Any) str[source]

Serialise and hash the provided log entry.

This is intended to produce a unique identifier for this event which may be used for deduplication.

Parameters:

entry – The log entry to hash.

Returns:

A hash of a log entry.

property hashes: Dict[str, set[str]]

Return hashes for the most recently seen log entries.

Returns:

A dictionary of log entry hashes, keyed by their pointer.

lock()[source]

Attempts to acquire an execution lock for the current connector.

This will raise a ConcurrencyException if a a valid lock is already present.

Raises:

ConcurrencyException – A valid lock is already held, likely the result of a concurrent execution of Grove.

metadata() Dict[str, Any][source]

Returns contextual metadata associated with this collection.

Returns:

A dictionary of metadata for storing with log entries.

property pointer: str

Return the currently known pointer, fetching from cache if needed.

Returns:

Pointer associated with this configured connector.

property pointer_next: str

Return the currently known next pointer, fetching from cache if needed.

Returns:

The next pointer associated with this configured connector.

property pointer_previous: str

Return the previous pointer, fetching from cache if needed.

Returns:

The previous pointer, which will be an empty string if no previous pointer was found.

process(entries: List[Dict[str, Any]]) List[Dict[str, Any]][source]

Process log entries prior to saving.

Parameters:

entries – A list of log entries to process.

Returns:

A processed list of log entries.

process_and_write(entries: List[Any])[source]

Write log entries them to the configured output handler.

Parameters:

entries – List of log entries to process.

run()[source]

Connector entrypoint, called by the scheduler.

Wraps collect to handle exceptions and logging for collection. This method should NOT be implemented by connectors as it is only intended to provide a consistent calling and error handling mechanism when connectors are executed.

save(entries: List[Any])[source]

Saves log entries, and updates the pointer in the cache.

Parameters:

entries – List of log entries to save.

save_hashes()[source]

Saves the log entry hashes to cache.

save_window_end()[source]

Saves the window end location to cache.

save_window_start()[source]

Saves the window start location to cache.

unlock()[source]

Releases an execution lock for the current connector.

Raises:
  • AccessException – An unexpected error occurred while releasing the lock.

  • ConcurrencyException – The lock does not match the expected value. This may indicate a concurrent execution of Grove has since taken the lock.

property window_end: str | None

Return the window end location from cache, if set.

Returns:

The window end location.

property window_start: str

Return the window start location from cache, if set.

Returns:

The window start location, or an empty string if not found.

Subpackages