Files

939 lines
35 KiB
Python

#!/usr/bin/env python3
"""
piwigo_export.py — Export every Piwigo photo with a JSON metadata sidecar.
Directory structure mirrors the album hierarchy; photos that belong to multiple
albums are copied into each album folder. Photos with no album membership go
into _unsorted/.
Optionally embeds metadata directly into the exported image copy via exiftool
(requires: apt install libimage-exiftool-perl or brew install exiftool).
Usage examples
--------------
# Export with JSON sidecars only:
python3 piwigo_export.py \
--dbhost localhost --dbuser piwigo --dbpassword secret --dbname piwigo \
--src-path /var/www/piwigo --output-dir ./export
# Also embed as XMP tags in the exported copy:
python3 piwigo_export.py ... --metadata xmp
# Embed all three metadata formats at once:
python3 piwigo_export.py ... --metadata exif iptc xmp
"""
import argparse
import datetime
import json
import math
import os
import pathlib
import re
import shutil
import subprocess
import sys
from contextlib import closing
import pymysql
# ---------------------------------------------------------------------------
# Database — bulk loaders (one query per table, not one per image)
# ---------------------------------------------------------------------------
def load_categories(connection, prefix):
"""Return {id: row} for every category."""
with closing(connection.cursor()) as cur:
cur.execute(f'SELECT * FROM `{prefix}categories`')
return {row['id']: row for row in cur}
def load_all_tags_by_image(connection, prefix):
"""Return {image_id: [tag_name, ...]} for the whole library."""
result: dict[int, list[str]] = {}
with closing(connection.cursor()) as cur:
cur.execute(
f'SELECT it.image_id, t.name'
f' FROM `{prefix}image_tag` it'
f' JOIN `{prefix}tags` t ON it.tag_id = t.id'
)
for row in cur:
result.setdefault(row['image_id'], []).append(row['name'])
return result
def load_all_categories_by_image(connection, prefix):
"""Return {image_id: [category_id, ...]} for the whole library."""
result: dict[int, list[int]] = {}
with closing(connection.cursor()) as cur:
cur.execute(
f'SELECT image_id, category_id FROM `{prefix}image_category`'
)
for row in cur:
result.setdefault(row['image_id'], []).append(row['category_id'])
return result
# ---------------------------------------------------------------------------
# Category path helpers
# ---------------------------------------------------------------------------
def category_display_path(cat_id, categories):
"""Return a human-readable path like 'Holidays / France / Normandy'."""
parts = []
seen: set[int] = set()
cid = cat_id
while cid is not None and cid not in seen:
seen.add(cid)
cat = categories.get(cid)
if cat is None:
break
parts.append(cat['name'])
cid = cat.get('id_uppercat')
parts.reverse()
return ' / '.join(parts)
def category_fs_path(cat_id, categories):
"""Return a pathlib.Path for the album's place in the output tree."""
parts = []
seen: set[int] = set()
cid = cat_id
while cid is not None and cid not in seen:
seen.add(cid)
cat = categories.get(cid)
if cat is None:
break
parts.append(_safe_dirname(cat['name']))
cid = cat.get('id_uppercat')
parts.reverse()
return pathlib.Path(*parts) if parts else pathlib.Path('_root')
def _safe_dirname(name: str) -> str:
"""Replace characters that are awkward in directory names."""
for ch in ('/', '\\', '\0', ':'):
name = name.replace(ch, '_')
return name.strip() or '_unnamed'
# ---------------------------------------------------------------------------
# Metadata embedding via exiftool
# ---------------------------------------------------------------------------
# IPTC IIM maximum byte lengths for string fields we write.
# exiftool silently truncates to these limits, so we apply them ourselves
# first — otherwise a re-run would see a spurious collision between the
# full Piwigo value and the already-truncated on-disk value.
_IPTC_MAX_BYTES: dict[str, int] = {
'IPTC:ObjectName': 64,
'IPTC:By-line': 32,
'IPTC:Caption-Abstract': 2000,
'IPTC:Keywords': 64, # per keyword
'IPTC:SupplementalCategories': 32, # per entry
}
def _iptc_truncate(tag: str, value: str) -> str:
"""Normalise *value* for storage in *tag*: strip whitespace (exiftool does
this on write) then truncate to the IPTC byte limit (UTF-8 aware)."""
value = value.strip()
limit = _IPTC_MAX_BYTES.get(tag)
if limit is None:
return value
encoded = value.encode('utf-8')
if len(encoded) <= limit:
return value
# Truncate on a UTF-8 character boundary.
return encoded[:limit].decode('utf-8', errors='ignore')
# Tags whose values are always lists (multi-value fields).
_LIST_TAGS = {
'IPTC:Keywords',
'IPTC:SupplementalCategories',
'XMP-dc:Subject',
'XMP-dc:Creator',
'XMP-lr:HierarchicalSubject',
}
# GPS tags use floating-point; compare with a tolerance instead of string equality.
# (1e-5 degrees ≈ 1 metre on the ground — more than enough.)
_GPS_TAGS = {'GPS:GPSLatitude', 'GPS:GPSLongitude'}
def check_exiftool():
if shutil.which('exiftool') is None:
sys.exit(
'ERROR: exiftool not found on PATH.\n'
' Install it with: apt install libimage-exiftool-perl\n'
' or: brew install exiftool\n'
'Then re-run, or omit --metadata.'
)
def _exif_datetime(s) -> str:
"""'YYYY-MM-DD[ HH:MM:SS]''YYYY:MM:DD HH:MM:SS' (EXIF format)."""
s = str(s)
date = s[:10].replace('-', ':')
time = s[11:19] if len(s) > 10 else '00:00:00'
return f'{date} {time}'
def _iptc_date(s) -> str:
"""'YYYY-MM-DD[ ...]''YYYYMMDD'."""
return str(s)[:10].replace('-', '')
def _iptc_time(s) -> str:
"""'YYYY-MM-DD HH:MM:SS''HHMMSS+0000'."""
s = str(s)
t = s[11:19] if len(s) > 10 else '00:00:00'
return t.replace(':', '') + '+0000'
def _xmp_datetime(s) -> str:
"""'YYYY-MM-DD[ HH:MM:SS]''YYYY-MM-DDTHH:MM:SS'."""
s = str(s)
t = s[11:19] if len(s) > 10 else '00:00:00'
return f'{s[:10]}T{t}'
def _parse_datetime(s) -> datetime.datetime | None:
"""Parse a DB or EXIF date string into a datetime, or return None."""
s = str(s).strip()
# Try the full string first (handles both datetime and date-only values).
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y:%m:%d %H:%M:%S', '%Y-%m-%d', '%Y:%m:%d'):
try:
return datetime.datetime.strptime(s, fmt)
except ValueError:
continue
# If the string has trailing timezone info or extra fields, try the prefix.
for prefix_len, fmt in ((19, '%Y-%m-%d %H:%M:%S'), (19, '%Y:%m:%d %H:%M:%S'),
(10, '%Y-%m-%d'), (10, '%Y:%m:%d')):
try:
return datetime.datetime.strptime(s[:prefix_len], fmt)
except ValueError:
continue
return None
def _earliest_image_date(image_path: pathlib.Path) -> datetime.datetime | None:
"""Return the earliest datetime found in the image's embedded metadata.
Scans every tag returned by exiftool whose name contains 'date' or 'time'
(case-insensitive), skipping filesystem/tool pseudo-groups (File:,
ExifTool:, Composite:) and GPS tags (which are UTC and timezone-ambiguous).
This catches EXIF:ModifyDate, PNG:ModifyDate, XMP-xmp:CreateDate, etc.
without needing a format-specific allowlist.
Returns None if exiftool is not available or no date tags are found.
"""
if not shutil.which('exiftool'):
return None
existing = _read_existing_metadata(image_path)
dates = []
for key, val in existing.items():
group = key.split(':')[0] if ':' in key else ''
if group in ('File', 'ExifTool', 'Composite', 'GPS'):
continue
tag_name = key.split(':', 1)[1] if ':' in key else key
if 'date' not in tag_name.lower() and 'time' not in tag_name.lower():
continue
dt = _parse_datetime(str(val))
if dt and 1900 < dt.year < 2100:
dates.append(dt)
return min(dates) if dates else None
# Matches 14 consecutive digits (YYYYMMDDHHMMSS) not adjacent to another digit.
_RE_DATETIME_14 = re.compile(r'(?<!\d)(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})(?!\d)')
# Matches 8 consecutive digits (YYYYMMDD) not adjacent to another digit.
_RE_DATE_8 = re.compile(r'(?<!\d)(\d{4})(\d{2})(\d{2})(?!\d)')
def _date_from_filename(name: str) -> tuple[datetime.datetime | None, bool]:
"""Try to parse a date/datetime from a filename.
Recognises patterns such as:
20120415142550-b05adf19.png → 2012-04-15 14:25:50 (has_time=True)
IMG-20120415142550.jpg → 2012-04-15 14:25:50 (has_time=True)
IMG-20120415.jpg → 2012-04-15 00:00:00 (has_time=False)
Returns (datetime, has_time). has_time=False means only a date was found;
the time component is set to midnight but should not be treated as known.
Returns (None, False) if no recognisable pattern is found.
"""
stem = pathlib.Path(name).stem
# Try 14-digit datetime first.
m = _RE_DATETIME_14.search(stem)
if m:
y, mo, d, h, mi, s = (int(x) for x in m.groups())
try:
return datetime.datetime(y, mo, d, h, mi, s), True
except ValueError:
pass # invalid date/time components — fall through
# Fall back to 8-digit date.
m = _RE_DATE_8.search(stem)
if m:
y, mo, d = (int(x) for x in m.groups())
try:
return datetime.datetime(y, mo, d, 0, 0, 0), False
except ValueError:
pass
return None, False
def _build_metadata_tags(metadata: dict, fmt: str) -> dict:
"""
Build a dict of { 'GROUP:TagName': value } for everything we want to write.
List-valued tags (Keywords, Subject, …) use Python lists as the value.
Scalar tags use a single string/number.
"""
tags: dict = {}
title = metadata.get('title') or ''
author = metadata.get('author') or ''
description = metadata.get('description') or ''
kw_list = metadata.get('tags') or []
albums = metadata.get('albums') or []
date_str = metadata.get('date_created')
rating = metadata.get('rating')
lat = metadata.get('latitude')
lon = metadata.get('longitude')
if fmt == 'exif':
if title: tags['EXIF:ImageDescription'] = title
if author: tags['EXIF:Artist'] = author
if description: tags['EXIF:UserComment'] = description
if date_str:
dt = _exif_datetime(date_str)
tags['EXIF:DateTimeOriginal'] = dt
tags['EXIF:CreateDate'] = dt
elif fmt == 'iptc':
if title: tags['IPTC:ObjectName'] = _iptc_truncate('IPTC:ObjectName', title)
if author: tags['IPTC:By-line'] = _iptc_truncate('IPTC:By-line', author)
if description: tags['IPTC:Caption-Abstract'] = _iptc_truncate('IPTC:Caption-Abstract', description)
if date_str:
tags['IPTC:DateCreated'] = _iptc_date(date_str)
tags['IPTC:TimeCreated'] = _iptc_time(date_str)
if kw_list: tags['IPTC:Keywords'] = [_iptc_truncate('IPTC:Keywords', k) for k in kw_list]
if albums: tags['IPTC:SupplementalCategories'] = [_iptc_truncate('IPTC:SupplementalCategories', a) for a in albums]
elif fmt == 'xmp':
if title: tags['XMP-dc:Title'] = title
if author: tags['XMP-dc:Creator'] = [author] # XMP Creator is a list
if description: tags['XMP-dc:Description'] = description
if date_str: tags['XMP-xmp:CreateDate'] = _xmp_datetime(date_str)
if kw_list: tags['XMP-dc:Subject'] = list(kw_list)
if albums: tags['XMP-lr:HierarchicalSubject'] = list(albums)
if rating is not None:
tags['XMP-xmp:Rating'] = int(round(rating))
else:
raise ValueError(f'Unknown metadata format: {fmt!r}')
# GPS is written to the EXIF GPS IFD regardless of which metadata format
# was chosen — it is the most universally readable location.
if lat is not None and lon is not None:
tags['GPS:GPSLatitude'] = abs(lat)
tags['GPS:GPSLatitudeRef'] = 'N' if lat >= 0 else 'S'
tags['GPS:GPSLongitude'] = abs(lon)
tags['GPS:GPSLongitudeRef'] = 'E' if lon >= 0 else 'W'
return tags
def _read_existing_metadata(image_path: pathlib.Path) -> dict:
"""
Return all metadata currently in *image_path* as { 'GROUP:Tag': value }.
Flags used:
-G prefix every key with its group name (e.g. 'EXIF:', 'GPS:')
-n return numeric values as numbers (avoids degree-string formatting
for GPS, avoids localised number formats, etc.)
-j JSON output
"""
result = subprocess.run(
['exiftool', '-json', '-G', '-n', str(image_path)],
capture_output=True, text=True,
)
if result.returncode != 0:
print(
f'WARNING: could not read metadata from {image_path}: '
f'{result.stderr.strip()}',
file=sys.stderr,
)
return {}
try:
data = json.loads(result.stdout)
return data[0] if data else {}
except (json.JSONDecodeError, IndexError):
return {}
def _is_repeated_char(s: str, min_reps: int = 10) -> bool:
"""Return True if *s* consists of a single character repeated at least
*min_reps* times (e.g. '??????????', '----------', ' ')."""
s = str(s)
return len(s) >= min_reps and len(set(s)) == 1
def _values_equal(tag: str, existing, desired) -> bool:
"""Return True if existing and desired values are effectively the same."""
if tag in _GPS_TAGS:
try:
return math.isclose(float(existing), float(desired), rel_tol=1e-5)
except (TypeError, ValueError):
pass
return str(existing).strip() == str(desired).strip()
def _filter_tags(
desired: dict,
existing: dict,
image_path: pathlib.Path,
never_overwrite: bool = False,
) -> dict:
"""
Compare desired tags against what is already embedded in the file and
return only the tags that need to be written.
Rules
-----
Scalar tags:
• Not present in file → include for writing.
• Present, same value → skip silently.
• Present, different → overwrite if the existing value is empty or a
repeated-character placeholder; otherwise prompt
the user (unless *never_overwrite* is True, in
which case the existing value is always kept).
List tags (Keywords, Subject, …):
• Each item is checked individually.
• Items already present in the file's list are silently skipped.
• Items not yet present are queued for writing.
• No collision error — lists are additive by nature.
"""
to_write: dict = {}
for tag, new_value in desired.items():
existing_value = existing.get(tag)
if tag in _LIST_TAGS:
new_items = new_value if isinstance(new_value, list) else [new_value]
if existing_value is None:
to_write[tag] = new_items
else:
ex_list = (
[str(v).strip() for v in existing_value]
if isinstance(existing_value, list)
else [str(existing_value).strip()]
)
to_add = [v for v in new_items if str(v).strip() not in ex_list]
if to_add:
to_write[tag] = to_add
else: # scalar tag
if existing_value is None:
to_write[tag] = new_value
elif _values_equal(tag, existing_value, new_value):
pass # already there with the same value — nothing to do
elif never_overwrite:
pass # keep existing value, skip silently
elif str(existing_value).strip() == '':
# Existing value is empty — silently replace with Piwigo value.
to_write[tag] = new_value
elif _is_repeated_char(existing_value):
# Existing value is a placeholder (e.g. '???????????') —
# silently replace it with the Piwigo value.
to_write[tag] = new_value
else:
print(
f'\nMetadata collision in {image_path}:\n'
f' tag : {tag}\n'
f' existing : {existing_value!r}\n'
f' Piwigo : {new_value!r}',
file=sys.stderr,
)
while True:
choice = input(
' Use Piwigo value? [y/N] '
).strip().lower()
if choice in ('n', 'no', ''):
break # leave this tag out of to_write
if choice in ('y', 'yes'):
to_write[tag] = new_value
break
print(' Please enter y or n.')
return to_write
def _tags_to_exiftool_args(tags: dict) -> list[str]:
"""Convert { 'GROUP:Tag': value } back into exiftool -TAG=VALUE strings."""
args: list[str] = []
for tag, value in tags.items():
if isinstance(value, list):
for item in value:
args.append(f'-{tag}={item}')
else:
args.append(f'-{tag}={value}')
return args
def embed_metadata(
dest_image: pathlib.Path,
metadata: dict,
fmt: str | list[str],
never_overwrite: bool = False,
):
"""
Read the image's existing metadata, check for conflicts with what Piwigo
knows, then write only the tags that are new or not yet present.
*fmt* may be a single format string or a list of format strings; when
multiple formats are given their tag dicts are merged before writing so
that only one exiftool invocation is needed.
If *never_overwrite* is True, tags that already exist in the file are
always kept as-is, with no prompt.
"""
formats = [fmt] if isinstance(fmt, str) else fmt
desired: dict = {}
for f in formats:
desired.update(_build_metadata_tags(metadata, f))
if not desired:
return
existing = _read_existing_metadata(dest_image)
to_write = _filter_tags(desired, existing, dest_image, never_overwrite)
if not to_write:
return # every tag was already present with the correct value
cmd = (
['exiftool', '-overwrite_original']
+ _tags_to_exiftool_args(to_write)
+ [str(dest_image)]
)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(
f' WARNING: exiftool failed for {dest_image.name}:\n'
f' {result.stderr.strip()}',
file=sys.stderr,
)
# ---------------------------------------------------------------------------
# Core export for a single image
# ---------------------------------------------------------------------------
def export_image(
image_row: dict,
tags_by_image: dict,
cats_by_image: dict,
categories: dict,
src_path: pathlib.Path,
output_dir: pathlib.Path,
metadata_format: list[str] | None,
overwrite: bool,
never_overwrite_metadata: bool = False,
) -> int:
"""
Copy the image (and its JSON sidecar) to every destination album folder.
Returns the number of image files actually written.
"""
src_file = src_path / image_row['path']
if not src_file.is_file():
print(f'WARNING: source file not found: {src_file}', file=sys.stderr)
return 0
image_id = image_row['id']
tags = tags_by_image.get(image_id, [])
cat_ids = cats_by_image.get(image_id, [])
# Collect all three date sources before building the metadata dict.
img_filename = pathlib.Path(image_row['path']).name
date_embedded_dt = _earliest_image_date(src_file)
date_filename_dt, filename_has_time = _date_from_filename(img_filename)
if date_filename_dt is not None:
date_filename_str = (
date_filename_dt.strftime('%Y-%m-%d %H:%M:%S') if filename_has_time
else date_filename_dt.strftime('%Y-%m-%d')
)
else:
date_filename_str = None
metadata = {
'title': image_row.get('name'),
'author': image_row.get('author'),
'date_created': str(image_row['date_creation']) if image_row.get('date_creation') else None,
'date_added': str(image_row['date_available']) if image_row.get('date_available') else None,
'date_embedded': str(date_embedded_dt) if date_embedded_dt else None,
'date_filename': date_filename_str,
'description': image_row.get('comment'),
'tags': tags,
'albums': [category_display_path(cid, categories) for cid in cat_ids],
'width': image_row.get('width'),
'height': image_row.get('height'),
'filesize': image_row.get('filesize'),
'latitude': float(image_row['latitude']) if image_row.get('latitude') else None,
'longitude': float(image_row['longitude']) if image_row.get('longitude') else None,
'rating': float(image_row['rating_score']) if image_row.get('rating_score') else None,
'original_path': image_row['path'],
}
# Print all three date sources so the user can see any discrepancy.
piwigo_str = metadata['date_created'] or ''
embedded_str = metadata['date_embedded'] or ''
fn_date_str = metadata['date_filename'] or ''
print(f' {img_filename} piwigo: {piwigo_str} embedded: {embedded_str} filename: {fn_date_str}')
# Best date for file mtime: piwigo > embedded > filename.
mtime_ts = None
for _ds in (metadata.get('date_created'), metadata.get('date_embedded'), metadata.get('date_filename')):
if _ds:
_dt = _parse_datetime(_ds)
if _dt:
mtime_ts = _dt.timestamp()
break
if mtime_ts is None:
print(f' NOTE: {img_filename}: no date found; file mtime will not be set', file=sys.stderr)
dest_dirs = (
[output_dir / category_fs_path(cid, categories) for cid in cat_ids]
if cat_ids
else [output_dir / '_unsorted']
)
filename = img_filename
stem = pathlib.Path(filename).stem
written = 0
for dest_dir in dest_dirs:
dest_dir.mkdir(parents=True, exist_ok=True)
dest_image = dest_dir / filename
dest_sidecar = dest_dir / f'{stem}.json'
# Collision check: would we overwrite a file from a *different* source?
if dest_image.exists() and dest_sidecar.exists():
try:
existing = json.loads(dest_sidecar.read_text(encoding='utf-8'))
if existing.get('original_path') != image_row['path']:
raise RuntimeError(
f"Filename collision at {dest_image}:\n"
f" already written from : {existing.get('original_path')}\n"
f" now requested from : {image_row['path']}\n"
f"Use --overwrite to ignore (the second file will replace the first)."
)
except json.JSONDecodeError:
pass # corrupted sidecar — let the overwrite logic decide
# Skip if both files are already present (and --overwrite not set).
if dest_image.exists() and dest_sidecar.exists() and not overwrite:
print(f' SKIP (both files exist, use --overwrite to replace): {dest_image}')
continue
# Copy image file.
shutil.copy2(str(src_file), str(dest_image))
written += 1
if metadata_format:
# If no Piwigo date, fall back to filename-derived date so that
# missing EXIF/IPTC/XMP date tags are filled in from the filename.
meta_for_embed = metadata
if not meta_for_embed.get('date_created') and meta_for_embed.get('date_filename'):
meta_for_embed = {**metadata, 'date_created': metadata['date_filename']}
embed_metadata(dest_image, meta_for_embed, metadata_format, never_overwrite_metadata)
# Set mtime after any exiftool call so exiftool doesn't reset it.
if mtime_ts is not None:
os.utime(str(dest_image), (mtime_ts, mtime_ts))
# Write/refresh the sidecar so it stays in sync with the DB.
dest_sidecar.write_text(
json.dumps(metadata, indent=2, ensure_ascii=False, default=str),
encoding='utf-8',
)
return written
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def cmd_export(args):
"""Run the export subcommand (database → filesystem)."""
if args.metadata:
check_exiftool()
src_path = pathlib.Path(args.src_path)
output_dir = pathlib.Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f'Connecting to {args.dbuser}@{args.dbhost}/{args.dbname}')
connection = pymysql.connect(
host=args.dbhost,
user=args.dbuser,
password=args.dbpassword,
database=args.dbname,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
)
prefix = args.db_prefix
print('Loading category tree …')
categories = load_categories(connection, prefix)
print(f' {len(categories)} categories.')
print('Loading tag assignments …')
tags_by_image = load_all_tags_by_image(connection, prefix)
print(f' tags for {len(tags_by_image)} images.')
print('Loading album memberships …')
cats_by_image = load_all_categories_by_image(connection, prefix)
print(f' memberships for {len(cats_by_image)} images.')
print('Exporting images …')
total_images = 0
total_written = 0
with closing(connection.cursor()) as cur:
cur.execute(
f'SELECT id, file, path, name, comment, author,'
f' date_creation, date_available,'
f' width, height, filesize,'
f' latitude, longitude, rating_score'
f' FROM `{prefix}images`'
)
for image_row in cur:
total_images += 1
total_written += export_image(
image_row, tags_by_image, cats_by_image, categories,
src_path, output_dir, args.metadata, args.overwrite,
args.no_overwrite_metadata,
)
if total_images % 100 == 0:
print(f'{total_images} processed, {total_written} written so far')
connection.close()
print(
f'\nDone. {total_images} images processed, '
f'{total_written} image files written to {output_dir}/'
)
def cmd_set_dates(args):
"""Walk an export directory and set each image's mtime from its sidecar dates."""
output_dir = pathlib.Path(args.output_dir)
if not output_dir.is_dir():
sys.exit(f'ERROR: output directory not found: {output_dir}')
sidecars = sorted(output_dir.rglob('*.json'))
if not sidecars:
print('No JSON sidecars found.')
return
applied = skipped = noted = 0
for sidecar in sidecars:
# Load the sidecar.
try:
data = json.loads(sidecar.read_text(encoding='utf-8'))
except (json.JSONDecodeError, OSError) as exc:
print(f'WARNING: could not read {sidecar}: {exc}', file=sys.stderr)
continue
# Find the corresponding image file (same stem, any non-.json extension).
image_path = None
for candidate in sidecar.parent.iterdir():
if candidate.stem == sidecar.stem and candidate.suffix.lower() != '.json':
image_path = candidate
break
if image_path is None:
print(f'WARNING: no image file found for {sidecar.name}', file=sys.stderr)
continue
# Parse all three candidate dates from the sidecar.
dt_piwigo = _parse_datetime(data['date_created']) if data.get('date_created') else None
dt_embedded = _parse_datetime(data['date_embedded']) if data.get('date_embedded') else None
fn_date_str = data.get('date_filename')
dt_filename = _parse_datetime(fn_date_str) if fn_date_str else None
# has_time is False when the sidecar stored only a date (no space → no time part).
filename_has_time = bool(fn_date_str and ' ' in fn_date_str)
chosen = None
chosen_source = None
if args.use == 'piwigo':
if dt_piwigo is None:
print(f' NOTE: {image_path.name}: no piwigo date in sidecar; skipping.')
noted += 1
continue
chosen, chosen_source = dt_piwigo, 'piwigo'
elif args.use == 'embedded':
if dt_embedded is None:
print(f' NOTE: {image_path.name}: no embedded date in sidecar; skipping.')
noted += 1
continue
chosen, chosen_source = dt_embedded, 'embedded'
elif args.use == 'filename':
if dt_filename is None:
print(f' NOTE: {image_path.name}: no filename date in sidecar; skipping.')
noted += 1
continue
chosen, chosen_source = dt_filename, 'filename'
else:
# Interactive mode: collect available (source, datetime) pairs.
options = []
if dt_piwigo: options.append(('piwigo', dt_piwigo))
if dt_embedded: options.append(('embedded', dt_embedded))
if dt_filename: options.append(('filename', dt_filename))
if not options:
print(f' NOTE: {image_path.name}: no dates available; skipping.')
noted += 1
continue
# If all available dates are the same, or only one source, apply silently.
unique_dts = list(dict.fromkeys(o[1] for o in options))
if len(unique_dts) == 1:
chosen, chosen_source = options[0][1], options[0][0]
else:
# Multiple different dates — ask the user.
print(f'\n{image_path.name}')
for i, (src, dt) in enumerate(options, 1):
print(f' [{i}] {src:<8} : {dt}')
print(f' [s] skip')
while True:
choice = input(f'Choice [1-{len(options)}/s]: ').strip().lower()
if choice in ('s', 'skip', ''):
break
try:
idx = int(choice) - 1
if 0 <= idx < len(options):
chosen_source, chosen = options[idx]
break
except ValueError:
pass
print(f' Please enter a number between 1 and {len(options)}, or s.')
if chosen is None:
skipped += 1
continue
# Set file mtime.
ts = chosen.timestamp()
os.utime(image_path, (ts, ts))
applied += 1
# For filename-derived dates, also embed the date into any missing
# EXIF/IPTC/XMP tags so the image carries its own date going forward.
if chosen_source == 'filename' and shutil.which('exiftool'):
date_str = (
chosen.strftime('%Y-%m-%d %H:%M:%S') if filename_has_time
else chosen.strftime('%Y-%m-%d')
)
embed_metadata(
image_path,
{'date_created': date_str},
['exif', 'iptc', 'xmp'],
never_overwrite=True,
)
# exiftool rewrites the file and resets its mtime — restore it.
os.utime(image_path, (ts, ts))
print(
f'\nDone. {applied} mtime(s) set, {skipped} skipped, {noted} with no date.'
)
def main():
parser = argparse.ArgumentParser(
description='Piwigo photo export and date-management utilities.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
sub = parser.add_subparsers(dest='command', required=True)
# ------------------------------------------------------------------ export
ep = sub.add_parser(
'export',
help='Export photos from Piwigo to a directory tree with JSON sidecars.',
formatter_class=argparse.RawDescriptionHelpFormatter,
)
db = ep.add_argument_group('database')
db.add_argument('--dbhost', metavar='HOST')
db.add_argument('--dbuser', required=True, metavar='USER')
db.add_argument('--dbpassword', metavar='PASS')
db.add_argument('--dbname', required=True, metavar='NAME')
db.add_argument(
'--db-prefix', default='piwigo_', metavar='PREFIX',
help='Piwigo table prefix (default: %(default)s)',
)
io = ep.add_argument_group('paths')
io.add_argument(
'--src-path', required=True, metavar='DIR',
help='Root of the Piwigo installation; piwigo_images.path is relative to this.',
)
io.add_argument(
'--output-dir', required=True, metavar='DIR',
help='Directory to write exported files into (created if absent).',
)
behaviour = ep.add_argument_group('behaviour')
behaviour.add_argument(
'--metadata', choices=['exif', 'iptc', 'xmp'], nargs='+', metavar='FORMAT',
help='Also embed metadata into the exported image copy using exiftool. '
'One or more of: exif, iptc, xmp. '
'Example: --metadata exif iptc xmp',
)
behaviour.add_argument(
'--overwrite', action='store_true',
help='Re-export image files that already exist in the output directory. '
'JSON sidecars are always refreshed.',
)
behaviour.add_argument(
'--no-overwrite-metadata', action='store_true',
help='When embedding metadata, never overwrite a tag that already has a '
'value in the file — skip it silently instead of prompting.',
)
# --------------------------------------------------------------- set-dates
dp = sub.add_parser(
'set-dates',
help='Set each exported image\'s mtime from dates recorded in its JSON sidecar.',
)
dp.add_argument(
'--output-dir', required=True, metavar='DIR',
help='Directory containing the exported files and JSON sidecars.',
)
dp.add_argument(
'--use', choices=['piwigo', 'embedded', 'filename'], metavar='SOURCE',
help='Auto-select a date source (piwigo, embedded, or filename) instead of '
'prompting for each image.',
)
args = parser.parse_args()
if args.command == 'export':
cmd_export(args)
else:
cmd_set_dates(args)
if __name__ == '__main__':
main()