psycopg2.InternalError nach Umstellung auf multiprocessing

  • Hallo Zusammen, entweder stehe ich gerade völlig auf den Schlauch....

    Bei meinem Smartmeterprojekt möchte ich die Durchlaufzeit verkürzen, um dies zu erreichen wollte ich den Vorgang welcher die Messwerte in die Datenbank schreibt in einen eigenen Prozess auslagern.

    Vor der Auslagerung funktioniert das Schreiben in die Datenbank, nach der Umgestaltung erhalte ich folgenden Fehler:

    Allerdings komme ich auch mithilfe des Links nacht dahinte wo das Problem ist.


    Im Anhang poste ich mal das funktionierende Skript ohne Multiproccessing und einmal das Fehlerhafte.

    Über die Compare Darstellung sollte man so einen schnellen Überblick bekommen was ich geändert habe. Vielleicht sieht von euch ja jemand den Fehler :conf:

    Zusatzinfo: sqlalchemy wird aus nächsten Schritt entfernt und nur noch auf das reine psycopg2 gesetzt...

  • psycopg2.InternalError nach Umstellung auf multiprocessing? Schau mal ob du hier fündig wirst!

  • Hi,

    ich kann Dir zwar bei Python nicht helfen ... aber für mich sieht das so aus, als würde noch ein anderer Prozess, evtl. sogar mit demselben Handle (das kann, muss aber nicht sein) auf die Tabelle zugreifen.

    Dadurch würden "dirty data" oder nur Teilupdates möglich und Du müsstest da erst an anderer Stelle einen commit absetzen oder aber die Update-Geschichte als Transaktion definieren, damit die entsprechende Tabelle(n) korrekt gelockt wird/werden.

    //EDIT: meine Vermutung ist halt, dass die automatische Lock-Strategie mit der Situation nicht klarkommt und so die Notbremse zieht ...

    So long,

    -ds-

  • Hallo,

    du überschreibst im Multiprocessing-Code `data` in der Funktion `datenbank_prozess` ist das Absicht? Wenn `while not q_intervall_daten_senden.empty()` zutrifft _und_ vorher `while not q_data.empty()` zugetroffen hat, sind alle Daten, die du in `data` geschrieben hast, überschrieben.

    Abgesehen davon: wenn man meint, mit einer variablen Anzahl Spalten beim `INSERT` arbeiten zu müssen, ist entweder der Datenbankentwurf oder der Programmentwurf falsch. Mal abgesehen davon, dass das Formatieren ein SQL-Ausdrucks mit `format` anfällig für SQL-Injections ist -> Sicherheitsrisiko. Zum Einfügen von Werten kennt die Python DB 2.0 API ja ein entsprechendes Vorgehen, genau so wie SQLAlchemy. Die Spalten solltest du fix im SQL-Statement haben.

    Wenn du schon SQLAlchemy nutzt, dann vielleicht doch für ein bisschen mehr als für's connection pooling. Man muss ja nicht unbedingt das ORM nutzen, aber zumindest du SQL Expression Language. Wenn SQLAlchemy zu "schwergewichtig" ist -> PeeWee ist eine "leichtere" Alternative.

    Bringt's das Auslagern in den Prozess wirklich? Der Bottleneck ist IMHO in der funktionierenden Variante, dass jedes Mal der SSH-Tunnel aufgebaut wird und _nicht_ das Schreiben in die DB.

    Postgres ist ja auch netzwerkfähig - hat das einen Grund, warum du über den Tunnel gehst?

    Außerdem dürfe auch das Logging mehr Zeit benötigen als das Schreiben in die DB, weil beim Logging File I/O im Spiel ist.

    Und nach was zum Stil: es ist ja ok, deutsche Klassen- und Funktionsnamen zu nutzen - aber dann bitte konsequent. "change", "set", "add", "write" sind kein Deutsch :)

    Gruß, noisefloor

  • aber für mich sieht das so aus, als würde noch ein anderer Prozess, evtl. sogar mit demselben Handle (das kann, muss aber nicht sein) auf die Tabelle zugreifen.

    Also in meinem Skript wird der Handler nur einmal Instanziert...die Tabelle ansich wird von Grafana zur Visualsierung noch verwendet. Aber dann müsste es ja mit dem funktionierenden Skript auch kollisionen geben.


    du überschreibst im Multiprocessing-Code `data` in der Funktion `datenbank_prozess` ist das Absicht? Wenn `while not q_intervall_daten_senden.empty()` zutrifft _und_ vorher `while not q_data.empty()` zugetroffen hat, sind alle Daten, die du in `data` geschrieben hast, überschrieben.

    Ist absicht ja, hatte das ganze per Umweg über data gemacht, weil ich noch print() Ausgaben eingebaut hatte, aber die daten werden ja mit append gesichert bzw. pg_handler.intervall_daten_senden  damit geschrieben.

    Wenn du schon SQLAlchemy nutzt,

    ja wie erwähnt, will eh auf psycopg2 umstellen. Wollte, PeeWee hat Linus auch schon auf dem Schirm... dann ists wohl besser gleich einheitlich zu arbeiten. Weswegen ich auch SQLAlchemy anfangs verwendete, bis ich gemerkt habe, dass ich dessen funktionen eh nicht verwende


    Bringt's das Auslagern in den Prozess wirklich?

    Kann ich leider noch nicht beantworten weil ich noch keine funktionierenden alternative habe. Aber ich verstehe was du meinst, dass ich das aktuelle Skript auch so umgestalten sollte, dass die SSH Verbindung dauernd gehalten wird.

    hat das einen Grund, warum du über den Tunnel gehst?

    Ja, der Server (VServer) gehört einem Freund von mir, und der ist so konfiguriert dass man nur als localhost darauf zugriff hat


    Und nach was zum Stil: es ist ja ok, deutsche Klassen- und Funktionsnamen zu nutzen - aber dann bitte konsequent. "change", "set", "add", "write" sind kein Deutsch :)

    Da beißt sich schon Linus über 1 Jahr die Zähne aus :angel: Das ist halt Denglisch

    Und das wichtigste zum Schluss, deswegen wollt ich schon mal ein Thread dazu eröffnen:

    Abgesehen davon: wenn man meint, mit einer variablen Anzahl Spalten beim `INSERT` arbeiten zu müssen, ist entweder der Datenbankentwurf oder der Programmentwurf falsch. Mal abgesehen davon, dass das Formatieren ein SQL-Ausdrucks mit `format` anfällig für SQL-Injections ist -> Sicherheitsrisiko. Zum Einfügen von Werten kennt die Python DB 2.0 API

    Kurz Weg, das mit der Python DB 2.0 API ist mir unbekannt


    Wegen den Injections, wie hoch ist das Sicherheitsrisiko wenn keine Daten von "außen" oder von Clients kommen, sondern wenn man das ganze nur für sich gemacht hat?

    Bzgl den Datenbankentwurf oder der Programmentwurf falsch das habe ich schon öfters gelesen und wollte eben wie erwähnt schonmal einen Thread dazu eröffnen. Für mich ist s eben eher unlogisch warum ich nicht die Spalten variable setzen kann ?!

    DB sieht so aus:

    Eine 3 Spaltige Tabelle mit timestamp, Wertname, Wert find ich wenig praktikabel, vorallem muss ich es ja in Grafana auch noch weiterverarbeiten.

    Programmentwurf, nunja ich will halt nicht immer jeden Wert bei jeder Messung auslesen sondern zu unterschiedlichen Intervallen, deshalb zu sagen das der Programmentwurf falsch ist ...?

    Wie würde man es besser gestalten?

    Das war jetzt viel zu sekundären "Problemen"...und bin auch froh darüber, dass diese Angesprochen werden, wie soll denn auch einer wie ich als Hobby "Programmierer" was dazulernen, aber das Primäre Problem, warum funktioniert das Daten schreiben beim Ursprünglichen Skript, aber nicht mehr mit multiprocessing?

  • Also in meinem Skript wird der Handler nur einmal Instanziert...die Tabelle ansich wird von Grafana zur Visualsierung noch verwendet. Aber dann müsste es ja mit dem funktionierenden Skript auch kollisionen geben

    nicht unbedingt, weil Du da evtl. keine konkurrierenden Zugriffe hast ...

    Ich kenne die Python-Lib nicht und habe SQL fast ausschliesslich mit Oracle und C programmiert. Ich weiss aber noch, dass wir unter bestimmten Umständen durchaus auch Lese-Sperren eingebaut haben (um eben dirty data resp. dirty reads durch updates/deletes während des Lesevorgangs zu verhindern) und Updates/Inserts/deletes ausschliesslich in Transaktionen gepackt hatten.

    Wie gesagt ... nur eine Vermutung ...

    ciao,

    -ds-

  • Hallo,

    Zitat

    Wegen den Injections, wie hoch ist das Sicherheitsrisiko wenn keine Daten von "außen" oder von Clients kommen, sondern wenn man das ganze nur für sich gemacht hat?

    Dann hast du gar kein Risiko - es sei denn, du bist masochistisch veranlagt und löschst dir über eine SQL-Injection deine eigene Tabelle :D

    Eine Tabelle mit 69 (!) Spalten würde ich als Fehler im Entwurf bezeichnen...

    Zitat


    Eine 3 Spaltige Tabelle mit timestamp, Wertname, Wert find ich wenig praktikabel

    Wäre aber IMHO besser, weil übersichtlicher. Daten mit dem gleichen timestamp solltest du dir IMHO über eine view anzeigen lassen können. Aber so gut kenne ich mich mit SQL auch nicht aus.

    Plan B: InfluxDB oder so was in die Richtung, was auf time-based data ausgelegt ist.

    Zum eigentlichen Problem: ich bin da bei dreamshader: die Fehlermeldung von SQLAlchemy besagt ja, das beim Schreiben der Daten was schief geht und dadurch Transaktionen schief gehen - auch, wenn Postgre eigentlich mit gleichzeitigen Schreib-Lesezugriffen umgehen kann. Was sagen eigentlich die Logs von Postgres? Darin sollte ggf. auch noch was aufschlussreiches zu finden sein.

    Gruß, noisefloor

    Einmal editiert, zuletzt von noisefloor (5. August 2018 um 15:33) aus folgendem Grund: Zitat richtig formatiert.

  • Servus Hofei,

    Eine Tabelle mit 69 (!) Spalten würde ich als Fehler im Entwurf bezeichnen...

    da bin ich voll bei noisefloor ...

    ich überleg' schon die ganze Zeit, wie das Tool hiess, dass wir seinerzeit zum DB-Design verwendet habe. Das fällt mir ums Verrecken nicht ein ...:wallbash:

    Aber ... ich hab' mal eine ad hoc-Suche gestartet und das sieht auch im freien Bereich mittlerweile recht gut aus.

    -> https://www.quora.com/What-is-the-be…ema-design-tool

    oder

    -> https://www.softwaretestinghelp.com/database-design-tools/

    Wenn Du mit solch einem Tool Deine DB-Tabellen erstellst, hast Du sowohl eine gute Übersicht über die Zusammenhänge ( da wären wir übrigens bei der Bedeutung von "Relational DB System" ;) ), gleich eine Doku und zudem die Möglichkeit, die Tabellen nach z.B. einer Änderung, jederzeit wieder neu zu generieren ...

    ciao,

    -ds-

  • Hier mal mein aktueller Stand:

    Den InternalError habe ich beseitigen können, in dem Moment wo ich den Fehler mit dem duplicate Key abgefangen habe, kam ein rollback() rein und der Fehler war beseitigt.

    Auch habe ich jetzt schon sqlAlchemy ersetzt in reines psycopg2. Der aktuelle Skriptstand wird unten angehängt.

    Folgende 2 Probleme vom Skript beschäftigen mich noch...wobei ich mir gerade selbst die Frage stelle ob ich dafür neue Threads eröffenen soll oder einfach hier anhängen?!

    Jedenfalls Problem Nr 1 was wäre, die Queue q_intervall_daten_senden will nicht funktionieren, hierfür hab ich zum testen wieder entsprechende prints eingebaut. put() wird noch ausgeführt, aber ich bekomme nie ein print von get() :conf::?: (Zeile 202)

    Mehrere Queue sollten aber möglich sein, da dieser Beispielcode funktioniert:

    Problem Nr. 2 ist, ich verstehe nicht wie bei dieser "Codekonstallation" überhaupt duplicate Keys entstehen können.

    Primary Key ist der timestamp.

    Ich prüfe wo die letzten Auslesung länger her ist als im Intervall eingestellt, all diese kommen in den messauftrag welcher anschließend ausgeführt wird. Wie es dann dadurch zustande kommen kann, dass es mehrere Aufträge mit dem selben timestamp geben würde ist mir nur schleierhaft (alles wird immer auf ganze Sekunden gekürzt.


    Ebenfalls möchte ich erwähnen, dass der Vorschlag mit peewee nicht unbeachtet bleibt, hier hab ich mich schon an die Arbeit gemacht und mittels dem pwiz-a-model-generator die aktuell vorliegenden DB Struktur erstellen lassen.

    Da dies Modul aber noch völlig neu ist, muss ich mich da natürlich noch entsprechend einarbeiten um dies in das Projekt integrieren zu können.

    Zum Datenbankdesign, also es gibt halt 69 Unterschiedliche Messwerte vom Stromzähler. Für mich erschien es so, dass es übersichtlicher ist für jeden Typ eine eigene Spalte zu kreieren, als nur mit dem oben genannten 3 Spalten Typ zu arbeiten.

    influx wurde schon früher verwendet bei einem anderen Projekt, aber mittlerweile wurde alles umgestellt auf PostgreSQL mit timescale Erweiterung. Das ist auch für zeitreihen Messwerte gemacht.

    Aber ja man merkt schon das ich mit DB Design ziemlich unerfahren bin und mehr als diese 2 genannten Methoden würden mir aktuell auch gar nicht einfallen, wie ich so eine DB gestalten könnte.

  • Hi,

    ... wie bei dieser "Codekonstallation" überhaupt duplicate Keys entstehen können.

    das kann ich Dir jetzt so leider nicht beantworten. Das hängt wohl auch davon ab, wo der Zeitstempel generiert wird - also in/von der DB selbst als Feld oder aus Deinem Programm.

    Primary Key ist der timestamp.

    Wenn Du das "wasserdicht" machen willst, solltest Du eine Nummerngeber-Tabelle verwenden (ein serial-Feld in einer Tabelle, die ausschliesslich dazu dient eindeutige Nummern zu generieren).

    Wie das bei PostGre funktioniert, weiss ich nicht ... aber entweder kannst Du das einfach als serial definieren oder Du nimmst ein entsprechend grosses, numerisches Feld, das Du jeweils um eins erhöhst).

    cu,

    -ds-

  • Das hängt wohl auch davon ab, wo der Zeitstempel generiert wird - also in/von der DB selbst als Feld oder aus Deinem Programm.

    Das erledige ich selbst in Zeile 237 (Zeitstempel generieren), und in Zeile 253 wird er den Datensatz zugeordnet.

    Wenn Du das "wasserdicht" machen willst, solltest Du eine Nummerngeber-Tabelle verwenden (ein serial-Feld in einer Tabelle, die ausschliesslich dazu dient eindeutige Nummern zu generieren).

    Auch ein guter Vorschlag, der Tabelle einfach noch ein ID Feld spendieren und dies als Primary Key definieren und von PG automatisch hochzählen lassen.

    Wie das geht weiß dafür ich wieder ;)

  • Hallo,

    der Code ist ein bisschen schwer zu verstehen, von daher ist es schwierig zu sagen, wo und wann der doppelte Primärschlüssel herkommt. Vom Design her würde ich aber der in die DB schreibenden Funktionen _eine_ fertigen Datensatz schicken, der weg geschrieben wird. D.h. aus der Queue kommt _ein_ fertiger Datensatz, z.B. als Dict oder Named Tuple. Wenn du in irgendeiner Form was mit den Daten machen willst / musst, dann bau' dir eine mehrstufige Queue. Eine Funktion soll _eine_ Funktionalität haben, deine Funktion datenbank_prozess macht mehrere Sachen (andere Funktionen auch). Wenn's nur eine Funktion ist macht das auch das Debugging deutlich einfacher und den Code besser wartbar.

    Wenn es zu jedem Timestamp nur einen Datensatz geben kann / darf, dann solltest du den Timestamp auch als PK stehen lassen. Wenn es mehr als einen Datensatz pro Sekunden geben kann, dann muss der Timestamp halt die entsprechende Auflösung haben.
    Wenn es dir "nur" darum geht, wann der Datensatz geschrieben wurde (und nicht wann gemessen), würde ich die Spalte mit dem Timestamp automatisch bei einen `INSERT` von der DB ausfüllen lassen.

    Mit dem Ansatz und einem ORM kannst du auch dein 69 Spalten SQL Injection Problem lösen. Nämlich indem du der zu schreibenden Werte in ein Dict packst und das ORM dieses in die DB schreiben lässt. Beispiel mit SQLAlchemy (unter Einsatz von SQLite, wird aber mit PostgreSQL genau so funktionieren):

    Du hättest dann halt in der Klassendefinition 69 Attribute zu definieren...

    Zum Code: die Zeile 142/143 sind sinnlos, weil da nie was passieren wird.

    Gruß, noisefloor

  • Vielen Dank für die Mühe meinen Code durchzusehen, bin immer über konstruktive Kritik dankbar, nur so kann ein Hobbyprogrammierer etwas dazu lernen :)

    Zum Code: die Zeile 142/143 sind sinnlos, weil da nie was passieren wird.

    Du meinst diese Codebereich (also ist bezogen auf meinen letzten Anhang oder auf den ersten):

    Python: smartmeter.py
        for key in messregister:
            if messauftrag is not None and key not in messauftrag:
                continue
            messregister[key]["messzeitpunkt"] = zeitpunkt

    Der Teil ist jedoch schon notwendig, denn es wird ja nicht von jedem Wert in jedem Durchlauf die Messwerte erfasst, sondern in Abhängigkeit der hinterlegten Sekunden in meiner Konfigdatei.

    Kleiner Auszug:

    Code
    [[messintervall]]
    Spannung_L1= 10
    Spannung_L2= 10
    Spannung_L3= 10
    Strom_L1= 5
    Strom_L2= 5
    Strom_L3= 5

    Gibt dann auch noch Messwerte mit 300 Sekunden, jedenfalls wollte ich das ganze einfach ziemlich flexibel halten. Sodass ich später je nach Bedarf das Messintervall anpassen kann.

    Vielen Dank für dein Codebeispiel mit SQLAlchemy. Für meine Bedürfnisse, würdest du mir hierbei eher zu PeeWee raten oder zu SQLAlchemy?

    Wenn es dir "nur" darum geht, wann der Datensatz geschrieben wurde (und nicht wann gemessen),

    Hierbei geht es mir um den Messzeitpunkt.

    Wenn es zu jedem Timestamp nur einen Datensatz geben kann / darf, dann solltest du den Timestamp auch als PK stehen lassen.

    Also mein aktuell geringstes Messintervall sind 2 Sekunden (welcher ich aber nicht ganz erreiche, daher die Idee - bzw. der Versuch mit Multiprocessing)

    Auf diesen verkürzten Messintervall wird geschaltet wenn ich über Telegram ein Botkommando schicke, dass der Messintervall von den Standardwerten zu verkürzen ist.

    Schade das dir der Code kompleziert erscheint, liegt es mehr an zu wenig Kommentarzeilen im Code, oder an einer zu komplizierten herangehensweise meinerseits?

    Das man pro Funktion eine Aufgabe nur erledigen sollte ist mir bekannt, jedoch verstoß ich (leider) oft dagegen. Werd mir mühe geben das noch besser umzusetzen.

    Aktuell habe ich jetzt 2 Branches vorliegen, einen mit multiprocessing und einen ohne.

    Hierbei habe ich auch deinen oben genannte Anmerkung einarbeiten:

    Bringt's das Auslagern in den Prozess wirklich? Der Bottleneck ist IMHO in der funktionierenden Variante, dass jedes Mal der SSH-Tunnel aufgebaut wird und _nicht_ das Schreiben in die DB.

    Und somit halte ich jetzt Dauerhaft einen SSH Tunnel. Siehe das das ganze wurde schon um einiges schneller. Aktuell hat es den Anschein, dass ich den Anlauf mit multiproccesing umsonst unternommen habe. Nun ja nicht ganz umsonst, schade ja nie wenn man sich in multithreading, multiprocessing und asyncrone Verarbeitung etwas einließt.

    Hier hätte ich jetzt noch gern eine Empfehlung, Einschätzung eurerseits.

    Nach wie vielen Sekunden/Minuten macht es Sinn, den SSH Tunnel und die Datenbankverbindung zu schließen, in welchen Zeitraum ist es besser die Verbindungen dauerhaft offen zu halten.

  • Hallo,

    Zitat

    Der Teil ist jedoch schon notwendig,

    Stimmt - Denkfehler meinerseits. Besser weil weniger kompliziert wäre aber:

    Python
        for key in messregister:
            if messauftrag and key in messauftrag:
                messregister[key]["messzeitpunkt"] = zeitpunkt

    `continue` braucht man eigentlich nie, weil man das auch in 99% der Fälle anders lösen kann.

    Zitat

    Also mein aktuell geringstes Messintervall sind 2 Sekunden

    Dann kannst du - bei richtiger Programmierung - keinen doppelten PK bekommen, selbst wenn der Timestamp nur eine Genauigkeit von einer Sekunde hat.

    Zitat

    Gibt dann auch noch Messwerte mit 300 Sekunden, jedenfalls wollte ich das ganze einfach ziemlich flexibel halten.

    Am flexibelsten bist du IMHO, wenn alles mit dem geringsten Intervall von Interesse speicherst, weil dann hast du garantiert immer alle Daten. Daten für die Auswertung verwerfen kannst du immer noch.

    BTW: deine Daten habe keine Relation und sind "write once, read many (times)" - damit brauchst du eigentlich keine RDBMS, weil du davon keinerlei Vorteile hast. Zumindest nicht, was die Datenspeicherung angeht.

    Zitat

    Schade das dir der Code kompleziert erscheint, liegt es mehr an zu wenig Kommentarzeilen im Code, oder an einer zu komplizierten herangehensweise meinerseits

    In erster Linie daran, dass wir die Konfigdateien nicht kennen und damit nicht nachvollziehbar ist, wie `messregister`aussieht. Und wenn man dessen Datenstruktur nicht kennt, ist der Code halt auch schwiergig nachzuvollziehen.

    Zitat

    Nach wie vielen Sekunden/Minuten macht es Sinn, den SSH Tunnel und die Datenbankverbindung zu schließen, in welchen Zeitraum ist es besser die Verbindungen dauerhaft offen zu halten.

    Kommt drauf an, wwie viel / oft du in die DB schreibst. Wenn du SQLAlchemy nutzt, kümmert sich das so wie so um's Connection Pooling.

    Zitat


    Für meine Bedürfnisse, würdest du mir hierbei eher zu PeeWee raten oder zu SQLAlchemy?

    Keine Ahnung - ich nutze das Django ORM, weil ich nur bei Django die Notwendigkeit für ein ORM habe :) SQLAlchemy ist hat _das_ ORM unter Python und kann alles - und ist entsprechend umfangreich.

    Gruß, noisefloor

  • continue` braucht man eigentlich

    ok continue fliegt raus

    Kommt drauf an, wwie viel / oft du in die DB schreibst

    je nachdem welche Einstellung die Konfig enthält, in der Regel im normal Betrieb, senden an die DB alle 30 Sekunden, im Schnellmessbetrieb alle 2 Sekunden (gewünscht) smartmeter_cfg.txt Dateiendung statt .txt eigentlich .toml

    Django...auch mal was ich mir genauer anschauen wollte

    Vielen Dank nochmals für die Hilfe, die meisten Unklarheiten sind beseitigt :)

    Zu neuen Fragen werd ich dann aber ein entsprechenden Thread eröffnen (CProfile Fragen mit Graphen stehen schon in den Startlöchern ;))

    Gerade gesehen, dachte eigentlich ich hätte im Eingangspost schon darauf verlinkt, das Projekt zu dem es die Probleme gab: Stromzähler mit Raspberry Pi und ModBus auslesen

Jetzt mitmachen!

Du hast noch kein Benutzerkonto auf unserer Seite? Registriere dich kostenlos und nimm an unserer Community teil!