Creates activity stream class

This commit is contained in:
Mouse Reeve 2021-03-22 18:39:16 -07:00
parent 459479db43
commit 3efabf1da3
4 changed files with 189 additions and 90 deletions

176
bookwyrm/activitystreams.py Normal file
View File

@ -0,0 +1,176 @@
""" access the activity streams stored in redis """
from abc import ABC
from django.dispatch import receiver
from django.db.models import signals
from django.db.models import Q
import redis
from bookwyrm import models, settings
from bookwyrm.views.helpers import privacy_filter
r = redis.Redis(
host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0
)
class ActivityStream(ABC):
""" a category of activity stream (like home, local, federated) """
def stream_id(self, user):
""" the redis key for this user's instance of this stream """
return '{}-{}'.format(user.id, self.key)
def unread_id(self, user):
""" the redis key for this user's unread count for this stream """
return '{}-unread'.format(self.stream_id(user))
def add_status(self, status):
""" add a status to users' feeds """
value = self.get_value(status)
# we want to do this as a bulk operation, hence "pipeline"
pipeline = r.pipeline()
for user in self.stream_users(status):
# add the status to the feed
pipeline.zadd(self.stream_id(user), value)
# add to the unread status count
pipeline.incr(self.unread_id(user))
# and go!
pipeline.execute()
def get_value(self, status): # pylint: disable=no-self-use
""" the status id and the rank (ie, published date) """
return {status.id: status.published_date.timestamp()}
def get_activity_stream(self, user):
""" load the ids for statuses to be displayed """
# clear unreads for this feed
r.set(self.unread_id(user), 0)
statuses = r.zrevrange(self.stream_id(user), 0, -1)
return models.Status.objects.select_subclasses().filter(
id__in=statuses
).order_by('-published_date')
def populate_stream(self, user):
''' go from zero to a timeline '''
pipeline = r.pipeline()
statuses = self.stream_statuses(user)
stream_id = self.stream_id(user)
for status in statuses.all()[:settings.MAX_STREAM_LENGTH]:
pipeline.zadd(stream_id, self.get_value(status))
pipeline.execute()
def stream_users(self, status): # pylint: disable=no-self-use
""" given a status, what users should see it """
# direct messages don't appeard in feeds.
if status.privacy == 'direct':
return None
# everybody who could plausibly see this status
audience = models.User.objects.filter(
is_active=True,
local=True # we only create feeds for users of this instance
).exclude(
Q(id__in=status.user.blocks.all()) | Q(blocks=status.user) # not blocked
)
# only visible to the poster's followers and tagged users
if status.privacy == 'followers':
audience = audience.filter(
Q(id=status.user.id) # if the user is the post's author
| Q(following=status.user) # if the user is following the author
)
return audience
def stream_statuses(self, user): # pylint: disable=no-self-use
""" given a user, what statuses should they see on this stream """
return privacy_filter(
user,
models.Status.objects.select_subclasses(),
privacy_levels=["public", 'unlisted', 'followers'],
)
class HomeStream(ActivityStream):
""" users you follow """
key = 'home'
def stream_users(self, status):
audience = super().stream_users(status)
return audience.filter(
Q(id=status.user.id) # if the user is the post's author
| Q(following=status.user) # if the user is following the author
| Q(id__in=status.mention_users.all()) # or the user is mentioned
)
def stream_statuses(self, user):
return privacy_filter(
user,
models.Status.objects.select_subclasses(),
privacy_levels=["public", 'unlisted', 'followers'],
following_only=True
)
class LocalStream(ActivityStream):
""" users you follow """
key = 'local'
def stream_users(self, status):
# this stream wants no part in non-public statuses
if status.privacy != 'public':
return None
return super().stream_users(status)
def stream_statuses(self, user):
# all public statuses by a local user
return privacy_filter(
user,
models.Status.objects.select_subclasses().filter(user__local=True),
privacy_levels=["public"],
)
class FederatedStream(ActivityStream):
""" users you follow """
key = 'federated'
def stream_users(self, status):
# this stream wants no part in non-public statuses
if status.privacy != 'public':
return None
return super().stream_users(status)
def stream_statuses(self, user):
return privacy_filter(
user,
models.Status.objects.select_subclasses(),
privacy_levels=["public"],
)
streams = {
'home': HomeStream(),
'local': LocalStream(),
'federated': FederatedStream(),
}
@receiver(signals.post_save)
# pylint: disable=unused-argument
def update_feeds(sender, instance, created, *args, **kwargs):
""" add statuses to activity feeds """
# we're only interested in new statuses that aren't dms
if not created or not issubclass(sender, models.Status) or \
instance.privacy == 'direct':
return
for stream in streams.values():
stream.add_status(instance)

View File

