#!/usr/bin/env python3 """ piwigo_export.py — Export every Piwigo photo with a JSON metadata sidecar. Directory structure mirrors the album hierarchy; photos that belong to multiple albums are copied into each album folder. Photos with no album membership go into _unsorted/. Optionally embeds metadata directly into the exported image copy via exiftool (requires: apt install libimage-exiftool-perl or brew install exiftool). Usage examples -------------- # Export with JSON sidecars only: python3 piwigo_export.py \ --dbhost localhost --dbuser piwigo --dbpassword secret --dbname piwigo \ --src-path /var/www/piwigo --output-dir ./export # Also embed as XMP tags in the exported copy: python3 piwigo_export.py ... --metadata xmp # Embed all three metadata formats at once: python3 piwigo_export.py ... --metadata exif iptc xmp """ import argparse import datetime import json import math import os import pathlib import re import shutil import subprocess import sys from contextlib import closing import pymysql # --------------------------------------------------------------------------- # Database — bulk loaders (one query per table, not one per image) # --------------------------------------------------------------------------- def load_categories(connection, prefix): """Return {id: row} for every category.""" with closing(connection.cursor()) as cur: cur.execute(f'SELECT * FROM `{prefix}categories`') return {row['id']: row for row in cur} def load_all_tags_by_image(connection, prefix): """Return {image_id: [tag_name, ...]} for the whole library.""" result: dict[int, list[str]] = {} with closing(connection.cursor()) as cur: cur.execute( f'SELECT it.image_id, t.name' f' FROM `{prefix}image_tag` it' f' JOIN `{prefix}tags` t ON it.tag_id = t.id' ) for row in cur: result.setdefault(row['image_id'], []).append(row['name']) return result def load_all_categories_by_image(connection, prefix): """Return {image_id: [category_id, ...]} for the whole library.""" result: dict[int, list[int]] = {} with closing(connection.cursor()) as cur: cur.execute( f'SELECT image_id, category_id FROM `{prefix}image_category`' ) for row in cur: result.setdefault(row['image_id'], []).append(row['category_id']) return result # --------------------------------------------------------------------------- # Category path helpers # --------------------------------------------------------------------------- def category_display_path(cat_id, categories): """Return a human-readable path like 'Holidays / France / Normandy'.""" parts = [] seen: set[int] = set() cid = cat_id while cid is not None and cid not in seen: seen.add(cid) cat = categories.get(cid) if cat is None: break parts.append(cat['name']) cid = cat.get('id_uppercat') parts.reverse() return ' / '.join(parts) def category_fs_path(cat_id, categories): """Return a pathlib.Path for the album's place in the output tree.""" parts = [] seen: set[int] = set() cid = cat_id while cid is not None and cid not in seen: seen.add(cid) cat = categories.get(cid) if cat is None: break parts.append(_safe_dirname(cat['name'])) cid = cat.get('id_uppercat') parts.reverse() return pathlib.Path(*parts) if parts else pathlib.Path('_root') def _safe_dirname(name: str) -> str: """Replace characters that are awkward in directory names.""" for ch in ('/', '\\', '\0', ':'): name = name.replace(ch, '_') return name.strip() or '_unnamed' # --------------------------------------------------------------------------- # Metadata embedding via exiftool # --------------------------------------------------------------------------- # IPTC IIM maximum byte lengths for string fields we write. # exiftool silently truncates to these limits, so we apply them ourselves # first — otherwise a re-run would see a spurious collision between the # full Piwigo value and the already-truncated on-disk value. _IPTC_MAX_BYTES: dict[str, int] = { 'IPTC:ObjectName': 64, 'IPTC:By-line': 32, 'IPTC:Caption-Abstract': 2000, 'IPTC:Keywords': 64, # per keyword 'IPTC:SupplementalCategories': 32, # per entry } def _iptc_truncate(tag: str, value: str) -> str: """Normalise *value* for storage in *tag*: strip whitespace (exiftool does this on write) then truncate to the IPTC byte limit (UTF-8 aware).""" value = value.strip() limit = _IPTC_MAX_BYTES.get(tag) if limit is None: return value encoded = value.encode('utf-8') if len(encoded) <= limit: return value # Truncate on a UTF-8 character boundary. return encoded[:limit].decode('utf-8', errors='ignore') # Tags whose values are always lists (multi-value fields). _LIST_TAGS = { 'IPTC:Keywords', 'IPTC:SupplementalCategories', 'XMP-dc:Subject', 'XMP-dc:Creator', 'XMP-lr:HierarchicalSubject', } # GPS tags use floating-point; compare with a tolerance instead of string equality. # (1e-5 degrees ≈ 1 metre on the ground — more than enough.) _GPS_TAGS = {'GPS:GPSLatitude', 'GPS:GPSLongitude'} def check_exiftool(): if shutil.which('exiftool') is None: sys.exit( 'ERROR: exiftool not found on PATH.\n' ' Install it with: apt install libimage-exiftool-perl\n' ' or: brew install exiftool\n' 'Then re-run, or omit --metadata.' ) def _exif_datetime(s) -> str: """'YYYY-MM-DD[ HH:MM:SS]' → 'YYYY:MM:DD HH:MM:SS' (EXIF format).""" s = str(s) date = s[:10].replace('-', ':') time = s[11:19] if len(s) > 10 else '00:00:00' return f'{date} {time}' def _iptc_date(s) -> str: """'YYYY-MM-DD[ ...]' → 'YYYYMMDD'.""" return str(s)[:10].replace('-', '') def _iptc_time(s) -> str: """'YYYY-MM-DD HH:MM:SS' → 'HHMMSS+0000'.""" s = str(s) t = s[11:19] if len(s) > 10 else '00:00:00' return t.replace(':', '') + '+0000' def _xmp_datetime(s) -> str: """'YYYY-MM-DD[ HH:MM:SS]' → 'YYYY-MM-DDTHH:MM:SS'.""" s = str(s) t = s[11:19] if len(s) > 10 else '00:00:00' return f'{s[:10]}T{t}' def _parse_datetime(s) -> datetime.datetime | None: """Parse a DB or EXIF date string into a datetime, or return None.""" s = str(s).strip() # Try the full string first (handles both datetime and date-only values). for fmt in ('%Y-%m-%d %H:%M:%S', '%Y:%m:%d %H:%M:%S', '%Y-%m-%d', '%Y:%m:%d'): try: return datetime.datetime.strptime(s, fmt) except ValueError: continue # If the string has trailing timezone info or extra fields, try the prefix. for prefix_len, fmt in ((19, '%Y-%m-%d %H:%M:%S'), (19, '%Y:%m:%d %H:%M:%S'), (10, '%Y-%m-%d'), (10, '%Y:%m:%d')): try: return datetime.datetime.strptime(s[:prefix_len], fmt) except ValueError: continue return None def _earliest_image_date(image_path: pathlib.Path) -> datetime.datetime | None: """Return the earliest datetime found in the image's embedded metadata. Scans every tag returned by exiftool whose name contains 'date' or 'time' (case-insensitive), skipping filesystem/tool pseudo-groups (File:, ExifTool:, Composite:) and GPS tags (which are UTC and timezone-ambiguous). This catches EXIF:ModifyDate, PNG:ModifyDate, XMP-xmp:CreateDate, etc. without needing a format-specific allowlist. Returns None if exiftool is not available or no date tags are found. """ if not shutil.which('exiftool'): return None existing = _read_existing_metadata(image_path) dates = [] for key, val in existing.items(): group = key.split(':')[0] if ':' in key else '' if group in ('File', 'ExifTool', 'Composite', 'GPS'): continue tag_name = key.split(':', 1)[1] if ':' in key else key if 'date' not in tag_name.lower() and 'time' not in tag_name.lower(): continue dt = _parse_datetime(str(val)) if dt and 1900 < dt.year < 2100: dates.append(dt) return min(dates) if dates else None # Matches 14 consecutive digits (YYYYMMDDHHMMSS) not adjacent to another digit. _RE_DATETIME_14 = re.compile(r'(? tuple[datetime.datetime | None, bool]: """Try to parse a date/datetime from a filename. Recognises patterns such as: 20120415142550-b05adf19.png → 2012-04-15 14:25:50 (has_time=True) IMG-20120415142550.jpg → 2012-04-15 14:25:50 (has_time=True) IMG-20120415.jpg → 2012-04-15 00:00:00 (has_time=False) Returns (datetime, has_time). has_time=False means only a date was found; the time component is set to midnight but should not be treated as known. Returns (None, False) if no recognisable pattern is found. """ stem = pathlib.Path(name).stem # Try 14-digit datetime first. m = _RE_DATETIME_14.search(stem) if m: y, mo, d, h, mi, s = (int(x) for x in m.groups()) try: return datetime.datetime(y, mo, d, h, mi, s), True except ValueError: pass # invalid date/time components — fall through # Fall back to 8-digit date. m = _RE_DATE_8.search(stem) if m: y, mo, d = (int(x) for x in m.groups()) try: return datetime.datetime(y, mo, d, 0, 0, 0), False except ValueError: pass return None, False def _build_metadata_tags(metadata: dict, fmt: str) -> dict: """ Build a dict of { 'GROUP:TagName': value } for everything we want to write. List-valued tags (Keywords, Subject, …) use Python lists as the value. Scalar tags use a single string/number. """ tags: dict = {} title = metadata.get('title') or '' author = metadata.get('author') or '' description = metadata.get('description') or '' kw_list = metadata.get('tags') or [] albums = metadata.get('albums') or [] date_str = metadata.get('date_created') rating = metadata.get('rating') lat = metadata.get('latitude') lon = metadata.get('longitude') if fmt == 'exif': if title: tags['EXIF:ImageDescription'] = title if author: tags['EXIF:Artist'] = author if description: tags['EXIF:UserComment'] = description if date_str: dt = _exif_datetime(date_str) tags['EXIF:DateTimeOriginal'] = dt tags['EXIF:CreateDate'] = dt elif fmt == 'iptc': if title: tags['IPTC:ObjectName'] = _iptc_truncate('IPTC:ObjectName', title) if author: tags['IPTC:By-line'] = _iptc_truncate('IPTC:By-line', author) if description: tags['IPTC:Caption-Abstract'] = _iptc_truncate('IPTC:Caption-Abstract', description) if date_str: tags['IPTC:DateCreated'] = _iptc_date(date_str) tags['IPTC:TimeCreated'] = _iptc_time(date_str) if kw_list: tags['IPTC:Keywords'] = [_iptc_truncate('IPTC:Keywords', k) for k in kw_list] if albums: tags['IPTC:SupplementalCategories'] = [_iptc_truncate('IPTC:SupplementalCategories', a) for a in albums] elif fmt == 'xmp': if title: tags['XMP-dc:Title'] = title if author: tags['XMP-dc:Creator'] = [author] # XMP Creator is a list if description: tags['XMP-dc:Description'] = description if date_str: tags['XMP-xmp:CreateDate'] = _xmp_datetime(date_str) if kw_list: tags['XMP-dc:Subject'] = list(kw_list) if albums: tags['XMP-lr:HierarchicalSubject'] = list(albums) if rating is not None: tags['XMP-xmp:Rating'] = int(round(rating)) else: raise ValueError(f'Unknown metadata format: {fmt!r}') # GPS is written to the EXIF GPS IFD regardless of which metadata format # was chosen — it is the most universally readable location. if lat is not None and lon is not None: tags['GPS:GPSLatitude'] = abs(lat) tags['GPS:GPSLatitudeRef'] = 'N' if lat >= 0 else 'S' tags['GPS:GPSLongitude'] = abs(lon) tags['GPS:GPSLongitudeRef'] = 'E' if lon >= 0 else 'W' return tags def _read_existing_metadata(image_path: pathlib.Path) -> dict: """ Return all metadata currently in *image_path* as { 'GROUP:Tag': value }. Flags used: -G prefix every key with its group name (e.g. 'EXIF:', 'GPS:') -n return numeric values as numbers (avoids degree-string formatting for GPS, avoids localised number formats, etc.) -j JSON output """ result = subprocess.run( ['exiftool', '-json', '-G', '-n', str(image_path)], capture_output=True, text=True, ) if result.returncode != 0: print( f'WARNING: could not read metadata from {image_path}: ' f'{result.stderr.strip()}', file=sys.stderr, ) return {} try: data = json.loads(result.stdout) return data[0] if data else {} except (json.JSONDecodeError, IndexError): return {} def _is_repeated_char(s: str, min_reps: int = 10) -> bool: """Return True if *s* consists of a single character repeated at least *min_reps* times (e.g. '??????????', '----------', ' ').""" s = str(s) return len(s) >= min_reps and len(set(s)) == 1 def _values_equal(tag: str, existing, desired) -> bool: """Return True if existing and desired values are effectively the same.""" if tag in _GPS_TAGS: try: return math.isclose(float(existing), float(desired), rel_tol=1e-5) except (TypeError, ValueError): pass return str(existing).strip() == str(desired).strip() def _filter_tags( desired: dict, existing: dict, image_path: pathlib.Path, never_overwrite: bool = False, ) -> dict: """ Compare desired tags against what is already embedded in the file and return only the tags that need to be written. Rules ----- Scalar tags: • Not present in file → include for writing. • Present, same value → skip silently. • Present, different → overwrite if the existing value is empty or a repeated-character placeholder; otherwise prompt the user (unless *never_overwrite* is True, in which case the existing value is always kept). List tags (Keywords, Subject, …): • Each item is checked individually. • Items already present in the file's list are silently skipped. • Items not yet present are queued for writing. • No collision error — lists are additive by nature. """ to_write: dict = {} for tag, new_value in desired.items(): existing_value = existing.get(tag) if tag in _LIST_TAGS: new_items = new_value if isinstance(new_value, list) else [new_value] if existing_value is None: to_write[tag] = new_items else: ex_list = ( [str(v).strip() for v in existing_value] if isinstance(existing_value, list) else [str(existing_value).strip()] ) to_add = [v for v in new_items if str(v).strip() not in ex_list] if to_add: to_write[tag] = to_add else: # scalar tag if existing_value is None: to_write[tag] = new_value elif _values_equal(tag, existing_value, new_value): pass # already there with the same value — nothing to do elif never_overwrite: pass # keep existing value, skip silently elif str(existing_value).strip() == '': # Existing value is empty — silently replace with Piwigo value. to_write[tag] = new_value elif _is_repeated_char(existing_value): # Existing value is a placeholder (e.g. '???????????') — # silently replace it with the Piwigo value. to_write[tag] = new_value else: print( f'\nMetadata collision in {image_path}:\n' f' tag : {tag}\n' f' existing : {existing_value!r}\n' f' Piwigo : {new_value!r}', file=sys.stderr, ) while True: choice = input( ' Use Piwigo value? [y/N] ' ).strip().lower() if choice in ('n', 'no', ''): break # leave this tag out of to_write if choice in ('y', 'yes'): to_write[tag] = new_value break print(' Please enter y or n.') return to_write def _tags_to_exiftool_args(tags: dict) -> list[str]: """Convert { 'GROUP:Tag': value } back into exiftool -TAG=VALUE strings.""" args: list[str] = [] for tag, value in tags.items(): if isinstance(value, list): for item in value: args.append(f'-{tag}={item}') else: args.append(f'-{tag}={value}') return args def embed_metadata( dest_image: pathlib.Path, metadata: dict, fmt: str | list[str], never_overwrite: bool = False, ): """ Read the image's existing metadata, check for conflicts with what Piwigo knows, then write only the tags that are new or not yet present. *fmt* may be a single format string or a list of format strings; when multiple formats are given their tag dicts are merged before writing so that only one exiftool invocation is needed. If *never_overwrite* is True, tags that already exist in the file are always kept as-is, with no prompt. """ formats = [fmt] if isinstance(fmt, str) else fmt desired: dict = {} for f in formats: desired.update(_build_metadata_tags(metadata, f)) if not desired: return existing = _read_existing_metadata(dest_image) to_write = _filter_tags(desired, existing, dest_image, never_overwrite) if not to_write: return # every tag was already present with the correct value cmd = ( ['exiftool', '-overwrite_original'] + _tags_to_exiftool_args(to_write) + [str(dest_image)] ) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print( f' WARNING: exiftool failed for {dest_image.name}:\n' f' {result.stderr.strip()}', file=sys.stderr, ) # --------------------------------------------------------------------------- # Core export for a single image # --------------------------------------------------------------------------- def export_image( image_row: dict, tags_by_image: dict, cats_by_image: dict, categories: dict, src_path: pathlib.Path, output_dir: pathlib.Path, metadata_format: list[str] | None, overwrite: bool, never_overwrite_metadata: bool = False, ) -> int: """ Copy the image (and its JSON sidecar) to every destination album folder. Returns the number of image files actually written. """ src_file = src_path / image_row['path'] if not src_file.is_file(): print(f'WARNING: source file not found: {src_file}', file=sys.stderr) return 0 image_id = image_row['id'] tags = tags_by_image.get(image_id, []) cat_ids = cats_by_image.get(image_id, []) # Collect all three date sources before building the metadata dict. img_filename = pathlib.Path(image_row['path']).name date_embedded_dt = _earliest_image_date(src_file) date_filename_dt, filename_has_time = _date_from_filename(img_filename) if date_filename_dt is not None: date_filename_str = ( date_filename_dt.strftime('%Y-%m-%d %H:%M:%S') if filename_has_time else date_filename_dt.strftime('%Y-%m-%d') ) else: date_filename_str = None metadata = { 'title': image_row.get('name'), 'author': image_row.get('author'), 'date_created': str(image_row['date_creation']) if image_row.get('date_creation') else None, 'date_added': str(image_row['date_available']) if image_row.get('date_available') else None, 'date_embedded': str(date_embedded_dt) if date_embedded_dt else None, 'date_filename': date_filename_str, 'description': image_row.get('comment'), 'tags': tags, 'albums': [category_display_path(cid, categories) for cid in cat_ids], 'width': image_row.get('width'), 'height': image_row.get('height'), 'filesize': image_row.get('filesize'), 'latitude': float(image_row['latitude']) if image_row.get('latitude') else None, 'longitude': float(image_row['longitude']) if image_row.get('longitude') else None, 'rating': float(image_row['rating_score']) if image_row.get('rating_score') else None, 'original_path': image_row['path'], } # Print all three date sources so the user can see any discrepancy. piwigo_str = metadata['date_created'] or '—' embedded_str = metadata['date_embedded'] or '—' fn_date_str = metadata['date_filename'] or '—' print(f' {img_filename} piwigo: {piwigo_str} embedded: {embedded_str} filename: {fn_date_str}') # Best date for file mtime: piwigo > embedded > filename. mtime_ts = None for _ds in (metadata.get('date_created'), metadata.get('date_embedded'), metadata.get('date_filename')): if _ds: _dt = _parse_datetime(_ds) if _dt: mtime_ts = _dt.timestamp() break if mtime_ts is None: print(f' NOTE: {img_filename}: no date found; file mtime will not be set', file=sys.stderr) dest_dirs = ( [output_dir / category_fs_path(cid, categories) for cid in cat_ids] if cat_ids else [output_dir / '_unsorted'] ) filename = img_filename stem = pathlib.Path(filename).stem written = 0 for dest_dir in dest_dirs: dest_dir.mkdir(parents=True, exist_ok=True) dest_image = dest_dir / filename dest_sidecar = dest_dir / f'{stem}.json' # Collision check: would we overwrite a file from a *different* source? if dest_image.exists() and dest_sidecar.exists(): try: existing = json.loads(dest_sidecar.read_text(encoding='utf-8')) if existing.get('original_path') != image_row['path']: raise RuntimeError( f"Filename collision at {dest_image}:\n" f" already written from : {existing.get('original_path')}\n" f" now requested from : {image_row['path']}\n" f"Use --overwrite to ignore (the second file will replace the first)." ) except json.JSONDecodeError: pass # corrupted sidecar — let the overwrite logic decide # Skip if both files are already present (and --overwrite not set). if dest_image.exists() and dest_sidecar.exists() and not overwrite: print(f' SKIP (both files exist, use --overwrite to replace): {dest_image}') continue # Copy image file. shutil.copy2(str(src_file), str(dest_image)) written += 1 if metadata_format: # If no Piwigo date, fall back to filename-derived date so that # missing EXIF/IPTC/XMP date tags are filled in from the filename. meta_for_embed = metadata if not meta_for_embed.get('date_created') and meta_for_embed.get('date_filename'): meta_for_embed = {**metadata, 'date_created': metadata['date_filename']} embed_metadata(dest_image, meta_for_embed, metadata_format, never_overwrite_metadata) # Set mtime after any exiftool call so exiftool doesn't reset it. if mtime_ts is not None: os.utime(str(dest_image), (mtime_ts, mtime_ts)) # Write/refresh the sidecar so it stays in sync with the DB. dest_sidecar.write_text( json.dumps(metadata, indent=2, ensure_ascii=False, default=str), encoding='utf-8', ) return written # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def cmd_export(args): """Run the export subcommand (database → filesystem).""" if args.metadata: check_exiftool() src_path = pathlib.Path(args.src_path) output_dir = pathlib.Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) print(f'Connecting to {args.dbuser}@{args.dbhost}/{args.dbname} …') connection = pymysql.connect( host=args.dbhost, user=args.dbuser, password=args.dbpassword, database=args.dbname, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, ) prefix = args.db_prefix print('Loading category tree …') categories = load_categories(connection, prefix) print(f' {len(categories)} categories.') print('Loading tag assignments …') tags_by_image = load_all_tags_by_image(connection, prefix) print(f' tags for {len(tags_by_image)} images.') print('Loading album memberships …') cats_by_image = load_all_categories_by_image(connection, prefix) print(f' memberships for {len(cats_by_image)} images.') print('Exporting images …') total_images = 0 total_written = 0 with closing(connection.cursor()) as cur: cur.execute( f'SELECT id, file, path, name, comment, author,' f' date_creation, date_available,' f' width, height, filesize,' f' latitude, longitude, rating_score' f' FROM `{prefix}images`' ) for image_row in cur: total_images += 1 total_written += export_image( image_row, tags_by_image, cats_by_image, categories, src_path, output_dir, args.metadata, args.overwrite, args.no_overwrite_metadata, ) if total_images % 100 == 0: print(f' … {total_images} processed, {total_written} written so far') connection.close() print( f'\nDone. {total_images} images processed, ' f'{total_written} image files written to {output_dir}/' ) def cmd_set_dates(args): """Walk an export directory and set each image's mtime from its sidecar dates.""" output_dir = pathlib.Path(args.output_dir) if not output_dir.is_dir(): sys.exit(f'ERROR: output directory not found: {output_dir}') sidecars = sorted(output_dir.rglob('*.json')) if not sidecars: print('No JSON sidecars found.') return applied = skipped = noted = 0 for sidecar in sidecars: # Load the sidecar. try: data = json.loads(sidecar.read_text(encoding='utf-8')) except (json.JSONDecodeError, OSError) as exc: print(f'WARNING: could not read {sidecar}: {exc}', file=sys.stderr) continue # Find the corresponding image file (same stem, any non-.json extension). image_path = None for candidate in sidecar.parent.iterdir(): if candidate.stem == sidecar.stem and candidate.suffix.lower() != '.json': image_path = candidate break if image_path is None: print(f'WARNING: no image file found for {sidecar.name}', file=sys.stderr) continue # Parse all three candidate dates from the sidecar. dt_piwigo = _parse_datetime(data['date_created']) if data.get('date_created') else None dt_embedded = _parse_datetime(data['date_embedded']) if data.get('date_embedded') else None fn_date_str = data.get('date_filename') dt_filename = _parse_datetime(fn_date_str) if fn_date_str else None # has_time is False when the sidecar stored only a date (no space → no time part). filename_has_time = bool(fn_date_str and ' ' in fn_date_str) chosen = None chosen_source = None if args.use == 'piwigo': if dt_piwigo is None: print(f' NOTE: {image_path.name}: no piwigo date in sidecar; skipping.') noted += 1 continue chosen, chosen_source = dt_piwigo, 'piwigo' elif args.use == 'embedded': if dt_embedded is None: print(f' NOTE: {image_path.name}: no embedded date in sidecar; skipping.') noted += 1 continue chosen, chosen_source = dt_embedded, 'embedded' elif args.use == 'filename': if dt_filename is None: print(f' NOTE: {image_path.name}: no filename date in sidecar; skipping.') noted += 1 continue chosen, chosen_source = dt_filename, 'filename' else: # Interactive mode: collect available (source, datetime) pairs. options = [] if dt_piwigo: options.append(('piwigo', dt_piwigo)) if dt_embedded: options.append(('embedded', dt_embedded)) if dt_filename: options.append(('filename', dt_filename)) if not options: print(f' NOTE: {image_path.name}: no dates available; skipping.') noted += 1 continue # If all available dates are the same, or only one source, apply silently. unique_dts = list(dict.fromkeys(o[1] for o in options)) if len(unique_dts) == 1: chosen, chosen_source = options[0][1], options[0][0] else: # Multiple different dates — ask the user. print(f'\n{image_path.name}') for i, (src, dt) in enumerate(options, 1): print(f' [{i}] {src:<8} : {dt}') print(f' [s] skip') while True: choice = input(f'Choice [1-{len(options)}/s]: ').strip().lower() if choice in ('s', 'skip', ''): break try: idx = int(choice) - 1 if 0 <= idx < len(options): chosen_source, chosen = options[idx] break except ValueError: pass print(f' Please enter a number between 1 and {len(options)}, or s.') if chosen is None: skipped += 1 continue # Set file mtime. ts = chosen.timestamp() os.utime(image_path, (ts, ts)) applied += 1 # For filename-derived dates, also embed the date into any missing # EXIF/IPTC/XMP tags so the image carries its own date going forward. if chosen_source == 'filename' and shutil.which('exiftool'): date_str = ( chosen.strftime('%Y-%m-%d %H:%M:%S') if filename_has_time else chosen.strftime('%Y-%m-%d') ) embed_metadata( image_path, {'date_created': date_str}, ['exif', 'iptc', 'xmp'], never_overwrite=True, ) # exiftool rewrites the file and resets its mtime — restore it. os.utime(image_path, (ts, ts)) print( f'\nDone. {applied} mtime(s) set, {skipped} skipped, {noted} with no date.' ) def main(): parser = argparse.ArgumentParser( description='Piwigo photo export and date-management utilities.', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) sub = parser.add_subparsers(dest='command', required=True) # ------------------------------------------------------------------ export ep = sub.add_parser( 'export', help='Export photos from Piwigo to a directory tree with JSON sidecars.', formatter_class=argparse.RawDescriptionHelpFormatter, ) db = ep.add_argument_group('database') db.add_argument('--dbhost', metavar='HOST') db.add_argument('--dbuser', required=True, metavar='USER') db.add_argument('--dbpassword', metavar='PASS') db.add_argument('--dbname', required=True, metavar='NAME') db.add_argument( '--db-prefix', default='piwigo_', metavar='PREFIX', help='Piwigo table prefix (default: %(default)s)', ) io = ep.add_argument_group('paths') io.add_argument( '--src-path', required=True, metavar='DIR', help='Root of the Piwigo installation; piwigo_images.path is relative to this.', ) io.add_argument( '--output-dir', required=True, metavar='DIR', help='Directory to write exported files into (created if absent).', ) behaviour = ep.add_argument_group('behaviour') behaviour.add_argument( '--metadata', choices=['exif', 'iptc', 'xmp'], nargs='+', metavar='FORMAT', help='Also embed metadata into the exported image copy using exiftool. ' 'One or more of: exif, iptc, xmp. ' 'Example: --metadata exif iptc xmp', ) behaviour.add_argument( '--overwrite', action='store_true', help='Re-export image files that already exist in the output directory. ' 'JSON sidecars are always refreshed.', ) behaviour.add_argument( '--no-overwrite-metadata', action='store_true', help='When embedding metadata, never overwrite a tag that already has a ' 'value in the file — skip it silently instead of prompting.', ) # --------------------------------------------------------------- set-dates dp = sub.add_parser( 'set-dates', help='Set each exported image\'s mtime from dates recorded in its JSON sidecar.', ) dp.add_argument( '--output-dir', required=True, metavar='DIR', help='Directory containing the exported files and JSON sidecars.', ) dp.add_argument( '--use', choices=['piwigo', 'embedded', 'filename'], metavar='SOURCE', help='Auto-select a date source (piwigo, embedded, or filename) instead of ' 'prompting for each image.', ) args = parser.parse_args() if args.command == 'export': cmd_export(args) else: cmd_set_dates(args) if __name__ == '__main__': main()