441 lines
15 KiB
Python
Executable File
441 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Complete Airtable to Directus Import Script
|
|
Imports: Locations, Contacts, Pricing Bands, and creates collections if needed
|
|
"""
|
|
|
|
import csv
|
|
import json
|
|
import os
|
|
import re
|
|
import ssl
|
|
import sys
|
|
import urllib.request
|
|
import urllib.error
|
|
from pathlib import Path
|
|
|
|
# Create SSL context that doesn't verify (for self-signed certs)
|
|
SSL_CONTEXT = ssl.create_default_context()
|
|
SSL_CONTEXT.check_hostname = False
|
|
SSL_CONTEXT.verify_mode = ssl.CERT_NONE
|
|
|
|
DIRECTUS_URL = os.environ.get('DIRECTUS_URL', 'https://katheryn-cms.jeffemmett.com')
|
|
DIRECTUS_EMAIL = 'katheryn@katheryntrenshaw.com'
|
|
DIRECTUS_PASSWORD = '9BHhUaHSQjefuAEaPqPyjerf'
|
|
DATA_DIR = Path('/home/jeffe/Github/katheryn-website/data')
|
|
|
|
# Global token
|
|
TOKEN = None
|
|
|
|
|
|
def get_token():
|
|
"""Get Directus access token"""
|
|
global TOKEN
|
|
if TOKEN:
|
|
return TOKEN
|
|
|
|
data = json.dumps({
|
|
'email': DIRECTUS_EMAIL,
|
|
'password': DIRECTUS_PASSWORD
|
|
}).encode()
|
|
|
|
req = urllib.request.Request(
|
|
f'{DIRECTUS_URL}/auth/login',
|
|
data=data,
|
|
headers={
|
|
'Content-Type': 'application/json',
|
|
'User-Agent': 'DirectusImport/1.0'
|
|
}
|
|
)
|
|
|
|
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
|
|
result = json.loads(resp.read())
|
|
TOKEN = result['data']['access_token']
|
|
return TOKEN
|
|
|
|
|
|
def api_request(method, endpoint, data=None):
|
|
"""Make authenticated API request"""
|
|
token = get_token()
|
|
url = f'{DIRECTUS_URL}{endpoint}'
|
|
|
|
req = urllib.request.Request(url, method=method)
|
|
req.add_header('Authorization', f'Bearer {token}')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('User-Agent', 'DirectusImport/1.0')
|
|
|
|
if data:
|
|
req.data = json.dumps(data).encode()
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, context=SSL_CONTEXT) as resp:
|
|
return json.loads(resp.read())
|
|
except urllib.error.HTTPError as e:
|
|
error_body = e.read().decode()
|
|
try:
|
|
error = json.loads(error_body)
|
|
return {'error': error.get('errors', [{}])[0].get('message', str(e))}
|
|
except:
|
|
return {'error': str(e)}
|
|
|
|
|
|
def collection_exists(name):
|
|
"""Check if a collection exists"""
|
|
result = api_request('GET', f'/collections/{name}')
|
|
return 'error' not in result
|
|
|
|
|
|
def create_collection(name, fields, meta=None):
|
|
"""Create a collection with fields"""
|
|
if collection_exists(name):
|
|
print(f" Collection '{name}' already exists, skipping...")
|
|
return True
|
|
|
|
# Create collection
|
|
collection_data = {
|
|
'collection': name,
|
|
'meta': meta or {},
|
|
'schema': {},
|
|
'fields': fields
|
|
}
|
|
|
|
result = api_request('POST', '/collections', collection_data)
|
|
if 'error' in result:
|
|
print(f" Error creating collection '{name}': {result['error']}")
|
|
return False
|
|
|
|
print(f" Created collection '{name}'")
|
|
return True
|
|
|
|
|
|
def create_item(collection, data):
|
|
"""Create item in Directus"""
|
|
result = api_request('POST', f'/items/{collection}', data)
|
|
if 'error' in result:
|
|
print(f" Error: {result['error']}")
|
|
return None
|
|
return result.get('data')
|
|
|
|
|
|
def parse_price(price_str):
|
|
"""Parse price string to decimal"""
|
|
if not price_str:
|
|
return None
|
|
# Remove currency symbols and extract number
|
|
match = re.search(r'[\d,.]+', price_str.replace(',', ''))
|
|
if match:
|
|
try:
|
|
return float(match.group())
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
def parse_percent(pct_str):
|
|
"""Parse percentage string"""
|
|
if not pct_str:
|
|
return None
|
|
match = re.search(r'[\d.]+', pct_str)
|
|
if match:
|
|
try:
|
|
return float(match.group())
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
def setup_collections():
|
|
"""Ensure all required collections exist"""
|
|
print("\n=== Setting Up Collections ===")
|
|
|
|
# Locations collection
|
|
locations_fields = [
|
|
{'field': 'id', 'type': 'uuid', 'meta': {'hidden': True, 'readonly': True, 'special': ['uuid']}, 'schema': {'is_primary_key': True}},
|
|
{'field': 'name', 'type': 'string', 'meta': {'required': True}},
|
|
{'field': 'type', 'type': 'string'},
|
|
{'field': 'is_active', 'type': 'boolean', 'schema': {'default_value': True}},
|
|
{'field': 'contact_name', 'type': 'string'},
|
|
{'field': 'email', 'type': 'string'},
|
|
{'field': 'phone', 'type': 'string'},
|
|
{'field': 'website', 'type': 'string'},
|
|
{'field': 'street', 'type': 'string'},
|
|
{'field': 'city', 'type': 'string'},
|
|
{'field': 'county', 'type': 'string'},
|
|
{'field': 'postcode', 'type': 'string'},
|
|
{'field': 'country', 'type': 'string'},
|
|
{'field': 'sale_cut_percent', 'type': 'decimal'},
|
|
{'field': 'notes', 'type': 'text'},
|
|
]
|
|
create_collection('locations', locations_fields, {'icon': 'store', 'note': 'Galleries and venues'})
|
|
|
|
# Contacts collection
|
|
contacts_fields = [
|
|
{'field': 'id', 'type': 'uuid', 'meta': {'hidden': True, 'readonly': True, 'special': ['uuid']}, 'schema': {'is_primary_key': True}},
|
|
{'field': 'first_name', 'type': 'string'},
|
|
{'field': 'last_name', 'type': 'string'},
|
|
{'field': 'organisation', 'type': 'string'},
|
|
{'field': 'email', 'type': 'string'},
|
|
{'field': 'phone', 'type': 'string'},
|
|
{'field': 'website', 'type': 'string'},
|
|
{'field': 'street', 'type': 'string'},
|
|
{'field': 'city', 'type': 'string'},
|
|
{'field': 'county', 'type': 'string'},
|
|
{'field': 'postcode', 'type': 'string'},
|
|
{'field': 'country', 'type': 'string'},
|
|
{'field': 'contact_type', 'type': 'string'},
|
|
{'field': 'is_press_contact', 'type': 'boolean', 'schema': {'default_value': False}},
|
|
{'field': 'has_purchased', 'type': 'boolean', 'schema': {'default_value': False}},
|
|
{'field': 'no_mailouts', 'type': 'boolean', 'schema': {'default_value': False}},
|
|
{'field': 'notes', 'type': 'text'},
|
|
{'field': 'airtable_id', 'type': 'string'},
|
|
]
|
|
create_collection('contacts', contacts_fields, {'icon': 'contacts', 'note': 'Customers and press contacts'})
|
|
|
|
# Pricing bands collection
|
|
pricing_fields = [
|
|
{'field': 'id', 'type': 'uuid', 'meta': {'hidden': True, 'readonly': True, 'special': ['uuid']}, 'schema': {'is_primary_key': True}},
|
|
{'field': 'name', 'type': 'string', 'meta': {'required': True}},
|
|
{'field': 'category', 'type': 'string'},
|
|
{'field': 'price_usd', 'type': 'decimal'},
|
|
{'field': 'sort', 'type': 'integer'},
|
|
]
|
|
create_collection('pricing_bands', pricing_fields, {'icon': 'attach_money', 'note': 'Standard pricing tiers'})
|
|
|
|
|
|
def import_locations():
|
|
"""Import Locations CSV"""
|
|
csv_path = DATA_DIR / 'Locations-Grid view.csv'
|
|
if not csv_path.exists():
|
|
print(f" File not found: {csv_path}")
|
|
return 0
|
|
|
|
print(f"\n=== Importing Locations ===")
|
|
print(f" Source: {csv_path}")
|
|
|
|
count = 0
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
|
|
for row in reader:
|
|
name = row.get('Location Name', '').strip()
|
|
if not name:
|
|
continue
|
|
|
|
# Parse type
|
|
loc_type = row.get('Type', '').strip().lower()
|
|
type_map = {
|
|
'gallery': 'gallery',
|
|
'auction': 'auction',
|
|
'studio': 'studio',
|
|
'private': 'private',
|
|
}
|
|
loc_type = type_map.get(loc_type, 'other')
|
|
|
|
# Parse active status
|
|
is_active = row.get('Active Location', '').strip().lower() == 'checked'
|
|
|
|
# Parse sale cut percentage
|
|
sale_cut = parse_percent(row.get('Fine Art sale % cut', ''))
|
|
|
|
# Build contact name from first/last
|
|
contact_parts = []
|
|
if row.get('Contact First Name', '').strip():
|
|
contact_parts.append(row.get('Contact First Name', '').strip())
|
|
if row.get('Contact Last Name', '').strip():
|
|
contact_parts.append(row.get('Contact Last Name', '').strip())
|
|
contact_name = ' '.join(contact_parts) if contact_parts else None
|
|
|
|
data = {
|
|
'name': name,
|
|
'type': loc_type,
|
|
'is_active': is_active,
|
|
'contact_name': contact_name,
|
|
'email': row.get('Contact Email', '').strip() or None,
|
|
'phone': row.get('Contact Phone', '').strip() or None,
|
|
'website': row.get('Contact Website', '').strip() or None,
|
|
'street': row.get('Contact Street', '').strip() or None,
|
|
'city': row.get('Contact Prefix with City', '').strip() or None,
|
|
'county': row.get('Contact County', '').strip() or None,
|
|
'postcode': row.get('Contact Postcode', '').strip() or None,
|
|
'country': row.get('Contact Country', '').strip() or None,
|
|
'sale_cut_percent': sale_cut,
|
|
}
|
|
|
|
# Remove None values
|
|
data = {k: v for k, v in data.items() if v is not None}
|
|
|
|
result = create_item('locations', data)
|
|
if result:
|
|
count += 1
|
|
print(f" [{count}] {name} ({loc_type})")
|
|
|
|
print(f" Imported {count} locations")
|
|
return count
|
|
|
|
|
|
def import_contacts():
|
|
"""Import Contacts CSV (selective - skip empty/minimal records)"""
|
|
csv_path = DATA_DIR / 'Contacts-Grid Main.csv'
|
|
if not csv_path.exists():
|
|
print(f" File not found: {csv_path}")
|
|
return 0
|
|
|
|
print(f"\n=== Importing Contacts ===")
|
|
print(f" Source: {csv_path}")
|
|
print(f" Note: Importing only contacts with email OR purchase history")
|
|
|
|
count = 0
|
|
skipped = 0
|
|
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
|
|
for row in reader:
|
|
first_name = row.get('First Name', '').strip()
|
|
last_name = row.get('Last Name', '').strip()
|
|
organisation = row.get('Organisation', '').strip()
|
|
email = row.get('Email', '').strip()
|
|
has_purchased = row.get('Purchased Something', '').strip().lower() == 'checked'
|
|
is_press = row.get('Press Contact', '').strip().lower() == 'checked'
|
|
|
|
# Skip records without meaningful data
|
|
# Must have: (name OR organisation) AND (email OR has_purchased OR is_press)
|
|
has_identity = first_name or last_name or organisation
|
|
has_value = email or has_purchased or is_press
|
|
|
|
if not (has_identity and has_value):
|
|
skipped += 1
|
|
continue
|
|
|
|
# Determine contact type
|
|
if is_press:
|
|
contact_type = 'press'
|
|
elif has_purchased:
|
|
contact_type = 'customer'
|
|
else:
|
|
contact_type = 'other'
|
|
|
|
data = {
|
|
'first_name': first_name or None,
|
|
'last_name': last_name or None,
|
|
'organisation': organisation or None,
|
|
'email': email or None,
|
|
'phone': row.get('Mobile', '').strip() or row.get('Home Phone', '').strip() or None,
|
|
'website': row.get('Website', '').strip() or None,
|
|
'street': row.get('Street', '').strip() or None,
|
|
'city': row.get('City', '').strip() or None,
|
|
'county': row.get('County (UK)', '').strip() or row.get('State (USA)', '').strip() or None,
|
|
'postcode': row.get('Postcode', '').strip() or None,
|
|
'country': row.get('Country', '').strip() or None,
|
|
'contact_type': contact_type,
|
|
'is_press_contact': is_press,
|
|
'has_purchased': has_purchased,
|
|
'no_mailouts': row.get('No Mailouts', '').strip().lower() == 'checked',
|
|
'notes': row.get('Notes', '').strip() or None,
|
|
'airtable_id': row.get('ID', '').strip() or None,
|
|
}
|
|
|
|
# Remove None values
|
|
data = {k: v for k, v in data.items() if v is not None}
|
|
|
|
result = create_item('contacts', data)
|
|
if result:
|
|
count += 1
|
|
name_display = f"{first_name} {last_name}".strip() or organisation or email
|
|
if count <= 50 or count % 100 == 0:
|
|
print(f" [{count}] {name_display}")
|
|
|
|
print(f" Imported {count} contacts (skipped {skipped} empty/minimal records)")
|
|
return count
|
|
|
|
|
|
def import_pricing_bands():
|
|
"""Import Pricing Bands CSV"""
|
|
csv_path = DATA_DIR / '_Fine Art Pricing Bands-Grid view.csv'
|
|
if not csv_path.exists():
|
|
print(f" File not found: {csv_path}")
|
|
return 0
|
|
|
|
print(f"\n=== Importing Pricing Bands ===")
|
|
print(f" Source: {csv_path}")
|
|
|
|
count = 0
|
|
current_category = 'fine_art'
|
|
sort_order = 0
|
|
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
|
|
for row in reader:
|
|
name = row.get('Band Name', '').strip()
|
|
price_str = row.get('Price in US dollars', '').strip()
|
|
|
|
if not name:
|
|
continue
|
|
|
|
# Check for category markers
|
|
if name.startswith('———'):
|
|
if 'FINE ART' in name.upper():
|
|
current_category = 'fine_art'
|
|
elif 'PRODUCTS' in name.upper():
|
|
current_category = 'products'
|
|
continue
|
|
|
|
price = parse_price(price_str)
|
|
sort_order += 1
|
|
|
|
data = {
|
|
'name': name,
|
|
'category': current_category,
|
|
'price_usd': price,
|
|
'sort': sort_order,
|
|
}
|
|
|
|
# Remove None values
|
|
data = {k: v for k, v in data.items() if v is not None}
|
|
|
|
result = create_item('pricing_bands', data)
|
|
if result:
|
|
count += 1
|
|
price_display = f"${price:.2f}" if price else "N/A"
|
|
print(f" [{count}] {name} - {price_display} ({current_category})")
|
|
|
|
print(f" Imported {count} pricing bands")
|
|
return count
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print(" AIRTABLE TO DIRECTUS - COMPLETE IMPORT")
|
|
print("=" * 60)
|
|
print(f"\nDirectus URL: {DIRECTUS_URL}")
|
|
print(f"Data directory: {DATA_DIR}")
|
|
|
|
# Authenticate
|
|
print("\nAuthenticating...")
|
|
try:
|
|
get_token()
|
|
print(" Authenticated successfully!")
|
|
except Exception as e:
|
|
print(f" Authentication failed: {e}")
|
|
sys.exit(1)
|
|
|
|
# Setup collections
|
|
setup_collections()
|
|
|
|
# Run imports
|
|
total = 0
|
|
|
|
total += import_pricing_bands()
|
|
total += import_locations()
|
|
total += import_contacts()
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f" IMPORT COMPLETE - {total} total items imported")
|
|
print("=" * 60)
|
|
print(f"\nView at: {DIRECTUS_URL}/admin")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|