[docs]classConnector(BaseConnector):NAME="twilio_messages"POINTER_PATH="date_sent"LOG_ORDER=REVERSE_CHRONOLOGICAL@propertydefsecret(self):"""Fetches the API secret from the configuration. This is required as this is a third authentication element required by Twilio when using API authentication. :return: The value of the 'secret' field from the configuration. """try:returnself.configuration.secretexceptAttributeError:returnNone@lru_cache(maxsize=512)# noqa: B019def_carrier_lookup(self,number:str):"""Performs a Carrier lookup via the Twilio API. This is wrapped with an lru_cache to try and reduce lookup costs for retries of the same number between collections - assuming the runtime supports long lived LRU caches! :return: A dictionary of carrier information from Twilio. """carrier=self.client.lookups.v1.phone_numbers(number).fetch(type=["carrier"])# Extract only carrier and country fields, as we don't want to track PII here.return{"country_code":carrier.country_code,"carrier":carrier.carrier,}
[docs]defcollect(self):# noqa: C901"""Collects logs of all messages from the Twilio Messages API. This will first check whether there are any pointers cached to indicate previous collections. If not, the 24-hours of data will be collected. """# Construct the client based on whether "API key" or "auth token" authentication# is configured. The client is defined as an instance variable so convenience# methods can access the same client.ifself.secret:self.client=Client(self.key,self.secret,self.identity)else:self.client=Client(self.identity,self.key)# If no pointer is stored then a previous run hasn't been performed, so set the# pointer to 24-hours ago.try:_=self.pointerexceptNotFoundException:self.pointer=(datetime.now(timezone.utc)-timedelta(days=1)).strftime("%a, %d %b %Y %H:%M:%S %z")# Our LRU cache only has 512 slots, so during a back fill this may be exhausted.# To try and alleviate we'll track results in memory during a single execution,# too.carrier_information:Dict[str,Any]={}# Stream the data from the API, paging as required.entries=[]batch_size=1000try:events=self.client.messages.list(date_sent=self.pointer)exceptTwilioExceptionaserr:raiseRequestFailedException(err)foreventinevents:# Redaction and lookup of numbers will be performed based on the direction# of the message.incoming=Falseifevent.status=="received":number=event.from_incoming=Trueelse:number=event.to# Check results cache for carrier information before performing a lookup,# as these are expensive in terms of both time and cost (currently $0.005# per lookup).ifnumberincarrier_information:carrier=carrier_information[number]else:carrier=self._carrier_lookup(number)carrier_information[number]=carrier# Fields need to be removed from the logs to protect message contents and# PII, so we'll construct the log messages ourselves.## The use of RFC2822 format dates is horrible, but this is the format that# this API expects :(message={"account_sid":event.account_sid,"date_sent":event.date_sent.strftime("%a, %d %b %Y %H:%M:%S %z"),"status":event.status,"sid":event.sid,}# Sanitise the message for PII based on direction.ifincoming:message["to"]=event.totry:message["from"]=f"{event.from_[0:4]}..."exceptIndexError:message["from"]="..."else:message["from"]=event.from_try:message["to"]=f"{event.to[0:4]}..."exceptIndexError:message["to"]="..."# Track the message.entries.append({**message,**carrier})# Save batches, and clear the list for the next page.iflen(entries)==batch_size:self.save(entries)entries=[]# Save any records not previously saved, before completion.self.save(entries)