katheryn-website/scripts/import-all.py

441 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Complete Airtable to Directus Import Script
Imports: Locations, Contacts, Pricing Bands, and creates collections if needed
"""
import csv
import json
import os
import re
import ssl
import sys
import urllib.request
import urllib.error
from pathlib import Path
# Create SSL context that doesn't verify (for self-signed certs)
SSL_CONTEXT = ssl.create_default_context()
SSL_CONTEXT.check_hostname = False
SSL_CONTEXT.verify_mode = ssl.CERT_NONE
DIRECTUS_URL = os.environ.get('DIRECTUS_URL', 'https://katheryn-cms.jeffemmett.com')
DIRECTUS_EMAIL = 'katheryn@katheryntrenshaw.com'
DIRECTUS_PASSWORD = '9BHhUaHSQjefuAEaPqPyjerf'
DATA_DIR = Path('/home/jeffe/Github/katheryn-website/data')
# Global token
TOKEN = None
def get_token():
"""Get Directus access token"""
global TOKEN
if TOKEN:
return TOKEN
data = json.dumps({
'email': DIRECTUS_EMAIL,
'password': DIRECTUS_PASSWORD
}).encode()
req = urllib.request.Request(
f'{DIRECTUS_URL}/auth/login',
data=data,
headers={
'Content-Type': 'application/json',
'User-Agent': 'DirectusImport/1.0'
}
)
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
result = json.loads(resp.read())
TOKEN = result['data']['access_token']
return TOKEN
def api_request(method, endpoint, data=None):
"""Make authenticated API request"""
token = get_token()
url = f'{DIRECTUS_URL}{endpoint}'
req = urllib.request.Request(url, method=method)
req.add_header('Authorization', f'Bearer {token}')
req.add_header('Content-Type', 'application/json')
req.add_header('User-Agent', 'DirectusImport/1.0')
if data:
req.data = json.dumps(data).encode()
try:
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
error_body = e.read().decode()
try:
error = json.loads(error_body)
return {'error': error.get('errors', [{}])[0].get('message', str(e))}
except:
return {'error': str(e)}
def collection_exists(name):
"""Check if a collection exists"""
result = api_request('GET', f'/collections/{name}')
return 'error' not in result
def create_collection(name, fields, meta=None):
"""Create a collection with fields"""
if collection_exists(name):
print(f" Collection '{name}' already exists, skipping...")
return True
# Create collection
collection_data = {
'collection': name,
'meta': meta or {},
'schema': {},
'fields': fields
}
result = api_request('POST', '/collections', collection_data)
if 'error' in result:
print(f" Error creating collection '{name}': {result['error']}")
return False
print(f" Created collection '{name}'")
return True
def create_item(collection, data):
"""Create item in Directus"""
result = api_request('POST', f'/items/{collection}', data)
if 'error' in result:
print(f" Error: {result['error']}")
return None
return result.get('data')
def parse_price(price_str):
"""Parse price string to decimal"""
if not price_str:
return None
# Remove currency symbols and extract number
match = re.search(r'[\d,.]+', price_str.replace(',', ''))
if match:
try:
return float(match.group())
except:
pass
return None
def parse_percent(pct_str):
"""Parse percentage string"""
if not pct_str:
return None
match = re.search(r'[\d.]+', pct_str)
if match:
try:
return float(match.group())
except:
pass
return None
def setup_collections():
"""Ensure all required collections exist"""
print("\n=== Setting Up Collections ===")
# Locations collection
locations_fields = [
{'field': 'id', 'type': 'uuid', 'meta': {'hidden': True, 'readonly': True, 'special': ['uuid']}, 'schema': {'is_primary_key': True}},
{'field': 'name', 'type': 'string', 'meta': {'required': True}},
{'field': 'type', 'type': 'string'},
{'field': 'is_active', 'type': 'boolean', 'schema': {'default_value': True}},
{'field': 'contact_name', 'type': 'string'},
{'field': 'email', 'type': 'string'},
{'field': 'phone', 'type': 'string'},
{'field': 'website', 'type': 'string'},
{'field': 'street', 'type': 'string'},
{'field': 'city', 'type': 'string'},
{'field': 'county', 'type': 'string'},
{'field': 'postcode', 'type': 'string'},
{'field': 'country', 'type': 'string'},
{'field': 'sale_cut_percent', 'type': 'decimal'},
{'field': 'notes', 'type': 'text'},
]
create_collection('locations', locations_fields, {'icon': 'store', 'note': 'Galleries and venues'})
# Contacts collection
contacts_fields = [
{'field': 'id', 'type': 'uuid', 'meta': {'hidden': True, 'readonly': True, 'special': ['uuid']}, 'schema': {'is_primary_key': True}},
{'field': 'first_name', 'type': 'string'},
{'field': 'last_name', 'type': 'string'},
{'field': 'organisation', 'type': 'string'},
{'field': 'email', 'type': 'string'},
{'field': 'phone', 'type': 'string'},
{'field': 'website', 'type': 'string'},
{'field': 'street', 'type': 'string'},
{'field': 'city', 'type': 'string'},
{'field': 'county', 'type': 'string'},
{'field': 'postcode', 'type': 'string'},
{'field': 'country', 'type': 'string'},
{'field': 'contact_type', 'type': 'string'},
{'field': 'is_press_contact', 'type': 'boolean', 'schema': {'default_value': False}},
{'field': 'has_purchased', 'type': 'boolean', 'schema': {'default_value': False}},
{'field': 'no_mailouts', 'type': 'boolean', 'schema': {'default_value': False}},
{'field': 'notes', 'type': 'text'},
{'field': 'airtable_id', 'type': 'string'},
]
create_collection('contacts', contacts_fields, {'icon': 'contacts', 'note': 'Customers and press contacts'})
# Pricing bands collection
pricing_fields = [
{'field': 'id', 'type': 'uuid', 'meta': {'hidden': True, 'readonly': True, 'special': ['uuid']}, 'schema': {'is_primary_key': True}},
{'field': 'name', 'type': 'string', 'meta': {'required': True}},
{'field': 'category', 'type': 'string'},
{'field': 'price_usd', 'type': 'decimal'},
{'field': 'sort', 'type': 'integer'},
]
create_collection('pricing_bands', pricing_fields, {'icon': 'attach_money', 'note': 'Standard pricing tiers'})
def import_locations():
"""Import Locations CSV"""
csv_path = DATA_DIR / 'Locations-Grid view.csv'
if not csv_path.exists():
print(f" File not found: {csv_path}")
return 0
print(f"\n=== Importing Locations ===")
print(f" Source: {csv_path}")
count = 0
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
name = row.get('Location Name', '').strip()
if not name:
continue
# Parse type
loc_type = row.get('Type', '').strip().lower()
type_map = {
'gallery': 'gallery',
'auction': 'auction',
'studio': 'studio',
'private': 'private',
}
loc_type = type_map.get(loc_type, 'other')
# Parse active status
is_active = row.get('Active Location', '').strip().lower() == 'checked'
# Parse sale cut percentage
sale_cut = parse_percent(row.get('Fine Art sale % cut', ''))
# Build contact name from first/last
contact_parts = []
if row.get('Contact First Name', '').strip():
contact_parts.append(row.get('Contact First Name', '').strip())
if row.get('Contact Last Name', '').strip():
contact_parts.append(row.get('Contact Last Name', '').strip())
contact_name = ' '.join(contact_parts) if contact_parts else None
data = {
'name': name,
'type': loc_type,
'is_active': is_active,
'contact_name': contact_name,
'email': row.get('Contact Email', '').strip() or None,
'phone': row.get('Contact Phone', '').strip() or None,
'website': row.get('Contact Website', '').strip() or None,
'street': row.get('Contact Street', '').strip() or None,
'city': row.get('Contact Prefix with City', '').strip() or None,
'county': row.get('Contact County', '').strip() or None,
'postcode': row.get('Contact Postcode', '').strip() or None,
'country': row.get('Contact Country', '').strip() or None,
'sale_cut_percent': sale_cut,
}
# Remove None values
data = {k: v for k, v in data.items() if v is not None}
result = create_item('locations', data)
if result:
count += 1
print(f" [{count}] {name} ({loc_type})")
print(f" Imported {count} locations")
return count
def import_contacts():
"""Import Contacts CSV (selective - skip empty/minimal records)"""
csv_path = DATA_DIR / 'Contacts-Grid Main.csv'
if not csv_path.exists():
print(f" File not found: {csv_path}")
return 0
print(f"\n=== Importing Contacts ===")
print(f" Source: {csv_path}")
print(f" Note: Importing only contacts with email OR purchase history")
count = 0
skipped = 0
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
first_name = row.get('First Name', '').strip()
last_name = row.get('Last Name', '').strip()
organisation = row.get('Organisation', '').strip()
email = row.get('Email', '').strip()
has_purchased = row.get('Purchased Something', '').strip().lower() == 'checked'
is_press = row.get('Press Contact', '').strip().lower() == 'checked'
# Skip records without meaningful data
# Must have: (name OR organisation) AND (email OR has_purchased OR is_press)
has_identity = first_name or last_name or organisation
has_value = email or has_purchased or is_press
if not (has_identity and has_value):
skipped += 1
continue
# Determine contact type
if is_press:
contact_type = 'press'
elif has_purchased:
contact_type = 'customer'
else:
contact_type = 'other'
data = {
'first_name': first_name or None,
'last_name': last_name or None,
'organisation': organisation or None,
'email': email or None,
'phone': row.get('Mobile', '').strip() or row.get('Home Phone', '').strip() or None,
'website': row.get('Website', '').strip() or None,
'street': row.get('Street', '').strip() or None,
'city': row.get('City', '').strip() or None,
'county': row.get('County (UK)', '').strip() or row.get('State (USA)', '').strip() or None,
'postcode': row.get('Postcode', '').strip() or None,
'country': row.get('Country', '').strip() or None,
'contact_type': contact_type,
'is_press_contact': is_press,
'has_purchased': has_purchased,
'no_mailouts': row.get('No Mailouts', '').strip().lower() == 'checked',
'notes': row.get('Notes', '').strip() or None,
'airtable_id': row.get('ID', '').strip() or None,
}
# Remove None values
data = {k: v for k, v in data.items() if v is not None}
result = create_item('contacts', data)
if result:
count += 1
name_display = f"{first_name} {last_name}".strip() or organisation or email
if count <= 50 or count % 100 == 0:
print(f" [{count}] {name_display}")
print(f" Imported {count} contacts (skipped {skipped} empty/minimal records)")
return count
def import_pricing_bands():
"""Import Pricing Bands CSV"""
csv_path = DATA_DIR / '_Fine Art Pricing Bands-Grid view.csv'
if not csv_path.exists():
print(f" File not found: {csv_path}")
return 0
print(f"\n=== Importing Pricing Bands ===")
print(f" Source: {csv_path}")
count = 0
current_category = 'fine_art'
sort_order = 0
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
name = row.get('Band Name', '').strip()
price_str = row.get('Price in US dollars', '').strip()
if not name:
continue
# Check for category markers
if name.startswith('———'):
if 'FINE ART' in name.upper():
current_category = 'fine_art'
elif 'PRODUCTS' in name.upper():
current_category = 'products'
continue
price = parse_price(price_str)
sort_order += 1
data = {
'name': name,
'category': current_category,
'price_usd': price,
'sort': sort_order,
}
# Remove None values
data = {k: v for k, v in data.items() if v is not None}
result = create_item('pricing_bands', data)
if result:
count += 1
price_display = f"${price:.2f}" if price else "N/A"
print(f" [{count}] {name} - {price_display} ({current_category})")
print(f" Imported {count} pricing bands")
return count
def main():
print("=" * 60)
print(" AIRTABLE TO DIRECTUS - COMPLETE IMPORT")
print("=" * 60)
print(f"\nDirectus URL: {DIRECTUS_URL}")
print(f"Data directory: {DATA_DIR}")
# Authenticate
print("\nAuthenticating...")
try:
get_token()
print(" Authenticated successfully!")
except Exception as e:
print(f" Authentication failed: {e}")
sys.exit(1)
# Setup collections
setup_collections()
# Run imports
total = 0
total += import_pricing_bands()
total += import_locations()
total += import_contacts()
print("\n" + "=" * 60)
print(f" IMPORT COMPLETE - {total} total items imported")
print("=" * 60)
print(f"\nView at: {DIRECTUS_URL}/admin")
if __name__ == '__main__':
main()