katheryn-website/scripts/import-airtable.py

228 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""
Import Airtable CSV exports into Directus
"""
import csv
import json
import os
import re
import sys
import urllib.request
import urllib.error
DIRECTUS_URL = os.environ.get('DIRECTUS_URL', 'https://katheryn-cms.jeffemmett.com')
DIRECTUS_EMAIL = 'katheryn@katheryntrenshaw.com'
DIRECTUS_PASSWORD = '9BHhUaHSQjefuAEaPqPyjerf'
def get_token():
"""Get Directus access token"""
data = json.dumps({
'email': DIRECTUS_EMAIL,
'password': DIRECTUS_PASSWORD
}).encode()
req = urllib.request.Request(
f'{DIRECTUS_URL}/auth/login',
data=data,
headers={'Content-Type': 'application/json'}
)
with urllib.request.urlopen(req) as resp:
result = json.loads(resp.read())
return result['data']['access_token']
def create_item(token, collection, data):
"""Create item in Directus"""
req = urllib.request.Request(
f'{DIRECTUS_URL}/items/{collection}',
data=json.dumps(data).encode(),
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'
}
)
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())['data']
except urllib.error.HTTPError as e:
error = json.loads(e.read())
print(f" Error: {error.get('errors', [{}])[0].get('message', str(e))}")
return None
def parse_price(price_str):
"""Parse price string to decimal"""
if not price_str:
return None
# Remove currency symbols and extract number
match = re.search(r'[\d,.]+', price_str.replace(',', ''))
if match:
try:
return float(match.group())
except:
pass
return None
def import_products(token, csv_path):
"""Import Products CSV"""
print(f"\nImporting Products from {csv_path}")
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
count = 0
for row in reader:
data = {
'product_id': row.get('Product ID', '').strip(),
'original': row.get('Original', '').strip(),
'location': row.get('Most Recent Location', '').strip(),
'notes': row.get('Notes', '').strip(),
'item_type': row.get('Item Type', '').strip(),
'pricing_band': row.get('Pricing Band', '').strip(),
'price_usd': parse_price(row.get('Price in Dollars', '')),
'dimensions': row.get('Image', '').strip(),
}
# Skip empty rows
if not data['product_id'] and not data['original'] and not data['notes']:
continue
# Remove None values
data = {k: v for k, v in data.items() if v is not None and v != ''}
result = create_item(token, 'products', data)
if result:
count += 1
print(f" [{count}] Created: {data.get('original') or data.get('product_id') or 'item'}")
print(f"Imported {count} products")
return count
def import_artworks(token, csv_path):
"""Import Fine Art CSV"""
print(f"\nImporting Artworks from {csv_path}")
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
count = 0
for row in reader:
data = {
'name': row.get('Name', '').strip(),
'medium': row.get('Item Medium', '').strip(),
'notes': row.get('Notes', '').strip(),
'dimensions': row.get('Artwork Size (H,W,D)', '').strip(),
'price_gbp': parse_price(row.get('Selling Price / GBP', '')),
'status': 'published',
}
# Parse creation date
date_str = row.get('Artwork Creation Date', '').strip()
if date_str:
# Try to parse "1 June 2025" format
try:
from datetime import datetime
dt = datetime.strptime(date_str, '%d %B %Y')
data['creation_date'] = dt.strftime('%Y-%m-%d')
except:
pass
# Skip empty rows
if not data['name']:
continue
# Remove None values
data = {k: v for k, v in data.items() if v is not None and v != ''}
result = create_item(token, 'artworks', data)
if result:
count += 1
print(f" [{count}] Created: {data['name']}")
print(f"Imported {count} artworks")
return count
def import_iyos(token, csv_path):
"""Import IYOS participants CSV"""
print(f"\nImporting IYOS participants from {csv_path}")
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
count = 0
for row in reader:
data = {
'unique_id': row.get('UNIQUE ID', '').strip(),
'name': row.get('Contacts', '').strip(),
'word': row.get('word', '').strip(),
'nationality': row.get('Nationality', '').strip(),
'place_of_birth': row.get('place of birth', '').strip(),
'bio': row.get('Bio', '').strip(),
}
# Parse age
age_str = row.get('age at time of shoot', '').strip()
if age_str:
try:
data['age_at_shoot'] = int(age_str)
except:
pass
# Parse release form
release = row.get('Release Form Complete', '').strip().lower()
if release:
data['release_form'] = release in ['yes', 'true', '1', 'complete', 'completed']
# Skip empty rows
if not data['unique_id'] and not data['name']:
continue
# Remove None/empty values
data = {k: v for k, v in data.items() if v is not None and v != ''}
result = create_item(token, 'iyos_participants', data)
if result:
count += 1
print(f" [{count}] Created: {data.get('name') or data.get('unique_id')}")
print(f"Imported {count} IYOS participants")
return count
def main():
data_dir = '/home/jeffe/Github/katheryn-website/data'
print("=== Airtable to Directus Import ===")
print(f"Directus URL: {DIRECTUS_URL}")
print(f"Data directory: {data_dir}")
# Get token
print("\nAuthenticating...")
token = get_token()
print("Authenticated!")
# Import each CSV
total = 0
# Fine Art → artworks
fine_art_csv = os.path.join(data_dir, '꩜ Fine Art ꩜-All Fields.csv')
if os.path.exists(fine_art_csv):
total += import_artworks(token, fine_art_csv)
# Products → products
products_csv = os.path.join(data_dir, 'Products-Grid view.csv')
if os.path.exists(products_csv):
total += import_products(token, products_csv)
# IYOS → iyos_participants
iyos_csv = os.path.join(data_dir, 'IYOS-Grid view.csv')
if os.path.exists(iyos_csv):
total += import_iyos(token, iyos_csv)
print(f"\n=== Import Complete ===")
print(f"Total items imported: {total}")
print(f"\nView at: {DIRECTUS_URL}/admin")
if __name__ == '__main__':
main()