228 lines
7.9 KiB
228 lines
7.9 KiB
''' openlibrary data connector '''
import re
import requests
from django.core.files.base import ContentFile
from django.db import transaction
from fedireads import models
from .abstract_connector import AbstractConnector, SearchResult
from .abstract_connector import update_from_mappings
from .abstract_connector import get_date, get_data
from .openlibrary_languages import languages
class Connector(AbstractConnector):
''' instantiate a connector for OL '''
def __init__(self, identifier):
get_first = lambda a: a[0]
self.key_mappings = {
'isbn_13': ('isbn_13', get_first),
'isbn_10': ('isbn_10', get_first),
'oclc_numbers': ('oclc_number', get_first),
'lccn': ('lccn', get_first),
self.book_mappings = self.key_mappings.copy()
'publish_date': ('published_date', get_date),
'first_publish_date': ('first_published_date', get_date),
'description': ('description', get_description),
'languages': ('languages', get_languages),
'number_of_pages': ('pages', None),
'series': ('series', get_first),
def format_search_result(self, doc):
key = doc['key']
key = key.split('/')[-1]
author = doc.get('author_name') or ['Unknown']
return SearchResult(
', '.join(author),
def parse_search_data(self, data):
return data.get('docs')
def get_or_create_book(self, olkey):
''' pull up a book record by whatever means possible.
if you give a work key, it should give you the default edition,
annotated with work data. '''
book = models.Book.objects.select_subclasses().filter(
if book:
if isinstance(book, models.Work):
return book.default_edition
return book
# no book was found, so we start creating a new one
if re.match(r'^OL\d+W$', olkey):
with transaction.atomic():
# create both work and a default edition
work_data = self.load_book_data(olkey)
work = self.create_book(olkey, work_data, models.Work)
edition_options = self.load_edition_data(olkey).get('entries')
edition_data = pick_default_edition(edition_options)
if not edition_data:
# hack: re-use the work data as the edition data
edition_data = work_data
key = edition_data.get('key').split('/')[-1]
edition = self.create_book(key, edition_data, models.Edition)
edition.default = True
edition.parent_work = work
with transaction.atomic():
edition_data = self.load_book_data(olkey)
edition = self.create_book(olkey, edition_data, models.Edition)
work_data = edition_data.get('works')
if not work_data:
# hack: we're re-using the edition data as the work data
work_key = olkey
work_key = work_data[0]['key'].split('/')[-1]
work = models.Work.objects.filter(
if not work:
work_data = self.load_book_data(work_key)
work = self.create_book(work_key, work_data, models.Work)
edition.parent_work = work
if not edition.authors and work.authors:
edition.author_text = ', '.join(a.name for a in edition.authors)
return edition
def get_authors_from_data(self, data):
''' parse author json and load or create authors '''
authors = []
for author_blob in data.get('authors', []):
# this id is "/authors/OL1234567A" and we want just "OL1234567A"
author_blob = author_blob.get('author', author_blob)
author_id = author_blob['key'].split('/')[-1]
return authors
def load_book_data(self, olkey):
''' query openlibrary for data on a book '''
url = '%s/works/%s.json' % (self.books_url, olkey)
return get_data(url)
def load_edition_data(self, olkey):
''' query openlibrary for editions of a work '''
url = '%s/works/%s/editions.json' % (self.books_url, olkey)
return get_data(url)
def expand_book_data(self, book):
work = book
if isinstance(book, models.Edition):
work = book.parent_work
edition_options = self.load_edition_data(work.openlibrary_key)
for edition_data in edition_options.get('entries'):
olkey = edition_data.get('key').split('/')[-1]
if models.Edition.objects.filter(openlibrary_key=olkey).count():
edition = self.create_book(olkey, edition_data, models.Edition)
edition.parent_work = work
if not edition.authors and work.authors:
def get_or_create_author(self, olkey):
''' load that author '''
if not re.match(r'^OL\d+A$', olkey):
raise ValueError('Invalid OpenLibrary author ID')
return models.Author.objects.get(openlibrary_key=olkey)
except models.Author.DoesNotExist:
url = '%s/authors/%s.json' % (self.base_url, olkey)
data = get_data(url)
author = models.Author(openlibrary_key=olkey)
mappings = {
'birth_date': ('born', get_date),
'death_date': ('died', get_date),
'bio': ('bio', get_description),
author = update_from_mappings(author, data, mappings)
# TODO this is making some BOLD assumption
name = data.get('name')
if name:
author.last_name = name.split(' ')[-1]
author.first_name = ' '.join(name.split(' ')[:-1])
return author
def get_cover_from_data(self, data):
''' ask openlibrary for the cover '''
if not data.get('covers'):
return None
cover_id = data.get('covers')[0]
image_name = '%s-M.jpg' % cover_id
url = '%s/b/id/%s' % (self.covers_url, image_name)
response = requests.get(url)
if not response.ok:
image_content = ContentFile(response.content)
return [image_name, image_content]
def get_description(description_blob):
''' descriptions can be a string or a dict '''
if isinstance(description_blob, dict):
return description_blob.get('value')
return description_blob
def get_languages(language_blob):
''' /language/eng -> English '''
langs = []
for lang in language_blob:
languages.get(lang.get('key', ''), None)
return langs
def pick_default_edition(options):
''' favor physical copies with covers in english '''
if not options:
return None
if len(options) == 1:
return options[0]
options = [e for e in options if e.get('cover')] or options
options = [e for e in options if \
'/languages/eng' in str(e.get('languages'))] or options
formats = ['paperback', 'hardcover', 'mass market paperback']
options = [e for e in options if \
str(e.get('physical_format')).lower() in formats] or options
options = [e for e in options if e.get('isbn_13')] or options
options = [e for e in options if e.get('ocaid')] or options
return options[0]