Files
utility-scripts/piwigo_export.py
Timothy Allen 6ef74e06b3 Add script to export data from Piwigo...
...and to update each image with Piwigo metadata at the same time
2026-04-03 11:15:33 +02:00

651 lines
23 KiB
Python

#!/usr/bin/env python3
"""
piwigo_export.py — Export every Piwigo photo with a JSON metadata sidecar.
Directory structure mirrors the album hierarchy; photos that belong to multiple
albums are copied into each album folder. Photos with no album membership go
into _unsorted/.
Optionally embeds metadata directly into the exported image copy via exiftool
(requires: apt install libimage-exiftool-perl or brew install exiftool).
Usage examples
--------------
# Export with JSON sidecars only:
python3 piwigo_export.py \
--dbhost localhost --dbuser piwigo --dbpassword secret --dbname piwigo \
--src-path /var/www/piwigo --output-dir ./export
# Also embed as XMP tags in the exported copy:
python3 piwigo_export.py ... --metadata xmp
# Embed all three metadata formats at once:
python3 piwigo_export.py ... --metadata exif iptc xmp
"""
import argparse
import json
import math
import os
import pathlib
import shutil
import subprocess
import sys
from contextlib import closing
import pymysql
# ---------------------------------------------------------------------------
# Database — bulk loaders (one query per table, not one per image)
# ---------------------------------------------------------------------------
def load_categories(connection, prefix):
"""Return {id: row} for every category."""
with closing(connection.cursor()) as cur:
cur.execute(f'SELECT * FROM `{prefix}categories`')
return {row['id']: row for row in cur}
def load_all_tags_by_image(connection, prefix):
"""Return {image_id: [tag_name, ...]} for the whole library."""
result: dict[int, list[str]] = {}
with closing(connection.cursor()) as cur:
cur.execute(
f'SELECT it.image_id, t.name'
f' FROM `{prefix}image_tag` it'
f' JOIN `{prefix}tags` t ON it.tag_id = t.id'
)
for row in cur:
result.setdefault(row['image_id'], []).append(row['name'])
return result
def load_all_categories_by_image(connection, prefix):
"""Return {image_id: [category_id, ...]} for the whole library."""
result: dict[int, list[int]] = {}
with closing(connection.cursor()) as cur:
cur.execute(
f'SELECT image_id, category_id FROM `{prefix}image_category`'
)
for row in cur:
result.setdefault(row['image_id'], []).append(row['category_id'])
return result
# ---------------------------------------------------------------------------
# Category path helpers
# ---------------------------------------------------------------------------
def category_display_path(cat_id, categories):
"""Return a human-readable path like 'Holidays / France / Normandy'."""
parts = []
seen: set[int] = set()
cid = cat_id
while cid is not None and cid not in seen:
seen.add(cid)
cat = categories.get(cid)
if cat is None:
break
parts.append(cat['name'])
cid = cat.get('id_uppercat')
parts.reverse()
return ' / '.join(parts)
def category_fs_path(cat_id, categories):
"""Return a pathlib.Path for the album's place in the output tree."""
parts = []
seen: set[int] = set()
cid = cat_id
while cid is not None and cid not in seen:
seen.add(cid)
cat = categories.get(cid)
if cat is None:
break
parts.append(_safe_dirname(cat['name']))
cid = cat.get('id_uppercat')
parts.reverse()
return pathlib.Path(*parts) if parts else pathlib.Path('_root')
def _safe_dirname(name: str) -> str:
"""Replace characters that are awkward in directory names."""
for ch in ('/', '\\', '\0', ':'):
name = name.replace(ch, '_')
return name.strip() or '_unnamed'
# ---------------------------------------------------------------------------
# Metadata embedding via exiftool
# ---------------------------------------------------------------------------
# IPTC IIM maximum byte lengths for string fields we write.
# exiftool silently truncates to these limits, so we apply them ourselves
# first — otherwise a re-run would see a spurious collision between the
# full Piwigo value and the already-truncated on-disk value.
_IPTC_MAX_BYTES: dict[str, int] = {
'IPTC:ObjectName': 64,
'IPTC:By-line': 32,
'IPTC:Caption-Abstract': 2000,
'IPTC:Keywords': 64, # per keyword
'IPTC:SupplementalCategories': 32, # per entry
}
def _iptc_truncate(tag: str, value: str) -> str:
"""Normalise *value* for storage in *tag*: strip whitespace (exiftool does
this on write) then truncate to the IPTC byte limit (UTF-8 aware)."""
value = value.strip()
limit = _IPTC_MAX_BYTES.get(tag)
if limit is None:
return value
encoded = value.encode('utf-8')
if len(encoded) <= limit:
return value
# Truncate on a UTF-8 character boundary.
return encoded[:limit].decode('utf-8', errors='ignore')
# Tags whose values are always lists (multi-value fields).
_LIST_TAGS = {
'IPTC:Keywords',
'IPTC:SupplementalCategories',
'XMP-dc:Subject',
'XMP-dc:Creator',
'XMP-lr:HierarchicalSubject',
}
# GPS tags use floating-point; compare with a tolerance instead of string equality.
# (1e-5 degrees ≈ 1 metre on the ground — more than enough.)
_GPS_TAGS = {'GPS:GPSLatitude', 'GPS:GPSLongitude'}
def check_exiftool():
if shutil.which('exiftool') is None:
sys.exit(
'ERROR: exiftool not found on PATH.\n'
' Install it with: apt install libimage-exiftool-perl\n'
' or: brew install exiftool\n'
'Then re-run, or omit --metadata.'
)
def _exif_datetime(s) -> str:
"""'YYYY-MM-DD[ HH:MM:SS]''YYYY:MM:DD HH:MM:SS' (EXIF format)."""
s = str(s)
date = s[:10].replace('-', ':')
time = s[11:19] if len(s) > 10 else '00:00:00'
return f'{date} {time}'
def _iptc_date(s) -> str:
"""'YYYY-MM-DD[ ...]''YYYYMMDD'."""
return str(s)[:10].replace('-', '')
def _iptc_time(s) -> str:
"""'YYYY-MM-DD HH:MM:SS''HHMMSS+0000'."""
s = str(s)
t = s[11:19] if len(s) > 10 else '00:00:00'
return t.replace(':', '') + '+0000'
def _xmp_datetime(s) -> str:
"""'YYYY-MM-DD[ HH:MM:SS]''YYYY-MM-DDTHH:MM:SS'."""
s = str(s)
t = s[11:19] if len(s) > 10 else '00:00:00'
return f'{s[:10]}T{t}'
def _build_metadata_tags(metadata: dict, fmt: str) -> dict:
"""
Build a dict of { 'GROUP:TagName': value } for everything we want to write.
List-valued tags (Keywords, Subject, …) use Python lists as the value.
Scalar tags use a single string/number.
"""
tags: dict = {}
title = metadata.get('title') or ''
author = metadata.get('author') or ''
description = metadata.get('description') or ''
kw_list = metadata.get('tags') or []
albums = metadata.get('albums') or []
date_str = metadata.get('date_created')
rating = metadata.get('rating')
lat = metadata.get('latitude')
lon = metadata.get('longitude')
if fmt == 'exif':
if title: tags['EXIF:ImageDescription'] = title
if author: tags['EXIF:Artist'] = author
if description: tags['EXIF:UserComment'] = description
if date_str:
dt = _exif_datetime(date_str)
tags['EXIF:DateTimeOriginal'] = dt
tags['EXIF:CreateDate'] = dt
elif fmt == 'iptc':
if title: tags['IPTC:ObjectName'] = _iptc_truncate('IPTC:ObjectName', title)
if author: tags['IPTC:By-line'] = _iptc_truncate('IPTC:By-line', author)
if description: tags['IPTC:Caption-Abstract'] = _iptc_truncate('IPTC:Caption-Abstract', description)
if date_str:
tags['IPTC:DateCreated'] = _iptc_date(date_str)
tags['IPTC:TimeCreated'] = _iptc_time(date_str)
if kw_list: tags['IPTC:Keywords'] = [_iptc_truncate('IPTC:Keywords', k) for k in kw_list]
if albums: tags['IPTC:SupplementalCategories'] = [_iptc_truncate('IPTC:SupplementalCategories', a) for a in albums]
elif fmt == 'xmp':
if title: tags['XMP-dc:Title'] = title
if author: tags['XMP-dc:Creator'] = [author] # XMP Creator is a list
if description: tags['XMP-dc:Description'] = description
if date_str: tags['XMP-xmp:CreateDate'] = _xmp_datetime(date_str)
if kw_list: tags['XMP-dc:Subject'] = list(kw_list)
if albums: tags['XMP-lr:HierarchicalSubject'] = list(albums)
if rating is not None:
tags['XMP-xmp:Rating'] = int(round(rating))
else:
raise ValueError(f'Unknown metadata format: {fmt!r}')
# GPS is written to the EXIF GPS IFD regardless of which metadata format
# was chosen — it is the most universally readable location.
if lat is not None and lon is not None:
tags['GPS:GPSLatitude'] = abs(lat)
tags['GPS:GPSLatitudeRef'] = 'N' if lat >= 0 else 'S'
tags['GPS:GPSLongitude'] = abs(lon)
tags['GPS:GPSLongitudeRef'] = 'E' if lon >= 0 else 'W'
return tags
def _read_existing_metadata(image_path: pathlib.Path) -> dict:
"""
Return all metadata currently in *image_path* as { 'GROUP:Tag': value }.
Flags used:
-G prefix every key with its group name (e.g. 'EXIF:', 'GPS:')
-n return numeric values as numbers (avoids degree-string formatting
for GPS, avoids localised number formats, etc.)
-j JSON output
"""
result = subprocess.run(
['exiftool', '-json', '-G', '-n', str(image_path)],
capture_output=True, text=True,
)
if result.returncode != 0:
print(
f'WARNING: could not read metadata from {image_path}: '
f'{result.stderr.strip()}',
file=sys.stderr,
)
return {}
try:
data = json.loads(result.stdout)
return data[0] if data else {}
except (json.JSONDecodeError, IndexError):
return {}
def _is_repeated_char(s: str, min_reps: int = 10) -> bool:
"""Return True if *s* consists of a single character repeated at least
*min_reps* times (e.g. '??????????', '----------', ' ')."""
s = str(s)
return len(s) >= min_reps and len(set(s)) == 1
def _values_equal(tag: str, existing, desired) -> bool:
"""Return True if existing and desired values are effectively the same."""
if tag in _GPS_TAGS:
try:
return math.isclose(float(existing), float(desired), rel_tol=1e-5)
except (TypeError, ValueError):
pass
return str(existing).strip() == str(desired).strip()
def _filter_tags(
desired: dict,
existing: dict,
image_path: pathlib.Path,
never_overwrite: bool = False,
) -> dict:
"""
Compare desired tags against what is already embedded in the file and
return only the tags that need to be written.
Rules
-----
Scalar tags:
• Not present in file → include for writing.
• Present, same value → skip silently.
• Present, different → overwrite if the existing value is empty or a
repeated-character placeholder; otherwise prompt
the user (unless *never_overwrite* is True, in
which case the existing value is always kept).
List tags (Keywords, Subject, …):
• Each item is checked individually.
• Items already present in the file's list are silently skipped.
• Items not yet present are queued for writing.
• No collision error — lists are additive by nature.
"""
to_write: dict = {}
for tag, new_value in desired.items():
existing_value = existing.get(tag)
if tag in _LIST_TAGS:
new_items = new_value if isinstance(new_value, list) else [new_value]
if existing_value is None:
to_write[tag] = new_items
else:
ex_list = (
[str(v).strip() for v in existing_value]
if isinstance(existing_value, list)
else [str(existing_value).strip()]
)
to_add = [v for v in new_items if str(v).strip() not in ex_list]
if to_add:
to_write[tag] = to_add
else: # scalar tag
if existing_value is None:
to_write[tag] = new_value
elif _values_equal(tag, existing_value, new_value):
pass # already there with the same value — nothing to do
elif never_overwrite:
pass # keep existing value, skip silently
elif str(existing_value).strip() == '':
# Existing value is empty — silently replace with Piwigo value.
to_write[tag] = new_value
elif _is_repeated_char(existing_value):
# Existing value is a placeholder (e.g. '???????????') —
# silently replace it with the Piwigo value.
to_write[tag] = new_value
else:
print(
f'\nMetadata collision in {image_path}:\n'
f' tag : {tag}\n'
f' existing : {existing_value!r}\n'
f' Piwigo : {new_value!r}',
file=sys.stderr,
)
while True:
choice = input(
' Use Piwigo value? [y/N] '
).strip().lower()
if choice in ('n', 'no', ''):
break # leave this tag out of to_write
if choice in ('y', 'yes'):
to_write[tag] = new_value
break
print(' Please enter y or n.')
return to_write
def _tags_to_exiftool_args(tags: dict) -> list[str]:
"""Convert { 'GROUP:Tag': value } back into exiftool -TAG=VALUE strings."""
args: list[str] = []
for tag, value in tags.items():
if isinstance(value, list):
for item in value:
args.append(f'-{tag}={item}')
else:
args.append(f'-{tag}={value}')
return args
def embed_metadata(
dest_image: pathlib.Path,
metadata: dict,
fmt: str | list[str],
never_overwrite: bool = False,
):
"""
Read the image's existing metadata, check for conflicts with what Piwigo
knows, then write only the tags that are new or not yet present.
*fmt* may be a single format string or a list of format strings; when
multiple formats are given their tag dicts are merged before writing so
that only one exiftool invocation is needed.
If *never_overwrite* is True, tags that already exist in the file are
always kept as-is, with no prompt.
"""
formats = [fmt] if isinstance(fmt, str) else fmt
desired: dict = {}
for f in formats:
desired.update(_build_metadata_tags(metadata, f))
if not desired:
return
existing = _read_existing_metadata(dest_image)
to_write = _filter_tags(desired, existing, dest_image, never_overwrite)
if not to_write:
return # every tag was already present with the correct value
cmd = (
['exiftool', '-overwrite_original']
+ _tags_to_exiftool_args(to_write)
+ [str(dest_image)]
)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(
f' WARNING: exiftool failed for {dest_image.name}:\n'
f' {result.stderr.strip()}',
file=sys.stderr,
)
# ---------------------------------------------------------------------------
# Core export for a single image
# ---------------------------------------------------------------------------
def export_image(
image_row: dict,
tags_by_image: dict,
cats_by_image: dict,
categories: dict,
src_path: pathlib.Path,
output_dir: pathlib.Path,
metadata_format: list[str] | None,
overwrite: bool,
never_overwrite_metadata: bool = False,
) -> int:
"""
Copy the image (and its JSON sidecar) to every destination album folder.
Returns the number of image files actually written.
"""
src_file = src_path / image_row['path']
if not src_file.is_file():
print(f'WARNING: source file not found: {src_file}', file=sys.stderr)
return 0
image_id = image_row['id']
tags = tags_by_image.get(image_id, [])
cat_ids = cats_by_image.get(image_id, [])
metadata = {
'title': image_row.get('name'),
'author': image_row.get('author'),
'date_created': str(image_row['date_creation']) if image_row.get('date_creation') else None,
'date_added': str(image_row['date_available']) if image_row.get('date_available') else None,
'description': image_row.get('comment'),
'tags': tags,
'albums': [category_display_path(cid, categories) for cid in cat_ids],
'width': image_row.get('width'),
'height': image_row.get('height'),
'filesize': image_row.get('filesize'),
'latitude': float(image_row['latitude']) if image_row.get('latitude') else None,
'longitude': float(image_row['longitude']) if image_row.get('longitude') else None,
'rating': float(image_row['rating_score']) if image_row.get('rating_score') else None,
'original_path': image_row['path'],
}
dest_dirs = (
[output_dir / category_fs_path(cid, categories) for cid in cat_ids]
if cat_ids
else [output_dir / '_unsorted']
)
filename = pathlib.Path(image_row['path']).name
stem = pathlib.Path(filename).stem
written = 0
for dest_dir in dest_dirs:
dest_dir.mkdir(parents=True, exist_ok=True)
dest_image = dest_dir / filename
dest_sidecar = dest_dir / f'{stem}.json'
# Collision check: would we overwrite a file from a *different* source?
if dest_image.exists() and dest_sidecar.exists():
try:
existing = json.loads(dest_sidecar.read_text(encoding='utf-8'))
if existing.get('original_path') != image_row['path']:
raise RuntimeError(
f"Filename collision at {dest_image}:\n"
f" already written from : {existing.get('original_path')}\n"
f" now requested from : {image_row['path']}\n"
f"Use --overwrite to ignore (the second file will replace the first)."
)
except json.JSONDecodeError:
pass # corrupted sidecar — let the overwrite logic decide
# Skip if both files are already present (and --overwrite not set).
if dest_image.exists() and dest_sidecar.exists() and not overwrite:
print(f' SKIP (both files exist, use --overwrite to replace): {dest_image}')
continue
# Copy image file
shutil.copy2(str(src_file), str(dest_image))
written += 1
if metadata_format:
embed_metadata(dest_image, metadata, metadata_format, never_overwrite_metadata)
# Write/refresh the sidecar so it stays in sync with the DB.
dest_sidecar.write_text(
json.dumps(metadata, indent=2, ensure_ascii=False, default=str),
encoding='utf-8',
)
return written
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description='Export Piwigo photos with JSON metadata sidecars.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
db = parser.add_argument_group('database')
db.add_argument('--dbhost', metavar='HOST')
db.add_argument('--dbuser', required=True, metavar='USER')
db.add_argument('--dbpassword', metavar='PASS')
db.add_argument('--dbname', required=True, metavar='NAME')
db.add_argument(
'--db-prefix', default='piwigo_', metavar='PREFIX',
help='Piwigo table prefix (default: %(default)s)',
)
io = parser.add_argument_group('paths')
io.add_argument(
'--src-path', required=True, metavar='DIR',
help='Root of the Piwigo installation; piwigo_images.path is relative to this.',
)
io.add_argument(
'--output-dir', required=True, metavar='DIR',
help='Directory to write exported files into (created if absent).',
)
behaviour = parser.add_argument_group('behaviour')
behaviour.add_argument(
'--metadata', choices=['exif', 'iptc', 'xmp'], nargs='+', metavar='FORMAT',
help='Also embed metadata into the exported image copy using exiftool. '
'One or more of: exif, iptc, xmp. '
'Example: --metadata exif iptc xmp',
)
behaviour.add_argument(
'--overwrite', action='store_true',
help='Re-export image files that already exist in the output directory. '
'JSON sidecars are always refreshed.',
)
behaviour.add_argument(
'--no-overwrite-metadata', action='store_true',
help='When embedding metadata, never overwrite a tag that already has a '
'value in the file — skip it silently instead of prompting.',
)
args = parser.parse_args()
if args.metadata:
check_exiftool()
src_path = pathlib.Path(args.src_path)
output_dir = pathlib.Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f'Connecting to {args.dbuser}@{args.dbhost}/{args.dbname}')
connection = pymysql.connect(
host=args.dbhost,
user=args.dbuser,
password=args.dbpassword,
database=args.dbname,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
)
prefix = args.db_prefix
print('Loading category tree …')
categories = load_categories(connection, prefix)
print(f' {len(categories)} categories.')
print('Loading tag assignments …')
tags_by_image = load_all_tags_by_image(connection, prefix)
print(f' tags for {len(tags_by_image)} images.')
print('Loading album memberships …')
cats_by_image = load_all_categories_by_image(connection, prefix)
print(f' memberships for {len(cats_by_image)} images.')
print('Exporting images …')
total_images = 0
total_written = 0
with closing(connection.cursor()) as cur:
cur.execute(
f'SELECT id, file, path, name, comment, author,'
f' date_creation, date_available,'
f' width, height, filesize,'
f' latitude, longitude, rating_score'
f' FROM `{prefix}images`'
)
for image_row in cur:
total_images += 1
total_written += export_image(
image_row, tags_by_image, cats_by_image, categories,
src_path, output_dir, args.metadata, args.overwrite,
args.no_overwrite_metadata,
)
if total_images % 100 == 0:
print(f'{total_images} processed, {total_written} written so far')
connection.close()
print(
f'\nDone. {total_images} images processed, '
f'{total_written} image files written to {output_dir}/'
)
if __name__ == '__main__':
main()