@ -10,9 +10,8 @@ from django.dispatch import receiver
from django.template.loader import get_template from django.template.loader import get_template
from django.utils import timezone from django.utils import timezone
from model_utils.managers import InheritanceManager from model_utils.managers import InheritanceManager
import redis
from bookwyrm import activitypub, settings from bookwyrm import activitypub
from .activitypub_mixin import ActivitypubMixin, ActivityMixin from .activitypub_mixin import ActivitypubMixin, ActivityMixin
from .activitypub_mixin import OrderedCollectionPageMixin from .activitypub_mixin import OrderedCollectionPageMixin
from .base_model import BookWyrmModel from .base_model import BookWyrmModel
@ -20,10 +19,6 @@ from .fields import image_serializer
from .readthrough import ProgressMode from .readthrough import ProgressMode
from . import fields from . import fields
r = redis.Redis(
host=settings.REDIS_ACTIVITY_HOST, port=settings.REDIS_ACTIVITY_PORT, db=0
)
class Status(OrderedCollectionPageMixin, BookWyrmModel): class Status(OrderedCollectionPageMixin, BookWyrmModel):
""" any post, like a reply to a review, etc """ """ any post, like a reply to a review, etc """
@ -121,7 +116,7 @@ class Status(OrderedCollectionPageMixin, BookWyrmModel):
return list(set(mentions)) return list(set(mentions))
@classmethod @classmethod
def ignore_activity(cls, activity): def ignore_activity(cls, activity): # pylint: disable=too-many-return-statements
""" keep notes if they are replies to existing statuses """ """ keep notes if they are replies to existing statuses """
if activity.type == "Announce": if activity.type == "Announce":
try: try:
@ -390,71 +385,3 @@ class Boost(ActivityMixin, Status):
# This constraint can't work as it would cross tables. # This constraint can't work as it would cross tables.
# class Meta: # class Meta:
# unique_together = ('user', 'boosted_status') # unique_together = ('user', 'boosted_status')
@receiver(models.signals.post_save)
# pylint: disable=unused-argument
def update_feeds(sender, instance, created, *args, **kwargs):
""" add statuses to activity feeds """
# we're only interested in new statuses that aren't dms
if not created or not issubclass(sender, Status) or instance.privacy == 'direct':
return
user = instance.user
community = user.__class__.objects.filter(
local=True # we only manage timelines for local users
).exclude(
Q(id__in=user.blocks.all()) | Q(blocks=user) # not blocked
)
# ------ home timeline: users you follow and yourself
friends = community.filter(
Q(id=user.id) | Q(following=user)
)
add_status(friends, instance, 'home')
# local and federated timelines only get public statuses
if instance.privacy != 'public':
return
# ------ federated timeline: to anyone, anywhere
add_status(community, instance, 'federated')
# if the author is a remote user, it doesn't go on the local timeline
if not user.local:
return
# ------ local timeline: to anyone, anywhere
add_status(community, instance, 'local')
def add_status(users, status, feed_name):
""" add a status to users' feeds """
# we want to do this as a bulk operation
pipeline = r.pipeline()
value = {status.id: status.published_date.timestamp()}
for user in users:
feed_id = '{}-{}'.format(user.id, feed_name)
unread_feed_id = '{}-unread'.format(feed_id)
# add the status to the feed
pipeline.zadd(feed_id, value)
# add to the unread status count
pipeline.incr(unread_feed_id)
pipeline.execute()
def get_activity_stream(user, feed_name, start, end):
""" load the ids for statuses to be displayed """
feed_id = '{}-{}'.format(user.id, feed_name)
unread_feed_id = '{}-unread'.format(feed_id)
# clear unreads for this feed
r.set(unread_feed_id, 0)
statuses = r.zrange(feed_id, start, end)
return Status.objects.select_subclasses().filter(
id__in=statuses
).order_by('-published_date')

View File

@ -97,6 +97,7 @@ REDIS_ACTIVITY_HOST = env("REDIS_ACTIVITY_HOST", "localhost")
REDIS_ACTIVITY_PORT = env("REDIS_ACTIVITY_PORT", 6379) REDIS_ACTIVITY_PORT = env("REDIS_ACTIVITY_PORT", 6379)
MAX_STREAM_LENGTH = env("MAX_STREAM_LENGTH", 200) MAX_STREAM_LENGTH = env("MAX_STREAM_LENGTH", 200)
STREAMS = ["home", "local", "federated"]
# Database # Database
# https://docs.djangoproject.com/en/2.0/ref/settings/#databases # https://docs.djangoproject.com/en/2.0/ref/settings/#databases

View File

@ -9,9 +9,9 @@ from django.utils.decorators import method_decorator
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from django.views import View from django.views import View
from bookwyrm import forms, models from bookwyrm import activitystreams, forms, models
from bookwyrm.activitypub import ActivitypubResponse from bookwyrm.activitypub import ActivitypubResponse
from bookwyrm.settings import PAGE_LENGTH from bookwyrm.settings import PAGE_LENGTH, STREAMS
from .helpers import get_activity_feed, get_user_from_username from .helpers import get_activity_feed, get_user_from_username
from .helpers import is_api_request, is_bookwyrm_request, object_visible_to_user from .helpers import is_api_request, is_bookwyrm_request, object_visible_to_user
@ -28,21 +28,16 @@ class Feed(View):
except ValueError: except ValueError:
page = 1 page = 1
try: if not tab in STREAMS:
tab_title = {
'home': _("Home"),
"local": _("Local"),
"federated": _("Federated")
}[tab]
except KeyError:
tab = 'home' tab = 'home'
tab_title = _("Home")
activities = models.status.get_activity_stream( tab_title = {
request.user, tab, 'home': _("Home"),
(1 - page) * PAGE_LENGTH, "local": _("Local"),
page * PAGE_LENGTH "federated": _("Federated")
) }[tab]
activities = activitystreams.streams[tab].get_activity_stream(request.user)
paginated = Paginator(activities, PAGE_LENGTH) paginated = Paginator(activities, PAGE_LENGTH)