228 lines
7.2 KiB
Python
228 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Import Airtable CSV exports into Directus
|
|
"""
|
|
|
|
import csv
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
DIRECTUS_URL = os.environ.get('DIRECTUS_URL', 'https://katheryn-cms.jeffemmett.com')
|
|
DIRECTUS_EMAIL = 'katheryn@katheryntrenshaw.com'
|
|
DIRECTUS_PASSWORD = '9BHhUaHSQjefuAEaPqPyjerf'
|
|
|
|
def get_token():
|
|
"""Get Directus access token"""
|
|
data = json.dumps({
|
|
'email': DIRECTUS_EMAIL,
|
|
'password': DIRECTUS_PASSWORD
|
|
}).encode()
|
|
|
|
req = urllib.request.Request(
|
|
f'{DIRECTUS_URL}/auth/login',
|
|
data=data,
|
|
headers={'Content-Type': 'application/json'}
|
|
)
|
|
|
|
with urllib.request.urlopen(req) as resp:
|
|
result = json.loads(resp.read())
|
|
return result['data']['access_token']
|
|
|
|
def create_item(token, collection, data):
|
|
"""Create item in Directus"""
|
|
req = urllib.request.Request(
|
|
f'{DIRECTUS_URL}/items/{collection}',
|
|
data=json.dumps(data).encode(),
|
|
headers={
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {token}'
|
|
}
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
return json.loads(resp.read())['data']
|
|
except urllib.error.HTTPError as e:
|
|
error = json.loads(e.read())
|
|
print(f" Error: {error.get('errors', [{}])[0].get('message', str(e))}")
|
|
return None
|
|
|
|
def parse_price(price_str):
|
|
"""Parse price string to decimal"""
|
|
if not price_str:
|
|
return None
|
|
# Remove currency symbols and extract number
|
|
match = re.search(r'[\d,.]+', price_str.replace(',', ''))
|
|
if match:
|
|
try:
|
|
return float(match.group())
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def import_products(token, csv_path):
|
|
"""Import Products CSV"""
|
|
print(f"\nImporting Products from {csv_path}")
|
|
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
count = 0
|
|
|
|
for row in reader:
|
|
data = {
|
|
'product_id': row.get('Product ID', '').strip(),
|
|
'original': row.get('Original', '').strip(),
|
|
'location': row.get('Most Recent Location', '').strip(),
|
|
'notes': row.get('Notes', '').strip(),
|
|
'item_type': row.get('Item Type', '').strip(),
|
|
'pricing_band': row.get('Pricing Band', '').strip(),
|
|
'price_usd': parse_price(row.get('Price in Dollars', '')),
|
|
'dimensions': row.get('Image', '').strip(),
|
|
}
|
|
|
|
# Skip empty rows
|
|
if not data['product_id'] and not data['original'] and not data['notes']:
|
|
continue
|
|
|
|
# Remove None values
|
|
data = {k: v for k, v in data.items() if v is not None and v != ''}
|
|
|
|
result = create_item(token, 'products', data)
|
|
if result:
|
|
count += 1
|
|
print(f" [{count}] Created: {data.get('original') or data.get('product_id') or 'item'}")
|
|
|
|
print(f"Imported {count} products")
|
|
return count
|
|
|
|
def import_artworks(token, csv_path):
|
|
"""Import Fine Art CSV"""
|
|
print(f"\nImporting Artworks from {csv_path}")
|
|
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
count = 0
|
|
|
|
for row in reader:
|
|
data = {
|
|
'name': row.get('Name', '').strip(),
|
|
'medium': row.get('Item Medium', '').strip(),
|
|
'notes': row.get('Notes', '').strip(),
|
|
'dimensions': row.get('Artwork Size (H,W,D)', '').strip(),
|
|
'price_gbp': parse_price(row.get('Selling Price / GBP', '')),
|
|
'status': 'published',
|
|
}
|
|
|
|
# Parse creation date
|
|
date_str = row.get('Artwork Creation Date', '').strip()
|
|
if date_str:
|
|
# Try to parse "1 June 2025" format
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.strptime(date_str, '%d %B %Y')
|
|
data['creation_date'] = dt.strftime('%Y-%m-%d')
|
|
except:
|
|
pass
|
|
|
|
# Skip empty rows
|
|
if not data['name']:
|
|
continue
|
|
|
|
# Remove None values
|
|
data = {k: v for k, v in data.items() if v is not None and v != ''}
|
|
|
|
result = create_item(token, 'artworks', data)
|
|
if result:
|
|
count += 1
|
|
print(f" [{count}] Created: {data['name']}")
|
|
|
|
print(f"Imported {count} artworks")
|
|
return count
|
|
|
|
def import_iyos(token, csv_path):
|
|
"""Import IYOS participants CSV"""
|
|
print(f"\nImporting IYOS participants from {csv_path}")
|
|
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
count = 0
|
|
|
|
for row in reader:
|
|
data = {
|
|
'unique_id': row.get('UNIQUE ID', '').strip(),
|
|
'name': row.get('Contacts', '').strip(),
|
|
'word': row.get('word', '').strip(),
|
|
'nationality': row.get('Nationality', '').strip(),
|
|
'place_of_birth': row.get('place of birth', '').strip(),
|
|
'bio': row.get('Bio', '').strip(),
|
|
}
|
|
|
|
# Parse age
|
|
age_str = row.get('age at time of shoot', '').strip()
|
|
if age_str:
|
|
try:
|
|
data['age_at_shoot'] = int(age_str)
|
|
except:
|
|
pass
|
|
|
|
# Parse release form
|
|
release = row.get('Release Form Complete', '').strip().lower()
|
|
if release:
|
|
data['release_form'] = release in ['yes', 'true', '1', 'complete', 'completed']
|
|
|
|
# Skip empty rows
|
|
if not data['unique_id'] and not data['name']:
|
|
continue
|
|
|
|
# Remove None/empty values
|
|
data = {k: v for k, v in data.items() if v is not None and v != ''}
|
|
|
|
result = create_item(token, 'iyos_participants', data)
|
|
if result:
|
|
count += 1
|
|
print(f" [{count}] Created: {data.get('name') or data.get('unique_id')}")
|
|
|
|
print(f"Imported {count} IYOS participants")
|
|
return count
|
|
|
|
def main():
|
|
data_dir = '/home/jeffe/Github/katheryn-website/data'
|
|
|
|
print("=== Airtable to Directus Import ===")
|
|
print(f"Directus URL: {DIRECTUS_URL}")
|
|
print(f"Data directory: {data_dir}")
|
|
|
|
# Get token
|
|
print("\nAuthenticating...")
|
|
token = get_token()
|
|
print("Authenticated!")
|
|
|
|
# Import each CSV
|
|
total = 0
|
|
|
|
# Fine Art → artworks
|
|
fine_art_csv = os.path.join(data_dir, '꩜ Fine Art ꩜-All Fields.csv')
|
|
if os.path.exists(fine_art_csv):
|
|
total += import_artworks(token, fine_art_csv)
|
|
|
|
# Products → products
|
|
products_csv = os.path.join(data_dir, 'Products-Grid view.csv')
|
|
if os.path.exists(products_csv):
|
|
total += import_products(token, products_csv)
|
|
|
|
# IYOS → iyos_participants
|
|
iyos_csv = os.path.join(data_dir, 'IYOS-Grid view.csv')
|
|
if os.path.exists(iyos_csv):
|
|
total += import_iyos(token, iyos_csv)
|
|
|
|
print(f"\n=== Import Complete ===")
|
|
print(f"Total items imported: {total}")
|
|
print(f"\nView at: {DIRECTUS_URL}/admin")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|