Description
class SubscriberSourceActivityEvent(BaseKafkaModel):
_topic = 'subscriber_sources_activities'
def __init__(self, key: str, value: str, channel_id: int, timestamp: int = None,
sent_count: int = None, delivered_count: int = None,
clicked_count: int = None, closed_count: int = None,
subscribed_count: int = None, unsubscribed_count: int = None,
page_view_count: int = None, **kwargs):
super(SubscriberSourceActivityEvent, self).__init__(**kwargs)
self.key = key
self.value = value
self.channel_id = channel_id
self.sent_count = sent_count or 0
self.closed_count = closed_count or 0
self.clicked_count = clicked_count or 0
self.delivered_count = delivered_count or 0
self.subscribed_count = subscribed_count or 0
self.unsubscribed_count = unsubscribed_count or 0
self.page_view_count = page_view_count or 0
self.timestamp = timestamp or int(time.time())
Cleaning
In order to clean old source activity events there is a python script that should be executed on the AWS Kafka
import gc
from webpush.lib.kafka.clients import Consumer
from webpush.lib.kafka.models.subscribers import SubscriberSourceActivityEvent
i = 0
with Consumer([SubscriberSourceActivityEvent], 'statistics', prepare_topics=True) as consumer:
while True:
print('garbage collector run')
gc.collect()
for _ in range(2500):
try:
req = consumer.get(timeout=1)
except TimeoutError:
continue
else:
print(req)
i += 1
print(i)
Links
- producers:
- consumers: