source

SqlAlchemy로 업스타트하는 방법은 무엇입니까?

factcode 2023. 8. 17. 21:55
반응형

SqlAlchemy로 업스타트하는 방법은 무엇입니까?

데이터베이스가 없는 경우 데이터베이스에 존재하고 싶은 레코드가 있으며 이미 존재하는 경우(기본 키가 있음)필드가 현재 상태로 업데이트되기를 원합니다.이것은 종종 업버트라고 불립니다.

다음의 불완전한 코드 스니펫은 무엇이 작동하는지 보여주지만 (특히 더 많은 열이 있는 경우) 지나치게 투박해 보입니다.가장 좋은 방법은 무엇입니까?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

이것을 하는 더 나은 방법이 있습니까 아니면 덜 장황한 방법이 있습니까?이와 같은 것이 좋을 것입니다.

sess.upsert_this(desired_default, unique_key = "name")

록비록▁the▁although.unique_keykwarg는 분명히 불필요합니다(ORM이 이를 쉽게 파악할 수 있어야 합니다). SQL 화학이 기본 키로만 작동하는 경향이 있기 때문에 추가했습니다.예: Session.merge를 적용할 수 있는지 살펴보았지만, 이는 기본 키(이 경우 자동 증분 ID)에서만 작동합니다. 이 경우에는 이 목적에 매우 유용하지 않습니다.

이에 대한 예제 사용 사례는 기본 예상 데이터를 업그레이드한 서버 응용프로그램을 시작하는 경우입니다.즉, 이 문제에 대한 동시성 문제는 없습니다.

는 SQL을 지원합니다.ON CONFLICT on_conflict_do_update()그리고.on_conflict_do_nothing().

설명서에서 복사:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)

SQLAlchemy에는 "저장 또는 업데이트" 동작이 있으며, 최근 버전에서는 이 동작이 다음과 같이 기본 제공됩니다.session.add의 하만이별는니다습개였전에지▁the▁but다▁separate니습별▁previous개였.session.saveorupdate전화입니다. 이것은 "업버스트"는 아니지만 당신의 필요에 충분히 적합할 수 있습니다.

여러 개의 고유 키가 있는 클래스에 대해 질문하는 것은 좋은 일입니다. 이것이 바로 올바른 방법이 없는 이유라고 생각합니다.기본 키도 고유 키입니다.고유한 제약 조건이 없고 기본 키만 있으면 충분히 간단한 문제가 됩니다. 지정된 ID가 없거나 ID가 없음인 경우 새 레코드를 만듭니다. 그렇지 않으면 기존 레코드의 다른 모든 필드를 해당 기본 키로 업데이트합니다.

그러나 고유한 제약 조건이 추가적으로 존재할 경우, 이러한 단순한 접근 방식에는 논리적인 문제가 있습니다.개체를 "업데이트"하고 개체의 기본 키가 기존 레코드와 일치하지만 다른 고유한 열이 다른 레코드와 일치하는 경우 어떻게 해야 합니까?마찬가지로 기본 키가 기존 레코드와 일치하지 않지만 다른 고유한 열이 기존 레코드와 일치하는 경우에는 어떻게 됩니까?당신의 특정 상황에 맞는 정답이 있을 수 있지만, 일반적으로 저는 단 하나의 정답이 없다고 주장합니다.

이것이 내장된 "upert" 작업이 없는 이유입니다.응용프로그램은 각 특정 사례에서 이것이 의미하는 바를 정의해야 합니다.

요즘 SQL 화학은 두 가지 유용한 기능과 를 제공합니다.이러한 기능은 유용하지만 ORM 인터페이스에서 하위 수준인 SQL Calchemy Core로 전환해야 합니다.

이 두 기능은 SQL 화학의 구문을 사용하여 업서팅하는 것이 그리 어렵지 않지만, 이러한 기능은 업서팅에 대한 완전한 즉시 사용 가능한 솔루션을 제공하는 것과는 거리가 있습니다.

일반적인 사용 사례는 단일 SQL 쿼리/세션 실행에서 행의 큰 덩어리를 뒤집는 것입니다.일반적으로 두 가지 문제가 발생합니다.

예를 들어, 우리가 익숙해진 더 높은 수준의 ORM 기능이 누락되었습니다.당신은 할 수 ORM 객체를 제공해야 .ForeignKey삽입 시 s.

저는 이 두 가지 문제를 모두 처리하기 위해 작성한 다음 기능을 사용하고 있습니다.

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)

저는 "뛰기 전에 보기" 접근법을 사용합니다.

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

장점은 이것이 db-neutral이고 읽기에 분명하다고 생각합니다.단점은 다음과 같은 시나리오에서 잠재적인 경주 조건이 있다는 것입니다.

  • 에 우는db에문다니합을 합니다.switch_command그리고 하나도 찾지 못합니다.
  • 우리는 다음을 만듭니다.switch_command
  • 는 다른프스또스생성가다니합드레는을 합니다.switch_command의 기본 키와 동일한
  • 우리는 우리의 일을 하려고 노력합니다.switch_command

여러 개의 답이 있고 또 다른 답(YAA)이 나옵니다.메타프로그래밍과 관련된 다른 답변은 읽을 수 없습니다.예를 들어 다음과 같습니다.

  • SQL 화학 ORM 사용

  • 0 . 0 행을 사용합니다.on_conflict_do_nothing

  • 다음을 사용하여 새 행을 만들지 않고 기존 행(있는 경우)을 업데이트하는 방법을 보여줍니다.on_conflict_do_update

  • 를 테블기키사용다로 합니다.constraint

원래 질문의 더 긴 예는 이 코드가 관련된 것입니다.


import sqlalchemy as sa
import sqlalchemy.orm as orm
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

class PairState(Base):

    __tablename__ = "pair_state"

    # This table has 1-to-1 relationship with Pair
    pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
    pair = orm.relationship(Pair,
                        backref=orm.backref("pair_state",
                                        lazy="dynamic",
                                        cascade="all, delete-orphan",
                                        single_parent=True, ), )


    # First raw event in data stream
    first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # Last raw event in data stream
    last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # The last hypertable entry added
    last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    @staticmethod
    def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Sets the first event value if not exist yet."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, first_event_at=ts).
            on_conflict_do_nothing()
        )

    @staticmethod
    def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_event_at for a named pair."""
        # Based on the original example of https://stackoverflow.com/a/49917004/315168
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_event_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
        )

    @staticmethod
    def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_interval_at for a named pair."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_interval_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
        )

아래의 내용은 적색편이 데이터베이스에서 잘 작동하며 결합된 기본 키 제약 조건에서도 작동할 것입니다.

출처: 항목

함수 def start_engine()에서 SQLChemy 엔진을 생성하는 데 필요한 몇 가지 수정 사항

from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql

Base = declarative_base()

def start_engine():
    engine = create_engine(os.getenv('SQLALCHEMY_URI', 
    'postgresql://localhost:5432/upsert'))
     connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    return engine


class DigitalSpend(Base):
    __tablename__ = 'digital_spend'
    report_date = Column(Date, nullable=False)
    day = Column(Date, nullable=False, primary_key=True)
    impressions = Column(Integer)
    conversions = Column(Integer)

    def __repr__(self):
        return str([getattr(self, c.name, None) for c in self.__table__.c])


def compile_query(query):
    compiler = query.compile if not hasattr(query, 'statement') else 
  query.statement.compile
    return compiler(dialect=postgresql.dialect())


def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
    table = model.__table__

    stmt = insert(table).values(rows)

    update_cols = [c.name for c in table.c
                   if c not in list(table.primary_key.columns)
                   and c.name not in no_update_cols]

    on_conflict_stmt = stmt.on_conflict_do_update(
        index_elements=table.primary_key.columns,
        set_={k: getattr(stmt.excluded, k) for k in update_cols},
        index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
        )

    print(compile_query(on_conflict_stmt))
    session.execute(on_conflict_stmt)


session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])

이를 통해 문자열 이름을 기반으로 기본 모델에 액세스할 수 있습니다.

def get_class_by_tablename(tablename):
  """Return class reference mapped to table.
  https://stackoverflow.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
  :param tablename: String with name of table.
  :return: Class reference or None.
  """
  for c in Base._decl_class_registry.values():
    if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
      return c


sqla_tbl = get_class_by_tablename(table_name)

def handle_upsert(record_dict, table):
    """
    handles updates when there are primary key conflicts

    """
    try:
        self.active_session().add(table(**record_dict))
    except:
        # Here we'll assume the error is caused by an integrity error
        # We do this because the error classes are passed from the
        # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
        # them with it's own code - this should be updated to have
        # explicit error handling for each new db engine

        # <update>add explicit error handling for each db engine</update> 
        active_session.rollback()
        # Query for conflic class, use update method to change values based on dict
        c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
        c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk

        c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
        c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols

        c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()

        # apply new data values to the existing record
        for k, v in record_dict.items()
            setattr(c_target_record, k, v)

이것은 sqlite3 및 postgres에 적용됩니다.기본 키 제약 조건이 결합되어 있으면 실패할 수 있지만, 추가적인 고유 제약 조건으로 인해 실패할 가능성이 높습니다.

    try:
        t = self._meta.tables[data['table']]
    except KeyError:
        self._log.error('table "%s" unknown', data['table'])
        return

    try:
        q = insert(t, values=data['values'])
        self._log.debug(q)
        self._db.execute(q)
    except IntegrityError:
        self._log.warning('integrity error')
        where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
        update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
        q = update(t, values=update_dict).where(*where_clause)
        self._log.debug(q)
        self._db.execute(q)
    except Exception as e:
        self._log.error('%s: %s', t.name, e)

생성된 default-ids 및 참조에 문제가 있어 외부 키 위반-다음과 같은 오류가 발생했습니다.

update or delete on table "..." violates foreign key constraint
Key (id)=(...) is still referenced from table "...".

업데이트 딕트에 대한 ID를 제외해야 했습니다. 그렇지 않으면 항상 새 기본값으로 생성됩니다.

또한 메소드는 생성/업데이트된 엔터티를 반환합니다.

from sqlalchemy.dialects.postgresql import insert # Important to use the postgresql insert


def upsert(session, data, key_columns, model):

    stmt = insert(model).values(data)
    
    # Important to exclude the ID for update!
    exclude_for_update = [model.id.name, *key_columns]
    update_dict = {c.name: c for c in stmt.excluded if c.name not in exclude_for_update}

    stmt = stmt.on_conflict_do_update(
        index_elements=key_columns,
        set_=update_dict
    ).returning(model)

    orm_stmt = (
        select(model)
        .from_statement(stmt)
        .execution_options(populate_existing=True)
    )

    return session.execute(orm_stmt).scalar()

예:


class UpsertUser(Base):
    __tablename__ = 'upsert_user'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    name: str = Column(sa.String, nullable=False)
    user_sid: str = Column(sa.String, nullable=False, unique=True)
    house_admin = relationship('UpsertHouse', back_populates='admin', uselist=False)


class UpsertHouse(Base):
    __tablename__ = 'upsert_house'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    admin_id: Id = Column(Id, ForeignKey('upsert_user.id'), nullable=False)
    admin: UpsertUser = relationship('UpsertUser', back_populates='house_admin', uselist=False)

# Usage

upserted_user = upsert(session, updated_user, [UpsertUser.user_sid.name], UpsertUser)

참고: postgresql에서만 테스트되지만 중복 키 업데이트 등을 지원하는 다른 DB에서도 작동할 수 있습니다.MySQL

sqlite의 경우,sqlite_on_conflict='REPLACE'옵션은 다음을 정의할 때 사용할 수 있습니다.UniqueConstraint,그리고.sqlite_on_conflict_unique단일 열에 대한 고유 제약 조건입니다.그리고나서session.add다음과 같은 방식으로 작동할 것입니다.upsert공식 설명서를 참조하십시오.

이 코드를 사용하려면 먼저 데이터베이스의 테이블에 기본 키를 추가해야 합니다.

from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.inspection import inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.dialects.postgresql import insert

def upsert(df, engine, table_name, schema=None, chunk_size = 1000):

    metadata = MetaData(schema=schema)
    metadata.bind = engine

    table = Table(table_name, metadata, schema=schema, autoload=True)
    
   # olny use common columns between df and table.
    table_columns = {column.name for column in table.columns}
    df_columns = set(df.columns)
    intersection_columns = table_columns.intersection(df_columns)
    
    df1 = df[intersection_columns] 
    records  = df1.to_dict('records')

    # get list of fields making up primary key
    primary_keys = [key.name for key in inspect(table).primary_key]
    

    with engine.connect() as conn:
        chunks = [records[i:i + chunk_size] for i in range(0, len(records), chunk_size)]
        for chunk in chunks:
            stmt = insert(table).values(chunk)
            update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
            s = stmt.on_conflict_do_update(
                index_elements= primary_keys,
                set_=update_dict)
            conn.execute(s)

언급URL : https://stackoverflow.com/questions/7165998/how-to-do-an-upsert-with-sqlalchemy

반응형