From bb04a400441b097088d0e4c84a344fd3a31fafc3 Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Tue, 31 Mar 2020 16:31:33 -0700 Subject: [PATCH 1/6] Move to redis and fix a bunch of things --- .env.example | 7 +- Dockerfile | 2 + docker-compose.yml | 18 ++-- fedireads/__init__.py | 9 -- fedireads/incoming.py | 26 ++--- fedireads/settings.py | 13 +-- fedireads/tasks.py | 42 +++++++++ fr_celery/__init__.py | 10 ++ fr_celery/asgi.py | 16 ++++ {fedireads => fr_celery}/celery.py | 8 +- fr_celery/settings.py | 146 +++++++++++++++++++++++++++++ fr_celery/urls.py | 21 +++++ fr_celery/wsgi.py | 16 ++++ requirements.txt | 2 + 14 files changed, 286 insertions(+), 50 deletions(-) create mode 100644 fedireads/tasks.py create mode 100644 fr_celery/__init__.py create mode 100644 fr_celery/asgi.py rename {fedireads => fr_celery}/celery.py (74%) create mode 100644 fr_celery/settings.py create mode 100644 fr_celery/urls.py create mode 100644 fr_celery/wsgi.py diff --git a/.env.example b/.env.example index a7a37c81..dbb5e0bf 100644 --- a/.env.example +++ b/.env.example @@ -22,6 +22,7 @@ POSTGRES_USER=fedireads POSTGRES_DB=fedireads POSTGRES_HOST=db -RABBITMQ_DEFAULT_USER=rabbit -RABBITMQ_DEFAULT_PASS=changeme -CELERY_BROKER=amqp://rabbit:changeme@rabbitmq:5672 +CELERY_BROKER=redis://redis:6379/0 +CELERY_RESULT_BACKEND=redis://redis:6379/0 + +FLOWER_PORT=5555 diff --git a/Dockerfile b/Dockerfile index c845035f..f6dbb740 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,3 +5,5 @@ WORKDIR /app COPY requirements.txt /app/ RUN pip install -r requirements.txt COPY ./fedireads /app +COPY ./fr_celery /app +EXPOSE 5555 diff --git a/docker-compose.yml b/docker-compose.yml index a119f25d..cb20794e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,25 +20,31 @@ services: - celery_worker networks: - main - rabbitmq: + redis: + image: redis env_file: .env - image: rabbitmq:latest + ports: + - "6379:6379" networks: - main - ports: - - "5672:5672" restart: on-failure celery_worker: env_file: .env build: . networks: - main - command: celery -A fedireads worker -l info + command: celery -A fr_celery worker -l info volumes: - .:/app depends_on: - db - - rabbitmq + - redis + restart: on-failure + flower: + image: mher/flower + env_file: .env + ports: + - "5555:5555" restart: on-failure volumes: pgdata: diff --git a/fedireads/__init__.py b/fedireads/__init__.py index 18666a18..e69de29b 100644 --- a/fedireads/__init__.py +++ b/fedireads/__init__.py @@ -1,9 +0,0 @@ -''' we need this file to initialize celery ''' -from __future__ import absolute_import, unicode_literals - -# This will make sure the app is always imported when -# Django starts so that shared_task will use this app. -from .celery import app as celery_app - -__all__ = ('celery_app',) - diff --git a/fedireads/incoming.py b/fedireads/incoming.py index a766cb68..252a1ecf 100644 --- a/fedireads/incoming.py +++ b/fedireads/incoming.py @@ -13,6 +13,7 @@ import requests from fedireads import models, outgoing from fedireads import status as status_builder from fedireads.remote_user import get_or_create_remote_user +from fedireads import tasks @csrf_exempt @@ -256,31 +257,16 @@ def handle_create(activity): def handle_favorite(activity): ''' approval of your good good post ''' - try: - status_id = activity['object'].split('/')[-1] - status = models.Status.objects.get(id=status_id) - liker = get_or_create_remote_user(activity['actor']) - except (models.Status.DoesNotExist, models.User.DoesNotExist): - return HttpResponseNotFound() - - if not liker.local: - status_builder.create_favorite_from_activity(liker, activity) - - status_builder.create_notification( - status.user, - 'FAVORITE', - related_user=liker, - related_status=status, - ) + print('hiii!') + tasks.handle_incoming_favorite.delay(activity) return HttpResponse() def handle_unfavorite(activity): ''' approval of your good good post ''' - try: - favorite_id = activity['object']['id'] - fav = status_builder.get_favorite(favorite_id) - except models.Favorite.DoesNotExist: + favorite_id = activity['object']['id'] + fav = status_builder.get_favorite(favorite_id) + if not fav: return HttpResponseNotFound() fav.delete() diff --git a/fedireads/settings.py b/fedireads/settings.py index 3c951496..ff93ad14 100644 --- a/fedireads/settings.py +++ b/fedireads/settings.py @@ -5,6 +5,13 @@ from environs import Env env = Env() +# celery +CELERY_BROKER = env('CELERY_BROKER') +CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') +CELERY_ACCEPT_CONTENT = ['application/json'] +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_SERIALIZER = 'json' + # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -21,12 +28,6 @@ DOMAIN = env('DOMAIN') ALLOWED_HOSTS = env.list('ALLOWED_HOSTS', ['*']) OL_URL = env('OL_URL') -# celery/rebbitmq -CELERY_BROKER_URL = env('CELERY_BROKER') -CELERY_ACCEPT_CONTENT = ['json'] -CELERY_TASK_SERIALIZER = 'json' -CELERY_RESULT_BACKEND = 'amqp' - # Application definition INSTALLED_APPS = [ diff --git a/fedireads/tasks.py b/fedireads/tasks.py new file mode 100644 index 00000000..760dffef --- /dev/null +++ b/fedireads/tasks.py @@ -0,0 +1,42 @@ +''' background tasks ''' +from celery import Celery +import os + +from fedireads import models +from fedireads import status as status_builder +from fedireads.outgoing import get_or_create_remote_user +from fedireads import settings + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fr_celery.settings') +app = Celery( + 'tasks', + broker=settings.CELERY_BROKER, +) + + +@app.task +def handle_incoming_favorite(activity): + ''' ugh ''' + print('here we go') + try: + status_id = activity['object'].split('/')[-1] + print(status_id) + status = models.Status.objects.get(id=status_id) + liker = get_or_create_remote_user(activity['actor']) + except (models.Status.DoesNotExist, models.User.DoesNotExist): + print('gonna return') + return + + print('got the status okay') + if not liker.local: + status_builder.create_favorite_from_activity(liker, activity) + + status_builder.create_notification( + status.user, + 'FAVORITE', + related_user=liker, + related_status=status, + ) + print('done') + diff --git a/fr_celery/__init__.py b/fr_celery/__init__.py new file mode 100644 index 00000000..3e6ab9e5 --- /dev/null +++ b/fr_celery/__init__.py @@ -0,0 +1,10 @@ +''' we need this file to initialize celery ''' +from __future__ import absolute_import, unicode_literals + +# This will make sure the app is always imported when +# Django starts so that shared_task will use this app. +from .celery import app as celery_app + +__all__ = ('celery_app',) + + diff --git a/fr_celery/asgi.py b/fr_celery/asgi.py new file mode 100644 index 00000000..f66a43b9 --- /dev/null +++ b/fr_celery/asgi.py @@ -0,0 +1,16 @@ +""" +ASGI config for fr_celery project. + +It exposes the ASGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/3.0/howto/deployment/asgi/ +""" + +import os + +from django.core.asgi import get_asgi_application + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fr_celery.settings') + +application = get_asgi_application() diff --git a/fedireads/celery.py b/fr_celery/celery.py similarity index 74% rename from fedireads/celery.py rename to fr_celery/celery.py index 7c26dc06..3368a8fb 100644 --- a/fedireads/celery.py +++ b/fr_celery/celery.py @@ -5,9 +5,9 @@ import os from celery import Celery # set the default Django settings module for the 'celery' program. -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fedireads.settings') +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fr_celery.settings') -app = Celery('fedireads') +app = Celery('fr_celery') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. @@ -18,7 +18,3 @@ app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() - -@app.task(bind=True) -def debug_task(self): - print('Request: {0!r}'.format(self.request)) diff --git a/fr_celery/settings.py b/fr_celery/settings.py new file mode 100644 index 00000000..48aa1643 --- /dev/null +++ b/fr_celery/settings.py @@ -0,0 +1,146 @@ +""" +Django settings for fr_celery project. + +Generated by 'django-admin startproject' using Django 3.0.3. + +For more information on this file, see +https://docs.djangoproject.com/en/3.0/topics/settings/ + +For the full list of settings and their values, see +https://docs.djangoproject.com/en/3.0/ref/settings/ +""" + +import os +from environs import Env + +env = Env() + +# Build paths inside the project like this: os.path.join(BASE_DIR, ...) +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + +# celery/rebbitmq +CELERY_BROKER_URL = env('CELERY_BROKER') +CELERY_ACCEPT_CONTENT = ['json'] +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_BACKEND = 'redis' + + +# Quick-start development settings - unsuitable for production +# See https://docs.djangoproject.com/en/3.0/howto/deployment/checklist/ + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY = '0a^0gpwjc1ap+lb$dinin=efc@e&_0%102$o3(>9e7lndiaw' + +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG = True + +ALLOWED_HOSTS = [] + + +# Application definition + +INSTALLED_APPS = [ + 'django.contrib.admin', + 'django.contrib.auth', + 'django.contrib.contenttypes', + 'django.contrib.sessions', + 'django.contrib.messages', + 'django.contrib.staticfiles', + 'fr_celery', + 'fedireads', + 'celery', +] + +MIDDLEWARE = [ + 'django.middleware.security.SecurityMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.middleware.common.CommonMiddleware', + 'django.middleware.csrf.CsrfViewMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + 'django.contrib.messages.middleware.MessageMiddleware', + 'django.middleware.clickjacking.XFrameOptionsMiddleware', +] + +ROOT_URLCONF = 'fr_celery.urls' + +TEMPLATES = [ + { + 'BACKEND': 'django.template.backends.django.DjangoTemplates', + 'DIRS': [], + 'APP_DIRS': True, + 'OPTIONS': { + 'context_processors': [ + 'django.template.context_processors.debug', + 'django.template.context_processors.request', + 'django.contrib.auth.context_processors.auth', + 'django.contrib.messages.context_processors.messages', + ], + }, + }, +] + +WSGI_APPLICATION = 'fr_celery.wsgi.application' + + +# Database +# https://docs.djangoproject.com/en/3.0/ref/settings/#databases + +FEDIREADS_DATABASE_BACKEND = env('FEDIREADS_DATABASE_BACKEND', 'postgres') + +FEDIREADS_DBS = { + 'postgres': { + 'ENGINE': 'django.db.backends.postgresql_psycopg2', + 'NAME': env('POSTGRES_DB', 'fedireads'), + 'USER': env('POSTGRES_USER', 'fedireads'), + 'PASSWORD': env('POSTGRES_PASSWORD', 'fedireads'), + 'HOST': env('POSTGRES_HOST', ''), + 'PORT': 5432 + }, + 'sqlite': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': os.path.join(BASE_DIR, 'fedireads.db') + } +} + +DATABASES = { + 'default': FEDIREADS_DBS[FEDIREADS_DATABASE_BACKEND] +} + + +# Password validation +# https://docs.djangoproject.com/en/3.0/ref/settings/#auth-password-validators + +AUTH_PASSWORD_VALIDATORS = [ + { + 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', + }, +] + + +# Internationalization +# https://docs.djangoproject.com/en/3.0/topics/i18n/ + +LANGUAGE_CODE = 'en-us' + +TIME_ZONE = 'UTC' + +USE_I18N = True + +USE_L10N = True + +USE_TZ = True + + +# Static files (CSS, JavaScript, Images) +# https://docs.djangoproject.com/en/3.0/howto/static-files/ + +STATIC_URL = '/static/' diff --git a/fr_celery/urls.py b/fr_celery/urls.py new file mode 100644 index 00000000..c8cc543b --- /dev/null +++ b/fr_celery/urls.py @@ -0,0 +1,21 @@ +"""fr_celery URL Configuration + +The `urlpatterns` list routes URLs to views. For more information please see: + https://docs.djangoproject.com/en/3.0/topics/http/urls/ +Examples: +Function views + 1. Add an import: from my_app import views + 2. Add a URL to urlpatterns: path('', views.home, name='home') +Class-based views + 1. Add an import: from other_app.views import Home + 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home') +Including another URLconf + 1. Import the include() function: from django.urls import include, path + 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) +""" +from django.contrib import admin +from django.urls import path + +urlpatterns = [ + path('admin/', admin.site.urls), +] diff --git a/fr_celery/wsgi.py b/fr_celery/wsgi.py new file mode 100644 index 00000000..38126556 --- /dev/null +++ b/fr_celery/wsgi.py @@ -0,0 +1,16 @@ +""" +WSGI config for fr_celery project. + +It exposes the WSGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/3.0/howto/deployment/wsgi/ +""" + +import os + +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fr_celery.settings') + +application = get_wsgi_application() diff --git a/requirements.txt b/requirements.txt index b8836226..13baa225 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,8 +2,10 @@ celery==4.4.2 Django==3.0.3 django-model-utils==4.0.0 environs==7.2.0 +flower==0.9.4 Pillow==7.0.0 psycopg2==2.8.4 pycryptodome==3.9.4 python-dateutil==2.8.1 +redis==3.4.1 requests==2.22.0 From b0790f3356d1619396a0dde76abef90b9637b435 Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Tue, 31 Mar 2020 16:50:58 -0700 Subject: [PATCH 2/6] Set flower broker it still doesn't load but we're getting closer --- docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yml b/docker-compose.yml index cb20794e..a8adde91 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,7 @@ services: restart: on-failure flower: image: mher/flower + command: ["flower", "--broker=redis://redis:6379/0", "--port=5555"] env_file: .env ports: - "5555:5555" From aeb648a8d539e766d3bd54c9d2383820c58b9876 Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Tue, 31 Mar 2020 17:00:01 -0700 Subject: [PATCH 3/6] Turn incoming activities into tasks --- fedireads/incoming.py | 92 ++++++++++++++++++++++++------------------- fedireads/tasks.py | 28 ------------- fr_celery/celery.py | 2 + 3 files changed, 54 insertions(+), 68 deletions(-) diff --git a/fedireads/incoming.py b/fedireads/incoming.py index 252a1ecf..773e4fbe 100644 --- a/fedireads/incoming.py +++ b/fedireads/incoming.py @@ -13,7 +13,21 @@ import requests from fedireads import models, outgoing from fedireads import status as status_builder from fedireads.remote_user import get_or_create_remote_user -from fedireads import tasks +from fedireads.tasks import app + + +@csrf_exempt +def inbox(request, username): + ''' incoming activitypub events ''' + # TODO: should do some kind of checking if the user accepts + # this action from the sender probably? idk + # but this will just throw a 404 if the user doesn't exist + try: + models.User.objects.get(localname=username) + except models.User.DoesNotExist: + return HttpResponseNotFound() + + return shared_inbox(request) @csrf_exempt @@ -41,7 +55,7 @@ def shared_inbox(request): 'Like': handle_favorite, 'Announce': handle_boost, 'Add': { - 'Tag': handle_add, + 'Tag': handle_tag, }, 'Undo': { 'Follow': handle_unfollow, @@ -58,10 +72,11 @@ def shared_inbox(request): if isinstance(handler, dict): handler = handler.get(activity['object']['type'], None) - if handler: - return handler(activity) + if not handler: + return HttpResponseNotFound() - return HttpResponseNotFound() + handler.delay(activity) + return HttpResponse() def verify_signature(request): @@ -110,20 +125,7 @@ def verify_signature(request): return True -@csrf_exempt -def inbox(request, username): - ''' incoming activitypub events ''' - # TODO: should do some kind of checking if the user accepts - # this action from the sender probably? idk - # but this will just throw a 404 if the user doesn't exist - try: - models.User.objects.get(localname=username) - except models.User.DoesNotExist: - return HttpResponseNotFound() - - return shared_inbox(request) - - +@app.task def handle_follow(activity): ''' someone wants to follow a local user ''' # figure out who they want to follow @@ -142,7 +144,7 @@ def handle_follow(activity): # Duplicate follow request. Not sure what the correct behaviour is, but # just dropping it works for now. We should perhaps generate the # Accept, but then do we need to match the activity id? - return HttpResponse() + return if not to_follow.manually_approves_followers: status_builder.create_notification( @@ -157,9 +159,9 @@ def handle_follow(activity): 'FOLLOW_REQUEST', related_user=user ) - return HttpResponse() +@app.task def handle_unfollow(activity): ''' unfollow a local user ''' obj = activity['object'] @@ -173,9 +175,9 @@ def handle_unfollow(activity): return HttpResponseNotFound() to_unfollow.followers.remove(requester) - return HttpResponse() +@app.task def handle_follow_accept(activity): ''' hurray, someone remote accepted a follow request ''' # figure out who they want to follow @@ -192,9 +194,9 @@ def handle_follow_accept(activity): except models.UserFollowRequest.DoesNotExist: pass accepter.followers.add(requester) - return HttpResponse() +@app.task def handle_follow_reject(activity): ''' someone is rejecting a follow request ''' requester = models.User.objects.get(actor=activity['object']['actor']) @@ -209,8 +211,8 @@ def handle_follow_reject(activity): except models.UserFollowRequest.DoesNotExist: pass - return HttpResponse() +@app.task def handle_create(activity): ''' someone did something, good on them ''' user = get_or_create_remote_user(activity['actor']) @@ -220,7 +222,7 @@ def handle_create(activity): if user.local: # we really oughtn't even be sending in this case - return HttpResponse() + return if activity['object'].get('fedireadsType') in ['Review', 'Comment'] and \ 'inReplyToBook' in activity['object']: @@ -252,16 +254,30 @@ def handle_create(activity): except ValueError: return HttpResponseBadRequest() - return HttpResponse() +@app.task def handle_favorite(activity): ''' approval of your good good post ''' - print('hiii!') - tasks.handle_incoming_favorite.delay(activity) - return HttpResponse() + try: + status_id = activity['object'].split('/')[-1] + status = models.Status.objects.get(id=status_id) + liker = get_or_create_remote_user(activity['actor']) + except (models.Status.DoesNotExist, models.User.DoesNotExist): + return + + if not liker.local: + status_builder.create_favorite_from_activity(liker, activity) + + status_builder.create_notification( + status.user, + 'FAVORITE', + related_user=liker, + related_status=status, + ) +@app.task def handle_unfavorite(activity): ''' approval of your good good post ''' favorite_id = activity['object']['id'] @@ -270,9 +286,9 @@ def handle_unfavorite(activity): return HttpResponseNotFound() fav.delete() - return HttpResponse() +@app.task def handle_boost(activity): ''' someone gave us a boost! ''' try: @@ -292,16 +308,12 @@ def handle_boost(activity): related_status=status, ) - return HttpResponse() -def handle_add(activity): +@app.task +def handle_tag(activity): ''' someone is tagging or shelving a book ''' - if activity['object']['type'] == 'Tag': - user = get_or_create_remote_user(activity['actor']) - if not user.local: - book = activity['target']['id'].split('/')[-1] - status_builder.create_tag(user, book, activity['object']['name']) - return HttpResponse() - return HttpResponse() - return HttpResponseNotFound() + user = get_or_create_remote_user(activity['actor']) + if not user.local: + book = activity['target']['id'].split('/')[-1] + status_builder.create_tag(user, book, activity['object']['name']) diff --git a/fedireads/tasks.py b/fedireads/tasks.py index 760dffef..9492fa69 100644 --- a/fedireads/tasks.py +++ b/fedireads/tasks.py @@ -2,9 +2,6 @@ from celery import Celery import os -from fedireads import models -from fedireads import status as status_builder -from fedireads.outgoing import get_or_create_remote_user from fedireads import settings # set the default Django settings module for the 'celery' program. @@ -15,28 +12,3 @@ app = Celery( ) -@app.task -def handle_incoming_favorite(activity): - ''' ugh ''' - print('here we go') - try: - status_id = activity['object'].split('/')[-1] - print(status_id) - status = models.Status.objects.get(id=status_id) - liker = get_or_create_remote_user(activity['actor']) - except (models.Status.DoesNotExist, models.User.DoesNotExist): - print('gonna return') - return - - print('got the status okay') - if not liker.local: - status_builder.create_favorite_from_activity(liker, activity) - - status_builder.create_notification( - status.user, - 'FAVORITE', - related_user=liker, - related_status=status, - ) - print('done') - diff --git a/fr_celery/celery.py b/fr_celery/celery.py index 3368a8fb..a2427e94 100644 --- a/fr_celery/celery.py +++ b/fr_celery/celery.py @@ -1,4 +1,5 @@ from __future__ import absolute_import, unicode_literals +from . import settings import os @@ -17,4 +18,5 @@ app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() +app.autodiscover_tasks(['fedireads'], related_name="incoming") From c969e5550ee963fbc1d114cbacf154f81fc4197f Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Tue, 31 Mar 2020 17:07:35 -0700 Subject: [PATCH 4/6] Don't broadcast yourself --- fedireads/broadcast.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fedireads/broadcast.py b/fedireads/broadcast.py index 3a62afeb..df01c8ea 100644 --- a/fedireads/broadcast.py +++ b/fedireads/broadcast.py @@ -39,6 +39,9 @@ def get_recipients(user, post_privacy, direct_recipients=None, limit=False): fedireads_user = limit == 'fedireads' followers = user.followers.filter(fedireads_user=fedireads_user).all() + # we don't need to broadcast to ourself + followers = followers.filter(local=False) + # TODO I don't think this is actually accomplishing pubic/followers only? if post_privacy == 'public': # post to public shared inboxes From 1970682c9c579b75c87dd495e203b5ab71bc3cf3 Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Tue, 31 Mar 2020 18:03:58 -0700 Subject: [PATCH 5/6] Move broadcast to celery --- fedireads/broadcast.py | 3 +++ fedireads/outgoing.py | 43 +++++++++++++++++++++--------------------- fr_celery/celery.py | 3 ++- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/fedireads/broadcast.py b/fedireads/broadcast.py index df01c8ea..d36cc193 100644 --- a/fedireads/broadcast.py +++ b/fedireads/broadcast.py @@ -9,6 +9,7 @@ import requests from urllib.parse import urlparse from fedireads import models +from fedireads.tasks import app def get_recipients(user, post_privacy, direct_recipients=None, limit=False): @@ -59,8 +60,10 @@ def get_recipients(user, post_privacy, direct_recipients=None, limit=False): return recipients +@app.task def broadcast(sender, activity, recipients): ''' send out an event ''' + sender = models.User.objects.get(id=sender) errors = [] for recipient in recipients: try: diff --git a/fedireads/outgoing.py b/fedireads/outgoing.py index dc60b2c0..b628a19e 100644 --- a/fedireads/outgoing.py +++ b/fedireads/outgoing.py @@ -81,7 +81,7 @@ def handle_account_search(query): def handle_follow(user, to_follow): ''' someone local wants to follow someone ''' activity = activitypub.get_follow_request(user, to_follow) - errors = broadcast(user, activity, [to_follow.inbox]) + errors = broadcast.delay(user.id, activity, [to_follow.inbox]) for error in errors: raise(error['error']) @@ -93,7 +93,7 @@ def handle_unfollow(user, to_unfollow): user_object=to_unfollow ) activity = activitypub.get_unfollow(relationship) - errors = broadcast(user, activity, [to_unfollow.inbox]) + errors = broadcast.delay(user.id, activity, [to_unfollow.inbox]) to_unfollow.followers.remove(user) for error in errors: raise(error['error']) @@ -108,7 +108,7 @@ def handle_accept(user, to_follow, follow_request): activity = activitypub.get_accept(to_follow, follow_request) recipient = get_recipients(to_follow, 'direct', direct_recipients=[user]) - broadcast(to_follow, activity, recipient) + broadcast.delay(to_follow.id, activity, recipient) def handle_reject(user, to_follow, relationship): @@ -117,7 +117,7 @@ def handle_reject(user, to_follow, relationship): activity = activitypub.get_reject(to_follow, relationship) recipient = get_recipients(to_follow, 'direct', direct_recipients=[user]) - broadcast(to_follow, activity, recipient) + broadcast.delay(to_follow.id, activity, recipient) def handle_shelve(user, book, shelf): @@ -127,7 +127,7 @@ def handle_shelve(user, book, shelf): activity = activitypub.get_add(user, book, shelf) recipients = get_recipients(user, 'public') - broadcast(user, activity, recipients) + broadcast.delay(user.id, activity, recipients) # tell the world about this cool thing that happened verb = { @@ -143,7 +143,7 @@ def handle_shelve(user, book, shelf): activity = activitypub.get_status(status) create_activity = activitypub.get_create(user, activity) - broadcast(user, create_activity, recipients) + broadcast.delay(user.id, create_activity, recipients) def handle_unshelve(user, book, shelf): @@ -155,7 +155,7 @@ def handle_unshelve(user, book, shelf): activity = activitypub.get_remove(user, book, shelf) recipients = get_recipients(user, 'public') - broadcast(user, activity, recipients) + broadcast.delay(user.id, activity, recipients) def handle_import_books(user, items): @@ -173,7 +173,7 @@ def handle_import_books(user, items): new_books.append(item.book) activity = activitypub.get_add(user, item.book, desired_shelf) recipients = get_recipients(user, 'public') - broadcast(user, activity, recipients) + broadcast.delay(user.id, activity, recipients) if new_books: message = 'imported {} books'.format(len(new_books)) @@ -183,7 +183,8 @@ def handle_import_books(user, items): create_activity = activitypub.get_create( user, activitypub.get_status(status)) - broadcast(user, create_activity, get_recipients(user, 'public')) + recipients = get_recipients(user, 'public') + broadcast.delay(user.id, create_activity, recipients) def handle_review(user, book, name, content, rating): @@ -194,14 +195,14 @@ def handle_review(user, book, name, content, rating): review_activity = activitypub.get_review(review) review_create_activity = activitypub.get_create(user, review_activity) fr_recipients = get_recipients(user, 'public', limit='fedireads') - broadcast(user, review_create_activity, fr_recipients) + broadcast.delay(user.id, review_create_activity, fr_recipients) # re-format the activity for non-fedireads servers article_activity = activitypub.get_review_article(review) article_create_activity = activitypub.get_create(user, article_activity) other_recipients = get_recipients(user, 'public', limit='other') - broadcast(user, article_create_activity, other_recipients) + broadcast.delay(user.id, article_create_activity, other_recipients) def handle_comment(user, book, name, content): @@ -212,14 +213,14 @@ def handle_comment(user, book, name, content): comment_activity = activitypub.get_comment(comment) comment_create_activity = activitypub.get_create(user, comment_activity) fr_recipients = get_recipients(user, 'public', limit='fedireads') - broadcast(user, comment_create_activity, fr_recipients) + broadcast.delay(user.id, comment_create_activity, fr_recipients) # re-format the activity for non-fedireads servers article_activity = activitypub.get_comment_article(comment) article_create_activity = activitypub.get_create(user, article_activity) other_recipients = get_recipients(user, 'public', limit='other') - broadcast(user, article_create_activity, other_recipients) + broadcast.delay(user.id, article_create_activity, other_recipients) def handle_tag(user, book, name): @@ -228,7 +229,7 @@ def handle_tag(user, book, name): tag_activity = activitypub.get_add_tag(tag) recipients = get_recipients(user, 'public') - broadcast(user, tag_activity, recipients) + broadcast.delay(user.id, tag_activity, recipients) def handle_untag(user, book, name): @@ -239,7 +240,7 @@ def handle_untag(user, book, name): tag.delete() recipients = get_recipients(user, 'public') - broadcast(user, tag_activity, recipients) + broadcast.delay(user.id, tag_activity, recipients) def handle_reply(user, review, content): @@ -257,7 +258,7 @@ def handle_reply(user, review, content): create_activity = activitypub.get_create(user, reply_activity) recipients = get_recipients(user, 'public') - broadcast(user, create_activity, recipients) + broadcast.delay(user.id, create_activity, recipients) def handle_favorite(user, status): @@ -273,7 +274,7 @@ def handle_favorite(user, status): fav_activity = activitypub.get_favorite(favorite) recipients = get_recipients(user, 'direct', [status.user]) - broadcast(user, fav_activity, recipients) + broadcast.delay(user.id, fav_activity, recipients) def handle_unfavorite(user, status): @@ -289,7 +290,7 @@ def handle_unfavorite(user, status): fav_activity = activitypub.get_unfavorite(favorite) recipients = get_recipients(user, 'direct', [status.user]) - broadcast(user, fav_activity, recipients) + broadcast.delay(user.id, fav_activity, recipients) def handle_boost(user, status): ''' a user wishes to boost a status ''' @@ -305,14 +306,14 @@ def handle_boost(user, status): boost_activity = activitypub.get_boost(boost) recipients = get_recipients(user, 'public') - broadcast(user, boost_activity, recipients) + broadcast.delay(user.id, boost_activity, recipients) def handle_update_book(user, book): ''' broadcast the news about our book ''' book_activity = activitypub.get_book(book) update_activity = activitypub.get_update(user, book_activity) recipients = get_recipients(None, 'public') - broadcast(user, update_activity, recipients) + broadcast.delay(user.id, update_activity, recipients) def handle_update_user(user): @@ -320,5 +321,5 @@ def handle_update_user(user): actor = activitypub.get_actor(user) update_activity = activitypub.get_update(user, actor) recipients = get_recipients(user, 'public') - broadcast(user, update_activity, recipients) + broadcast.delay(user.id, update_activity, recipients) diff --git a/fr_celery/celery.py b/fr_celery/celery.py index a2427e94..45c130d9 100644 --- a/fr_celery/celery.py +++ b/fr_celery/celery.py @@ -18,5 +18,6 @@ app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() -app.autodiscover_tasks(['fedireads'], related_name="incoming") +app.autodiscover_tasks(['fedireads'], related_name='incoming') +app.autodiscover_tasks(['fedireads'], related_name='broadcast') From 1caf19863e1ec8d9c6c8b6a9f42381d294ef518b Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Wed, 1 Apr 2020 10:16:20 -0700 Subject: [PATCH 6/6] use broadcast task --- fedireads/broadcast.py | 9 +++++++-- fedireads/outgoing.py | 46 +++++++++++++++++++----------------------- fedireads/views.py | 2 +- 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/fedireads/broadcast.py b/fedireads/broadcast.py index d36cc193..0b83ab89 100644 --- a/fedireads/broadcast.py +++ b/fedireads/broadcast.py @@ -60,10 +60,15 @@ def get_recipients(user, post_privacy, direct_recipients=None, limit=False): return recipients -@app.task def broadcast(sender, activity, recipients): ''' send out an event ''' - sender = models.User.objects.get(id=sender) + broadcast_task.delay(sender.id, activity, recipients) + + +@app.task +def broadcast_task(sender_id, activity, recipients): + ''' the celery task for broadcast ''' + sender = models.User.objects.get(id=sender_id) errors = [] for recipient in recipients: try: diff --git a/fedireads/outgoing.py b/fedireads/outgoing.py index b628a19e..098b0784 100644 --- a/fedireads/outgoing.py +++ b/fedireads/outgoing.py @@ -81,9 +81,7 @@ def handle_account_search(query): def handle_follow(user, to_follow): ''' someone local wants to follow someone ''' activity = activitypub.get_follow_request(user, to_follow) - errors = broadcast.delay(user.id, activity, [to_follow.inbox]) - for error in errors: - raise(error['error']) + broadcast(user, activity, [to_follow.inbox]) def handle_unfollow(user, to_unfollow): @@ -93,10 +91,8 @@ def handle_unfollow(user, to_unfollow): user_object=to_unfollow ) activity = activitypub.get_unfollow(relationship) - errors = broadcast.delay(user.id, activity, [to_unfollow.inbox]) + broadcast(user, activity, [to_unfollow.inbox]) to_unfollow.followers.remove(user) - for error in errors: - raise(error['error']) def handle_accept(user, to_follow, follow_request): @@ -108,7 +104,7 @@ def handle_accept(user, to_follow, follow_request): activity = activitypub.get_accept(to_follow, follow_request) recipient = get_recipients(to_follow, 'direct', direct_recipients=[user]) - broadcast.delay(to_follow.id, activity, recipient) + broadcast(to_follow, activity, recipient) def handle_reject(user, to_follow, relationship): @@ -117,7 +113,7 @@ def handle_reject(user, to_follow, relationship): activity = activitypub.get_reject(to_follow, relationship) recipient = get_recipients(to_follow, 'direct', direct_recipients=[user]) - broadcast.delay(to_follow.id, activity, recipient) + broadcast(to_follow, activity, recipient) def handle_shelve(user, book, shelf): @@ -127,7 +123,7 @@ def handle_shelve(user, book, shelf): activity = activitypub.get_add(user, book, shelf) recipients = get_recipients(user, 'public') - broadcast.delay(user.id, activity, recipients) + broadcast(user, activity, recipients) # tell the world about this cool thing that happened verb = { @@ -143,7 +139,7 @@ def handle_shelve(user, book, shelf): activity = activitypub.get_status(status) create_activity = activitypub.get_create(user, activity) - broadcast.delay(user.id, create_activity, recipients) + broadcast(user, create_activity, recipients) def handle_unshelve(user, book, shelf): @@ -155,7 +151,7 @@ def handle_unshelve(user, book, shelf): activity = activitypub.get_remove(user, book, shelf) recipients = get_recipients(user, 'public') - broadcast.delay(user.id, activity, recipients) + broadcast(user, activity, recipients) def handle_import_books(user, items): @@ -173,7 +169,7 @@ def handle_import_books(user, items): new_books.append(item.book) activity = activitypub.get_add(user, item.book, desired_shelf) recipients = get_recipients(user, 'public') - broadcast.delay(user.id, activity, recipients) + broadcast(user, activity, recipients) if new_books: message = 'imported {} books'.format(len(new_books)) @@ -184,7 +180,7 @@ def handle_import_books(user, items): create_activity = activitypub.get_create( user, activitypub.get_status(status)) recipients = get_recipients(user, 'public') - broadcast.delay(user.id, create_activity, recipients) + broadcast(user, create_activity, recipients) def handle_review(user, book, name, content, rating): @@ -195,14 +191,14 @@ def handle_review(user, book, name, content, rating): review_activity = activitypub.get_review(review) review_create_activity = activitypub.get_create(user, review_activity) fr_recipients = get_recipients(user, 'public', limit='fedireads') - broadcast.delay(user.id, review_create_activity, fr_recipients) + broadcast(user, review_create_activity, fr_recipients) # re-format the activity for non-fedireads servers article_activity = activitypub.get_review_article(review) article_create_activity = activitypub.get_create(user, article_activity) other_recipients = get_recipients(user, 'public', limit='other') - broadcast.delay(user.id, article_create_activity, other_recipients) + broadcast(user, article_create_activity, other_recipients) def handle_comment(user, book, name, content): @@ -213,14 +209,14 @@ def handle_comment(user, book, name, content): comment_activity = activitypub.get_comment(comment) comment_create_activity = activitypub.get_create(user, comment_activity) fr_recipients = get_recipients(user, 'public', limit='fedireads') - broadcast.delay(user.id, comment_create_activity, fr_recipients) + broadcast(user, comment_create_activity, fr_recipients) # re-format the activity for non-fedireads servers article_activity = activitypub.get_comment_article(comment) article_create_activity = activitypub.get_create(user, article_activity) other_recipients = get_recipients(user, 'public', limit='other') - broadcast.delay(user.id, article_create_activity, other_recipients) + broadcast(user, article_create_activity, other_recipients) def handle_tag(user, book, name): @@ -229,7 +225,7 @@ def handle_tag(user, book, name): tag_activity = activitypub.get_add_tag(tag) recipients = get_recipients(user, 'public') - broadcast.delay(user.id, tag_activity, recipients) + broadcast(user, tag_activity, recipients) def handle_untag(user, book, name): @@ -240,7 +236,7 @@ def handle_untag(user, book, name): tag.delete() recipients = get_recipients(user, 'public') - broadcast.delay(user.id, tag_activity, recipients) + broadcast(user, tag_activity, recipients) def handle_reply(user, review, content): @@ -258,7 +254,7 @@ def handle_reply(user, review, content): create_activity = activitypub.get_create(user, reply_activity) recipients = get_recipients(user, 'public') - broadcast.delay(user.id, create_activity, recipients) + broadcast(user, create_activity, recipients) def handle_favorite(user, status): @@ -274,7 +270,7 @@ def handle_favorite(user, status): fav_activity = activitypub.get_favorite(favorite) recipients = get_recipients(user, 'direct', [status.user]) - broadcast.delay(user.id, fav_activity, recipients) + broadcast(user, fav_activity, recipients) def handle_unfavorite(user, status): @@ -290,7 +286,7 @@ def handle_unfavorite(user, status): fav_activity = activitypub.get_unfavorite(favorite) recipients = get_recipients(user, 'direct', [status.user]) - broadcast.delay(user.id, fav_activity, recipients) + broadcast(user, fav_activity, recipients) def handle_boost(user, status): ''' a user wishes to boost a status ''' @@ -306,14 +302,14 @@ def handle_boost(user, status): boost_activity = activitypub.get_boost(boost) recipients = get_recipients(user, 'public') - broadcast.delay(user.id, boost_activity, recipients) + broadcast(user, boost_activity, recipients) def handle_update_book(user, book): ''' broadcast the news about our book ''' book_activity = activitypub.get_book(book) update_activity = activitypub.get_update(user, book_activity) recipients = get_recipients(None, 'public') - broadcast.delay(user.id, update_activity, recipients) + broadcast(user, update_activity, recipients) def handle_update_user(user): @@ -321,5 +317,5 @@ def handle_update_user(user): actor = activitypub.get_actor(user) update_activity = activitypub.get_update(user, actor) recipients = get_recipients(user, 'public') - broadcast.delay(user.id, update_activity, recipients) + broadcast(user, update_activity, recipients) diff --git a/fedireads/views.py b/fedireads/views.py index 3fcb51df..798fcbf6 100644 --- a/fedireads/views.py +++ b/fedireads/views.py @@ -116,7 +116,7 @@ def books_page(request): if request.user.is_authenticated: recent_books = models.Edition.objects.filter( ~Q(shelfbook__shelf__user=request.user), - id__in=[b.id for b in recent_books], + id__in=[b.id for b in recent_books if b], ) data = {