From 3efabf1da371a27de04fcd8f4a67ae51afd6562b Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Mon, 22 Mar 2021 18:39:16 -0700 Subject: [PATCH] Creates activity stream class --- bookwyrm/activitystreams.py | 176 ++++++++++++++++++++++++++++++++++++ bookwyrm/models/status.py | 77 +--------------- bookwyrm/settings.py | 1 + bookwyrm/views/feed.py | 25 ++--- 4 files changed, 189 insertions(+), 90 deletions(-) create mode 100644 bookwyrm/activitystreams.py diff --git a/bookwyrm/activitystreams.py b/bookwyrm/activitystreams.py new file mode 100644 index 00000000..bceb1cad --- /dev/null +++ b/bookwyrm/activitystreams.py @@ -0,0 +1,176 @@ +""" access the activity streams stored in redis """ +from abc import ABC +from django.dispatch import receiver +from django.db.models import signals +from django.db.models import Q +import redis + +from bookwyrm import models, settings +from bookwyrm.views.helpers import privacy_filter + +r = redis.Redis( + host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0 +) + + +class ActivityStream(ABC): + """ a category of activity stream (like home, local, federated) """ + + def stream_id(self, user): + """ the redis key for this user's instance of this stream """ + return '{}-{}'.format(user.id, self.key) + + def unread_id(self, user): + """ the redis key for this user's unread count for this stream """ + return '{}-unread'.format(self.stream_id(user)) + + def add_status(self, status): + """ add a status to users' feeds """ + value = self.get_value(status) + # we want to do this as a bulk operation, hence "pipeline" + pipeline = r.pipeline() + for user in self.stream_users(status): + # add the status to the feed + pipeline.zadd(self.stream_id(user), value) + + # add to the unread status count + pipeline.incr(self.unread_id(user)) + # and go! + pipeline.execute() + + + def get_value(self, status): # pylint: disable=no-self-use + """ the status id and the rank (ie, published date) """ + return {status.id: status.published_date.timestamp()} + + + def get_activity_stream(self, user): + """ load the ids for statuses to be displayed """ + # clear unreads for this feed + r.set(self.unread_id(user), 0) + + statuses = r.zrevrange(self.stream_id(user), 0, -1) + return models.Status.objects.select_subclasses().filter( + id__in=statuses + ).order_by('-published_date') + + + def populate_stream(self, user): + ''' go from zero to a timeline ''' + pipeline = r.pipeline() + statuses = self.stream_statuses(user) + + stream_id = self.stream_id(user) + for status in statuses.all()[:settings.MAX_STREAM_LENGTH]: + pipeline.zadd(stream_id, self.get_value(status)) + pipeline.execute() + + + def stream_users(self, status): # pylint: disable=no-self-use + """ given a status, what users should see it """ + # direct messages don't appeard in feeds. + if status.privacy == 'direct': + return None + + # everybody who could plausibly see this status + audience = models.User.objects.filter( + is_active=True, + local=True # we only create feeds for users of this instance + ).exclude( + Q(id__in=status.user.blocks.all()) | Q(blocks=status.user) # not blocked + ) + + # only visible to the poster's followers and tagged users + if status.privacy == 'followers': + audience = audience.filter( + Q(id=status.user.id) # if the user is the post's author + | Q(following=status.user) # if the user is following the author + ) + return audience + + + def stream_statuses(self, user): # pylint: disable=no-self-use + """ given a user, what statuses should they see on this stream """ + return privacy_filter( + user, + models.Status.objects.select_subclasses(), + privacy_levels=["public", 'unlisted', 'followers'], + ) + + +class HomeStream(ActivityStream): + """ users you follow """ + key = 'home' + + def stream_users(self, status): + audience = super().stream_users(status) + return audience.filter( + Q(id=status.user.id) # if the user is the post's author + | Q(following=status.user) # if the user is following the author + | Q(id__in=status.mention_users.all()) # or the user is mentioned + ) + + def stream_statuses(self, user): + return privacy_filter( + user, + models.Status.objects.select_subclasses(), + privacy_levels=["public", 'unlisted', 'followers'], + following_only=True + ) + + +class LocalStream(ActivityStream): + """ users you follow """ + key = 'local' + + def stream_users(self, status): + # this stream wants no part in non-public statuses + if status.privacy != 'public': + return None + return super().stream_users(status) + + def stream_statuses(self, user): + # all public statuses by a local user + return privacy_filter( + user, + models.Status.objects.select_subclasses().filter(user__local=True), + privacy_levels=["public"], + ) + + +class FederatedStream(ActivityStream): + """ users you follow """ + key = 'federated' + + def stream_users(self, status): + # this stream wants no part in non-public statuses + if status.privacy != 'public': + return None + return super().stream_users(status) + + def stream_statuses(self, user): + return privacy_filter( + user, + models.Status.objects.select_subclasses(), + privacy_levels=["public"], + ) + + + +streams = { + 'home': HomeStream(), + 'local': LocalStream(), + 'federated': FederatedStream(), +} + +@receiver(signals.post_save) +# pylint: disable=unused-argument +def update_feeds(sender, instance, created, *args, **kwargs): + """ add statuses to activity feeds """ + # we're only interested in new statuses that aren't dms + if not created or not issubclass(sender, models.Status) or \ + instance.privacy == 'direct': + return + + for stream in streams.values(): + stream.add_status(instance) diff --git a/bookwyrm/models/status.py b/bookwyrm/models/status.py index 6d93ff0a..b313ac86 100644 --- a/bookwyrm/models/status.py +++ b/bookwyrm/models/status.py @@ -10,9 +10,8 @@ from django.dispatch import receiver from django.template.loader import get_template from django.utils import timezone from model_utils.managers import InheritanceManager -import redis -from bookwyrm import activitypub, settings +from bookwyrm import activitypub from .activitypub_mixin import ActivitypubMixin, ActivityMixin from .activitypub_mixin import OrderedCollectionPageMixin from .base_model import BookWyrmModel @@ -20,10 +19,6 @@ from .fields import image_serializer from .readthrough import ProgressMode from . import fields -r = redis.Redis( - host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0 -) - class Status(OrderedCollectionPageMixin, BookWyrmModel): """ any post, like a reply to a review, etc """ @@ -121,7 +116,7 @@ class Status(OrderedCollectionPageMixin, BookWyrmModel): return list(set(mentions)) @classmethod - def ignore_activity(cls, activity): + def ignore_activity(cls, activity): # pylint: disable=too-many-return-statements """ keep notes if they are replies to existing statuses """ if activity.type == "Announce": try: @@ -390,71 +385,3 @@ class Boost(ActivityMixin, Status): # This constraint can't work as it would cross tables. # class Meta: # unique_together = ('user', 'boosted_status') - - -@receiver(models.signals.post_save) -# pylint: disable=unused-argument -def update_feeds(sender, instance, created, *args, **kwargs): - """ add statuses to activity feeds """ - # we're only interested in new statuses that aren't dms - if not created or not issubclass(sender, Status) or instance.privacy == 'direct': - return - - user = instance.user - - community = user.__class__.objects.filter( - local=True # we only manage timelines for local users - ).exclude( - Q(id__in=user.blocks.all()) | Q(blocks=user) # not blocked - ) - - # ------ home timeline: users you follow and yourself - friends = community.filter( - Q(id=user.id) | Q(following=user) - ) - add_status(friends, instance, 'home') - - # local and federated timelines only get public statuses - if instance.privacy != 'public': - return - - # ------ federated timeline: to anyone, anywhere - add_status(community, instance, 'federated') - - # if the author is a remote user, it doesn't go on the local timeline - if not user.local: - return - - # ------ local timeline: to anyone, anywhere - add_status(community, instance, 'local') - - -def add_status(users, status, feed_name): - """ add a status to users' feeds """ - # we want to do this as a bulk operation - pipeline = r.pipeline() - value = {status.id: status.published_date.timestamp()} - for user in users: - feed_id = '{}-{}'.format(user.id, feed_name) - unread_feed_id = '{}-unread'.format(feed_id) - - # add the status to the feed - pipeline.zadd(feed_id, value) - - # add to the unread status count - pipeline.incr(unread_feed_id) - pipeline.execute() - - -def get_activity_stream(user, feed_name, start, end): - """ load the ids for statuses to be displayed """ - feed_id = '{}-{}'.format(user.id, feed_name) - unread_feed_id = '{}-unread'.format(feed_id) - - # clear unreads for this feed - r.set(unread_feed_id, 0) - - statuses = r.zrange(feed_id, start, end) - return Status.objects.select_subclasses().filter( - id__in=statuses - ).order_by('-published_date') diff --git a/bookwyrm/settings.py b/bookwyrm/settings.py index bc210dac..36806435 100644 --- a/bookwyrm/settings.py +++ b/bookwyrm/settings.py @@ -97,6 +97,7 @@ REDIS_ACTIVITY_HOST = env("REDIS_ACTIVITY_HOST", "localhost") REDIS_ACTIVITY_PORT = env("REDIS_ACTIVITY_PORT", 6379) MAX_STREAM_LENGTH = env("MAX_STREAM_LENGTH", 200) +STREAMS = ["home", "local", "federated"] # Database # https://docs.djangoproject.com/en/2.0/ref/settings/#databases diff --git a/bookwyrm/views/feed.py b/bookwyrm/views/feed.py index a19c2738..4c003f39 100644 --- a/bookwyrm/views/feed.py +++ b/bookwyrm/views/feed.py @@ -9,9 +9,9 @@ from django.utils.decorators import method_decorator from django.utils.translation import gettext as _ from django.views import View -from bookwyrm import forms, models +from bookwyrm import activitystreams, forms, models from bookwyrm.activitypub import ActivitypubResponse -from bookwyrm.settings import PAGE_LENGTH +from bookwyrm.settings import PAGE_LENGTH, STREAMS from .helpers import get_activity_feed, get_user_from_username from .helpers import is_api_request, is_bookwyrm_request, object_visible_to_user @@ -28,21 +28,16 @@ class Feed(View): except ValueError: page = 1 - try: - tab_title = { - 'home': _("Home"), - "local": _("Local"), - "federated": _("Federated") - }[tab] - except KeyError: + if not tab in STREAMS: tab = 'home' - tab_title = _("Home") - activities = models.status.get_activity_stream( - request.user, tab, - (1 - page) * PAGE_LENGTH, - page * PAGE_LENGTH - ) + tab_title = { + 'home': _("Home"), + "local": _("Local"), + "federated": _("Federated") + }[tab] + + activities = activitystreams.streams[tab].get_activity_stream(request.user) paginated = Paginator(activities, PAGE_LENGTH)