diff --git a/bookwyrm/lists_stream.py b/bookwyrm/lists_stream.py new file mode 100644 index 00000000..82cef5e4 --- /dev/null +++ b/bookwyrm/lists_stream.py @@ -0,0 +1,238 @@ +""" access the activity streams stored in redis """ +from django.dispatch import receiver +from django.db import transaction +from django.db.models import signals, Q + +from bookwyrm import models +from bookwyrm.redis_store import RedisStore +from bookwyrm.tasks import app, LOW, MEDIUM, HIGH + + +class ListsStream(RedisStore): + """a category of activity stream (like home, local, books)""" + + def stream_id(self, user): # pylint: disable=no-self-use + """the redis key for this user's instance of this stream""" + return f"{user.id}-lists" + + def get_rank(self, obj): # pylint: disable=no-self-use + """lists are sorted by date published""" + return obj.updated_date.timestamp() + + def add_list(self, book_list): + """add a list to users' feeds""" + # the pipeline contains all the add-to-stream activities + self.add_object_to_related_stores(book_list) + + def add_user_lists(self, viewer, user): + """add a user's listes to another user's feed""" + # only add the listes that the viewer should be able to see + lists = models.List.filter(user=user).privacy_filter(viewer) + self.bulk_add_objects_to_store(lists, self.stream_id(viewer)) + + def remove_user_lists(self, viewer, user): + """remove a user's list from another user's feed""" + # remove all so that followers only lists are removed + lists = user.list_set.all() + self.bulk_remove_objects_from_store(lists, self.stream_id(viewer)) + + def get_activity_stream(self, user): + """load the lists to be displayed""" + lists = self.get_store(self.stream_id(user)) + return ( + models.List.objects.filter(id__in=lists) + .select_related( + "user", + ) + .prefetch_related("listitem_set") + .order_by("-updated_date") + ) + + def populate_streams(self, user): + """go from zero to a timeline""" + self.populate_store(self.stream_id(user)) + + def get_audience(self, book_list): # pylint: disable=no-self-use + """given a list, what users should see it""" + # everybody who could plausibly see this list + audience = models.User.objects.filter( + is_active=True, + local=True, # we only create feeds for users of this instance + ).exclude( # not blocked + Q(id__in=book_list.user.blocks.all()) | Q(blocks=book_list.user) + ) + + # TODO: groups + + # only visible to the poster and mentioned users + if book_list.privacy == "direct": + audience = audience.filter( + Q(id=list.user.id) # if the user is the post's author + ) + # only visible to the poster's followers and tagged users + elif book_list.privacy == "followers": + audience = audience.filter( + Q(id=book_list.user.id) # if the user is the post's author + | Q(following=book_list.user) # if the user is following the author + ) + return audience.distinct() + + def get_stores_for_object(self, obj): + return [self.stream_id(u) for u in self.get_audience(obj)] + + def get_lists_for_user(self, user): # pylint: disable=no-self-use + """given a user, what lists should they see on this stream""" + return models.List.privacy_filter( + user, + privacy_levels=["public", "followers"], + ) + + def get_objects_for_store(self, store): + user = models.User.objects.get(id=store.split("-")[0]) + return self.get_lists_for_user(user) + + +@receiver(signals.post_save, sender=models.List) +# pylint: disable=unused-argument +def add_list_on_create(sender, instance, created, *args, **kwargs): + """add newly created lists to activity feeds""" + # we're only interested in new lists + if not issubclass(sender, models.List): + return + + if instance.deleted: + remove_list_task.delay(instance.id) + return + + # when creating new things, gotta wait on the transaction + transaction.on_commit(lambda: add_list_on_create_command(instance, created)) + + +def add_list_on_create_command(instance, created): + """runs this code only after the database commit completes""" + priority = HIGH + # check if this is an old list, de-prioritize if so + # (this will happen if federation is very slow, or, more expectedly, on csv import) + one_day = 60 * 60 * 24 + if (instance.created_date - instance.published_date).seconds > one_day: + priority = LOW + + add_list_task.apply_async( + args=(instance.id,), + kwargs={"increment_unread": created}, + queue=priority, + ) + + +@receiver(signals.post_save, sender=models.UserFollows) +# pylint: disable=unused-argument +def add_lists_on_follow(sender, instance, created, *args, **kwargs): + """add a newly followed user's lists to feeds""" + if not created or not instance.user_subject.local: + return + add_user_lists_task.delay( + instance.user_subject.id, instance.user_object.id, stream_list=["home"] + ) + + +@receiver(signals.post_delete, sender=models.UserFollows) +# pylint: disable=unused-argument +def remove_lists_on_unfollow(sender, instance, *args, **kwargs): + """remove lists from a feed on unfollow""" + if not instance.user_subject.local: + return + remove_user_lists_task.delay( + instance.user_subject.id, instance.user_object.id, stream_list=["home"] + ) + + +@receiver(signals.post_save, sender=models.UserBlocks) +# pylint: disable=unused-argument +def remove_lists_on_block(sender, instance, *args, **kwargs): + """remove lists from all feeds on block""" + # blocks apply ot all feeds + if instance.user_subject.local: + remove_user_lists_task.delay(instance.user_subject.id, instance.user_object.id) + + # and in both directions + if instance.user_object.local: + remove_user_lists_task.delay(instance.user_object.id, instance.user_subject.id) + + +@receiver(signals.post_delete, sender=models.UserBlocks) +# pylint: disable=unused-argument +def add_lists_on_unblock(sender, instance, *args, **kwargs): + """add lists back to all feeds on unblock""" + # make sure there isn't a block in the other direction + if models.UserBlocks.objects.filter( + user_subject=instance.user_object, + user_object=instance.user_subject, + ).exists(): + return + + # add lists back to streams with lists from anyone + if instance.user_subject.local: + add_user_lists_task.delay( + instance.user_subject.id, + instance.user_object.id, + ) + + # add lists back to streams with lists from anyone + if instance.user_object.local: + add_user_lists_task.delay( + instance.user_object.id, + instance.user_subject.id, + ) + + +@receiver(signals.post_save, sender=models.User) +# pylint: disable=unused-argument +def populate_streams_on_account_create(sender, instance, created, *args, **kwargs): + """build a user's feeds when they join""" + if not created or not instance.local: + return + + populate_lists_stream_task.delay(instance.id) + + +# ---- TASKS +@app.task(queue=MEDIUM) +def populate_lists_stream_task(user_id): + """background task for populating an empty activitystream""" + user = models.User.objects.get(id=user_id) + ListsStream().populate_streams(user) + + +@app.task(queue=MEDIUM) +def remove_list_task(list_ids): + """remove a list from any stream it might be in""" + # this can take an id or a list of ids + if not isinstance(list_ids, list): + list_ids = [list_ids] + lists = models.List.objects.filter(id__in=list_ids) + + for book_list in lists: + ListsStream().remove_object_from_related_stores(book_list) + + +@app.task(queue=HIGH) +def add_list_task(list_id): + """add a list to any stream it should be in""" + book_list = models.List.objects.get(id=list_id) + ListsStream().add_list(book_list) + + +@app.task(queue=MEDIUM) +def remove_user_lists_task(viewer_id, user_id): + """remove all lists by a user from a viewer's stream""" + viewer = models.User.objects.get(id=viewer_id) + user = models.User.objects.get(id=user_id) + ListsStream().remove_user_lists(viewer, user) + + +@app.task(queue=MEDIUM) +def add_user_lists_task(viewer_id, user_id): + """add all lists by a user to a viewer's stream""" + viewer = models.User.objects.get(id=viewer_id) + user = models.User.objects.get(id=user_id) + ListsStream().add_user_lists(viewer, user)