Runs black
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
''' signs activitypub activities '''
|
||||
""" signs activitypub activities """
|
||||
import hashlib
|
||||
from urllib.parse import urlparse
|
||||
import datetime
|
||||
@ -6,54 +6,56 @@ from base64 import b64encode, b64decode
|
||||
|
||||
from Crypto import Random
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Signature import pkcs1_15 #pylint: disable=no-name-in-module
|
||||
from Crypto.Signature import pkcs1_15 # pylint: disable=no-name-in-module
|
||||
from Crypto.Hash import SHA256
|
||||
|
||||
MAX_SIGNATURE_AGE = 300
|
||||
|
||||
|
||||
def create_key_pair():
|
||||
''' a new public/private key pair, used for creating new users '''
|
||||
""" a new public/private key pair, used for creating new users """
|
||||
random_generator = Random.new().read
|
||||
key = RSA.generate(1024, random_generator)
|
||||
private_key = key.export_key().decode('utf8')
|
||||
public_key = key.publickey().export_key().decode('utf8')
|
||||
private_key = key.export_key().decode("utf8")
|
||||
public_key = key.publickey().export_key().decode("utf8")
|
||||
|
||||
return private_key, public_key
|
||||
|
||||
|
||||
def make_signature(sender, destination, date, digest):
|
||||
''' uses a private key to sign an outgoing message '''
|
||||
""" uses a private key to sign an outgoing message """
|
||||
inbox_parts = urlparse(destination)
|
||||
signature_headers = [
|
||||
'(request-target): post %s' % inbox_parts.path,
|
||||
'host: %s' % inbox_parts.netloc,
|
||||
'date: %s' % date,
|
||||
'digest: %s' % digest,
|
||||
"(request-target): post %s" % inbox_parts.path,
|
||||
"host: %s" % inbox_parts.netloc,
|
||||
"date: %s" % date,
|
||||
"digest: %s" % digest,
|
||||
]
|
||||
message_to_sign = '\n'.join(signature_headers)
|
||||
message_to_sign = "\n".join(signature_headers)
|
||||
signer = pkcs1_15.new(RSA.import_key(sender.key_pair.private_key))
|
||||
signed_message = signer.sign(SHA256.new(message_to_sign.encode('utf8')))
|
||||
signed_message = signer.sign(SHA256.new(message_to_sign.encode("utf8")))
|
||||
signature = {
|
||||
'keyId': '%s#main-key' % sender.remote_id,
|
||||
'algorithm': 'rsa-sha256',
|
||||
'headers': '(request-target) host date digest',
|
||||
'signature': b64encode(signed_message).decode('utf8'),
|
||||
"keyId": "%s#main-key" % sender.remote_id,
|
||||
"algorithm": "rsa-sha256",
|
||||
"headers": "(request-target) host date digest",
|
||||
"signature": b64encode(signed_message).decode("utf8"),
|
||||
}
|
||||
return ','.join('%s="%s"' % (k, v) for (k, v) in signature.items())
|
||||
return ",".join('%s="%s"' % (k, v) for (k, v) in signature.items())
|
||||
|
||||
|
||||
def make_digest(data):
|
||||
''' creates a message digest for signing '''
|
||||
return 'SHA-256=' + b64encode(hashlib.sha256(data.encode('utf-8'))\
|
||||
.digest()).decode('utf-8')
|
||||
""" creates a message digest for signing """
|
||||
return "SHA-256=" + b64encode(hashlib.sha256(data.encode("utf-8")).digest()).decode(
|
||||
"utf-8"
|
||||
)
|
||||
|
||||
|
||||
def verify_digest(request):
|
||||
''' checks if a digest is syntactically valid and matches the message '''
|
||||
algorithm, digest = request.headers['digest'].split('=', 1)
|
||||
if algorithm == 'SHA-256':
|
||||
""" checks if a digest is syntactically valid and matches the message """
|
||||
algorithm, digest = request.headers["digest"].split("=", 1)
|
||||
if algorithm == "SHA-256":
|
||||
hash_function = hashlib.sha256
|
||||
elif algorithm == 'SHA-512':
|
||||
elif algorithm == "SHA-512":
|
||||
hash_function = hashlib.sha512
|
||||
else:
|
||||
raise ValueError("Unsupported hash function: {}".format(algorithm))
|
||||
@ -62,8 +64,10 @@ def verify_digest(request):
|
||||
if b64decode(digest) != expected:
|
||||
raise ValueError("Invalid HTTP Digest header")
|
||||
|
||||
|
||||
class Signature:
|
||||
''' read and validate incoming signatures '''
|
||||
""" read and validate incoming signatures """
|
||||
|
||||
def __init__(self, key_id, headers, signature):
|
||||
self.key_id = key_id
|
||||
self.headers = headers
|
||||
@ -71,42 +75,39 @@ class Signature:
|
||||
|
||||
@classmethod
|
||||
def parse(cls, request):
|
||||
''' extract and parse a signature from an http request '''
|
||||
""" extract and parse a signature from an http request """
|
||||
signature_dict = {}
|
||||
for pair in request.headers['Signature'].split(','):
|
||||
k, v = pair.split('=', 1)
|
||||
v = v.replace('"', '')
|
||||
for pair in request.headers["Signature"].split(","):
|
||||
k, v = pair.split("=", 1)
|
||||
v = v.replace('"', "")
|
||||
signature_dict[k] = v
|
||||
|
||||
try:
|
||||
key_id = signature_dict['keyId']
|
||||
headers = signature_dict['headers']
|
||||
signature = b64decode(signature_dict['signature'])
|
||||
key_id = signature_dict["keyId"]
|
||||
headers = signature_dict["headers"]
|
||||
signature = b64decode(signature_dict["signature"])
|
||||
except KeyError:
|
||||
raise ValueError('Invalid auth header')
|
||||
raise ValueError("Invalid auth header")
|
||||
|
||||
return cls(key_id, headers, signature)
|
||||
|
||||
def verify(self, public_key, request):
|
||||
''' verify rsa signature '''
|
||||
if http_date_age(request.headers['date']) > MAX_SIGNATURE_AGE:
|
||||
raise ValueError(
|
||||
"Request too old: %s" % (request.headers['date'],))
|
||||
""" verify rsa signature """
|
||||
if http_date_age(request.headers["date"]) > MAX_SIGNATURE_AGE:
|
||||
raise ValueError("Request too old: %s" % (request.headers["date"],))
|
||||
public_key = RSA.import_key(public_key)
|
||||
|
||||
comparison_string = []
|
||||
for signed_header_name in self.headers.split(' '):
|
||||
if signed_header_name == '(request-target)':
|
||||
comparison_string.append(
|
||||
'(request-target): post %s' % request.path)
|
||||
for signed_header_name in self.headers.split(" "):
|
||||
if signed_header_name == "(request-target)":
|
||||
comparison_string.append("(request-target): post %s" % request.path)
|
||||
else:
|
||||
if signed_header_name == 'digest':
|
||||
if signed_header_name == "digest":
|
||||
verify_digest(request)
|
||||
comparison_string.append('%s: %s' % (
|
||||
signed_header_name,
|
||||
request.headers[signed_header_name]
|
||||
))
|
||||
comparison_string = '\n'.join(comparison_string)
|
||||
comparison_string.append(
|
||||
"%s: %s" % (signed_header_name, request.headers[signed_header_name])
|
||||
)
|
||||
comparison_string = "\n".join(comparison_string)
|
||||
|
||||
signer = pkcs1_15.new(public_key)
|
||||
digest = SHA256.new()
|
||||
@ -117,7 +118,7 @@ class Signature:
|
||||
|
||||
|
||||
def http_date_age(datestr):
|
||||
''' age of a signature in seconds '''
|
||||
parsed = datetime.datetime.strptime(datestr, '%a, %d %b %Y %H:%M:%S GMT')
|
||||
""" age of a signature in seconds """
|
||||
parsed = datetime.datetime.strptime(datestr, "%a, %d %b %Y %H:%M:%S GMT")
|
||||
delta = datetime.datetime.utcnow() - parsed
|
||||
return delta.total_seconds()
|
||||
|
Reference in New Issue
Block a user