#!/usr/bin/env python3 """ Complete Airtable to Directus Import Script Imports: Locations, Contacts, Pricing Bands, and creates collections if needed """ import csv import json import os import re import ssl import sys import urllib.request import urllib.error from pathlib import Path # Create SSL context that doesn't verify (for self-signed certs) SSL_CONTEXT = ssl.create_default_context() SSL_CONTEXT.check_hostname = False SSL_CONTEXT.verify_mode = ssl.CERT_NONE DIRECTUS_URL = os.environ.get('DIRECTUS_URL', 'https://katheryn-cms.jeffemmett.com') DIRECTUS_EMAIL = 'katheryn@katheryntrenshaw.com' DIRECTUS_PASSWORD = '9BHhUaHSQjefuAEaPqPyjerf' DATA_DIR = Path('/home/jeffe/Github/katheryn-website/data') # Global token TOKEN = None def get_token(): """Get Directus access token""" global TOKEN if TOKEN: return TOKEN data = json.dumps({ 'email': DIRECTUS_EMAIL, 'password': DIRECTUS_PASSWORD }).encode() req = urllib.request.Request( f'{DIRECTUS_URL}/auth/login', data=data, headers={ 'Content-Type': 'application/json', 'User-Agent': 'DirectusImport/1.0' } ) with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp: result = json.loads(resp.read()) TOKEN = result['data']['access_token'] return TOKEN def api_request(method, endpoint, data=None): """Make authenticated API request""" token = get_token() url = f'{DIRECTUS_URL}{endpoint}' req = urllib.request.Request(url, method=method) req.add_header('Authorization', f'Bearer {token}') req.add_header('Content-Type', 'application/json') req.add_header('User-Agent', 'DirectusImport/1.0') if data: req.data = json.dumps(data).encode() try: with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp: return json.loads(resp.read()) except urllib.error.HTTPError as e: error_body = e.read().decode() try: error = json.loads(error_body) return {'error': error.get('errors', [{}])[0].get('message', str(e))} except: return {'error': str(e)} def collection_exists(name): """Check if a collection exists""" result = api_request('GET', f'/collections/{name}') return 'error' not in result def create_collection(name, fields, meta=None): """Create a collection with fields""" if collection_exists(name): print(f" Collection '{name}' already exists, skipping...") return True # Create collection collection_data = { 'collection': name, 'meta': meta or {}, 'schema': {}, 'fields': fields } result = api_request('POST', '/collections', collection_data) if 'error' in result: print(f" Error creating collection '{name}': {result['error']}") return False print(f" Created collection '{name}'") return True def create_item(collection, data): """Create item in Directus""" result = api_request('POST', f'/items/{collection}', data) if 'error' in result: print(f" Error: {result['error']}") return None return result.get('data') def parse_price(price_str): """Parse price string to decimal""" if not price_str: return None # Remove currency symbols and extract number match = re.search(r'[\d,.]+', price_str.replace(',', '')) if match: try: return float(match.group()) except: pass return None def parse_percent(pct_str): """Parse percentage string""" if not pct_str: return None match = re.search(r'[\d.]+', pct_str) if match: try: return float(match.group()) except: pass return None def setup_collections(): """Ensure all required collections exist""" print("\n=== Setting Up Collections ===") # Locations collection locations_fields = [ {'field': 'id', 'type': 'uuid', 'meta': {'hidden': True, 'readonly': True, 'special': ['uuid']}, 'schema': {'is_primary_key': True}}, {'field': 'name', 'type': 'string', 'meta': {'required': True}}, {'field': 'type', 'type': 'string'}, {'field': 'is_active', 'type': 'boolean', 'schema': {'default_value': True}}, {'field': 'contact_name', 'type': 'string'}, {'field': 'email', 'type': 'string'}, {'field': 'phone', 'type': 'string'}, {'field': 'website', 'type': 'string'}, {'field': 'street', 'type': 'string'}, {'field': 'city', 'type': 'string'}, {'field': 'county', 'type': 'string'}, {'field': 'postcode', 'type': 'string'}, {'field': 'country', 'type': 'string'}, {'field': 'sale_cut_percent', 'type': 'decimal'}, {'field': 'notes', 'type': 'text'}, ] create_collection('locations', locations_fields, {'icon': 'store', 'note': 'Galleries and venues'}) # Contacts collection contacts_fields = [ {'field': 'id', 'type': 'uuid', 'meta': {'hidden': True, 'readonly': True, 'special': ['uuid']}, 'schema': {'is_primary_key': True}}, {'field': 'first_name', 'type': 'string'}, {'field': 'last_name', 'type': 'string'}, {'field': 'organisation', 'type': 'string'}, {'field': 'email', 'type': 'string'}, {'field': 'phone', 'type': 'string'}, {'field': 'website', 'type': 'string'}, {'field': 'street', 'type': 'string'}, {'field': 'city', 'type': 'string'}, {'field': 'county', 'type': 'string'}, {'field': 'postcode', 'type': 'string'}, {'field': 'country', 'type': 'string'}, {'field': 'contact_type', 'type': 'string'}, {'field': 'is_press_contact', 'type': 'boolean', 'schema': {'default_value': False}}, {'field': 'has_purchased', 'type': 'boolean', 'schema': {'default_value': False}}, {'field': 'no_mailouts', 'type': 'boolean', 'schema': {'default_value': False}}, {'field': 'notes', 'type': 'text'}, {'field': 'airtable_id', 'type': 'string'}, ] create_collection('contacts', contacts_fields, {'icon': 'contacts', 'note': 'Customers and press contacts'}) # Pricing bands collection pricing_fields = [ {'field': 'id', 'type': 'uuid', 'meta': {'hidden': True, 'readonly': True, 'special': ['uuid']}, 'schema': {'is_primary_key': True}}, {'field': 'name', 'type': 'string', 'meta': {'required': True}}, {'field': 'category', 'type': 'string'}, {'field': 'price_usd', 'type': 'decimal'}, {'field': 'sort', 'type': 'integer'}, ] create_collection('pricing_bands', pricing_fields, {'icon': 'attach_money', 'note': 'Standard pricing tiers'}) def import_locations(): """Import Locations CSV""" csv_path = DATA_DIR / 'Locations-Grid view.csv' if not csv_path.exists(): print(f" File not found: {csv_path}") return 0 print(f"\n=== Importing Locations ===") print(f" Source: {csv_path}") count = 0 with open(csv_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: name = row.get('Location Name', '').strip() if not name: continue # Parse type loc_type = row.get('Type', '').strip().lower() type_map = { 'gallery': 'gallery', 'auction': 'auction', 'studio': 'studio', 'private': 'private', } loc_type = type_map.get(loc_type, 'other') # Parse active status is_active = row.get('Active Location', '').strip().lower() == 'checked' # Parse sale cut percentage sale_cut = parse_percent(row.get('Fine Art sale % cut', '')) # Build contact name from first/last contact_parts = [] if row.get('Contact First Name', '').strip(): contact_parts.append(row.get('Contact First Name', '').strip()) if row.get('Contact Last Name', '').strip(): contact_parts.append(row.get('Contact Last Name', '').strip()) contact_name = ' '.join(contact_parts) if contact_parts else None data = { 'name': name, 'type': loc_type, 'is_active': is_active, 'contact_name': contact_name, 'email': row.get('Contact Email', '').strip() or None, 'phone': row.get('Contact Phone', '').strip() or None, 'website': row.get('Contact Website', '').strip() or None, 'street': row.get('Contact Street', '').strip() or None, 'city': row.get('Contact Prefix with City', '').strip() or None, 'county': row.get('Contact County', '').strip() or None, 'postcode': row.get('Contact Postcode', '').strip() or None, 'country': row.get('Contact Country', '').strip() or None, 'sale_cut_percent': sale_cut, } # Remove None values data = {k: v for k, v in data.items() if v is not None} result = create_item('locations', data) if result: count += 1 print(f" [{count}] {name} ({loc_type})") print(f" Imported {count} locations") return count def import_contacts(): """Import Contacts CSV (selective - skip empty/minimal records)""" csv_path = DATA_DIR / 'Contacts-Grid Main.csv' if not csv_path.exists(): print(f" File not found: {csv_path}") return 0 print(f"\n=== Importing Contacts ===") print(f" Source: {csv_path}") print(f" Note: Importing only contacts with email OR purchase history") count = 0 skipped = 0 with open(csv_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: first_name = row.get('First Name', '').strip() last_name = row.get('Last Name', '').strip() organisation = row.get('Organisation', '').strip() email = row.get('Email', '').strip() has_purchased = row.get('Purchased Something', '').strip().lower() == 'checked' is_press = row.get('Press Contact', '').strip().lower() == 'checked' # Skip records without meaningful data # Must have: (name OR organisation) AND (email OR has_purchased OR is_press) has_identity = first_name or last_name or organisation has_value = email or has_purchased or is_press if not (has_identity and has_value): skipped += 1 continue # Determine contact type if is_press: contact_type = 'press' elif has_purchased: contact_type = 'customer' else: contact_type = 'other' data = { 'first_name': first_name or None, 'last_name': last_name or None, 'organisation': organisation or None, 'email': email or None, 'phone': row.get('Mobile', '').strip() or row.get('Home Phone', '').strip() or None, 'website': row.get('Website', '').strip() or None, 'street': row.get('Street', '').strip() or None, 'city': row.get('City', '').strip() or None, 'county': row.get('County (UK)', '').strip() or row.get('State (USA)', '').strip() or None, 'postcode': row.get('Postcode', '').strip() or None, 'country': row.get('Country', '').strip() or None, 'contact_type': contact_type, 'is_press_contact': is_press, 'has_purchased': has_purchased, 'no_mailouts': row.get('No Mailouts', '').strip().lower() == 'checked', 'notes': row.get('Notes', '').strip() or None, 'airtable_id': row.get('ID', '').strip() or None, } # Remove None values data = {k: v for k, v in data.items() if v is not None} result = create_item('contacts', data) if result: count += 1 name_display = f"{first_name} {last_name}".strip() or organisation or email if count <= 50 or count % 100 == 0: print(f" [{count}] {name_display}") print(f" Imported {count} contacts (skipped {skipped} empty/minimal records)") return count def import_pricing_bands(): """Import Pricing Bands CSV""" csv_path = DATA_DIR / '_Fine Art Pricing Bands-Grid view.csv' if not csv_path.exists(): print(f" File not found: {csv_path}") return 0 print(f"\n=== Importing Pricing Bands ===") print(f" Source: {csv_path}") count = 0 current_category = 'fine_art' sort_order = 0 with open(csv_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: name = row.get('Band Name', '').strip() price_str = row.get('Price in US dollars', '').strip() if not name: continue # Check for category markers if name.startswith('———'): if 'FINE ART' in name.upper(): current_category = 'fine_art' elif 'PRODUCTS' in name.upper(): current_category = 'products' continue price = parse_price(price_str) sort_order += 1 data = { 'name': name, 'category': current_category, 'price_usd': price, 'sort': sort_order, } # Remove None values data = {k: v for k, v in data.items() if v is not None} result = create_item('pricing_bands', data) if result: count += 1 price_display = f"${price:.2f}" if price else "N/A" print(f" [{count}] {name} - {price_display} ({current_category})") print(f" Imported {count} pricing bands") return count def main(): print("=" * 60) print(" AIRTABLE TO DIRECTUS - COMPLETE IMPORT") print("=" * 60) print(f"\nDirectus URL: {DIRECTUS_URL}") print(f"Data directory: {DATA_DIR}") # Authenticate print("\nAuthenticating...") try: get_token() print(" Authenticated successfully!") except Exception as e: print(f" Authentication failed: {e}") sys.exit(1) # Setup collections setup_collections() # Run imports total = 0 total += import_pricing_bands() total += import_locations() total += import_contacts() print("\n" + "=" * 60) print(f" IMPORT COMPLETE - {total} total items imported") print("=" * 60) print(f"\nView at: {DIRECTUS_URL}/admin") if __name__ == '__main__': main()