432 lines
13 KiB
Python
Executable File
432 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Import Lightroom Catalog into Directus
|
|
|
|
This script reads a Lightroom Classic catalog (.lrcat file) and imports
|
|
photos with their metadata into Directus.
|
|
|
|
Usage:
|
|
python import-lightroom.py /path/to/catalog.lrcat /path/to/photos/
|
|
|
|
Requirements:
|
|
- Python 3.8+
|
|
- Lightroom catalog file (.lrcat)
|
|
- Access to the photo files
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import ssl
|
|
import sys
|
|
import urllib.request
|
|
import urllib.error
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# Directus configuration
|
|
DIRECTUS_URL = os.environ.get('DIRECTUS_URL', 'https://katheryn-cms.jeffemmett.com')
|
|
DIRECTUS_EMAIL = 'katheryn@katheryntrenshaw.com'
|
|
DIRECTUS_PASSWORD = '9BHhUaHSQjefuAEaPqPyjerf'
|
|
|
|
# SSL context
|
|
SSL_CONTEXT = ssl.create_default_context()
|
|
SSL_CONTEXT.check_hostname = False
|
|
SSL_CONTEXT.verify_mode = ssl.CERT_NONE
|
|
|
|
TOKEN = None
|
|
|
|
|
|
def get_token():
|
|
"""Get Directus access token"""
|
|
global TOKEN
|
|
if TOKEN:
|
|
return TOKEN
|
|
|
|
data = json.dumps({
|
|
'email': DIRECTUS_EMAIL,
|
|
'password': DIRECTUS_PASSWORD
|
|
}).encode()
|
|
|
|
req = urllib.request.Request(
|
|
f'{DIRECTUS_URL}/auth/login',
|
|
data=data,
|
|
headers={
|
|
'Content-Type': 'application/json',
|
|
'User-Agent': 'LightroomImport/1.0'
|
|
}
|
|
)
|
|
|
|
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
|
|
result = json.loads(resp.read())
|
|
TOKEN = result['data']['access_token']
|
|
return TOKEN
|
|
|
|
|
|
def api_request(method, endpoint, data=None):
|
|
"""Make authenticated API request"""
|
|
token = get_token()
|
|
url = f'{DIRECTUS_URL}{endpoint}'
|
|
|
|
req = urllib.request.Request(url, method=method)
|
|
req.add_header('Authorization', f'Bearer {token}')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('User-Agent', 'LightroomImport/1.0')
|
|
|
|
if data:
|
|
req.data = json.dumps(data).encode()
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
|
|
return json.loads(resp.read())
|
|
except urllib.error.HTTPError as e:
|
|
error_body = e.read().decode()
|
|
try:
|
|
error = json.loads(error_body)
|
|
return {'error': error.get('errors', [{}])[0].get('message', str(e))}
|
|
except:
|
|
return {'error': str(e)}
|
|
|
|
|
|
def upload_file(file_path, title=None):
|
|
"""Upload a file to Directus"""
|
|
import mimetypes
|
|
import uuid
|
|
|
|
token = get_token()
|
|
url = f'{DIRECTUS_URL}/files'
|
|
|
|
# Determine content type
|
|
content_type, _ = mimetypes.guess_type(file_path)
|
|
if not content_type:
|
|
content_type = 'application/octet-stream'
|
|
|
|
filename = os.path.basename(file_path)
|
|
|
|
# Create multipart form data manually
|
|
boundary = f'----WebKitFormBoundary{uuid.uuid4().hex[:16]}'
|
|
|
|
# Read file content
|
|
with open(file_path, 'rb') as f:
|
|
file_content = f.read()
|
|
|
|
# Build multipart body
|
|
body_parts = []
|
|
|
|
# Add title field if provided
|
|
if title:
|
|
body_parts.append(
|
|
f'--{boundary}\r\n'
|
|
f'Content-Disposition: form-data; name="title"\r\n\r\n'
|
|
f'{title}\r\n'
|
|
)
|
|
|
|
# Add file field
|
|
body_parts.append(
|
|
f'--{boundary}\r\n'
|
|
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
|
|
f'Content-Type: {content_type}\r\n\r\n'
|
|
)
|
|
|
|
# Combine parts
|
|
body = ''.join(body_parts).encode() + file_content + f'\r\n--{boundary}--\r\n'.encode()
|
|
|
|
req = urllib.request.Request(url, data=body, method='POST')
|
|
req.add_header('Authorization', f'Bearer {token}')
|
|
req.add_header('Content-Type', f'multipart/form-data; boundary={boundary}')
|
|
req.add_header('User-Agent', 'LightroomImport/1.0')
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
|
|
result = json.loads(resp.read())
|
|
return result.get('data', {}).get('id')
|
|
except urllib.error.HTTPError as e:
|
|
print(f" Upload error: {e}")
|
|
return None
|
|
|
|
|
|
def read_lightroom_catalog(catalog_path):
|
|
"""Read metadata from Lightroom catalog (.lrcat SQLite database)"""
|
|
if not os.path.exists(catalog_path):
|
|
print(f"Error: Catalog file not found: {catalog_path}")
|
|
sys.exit(1)
|
|
|
|
conn = sqlite3.connect(catalog_path)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# Get list of tables to understand the schema
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = [row[0] for row in cursor.fetchall()]
|
|
print(f" Found {len(tables)} tables in catalog")
|
|
|
|
photos = []
|
|
|
|
# Main photo query - Lightroom stores photos in Adobe_images
|
|
# with metadata in various related tables
|
|
try:
|
|
cursor.execute("""
|
|
SELECT
|
|
i.id_local as lr_id,
|
|
i.id_global as lr_uuid,
|
|
r.baseName as filename,
|
|
r.extension as extension,
|
|
f.pathFromRoot as folder_path,
|
|
rf.absolutePath as root_path,
|
|
i.captureTime as capture_time,
|
|
i.rating as rating,
|
|
i.colorLabels as color_label,
|
|
i.pick as pick_status
|
|
FROM Adobe_images i
|
|
LEFT JOIN AgLibraryFile r ON i.rootFile = r.id_local
|
|
LEFT JOIN AgLibraryFolder f ON r.folder = f.id_local
|
|
LEFT JOIN AgLibraryRootFolder rf ON f.rootFolder = rf.id_local
|
|
WHERE r.baseName IS NOT NULL
|
|
""")
|
|
|
|
for row in cursor.fetchall():
|
|
photo = dict(row)
|
|
|
|
# Get IPTC/XMP metadata
|
|
try:
|
|
cursor.execute("""
|
|
SELECT
|
|
caption,
|
|
copyright,
|
|
title
|
|
FROM AgLibraryIPTC
|
|
WHERE image = ?
|
|
""", (photo['lr_id'],))
|
|
iptc = cursor.fetchone()
|
|
if iptc:
|
|
photo['caption'] = iptc['caption']
|
|
photo['copyright'] = iptc['copyright']
|
|
photo['title'] = iptc['title']
|
|
except:
|
|
pass
|
|
|
|
# Get keywords
|
|
try:
|
|
cursor.execute("""
|
|
SELECT k.name
|
|
FROM AgLibraryKeyword k
|
|
JOIN AgLibraryKeywordImage ki ON k.id_local = ki.tag
|
|
WHERE ki.image = ?
|
|
""", (photo['lr_id'],))
|
|
keywords = [row[0] for row in cursor.fetchall()]
|
|
photo['keywords'] = keywords
|
|
except:
|
|
photo['keywords'] = []
|
|
|
|
# Get collections
|
|
try:
|
|
cursor.execute("""
|
|
SELECT c.name
|
|
FROM AgLibraryCollection c
|
|
JOIN AgLibraryCollectionImage ci ON c.id_local = ci.collection
|
|
WHERE ci.image = ?
|
|
""", (photo['lr_id'],))
|
|
collections = [row[0] for row in cursor.fetchall()]
|
|
photo['collections'] = collections
|
|
except:
|
|
photo['collections'] = []
|
|
|
|
photos.append(photo)
|
|
|
|
except sqlite3.OperationalError as e:
|
|
print(f" Database query error: {e}")
|
|
print(" This catalog may use a different schema version.")
|
|
print(" Attempting alternative query...")
|
|
|
|
# Try alternative schema (older Lightroom versions)
|
|
try:
|
|
cursor.execute("""
|
|
SELECT * FROM Adobe_images LIMIT 5
|
|
""")
|
|
sample = cursor.fetchall()
|
|
if sample:
|
|
print(f" Found {len(sample)} sample records")
|
|
# Print column names for debugging
|
|
print(f" Columns: {[d[0] for d in cursor.description]}")
|
|
except Exception as e2:
|
|
print(f" Alternative query also failed: {e2}")
|
|
|
|
conn.close()
|
|
return photos
|
|
|
|
|
|
def find_photo_file(photo, photos_dir):
|
|
"""Find the actual photo file on disk"""
|
|
if not photo.get('filename'):
|
|
return None
|
|
|
|
filename = photo['filename']
|
|
extension = photo.get('extension', '')
|
|
if extension and not filename.endswith(f'.{extension}'):
|
|
filename = f"{filename}.{extension}"
|
|
|
|
# Try various paths
|
|
search_paths = [
|
|
# Direct in photos_dir
|
|
os.path.join(photos_dir, filename),
|
|
# With folder path from catalog
|
|
os.path.join(photos_dir, photo.get('folder_path', ''), filename) if photo.get('folder_path') else None,
|
|
# Full path from catalog
|
|
os.path.join(photo.get('root_path', ''), photo.get('folder_path', ''), filename) if photo.get('root_path') else None,
|
|
]
|
|
|
|
for path in search_paths:
|
|
if path and os.path.exists(path):
|
|
return path
|
|
|
|
# Recursive search as fallback
|
|
for root, dirs, files in os.walk(photos_dir):
|
|
if filename in files:
|
|
return os.path.join(root, filename)
|
|
|
|
return None
|
|
|
|
|
|
def import_photo_to_directus(photo, file_path):
|
|
"""Import a single photo to Directus"""
|
|
# Upload the image file
|
|
title = photo.get('title') or photo.get('filename', 'Untitled')
|
|
file_id = upload_file(file_path, title=title)
|
|
|
|
if not file_id:
|
|
return None
|
|
|
|
# Create artwork record
|
|
artwork_data = {
|
|
'title': title,
|
|
'image': file_id,
|
|
'description': photo.get('caption'),
|
|
'lightroom_id': photo.get('lr_uuid'),
|
|
'status': 'draft',
|
|
}
|
|
|
|
# Parse year from capture time
|
|
if photo.get('capture_time'):
|
|
try:
|
|
dt = datetime.fromisoformat(photo['capture_time'].replace('Z', '+00:00'))
|
|
artwork_data['year'] = dt.year
|
|
except:
|
|
pass
|
|
|
|
# Add keywords as tags (would need tag creation logic)
|
|
# For now, store in description
|
|
if photo.get('keywords'):
|
|
keywords_str = ', '.join(photo['keywords'])
|
|
if artwork_data.get('description'):
|
|
artwork_data['description'] += f'\n\nKeywords: {keywords_str}'
|
|
else:
|
|
artwork_data['description'] = f'Keywords: {keywords_str}'
|
|
|
|
# Remove None values
|
|
artwork_data = {k: v for k, v in artwork_data.items() if v is not None}
|
|
|
|
result = api_request('POST', '/items/artworks', artwork_data)
|
|
if 'error' in result:
|
|
print(f" Error creating artwork: {result['error']}")
|
|
return None
|
|
|
|
return result.get('data')
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Import Lightroom catalog to Directus')
|
|
parser.add_argument('catalog', help='Path to Lightroom catalog (.lrcat file)')
|
|
parser.add_argument('photos_dir', help='Path to photos directory')
|
|
parser.add_argument('--dry-run', action='store_true', help='Preview without importing')
|
|
parser.add_argument('--limit', type=int, help='Limit number of photos to import')
|
|
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 60)
|
|
print(" LIGHTROOM TO DIRECTUS IMPORT")
|
|
print("=" * 60)
|
|
print(f"\nCatalog: {args.catalog}")
|
|
print(f"Photos: {args.photos_dir}")
|
|
print(f"Directus: {DIRECTUS_URL}")
|
|
|
|
if args.dry_run:
|
|
print("\n*** DRY RUN - No changes will be made ***\n")
|
|
|
|
# Read catalog
|
|
print("\n=== Reading Lightroom Catalog ===")
|
|
photos = read_lightroom_catalog(args.catalog)
|
|
print(f" Found {len(photos)} photos in catalog")
|
|
|
|
if args.limit:
|
|
photos = photos[:args.limit]
|
|
print(f" Limited to {len(photos)} photos")
|
|
|
|
if not photos:
|
|
print("\nNo photos found to import.")
|
|
return
|
|
|
|
# Preview some photos
|
|
print("\n=== Sample Photos ===")
|
|
for photo in photos[:5]:
|
|
print(f" - {photo.get('filename', 'Unknown')}")
|
|
if photo.get('title'):
|
|
print(f" Title: {photo['title']}")
|
|
if photo.get('keywords'):
|
|
print(f" Keywords: {', '.join(photo['keywords'][:5])}")
|
|
if photo.get('collections'):
|
|
print(f" Collections: {', '.join(photo['collections'][:3])}")
|
|
|
|
if args.dry_run:
|
|
print("\n=== Dry Run Summary ===")
|
|
print(f"Would import {len(photos)} photos")
|
|
return
|
|
|
|
# Authenticate
|
|
print("\n=== Authenticating ===")
|
|
try:
|
|
get_token()
|
|
print(" Authenticated successfully!")
|
|
except Exception as e:
|
|
print(f" Authentication failed: {e}")
|
|
sys.exit(1)
|
|
|
|
# Import photos
|
|
print("\n=== Importing Photos ===")
|
|
imported = 0
|
|
skipped = 0
|
|
errors = 0
|
|
|
|
for i, photo in enumerate(photos, 1):
|
|
filename = photo.get('filename', 'Unknown')
|
|
print(f"[{i}/{len(photos)}] {filename}")
|
|
|
|
# Find file on disk
|
|
file_path = find_photo_file(photo, args.photos_dir)
|
|
if not file_path:
|
|
print(f" Skipped: File not found")
|
|
skipped += 1
|
|
continue
|
|
|
|
# Import to Directus
|
|
result = import_photo_to_directus(photo, file_path)
|
|
if result:
|
|
imported += 1
|
|
print(f" Imported: {result.get('title', filename)}")
|
|
else:
|
|
errors += 1
|
|
print(f" Error: Failed to import")
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f" IMPORT COMPLETE")
|
|
print(f" Imported: {imported}")
|
|
print(f" Skipped: {skipped}")
|
|
print(f" Errors: {errors}")
|
|
print("=" * 60)
|
|
print(f"\nView at: {DIRECTUS_URL}/admin")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|