katheryn-website/scripts/import-lightroom.py

432 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Import Lightroom Catalog into Directus
This script reads a Lightroom Classic catalog (.lrcat file) and imports
photos with their metadata into Directus.
Usage:
python import-lightroom.py /path/to/catalog.lrcat /path/to/photos/
Requirements:
- Python 3.8+
- Lightroom catalog file (.lrcat)
- Access to the photo files
"""
import argparse
import json
import os
import re
import sqlite3
import ssl
import sys
import urllib.request
import urllib.error
from pathlib import Path
from datetime import datetime
# Directus configuration
DIRECTUS_URL = os.environ.get('DIRECTUS_URL', 'https://katheryn-cms.jeffemmett.com')
DIRECTUS_EMAIL = 'katheryn@katheryntrenshaw.com'
DIRECTUS_PASSWORD = '9BHhUaHSQjefuAEaPqPyjerf'
# SSL context
SSL_CONTEXT = ssl.create_default_context()
SSL_CONTEXT.check_hostname = False
SSL_CONTEXT.verify_mode = ssl.CERT_NONE
TOKEN = None
def get_token():
"""Get Directus access token"""
global TOKEN
if TOKEN:
return TOKEN
data = json.dumps({
'email': DIRECTUS_EMAIL,
'password': DIRECTUS_PASSWORD
}).encode()
req = urllib.request.Request(
f'{DIRECTUS_URL}/auth/login',
data=data,
headers={
'Content-Type': 'application/json',
'User-Agent': 'LightroomImport/1.0'
}
)
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
result = json.loads(resp.read())
TOKEN = result['data']['access_token']
return TOKEN
def api_request(method, endpoint, data=None):
"""Make authenticated API request"""
token = get_token()
url = f'{DIRECTUS_URL}{endpoint}'
req = urllib.request.Request(url, method=method)
req.add_header('Authorization', f'Bearer {token}')
req.add_header('Content-Type', 'application/json')
req.add_header('User-Agent', 'LightroomImport/1.0')
if data:
req.data = json.dumps(data).encode()
try:
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
error_body = e.read().decode()
try:
error = json.loads(error_body)
return {'error': error.get('errors', [{}])[0].get('message', str(e))}
except:
return {'error': str(e)}
def upload_file(file_path, title=None):
"""Upload a file to Directus"""
import mimetypes
import uuid
token = get_token()
url = f'{DIRECTUS_URL}/files'
# Determine content type
content_type, _ = mimetypes.guess_type(file_path)
if not content_type:
content_type = 'application/octet-stream'
filename = os.path.basename(file_path)
# Create multipart form data manually
boundary = f'----WebKitFormBoundary{uuid.uuid4().hex[:16]}'
# Read file content
with open(file_path, 'rb') as f:
file_content = f.read()
# Build multipart body
body_parts = []
# Add title field if provided
if title:
body_parts.append(
f'--{boundary}\r\n'
f'Content-Disposition: form-data; name="title"\r\n\r\n'
f'{title}\r\n'
)
# Add file field
body_parts.append(
f'--{boundary}\r\n'
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f'Content-Type: {content_type}\r\n\r\n'
)
# Combine parts
body = ''.join(body_parts).encode() + file_content + f'\r\n--{boundary}--\r\n'.encode()
req = urllib.request.Request(url, data=body, method='POST')
req.add_header('Authorization', f'Bearer {token}')
req.add_header('Content-Type', f'multipart/form-data; boundary={boundary}')
req.add_header('User-Agent', 'LightroomImport/1.0')
try:
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
result = json.loads(resp.read())
return result.get('data', {}).get('id')
except urllib.error.HTTPError as e:
print(f" Upload error: {e}")
return None
def read_lightroom_catalog(catalog_path):
"""Read metadata from Lightroom catalog (.lrcat SQLite database)"""
if not os.path.exists(catalog_path):
print(f"Error: Catalog file not found: {catalog_path}")
sys.exit(1)
conn = sqlite3.connect(catalog_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get list of tables to understand the schema
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
print(f" Found {len(tables)} tables in catalog")
photos = []
# Main photo query - Lightroom stores photos in Adobe_images
# with metadata in various related tables
try:
cursor.execute("""
SELECT
i.id_local as lr_id,
i.id_global as lr_uuid,
r.baseName as filename,
r.extension as extension,
f.pathFromRoot as folder_path,
rf.absolutePath as root_path,
i.captureTime as capture_time,
i.rating as rating,
i.colorLabels as color_label,
i.pick as pick_status
FROM Adobe_images i
LEFT JOIN AgLibraryFile r ON i.rootFile = r.id_local
LEFT JOIN AgLibraryFolder f ON r.folder = f.id_local
LEFT JOIN AgLibraryRootFolder rf ON f.rootFolder = rf.id_local
WHERE r.baseName IS NOT NULL
""")
for row in cursor.fetchall():
photo = dict(row)
# Get IPTC/XMP metadata
try:
cursor.execute("""
SELECT
caption,
copyright,
title
FROM AgLibraryIPTC
WHERE image = ?
""", (photo['lr_id'],))
iptc = cursor.fetchone()
if iptc:
photo['caption'] = iptc['caption']
photo['copyright'] = iptc['copyright']
photo['title'] = iptc['title']
except:
pass
# Get keywords
try:
cursor.execute("""
SELECT k.name
FROM AgLibraryKeyword k
JOIN AgLibraryKeywordImage ki ON k.id_local = ki.tag
WHERE ki.image = ?
""", (photo['lr_id'],))
keywords = [row[0] for row in cursor.fetchall()]
photo['keywords'] = keywords
except:
photo['keywords'] = []
# Get collections
try:
cursor.execute("""
SELECT c.name
FROM AgLibraryCollection c
JOIN AgLibraryCollectionImage ci ON c.id_local = ci.collection
WHERE ci.image = ?
""", (photo['lr_id'],))
collections = [row[0] for row in cursor.fetchall()]
photo['collections'] = collections
except:
photo['collections'] = []
photos.append(photo)
except sqlite3.OperationalError as e:
print(f" Database query error: {e}")
print(" This catalog may use a different schema version.")
print(" Attempting alternative query...")
# Try alternative schema (older Lightroom versions)
try:
cursor.execute("""
SELECT * FROM Adobe_images LIMIT 5
""")
sample = cursor.fetchall()
if sample:
print(f" Found {len(sample)} sample records")
# Print column names for debugging
print(f" Columns: {[d[0] for d in cursor.description]}")
except Exception as e2:
print(f" Alternative query also failed: {e2}")
conn.close()
return photos
def find_photo_file(photo, photos_dir):
"""Find the actual photo file on disk"""
if not photo.get('filename'):
return None
filename = photo['filename']
extension = photo.get('extension', '')
if extension and not filename.endswith(f'.{extension}'):
filename = f"{filename}.{extension}"
# Try various paths
search_paths = [
# Direct in photos_dir
os.path.join(photos_dir, filename),
# With folder path from catalog
os.path.join(photos_dir, photo.get('folder_path', ''), filename) if photo.get('folder_path') else None,
# Full path from catalog
os.path.join(photo.get('root_path', ''), photo.get('folder_path', ''), filename) if photo.get('root_path') else None,
]
for path in search_paths:
if path and os.path.exists(path):
return path
# Recursive search as fallback
for root, dirs, files in os.walk(photos_dir):
if filename in files:
return os.path.join(root, filename)
return None
def import_photo_to_directus(photo, file_path):
"""Import a single photo to Directus"""
# Upload the image file
title = photo.get('title') or photo.get('filename', 'Untitled')
file_id = upload_file(file_path, title=title)
if not file_id:
return None
# Create artwork record
artwork_data = {
'title': title,
'image': file_id,
'description': photo.get('caption'),
'lightroom_id': photo.get('lr_uuid'),
'status': 'draft',
}
# Parse year from capture time
if photo.get('capture_time'):
try:
dt = datetime.fromisoformat(photo['capture_time'].replace('Z', '+00:00'))
artwork_data['year'] = dt.year
except:
pass
# Add keywords as tags (would need tag creation logic)
# For now, store in description
if photo.get('keywords'):
keywords_str = ', '.join(photo['keywords'])
if artwork_data.get('description'):
artwork_data['description'] += f'\n\nKeywords: {keywords_str}'
else:
artwork_data['description'] = f'Keywords: {keywords_str}'
# Remove None values
artwork_data = {k: v for k, v in artwork_data.items() if v is not None}
result = api_request('POST', '/items/artworks', artwork_data)
if 'error' in result:
print(f" Error creating artwork: {result['error']}")
return None
return result.get('data')
def main():
parser = argparse.ArgumentParser(description='Import Lightroom catalog to Directus')
parser.add_argument('catalog', help='Path to Lightroom catalog (.lrcat file)')
parser.add_argument('photos_dir', help='Path to photos directory')
parser.add_argument('--dry-run', action='store_true', help='Preview without importing')
parser.add_argument('--limit', type=int, help='Limit number of photos to import')
args = parser.parse_args()
print("=" * 60)
print(" LIGHTROOM TO DIRECTUS IMPORT")
print("=" * 60)
print(f"\nCatalog: {args.catalog}")
print(f"Photos: {args.photos_dir}")
print(f"Directus: {DIRECTUS_URL}")
if args.dry_run:
print("\n*** DRY RUN - No changes will be made ***\n")
# Read catalog
print("\n=== Reading Lightroom Catalog ===")
photos = read_lightroom_catalog(args.catalog)
print(f" Found {len(photos)} photos in catalog")
if args.limit:
photos = photos[:args.limit]
print(f" Limited to {len(photos)} photos")
if not photos:
print("\nNo photos found to import.")
return
# Preview some photos
print("\n=== Sample Photos ===")
for photo in photos[:5]:
print(f" - {photo.get('filename', 'Unknown')}")
if photo.get('title'):
print(f" Title: {photo['title']}")
if photo.get('keywords'):
print(f" Keywords: {', '.join(photo['keywords'][:5])}")
if photo.get('collections'):
print(f" Collections: {', '.join(photo['collections'][:3])}")
if args.dry_run:
print("\n=== Dry Run Summary ===")
print(f"Would import {len(photos)} photos")
return
# Authenticate
print("\n=== Authenticating ===")
try:
get_token()
print(" Authenticated successfully!")
except Exception as e:
print(f" Authentication failed: {e}")
sys.exit(1)
# Import photos
print("\n=== Importing Photos ===")
imported = 0
skipped = 0
errors = 0
for i, photo in enumerate(photos, 1):
filename = photo.get('filename', 'Unknown')
print(f"[{i}/{len(photos)}] {filename}")
# Find file on disk
file_path = find_photo_file(photo, args.photos_dir)
if not file_path:
print(f" Skipped: File not found")
skipped += 1
continue
# Import to Directus
result = import_photo_to_directus(photo, file_path)
if result:
imported += 1
print(f" Imported: {result.get('title', filename)}")
else:
errors += 1
print(f" Error: Failed to import")
print("\n" + "=" * 60)
print(f" IMPORT COMPLETE")
print(f" Imported: {imported}")
print(f" Skipped: {skipped}")
print(f" Errors: {errors}")
print("=" * 60)
print(f"\nView at: {DIRECTUS_URL}/admin")
if __name__ == '__main__':
main()