Paperless-ngx, CLIs, APIs und ich

Kurt Klinner

Nachdem ich im letzten Jahr ja etwas zum Thema paperless-ngx und Authelia dokumentiert hatte, ist es an der Zeit ein bisschen genauer zu beleuchten, wie ich das System nutze.

Dokumente, die ich bereits in digitaler Form erhalte (ja ab und an passiert es wirklich, dass man ein PDF etc. bekommt) werden direkt via dem WebUI im System abgelegt. Viel zeitraubender ist aber alles zu digitalisieren, was auf dem postalischen Wege ankommt. Einscannen, hochladen, ...

stack of books on table
Photo by Wesley Tingey / Unsplash

Paperless-ngx verfügt über eine brauchbare API auch wenn die Implementierung der ein oder anderen Methode, nennen wir es mal interessant ist.

Ein paar kleinere Tests mittels Python zeigten, dass alles Basis-Features ala Dokumenten-Upload, Vergabe von Tags etc. vorhanden sind. Von daher lag die Herausforderung eher darin das Scannen von Dokumenten programmatisch zu erledigen.

Bei meiner Recherche bin ich auf verschieden Optionen gestossen, die als Alternative zum brscan Tool gelistet sind, allerdings sind nicht alle auf MacOS nutzbar, von daher fiel mein Wahl auf scanline, einem CLI Tool für OSX

GitHub - klep/scanline: Command line scanning utility for OSX
Command line scanning utility for OSX. Contribute to klep/scanline development by creating an account on GitHub.

Scanline bietet alle Optionen, die man erwartet. Relevant für mich war aber lediglich der Batchmodus um doppelseitige Dokumente scannen zu können.

Für meinen kleinen POC entschied ich mich für Python als Programmiersprache.

Vor der eigentlichen Implementierung, musste ich für meinen Account erst ein Token generieren, welches man direkt am Userprofile erledigen kann

Mittels des Tokens - welches ich nicht statisch im Script sonder als Umgebungsvariable abgelegt habe - erfolgt die Authentifizierung an meiner paperless-ngx Instanz, um entsprechend mit der API zu kommunizieren.

Neben einigen Hilfsmethoden, die existierende Tags bzw. Correspondents liefern, erfolgt die eigentliche Arbeite in der scan_document und upload_document Methode (hoffe die Namen sind selbsterklärend 😏).

import subprocess
import os
import requests
from requests_toolbelt import MultipartEncoder
import inquirer

DOCUMENT_PATH = '/Users/<YOURUSERNAME>/Documents/Archive'
PAPERLESS_URL="http://<YOURPAPERLESSINSTANCE>/"
PAPERLESS_API_TOKEN=os.getenv('PAPERLESS_API_TOKEN')
paperless_api_url = f'{PAPERLESS_URL.rstrip("/")}/api/'
paperless_post_url = f'{paperless_api_url.rstrip("/")}/documents/post_document/'
paperless_headers = {'Authorization': f'Token {PAPERLESS_API_TOKEN}'}

def get_csrf_token():
    """Retrieve the CSRF token."""
    try:
        response = requests.get(PAPERLESS_URL, headers=paperless_headers, timeout=10)
        response.raise_for_status()
        paperless_headers['cookie'] = '; '.join([f'{cookie.name}={cookie.value}'
            for cookie in response.cookies])
        return response.cookies.get("csrftoken")
    except requests.RequestException as e:
        print(f"Error retrieving CSRF token: {e}")
        return None

def get_correspondents():
    """Retrieve the configured correspondents as JSON."""
    try:
        response = requests.get(f'{paperless_api_url}correspondents/',
            headers=paperless_headers, timeout=10)
        response.raise_for_status()
        return response.json().get('results', [])
    except requests.RequestException as e:
        print(f"Error retrieving correspondents: {e}")
        return []

def get_tags():
    """Retrieve the configured tags as JSON."""
    try:
        response = requests.get(f'{paperless_api_url}tags/',
            headers=paperless_headers, timeout=10)
        response.raise_for_status()
        return response.json().get('results', [])
    except requests.RequestException as e:
        print(f"Error retrieving tags: {e}")
        return []

def scan_document(documentname, batch):
    """Scan a document using the scanline CLI."""
    try:
        args = ["scanline", "-name", documentname]
        if batch:
            args.insert(1, "-batch")
        subprocess.run(args, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
    except subprocess.CalledProcessError as e:
        print(f"Error scanning document: {e}")

def prompt_user_for_details(correspondents_names, correspondents_tags):
    """Prompt user for document details."""
    questions = [
        inquirer.List('correspondent', message="Name of the correspondent?",
            choices=correspondents_names),
        inquirer.List('tag', message="Document tags?", choices=correspondents_tags),
        inquirer.Text(name='documentname', message="Name of the document to be scanned?"),
        inquirer.Confirm(name='batchscan', message="Scan multiple documents?", default=False)
    ]
    return inquirer.prompt(questions)

def upload_document():
    """Upload the document to paperless-ngx."""
    paperless_token = get_csrf_token()
    if not paperless_token:
        return

    correspondents = get_correspondents()
    if not correspondents:
        return
    correspondents_details = {c["name"]: c["id"] for c in correspondents}
    correspondents_names = list(correspondents_details.keys())

    tags = get_tags()
    if not tags:
        return
    tags_details = {t["name"]: t["id"] for t in tags}
    tags_names = list(tags_details.keys())

    answers = prompt_user_for_details(correspondents_names, tags_names)
    if not answers:
        return

    correspondent_id = correspondents_details[answers['correspondent']]
    tag_id = tags_details[answers['tag']]
    scan_document(answers['documentname'], answers['batchscan'])

    document_path = os.path.join(DOCUMENT_PATH, f"{answers['documentname']}.pdf")

    try:
        with open(document_path, 'rb') as document_file:
            multipart = MultipartEncoder(
                fields={
                    'csrfmiddlewaretoken': paperless_token,
                    'document': (answers['documentname'], document_file),
                    'correspondent': str(correspondent_id),
                    'tags': str(tag_id),
                }
            )
            paperless_headers['Content-Type'] = multipart.content_type
            response = requests.post(paperless_post_url, data=multipart,
                headers=paperless_headers, timeout=10)
            if response.status_code == 200:
                print(f"Document '{document_path}' uploaded successfully.")
                os.remove(document_path)
            else:
                print(f"Failed to upload document. Status code: {response.status_code}")
    except FileNotFoundError:
        print(f"Document file '{document_path}' not found.")
    except requests.RequestException as e:
        print(f"Error uploading document: {e}")

if __name__ == '__main__':
    upload_document()

So sieht das Ganze dann in Aktion aus

0:00
/0:49