diff --git a/piwigo_export.py b/piwigo_export.py new file mode 100644 index 0000000..30e33e9 --- /dev/null +++ b/piwigo_export.py @@ -0,0 +1,650 @@ +#!/usr/bin/env python3 +""" +piwigo_export.py — Export every Piwigo photo with a JSON metadata sidecar. + +Directory structure mirrors the album hierarchy; photos that belong to multiple +albums are copied into each album folder. Photos with no album membership go +into _unsorted/. + +Optionally embeds metadata directly into the exported image copy via exiftool +(requires: apt install libimage-exiftool-perl or brew install exiftool). + +Usage examples +-------------- +# Export with JSON sidecars only: +python3 piwigo_export.py \ + --dbhost localhost --dbuser piwigo --dbpassword secret --dbname piwigo \ + --src-path /var/www/piwigo --output-dir ./export + +# Also embed as XMP tags in the exported copy: +python3 piwigo_export.py ... --metadata xmp + +# Embed all three metadata formats at once: +python3 piwigo_export.py ... --metadata exif iptc xmp +""" + +import argparse +import json +import math +import os +import pathlib +import shutil +import subprocess +import sys +from contextlib import closing + +import pymysql + + +# --------------------------------------------------------------------------- +# Database — bulk loaders (one query per table, not one per image) +# --------------------------------------------------------------------------- + +def load_categories(connection, prefix): + """Return {id: row} for every category.""" + with closing(connection.cursor()) as cur: + cur.execute(f'SELECT * FROM `{prefix}categories`') + return {row['id']: row for row in cur} + + +def load_all_tags_by_image(connection, prefix): + """Return {image_id: [tag_name, ...]} for the whole library.""" + result: dict[int, list[str]] = {} + with closing(connection.cursor()) as cur: + cur.execute( + f'SELECT it.image_id, t.name' + f' FROM `{prefix}image_tag` it' + f' JOIN `{prefix}tags` t ON it.tag_id = t.id' + ) + for row in cur: + result.setdefault(row['image_id'], []).append(row['name']) + return result + + +def load_all_categories_by_image(connection, prefix): + """Return {image_id: [category_id, ...]} for the whole library.""" + result: dict[int, list[int]] = {} + with closing(connection.cursor()) as cur: + cur.execute( + f'SELECT image_id, category_id FROM `{prefix}image_category`' + ) + for row in cur: + result.setdefault(row['image_id'], []).append(row['category_id']) + return result + + +# --------------------------------------------------------------------------- +# Category path helpers +# --------------------------------------------------------------------------- + +def category_display_path(cat_id, categories): + """Return a human-readable path like 'Holidays / France / Normandy'.""" + parts = [] + seen: set[int] = set() + cid = cat_id + while cid is not None and cid not in seen: + seen.add(cid) + cat = categories.get(cid) + if cat is None: + break + parts.append(cat['name']) + cid = cat.get('id_uppercat') + parts.reverse() + return ' / '.join(parts) + + +def category_fs_path(cat_id, categories): + """Return a pathlib.Path for the album's place in the output tree.""" + parts = [] + seen: set[int] = set() + cid = cat_id + while cid is not None and cid not in seen: + seen.add(cid) + cat = categories.get(cid) + if cat is None: + break + parts.append(_safe_dirname(cat['name'])) + cid = cat.get('id_uppercat') + parts.reverse() + return pathlib.Path(*parts) if parts else pathlib.Path('_root') + + +def _safe_dirname(name: str) -> str: + """Replace characters that are awkward in directory names.""" + for ch in ('/', '\\', '\0', ':'): + name = name.replace(ch, '_') + return name.strip() or '_unnamed' + + +# --------------------------------------------------------------------------- +# Metadata embedding via exiftool +# --------------------------------------------------------------------------- + +# IPTC IIM maximum byte lengths for string fields we write. +# exiftool silently truncates to these limits, so we apply them ourselves +# first — otherwise a re-run would see a spurious collision between the +# full Piwigo value and the already-truncated on-disk value. +_IPTC_MAX_BYTES: dict[str, int] = { + 'IPTC:ObjectName': 64, + 'IPTC:By-line': 32, + 'IPTC:Caption-Abstract': 2000, + 'IPTC:Keywords': 64, # per keyword + 'IPTC:SupplementalCategories': 32, # per entry +} + + +def _iptc_truncate(tag: str, value: str) -> str: + """Normalise *value* for storage in *tag*: strip whitespace (exiftool does + this on write) then truncate to the IPTC byte limit (UTF-8 aware).""" + value = value.strip() + limit = _IPTC_MAX_BYTES.get(tag) + if limit is None: + return value + encoded = value.encode('utf-8') + if len(encoded) <= limit: + return value + # Truncate on a UTF-8 character boundary. + return encoded[:limit].decode('utf-8', errors='ignore') + + +# Tags whose values are always lists (multi-value fields). +_LIST_TAGS = { + 'IPTC:Keywords', + 'IPTC:SupplementalCategories', + 'XMP-dc:Subject', + 'XMP-dc:Creator', + 'XMP-lr:HierarchicalSubject', +} + +# GPS tags use floating-point; compare with a tolerance instead of string equality. +# (1e-5 degrees ≈ 1 metre on the ground — more than enough.) +_GPS_TAGS = {'GPS:GPSLatitude', 'GPS:GPSLongitude'} + + +def check_exiftool(): + if shutil.which('exiftool') is None: + sys.exit( + 'ERROR: exiftool not found on PATH.\n' + ' Install it with: apt install libimage-exiftool-perl\n' + ' or: brew install exiftool\n' + 'Then re-run, or omit --metadata.' + ) + + +def _exif_datetime(s) -> str: + """'YYYY-MM-DD[ HH:MM:SS]' → 'YYYY:MM:DD HH:MM:SS' (EXIF format).""" + s = str(s) + date = s[:10].replace('-', ':') + time = s[11:19] if len(s) > 10 else '00:00:00' + return f'{date} {time}' + + +def _iptc_date(s) -> str: + """'YYYY-MM-DD[ ...]' → 'YYYYMMDD'.""" + return str(s)[:10].replace('-', '') + + +def _iptc_time(s) -> str: + """'YYYY-MM-DD HH:MM:SS' → 'HHMMSS+0000'.""" + s = str(s) + t = s[11:19] if len(s) > 10 else '00:00:00' + return t.replace(':', '') + '+0000' + + +def _xmp_datetime(s) -> str: + """'YYYY-MM-DD[ HH:MM:SS]' → 'YYYY-MM-DDTHH:MM:SS'.""" + s = str(s) + t = s[11:19] if len(s) > 10 else '00:00:00' + return f'{s[:10]}T{t}' + + +def _build_metadata_tags(metadata: dict, fmt: str) -> dict: + """ + Build a dict of { 'GROUP:TagName': value } for everything we want to write. + List-valued tags (Keywords, Subject, …) use Python lists as the value. + Scalar tags use a single string/number. + """ + tags: dict = {} + + title = metadata.get('title') or '' + author = metadata.get('author') or '' + description = metadata.get('description') or '' + kw_list = metadata.get('tags') or [] + albums = metadata.get('albums') or [] + date_str = metadata.get('date_created') + rating = metadata.get('rating') + lat = metadata.get('latitude') + lon = metadata.get('longitude') + + if fmt == 'exif': + if title: tags['EXIF:ImageDescription'] = title + if author: tags['EXIF:Artist'] = author + if description: tags['EXIF:UserComment'] = description + if date_str: + dt = _exif_datetime(date_str) + tags['EXIF:DateTimeOriginal'] = dt + tags['EXIF:CreateDate'] = dt + + elif fmt == 'iptc': + if title: tags['IPTC:ObjectName'] = _iptc_truncate('IPTC:ObjectName', title) + if author: tags['IPTC:By-line'] = _iptc_truncate('IPTC:By-line', author) + if description: tags['IPTC:Caption-Abstract'] = _iptc_truncate('IPTC:Caption-Abstract', description) + if date_str: + tags['IPTC:DateCreated'] = _iptc_date(date_str) + tags['IPTC:TimeCreated'] = _iptc_time(date_str) + if kw_list: tags['IPTC:Keywords'] = [_iptc_truncate('IPTC:Keywords', k) for k in kw_list] + if albums: tags['IPTC:SupplementalCategories'] = [_iptc_truncate('IPTC:SupplementalCategories', a) for a in albums] + + elif fmt == 'xmp': + if title: tags['XMP-dc:Title'] = title + if author: tags['XMP-dc:Creator'] = [author] # XMP Creator is a list + if description: tags['XMP-dc:Description'] = description + if date_str: tags['XMP-xmp:CreateDate'] = _xmp_datetime(date_str) + if kw_list: tags['XMP-dc:Subject'] = list(kw_list) + if albums: tags['XMP-lr:HierarchicalSubject'] = list(albums) + if rating is not None: + tags['XMP-xmp:Rating'] = int(round(rating)) + + else: + raise ValueError(f'Unknown metadata format: {fmt!r}') + + # GPS is written to the EXIF GPS IFD regardless of which metadata format + # was chosen — it is the most universally readable location. + if lat is not None and lon is not None: + tags['GPS:GPSLatitude'] = abs(lat) + tags['GPS:GPSLatitudeRef'] = 'N' if lat >= 0 else 'S' + tags['GPS:GPSLongitude'] = abs(lon) + tags['GPS:GPSLongitudeRef'] = 'E' if lon >= 0 else 'W' + + return tags + + +def _read_existing_metadata(image_path: pathlib.Path) -> dict: + """ + Return all metadata currently in *image_path* as { 'GROUP:Tag': value }. + + Flags used: + -G prefix every key with its group name (e.g. 'EXIF:', 'GPS:') + -n return numeric values as numbers (avoids degree-string formatting + for GPS, avoids localised number formats, etc.) + -j JSON output + """ + result = subprocess.run( + ['exiftool', '-json', '-G', '-n', str(image_path)], + capture_output=True, text=True, + ) + if result.returncode != 0: + print( + f'WARNING: could not read metadata from {image_path}: ' + f'{result.stderr.strip()}', + file=sys.stderr, + ) + return {} + try: + data = json.loads(result.stdout) + return data[0] if data else {} + except (json.JSONDecodeError, IndexError): + return {} + + +def _is_repeated_char(s: str, min_reps: int = 10) -> bool: + """Return True if *s* consists of a single character repeated at least + *min_reps* times (e.g. '??????????', '----------', ' ').""" + s = str(s) + return len(s) >= min_reps and len(set(s)) == 1 + + +def _values_equal(tag: str, existing, desired) -> bool: + """Return True if existing and desired values are effectively the same.""" + if tag in _GPS_TAGS: + try: + return math.isclose(float(existing), float(desired), rel_tol=1e-5) + except (TypeError, ValueError): + pass + return str(existing).strip() == str(desired).strip() + + +def _filter_tags( + desired: dict, + existing: dict, + image_path: pathlib.Path, + never_overwrite: bool = False, +) -> dict: + """ + Compare desired tags against what is already embedded in the file and + return only the tags that need to be written. + + Rules + ----- + Scalar tags: + • Not present in file → include for writing. + • Present, same value → skip silently. + • Present, different → overwrite if the existing value is empty or a + repeated-character placeholder; otherwise prompt + the user (unless *never_overwrite* is True, in + which case the existing value is always kept). + + List tags (Keywords, Subject, …): + • Each item is checked individually. + • Items already present in the file's list are silently skipped. + • Items not yet present are queued for writing. + • No collision error — lists are additive by nature. + """ + to_write: dict = {} + + for tag, new_value in desired.items(): + existing_value = existing.get(tag) + + if tag in _LIST_TAGS: + new_items = new_value if isinstance(new_value, list) else [new_value] + if existing_value is None: + to_write[tag] = new_items + else: + ex_list = ( + [str(v).strip() for v in existing_value] + if isinstance(existing_value, list) + else [str(existing_value).strip()] + ) + to_add = [v for v in new_items if str(v).strip() not in ex_list] + if to_add: + to_write[tag] = to_add + + else: # scalar tag + if existing_value is None: + to_write[tag] = new_value + elif _values_equal(tag, existing_value, new_value): + pass # already there with the same value — nothing to do + elif never_overwrite: + pass # keep existing value, skip silently + elif str(existing_value).strip() == '': + # Existing value is empty — silently replace with Piwigo value. + to_write[tag] = new_value + elif _is_repeated_char(existing_value): + # Existing value is a placeholder (e.g. '???????????') — + # silently replace it with the Piwigo value. + to_write[tag] = new_value + else: + print( + f'\nMetadata collision in {image_path}:\n' + f' tag : {tag}\n' + f' existing : {existing_value!r}\n' + f' Piwigo : {new_value!r}', + file=sys.stderr, + ) + while True: + choice = input( + ' Use Piwigo value? [y/N] ' + ).strip().lower() + if choice in ('n', 'no', ''): + break # leave this tag out of to_write + if choice in ('y', 'yes'): + to_write[tag] = new_value + break + print(' Please enter y or n.') + + return to_write + + +def _tags_to_exiftool_args(tags: dict) -> list[str]: + """Convert { 'GROUP:Tag': value } back into exiftool -TAG=VALUE strings.""" + args: list[str] = [] + for tag, value in tags.items(): + if isinstance(value, list): + for item in value: + args.append(f'-{tag}={item}') + else: + args.append(f'-{tag}={value}') + return args + + +def embed_metadata( + dest_image: pathlib.Path, + metadata: dict, + fmt: str | list[str], + never_overwrite: bool = False, +): + """ + Read the image's existing metadata, check for conflicts with what Piwigo + knows, then write only the tags that are new or not yet present. + + *fmt* may be a single format string or a list of format strings; when + multiple formats are given their tag dicts are merged before writing so + that only one exiftool invocation is needed. + + If *never_overwrite* is True, tags that already exist in the file are + always kept as-is, with no prompt. + """ + formats = [fmt] if isinstance(fmt, str) else fmt + desired: dict = {} + for f in formats: + desired.update(_build_metadata_tags(metadata, f)) + if not desired: + return + + existing = _read_existing_metadata(dest_image) + to_write = _filter_tags(desired, existing, dest_image, never_overwrite) + + if not to_write: + return # every tag was already present with the correct value + + cmd = ( + ['exiftool', '-overwrite_original'] + + _tags_to_exiftool_args(to_write) + + [str(dest_image)] + ) + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + print( + f' WARNING: exiftool failed for {dest_image.name}:\n' + f' {result.stderr.strip()}', + file=sys.stderr, + ) + + +# --------------------------------------------------------------------------- +# Core export for a single image +# --------------------------------------------------------------------------- + +def export_image( + image_row: dict, + tags_by_image: dict, + cats_by_image: dict, + categories: dict, + src_path: pathlib.Path, + output_dir: pathlib.Path, + metadata_format: list[str] | None, + overwrite: bool, + never_overwrite_metadata: bool = False, +) -> int: + """ + Copy the image (and its JSON sidecar) to every destination album folder. + Returns the number of image files actually written. + """ + src_file = src_path / image_row['path'] + if not src_file.is_file(): + print(f'WARNING: source file not found: {src_file}', file=sys.stderr) + return 0 + + image_id = image_row['id'] + tags = tags_by_image.get(image_id, []) + cat_ids = cats_by_image.get(image_id, []) + + metadata = { + 'title': image_row.get('name'), + 'author': image_row.get('author'), + 'date_created': str(image_row['date_creation']) if image_row.get('date_creation') else None, + 'date_added': str(image_row['date_available']) if image_row.get('date_available') else None, + 'description': image_row.get('comment'), + 'tags': tags, + 'albums': [category_display_path(cid, categories) for cid in cat_ids], + 'width': image_row.get('width'), + 'height': image_row.get('height'), + 'filesize': image_row.get('filesize'), + 'latitude': float(image_row['latitude']) if image_row.get('latitude') else None, + 'longitude': float(image_row['longitude']) if image_row.get('longitude') else None, + 'rating': float(image_row['rating_score']) if image_row.get('rating_score') else None, + 'original_path': image_row['path'], + } + + dest_dirs = ( + [output_dir / category_fs_path(cid, categories) for cid in cat_ids] + if cat_ids + else [output_dir / '_unsorted'] + ) + + filename = pathlib.Path(image_row['path']).name + stem = pathlib.Path(filename).stem + written = 0 + + for dest_dir in dest_dirs: + dest_dir.mkdir(parents=True, exist_ok=True) + dest_image = dest_dir / filename + dest_sidecar = dest_dir / f'{stem}.json' + + # Collision check: would we overwrite a file from a *different* source? + if dest_image.exists() and dest_sidecar.exists(): + try: + existing = json.loads(dest_sidecar.read_text(encoding='utf-8')) + if existing.get('original_path') != image_row['path']: + raise RuntimeError( + f"Filename collision at {dest_image}:\n" + f" already written from : {existing.get('original_path')}\n" + f" now requested from : {image_row['path']}\n" + f"Use --overwrite to ignore (the second file will replace the first)." + ) + except json.JSONDecodeError: + pass # corrupted sidecar — let the overwrite logic decide + + # Skip if both files are already present (and --overwrite not set). + if dest_image.exists() and dest_sidecar.exists() and not overwrite: + print(f' SKIP (both files exist, use --overwrite to replace): {dest_image}') + continue + + # Copy image file + shutil.copy2(str(src_file), str(dest_image)) + written += 1 + if metadata_format: + embed_metadata(dest_image, metadata, metadata_format, never_overwrite_metadata) + + # Write/refresh the sidecar so it stays in sync with the DB. + dest_sidecar.write_text( + json.dumps(metadata, indent=2, ensure_ascii=False, default=str), + encoding='utf-8', + ) + + return written + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description='Export Piwigo photos with JSON metadata sidecars.', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + + db = parser.add_argument_group('database') + db.add_argument('--dbhost', metavar='HOST') + db.add_argument('--dbuser', required=True, metavar='USER') + db.add_argument('--dbpassword', metavar='PASS') + db.add_argument('--dbname', required=True, metavar='NAME') + db.add_argument( + '--db-prefix', default='piwigo_', metavar='PREFIX', + help='Piwigo table prefix (default: %(default)s)', + ) + + io = parser.add_argument_group('paths') + io.add_argument( + '--src-path', required=True, metavar='DIR', + help='Root of the Piwigo installation; piwigo_images.path is relative to this.', + ) + io.add_argument( + '--output-dir', required=True, metavar='DIR', + help='Directory to write exported files into (created if absent).', + ) + + behaviour = parser.add_argument_group('behaviour') + behaviour.add_argument( + '--metadata', choices=['exif', 'iptc', 'xmp'], nargs='+', metavar='FORMAT', + help='Also embed metadata into the exported image copy using exiftool. ' + 'One or more of: exif, iptc, xmp. ' + 'Example: --metadata exif iptc xmp', + ) + behaviour.add_argument( + '--overwrite', action='store_true', + help='Re-export image files that already exist in the output directory. ' + 'JSON sidecars are always refreshed.', + ) + behaviour.add_argument( + '--no-overwrite-metadata', action='store_true', + help='When embedding metadata, never overwrite a tag that already has a ' + 'value in the file — skip it silently instead of prompting.', + ) + + args = parser.parse_args() + + if args.metadata: + check_exiftool() + + src_path = pathlib.Path(args.src_path) + output_dir = pathlib.Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + print(f'Connecting to {args.dbuser}@{args.dbhost}/{args.dbname} …') + connection = pymysql.connect( + host=args.dbhost, + user=args.dbuser, + password=args.dbpassword, + database=args.dbname, + charset='utf8mb4', + cursorclass=pymysql.cursors.DictCursor, + ) + + prefix = args.db_prefix + + print('Loading category tree …') + categories = load_categories(connection, prefix) + print(f' {len(categories)} categories.') + + print('Loading tag assignments …') + tags_by_image = load_all_tags_by_image(connection, prefix) + print(f' tags for {len(tags_by_image)} images.') + + print('Loading album memberships …') + cats_by_image = load_all_categories_by_image(connection, prefix) + print(f' memberships for {len(cats_by_image)} images.') + + print('Exporting images …') + total_images = 0 + total_written = 0 + + with closing(connection.cursor()) as cur: + cur.execute( + f'SELECT id, file, path, name, comment, author,' + f' date_creation, date_available,' + f' width, height, filesize,' + f' latitude, longitude, rating_score' + f' FROM `{prefix}images`' + ) + for image_row in cur: + total_images += 1 + total_written += export_image( + image_row, tags_by_image, cats_by_image, categories, + src_path, output_dir, args.metadata, args.overwrite, + args.no_overwrite_metadata, + ) + if total_images % 100 == 0: + print(f' … {total_images} processed, {total_written} written so far') + + connection.close() + print( + f'\nDone. {total_images} images processed, ' + f'{total_written} image files written to {output_dir}/' + ) + + +if __name__ == '__main__': + main()