Description

class SubscriberSourceActivityEvent(BaseKafkaModel):
    _topic = 'subscriber_sources_activities'
 
    def __init__(self, key: str, value: str, channel_id: int, timestamp: int = None,
                 sent_count: int = None, delivered_count: int = None,
                 clicked_count: int = None, closed_count: int = None, 
                 subscribed_count: int = None, unsubscribed_count: int = None, 
                 page_view_count: int = None, **kwargs):
        super(SubscriberSourceActivityEvent, self).__init__(**kwargs)
        self.key = key
        self.value = value
        self.channel_id = channel_id
        self.sent_count = sent_count or 0
        self.closed_count = closed_count or 0
        self.clicked_count = clicked_count or 0
        self.delivered_count = delivered_count or 0
        self.subscribed_count = subscribed_count or 0
        self.unsubscribed_count = unsubscribed_count or 0
        self.page_view_count = page_view_count or 0
        self.timestamp = timestamp or int(time.time())
 

Cleaning

In order to clean old source activity events there is a python script that should be executed on the AWS Kafka

import gc
 
from webpush.lib.kafka.clients import Consumer
from webpush.lib.kafka.models.subscribers import SubscriberSourceActivityEvent
 
i = 0
with Consumer([SubscriberSourceActivityEvent], 'statistics', prepare_topics=True) as consumer:
    while True:
        print('garbage collector run')
        gc.collect()
        for _ in range(2500):
            try:
                req = consumer.get(timeout=1)
            except TimeoutError:
                continue
            else:
                print(req)
                i += 1
                print(i)