From fd41dbbd4ca68364116b6cb441fbd6487fec4fbd Mon Sep 17 00:00:00 2001 From: Fake-Name Date: Sat, 25 Nov 2017 00:01:31 -0800 Subject: [PATCH] Add image metadata to the file table, and fetch more then one db row per query, because the get_job() function query was somehow completely slamming my database. --- alembic/versions/bada78e9a9a8_.py | 60 +++++++++++++++++++++++++++++++ scraper/database.py | 10 +++++- scraper/fetchBase.py | 59 ++++++++++++++++-------------- 3 files changed, 101 insertions(+), 28 deletions(-) create mode 100644 alembic/versions/bada78e9a9a8_.py diff --git a/alembic/versions/bada78e9a9a8_.py b/alembic/versions/bada78e9a9a8_.py new file mode 100644 index 0000000..7033c09 --- /dev/null +++ b/alembic/versions/bada78e9a9a8_.py @@ -0,0 +1,60 @@ +"""empty message + +Revision ID: bada78e9a9a8 +Revises: e19fd729888d +Create Date: 2017-11-25 03:33:37.355463 + +""" + +# revision identifiers, used by Alembic. +revision = 'bada78e9a9a8' +down_revision = 'e19fd729888d' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +# Patch in knowledge of the citext type, so it reflects properly. +from sqlalchemy.dialects.postgresql.base import ischema_names +import citext +import queue +import datetime +from sqlalchemy.dialects.postgresql import ENUM +from sqlalchemy.dialects.postgresql import JSON +from sqlalchemy.dialects.postgresql import TSVECTOR +ischema_names['citext'] = citext.CIText + + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + print("Altering column") + op.alter_column('db_files', 'id', + existing_type=sa.INTEGER(), + type_=sa.BigInteger(), + autoincrement=True, + existing_server_default=sa.text("nextval('db_files_id_seq'::regclass)")) + print("Adding new columns") + op.add_column('db_files', sa.Column('imgx', sa.Integer(), nullable=True)) + op.add_column('db_files', sa.Column('imgy', sa.Integer(), nullable=True)) + op.add_column('db_files', sa.Column('phash', sa.BigInteger(), nullable=True)) + print("Creating phash index") + op.create_index('phash_bktree_idx', 'db_files', [sa.text('phash bktree_ops')], unique=False, postgresql_using='spgist') + print("Done") + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('phash_bktree_idx', table_name='db_files') + op.alter_column('db_files', 'id', + existing_type=sa.BigInteger(), + type_=sa.INTEGER(), + autoincrement=True, + existing_server_default=sa.text("nextval('db_files_id_seq'::regclass)")) + op.drop_column('db_files', 'phash') + op.drop_column('db_files', 'imgy') + op.drop_column('db_files', 'imgx') + # ### end Alembic commands ### diff --git a/scraper/database.py b/scraper/database.py index b10c709..d27551d 100644 --- a/scraper/database.py +++ b/scraper/database.py @@ -5,11 +5,13 @@ from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session from sqlalchemy import Table +from sqlalchemy import Index from sqlalchemy import Column from sqlalchemy import Integer from sqlalchemy import BigInteger from sqlalchemy import Text +from sqlalchemy import text from sqlalchemy import Float from sqlalchemy import Boolean from sqlalchemy import DateTime @@ -114,14 +116,19 @@ class Artist(Base): class Files(Base): __tablename__ = 'db_files' - id = Column(Integer, primary_key=True) + id = Column(BigInteger, primary_key=True) filepath = Column(citext.CIText(), nullable=False) fhash = Column(Text, nullable=False) + phash = Column(BigInteger) + imgx = Column(Integer) + imgy = Column(Integer) + __table_args__ = ( UniqueConstraint('filepath'), UniqueConstraint('fhash'), + Index('phash_bktree_idx', 'phash', postgresql_using="spgist") ) def tag_creator(tag): @@ -204,6 +211,7 @@ class Releases(Base): __table_args__ = ( UniqueConstraint('postid', 'source'), + Index('db_releases_source_state_id_idx', 'source', 'state', 'id') ) diff --git a/scraper/fetchBase.py b/scraper/fetchBase.py index 5f122ca..77fbc68 100644 --- a/scraper/fetchBase.py +++ b/scraper/fetchBase.py @@ -43,6 +43,7 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta): self.log = logging.getLogger(self.loggerpath) self.wg = util.WebRequest.WebGetRobust(logPath=self.loggerpath+".Web") + self.jobs_queued = [] def get_job(self): @@ -52,35 +53,39 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta): while 1: self.log.info("Getting job") try: + if not self.jobs_queued: + raw_query = ''' + UPDATE + db_releases + SET + state = 'fetching' + WHERE + db_releases.id in ( + SELECT + db_releases.id + FROM + db_releases + WHERE + db_releases.state = 'new' + AND + source = :source + ORDER BY + db_releases.postid ASC + LIMIT 500 + ) + AND + db_releases.state = 'new' + RETURNING + db_releases.id; + ''' - raw_query = ''' - UPDATE - db_releases - SET - state = 'fetching' - WHERE - db_releases.id in ( - SELECT - db_releases.id - FROM - db_releases - WHERE - db_releases.state = 'new' - AND - source = :source - ORDER BY - db_releases.postid ASC - LIMIT 1 - ) - AND - db_releases.state = 'new' - RETURNING - db_releases.id; - ''' + rids = session.execute(text(raw_query), {'source' : self.pluginkey}) + ridl = list(rids) + self.jobs_queued = [tmp[0] for tmp in ridl] - rids = session.execute(text(raw_query), {'source' : self.pluginkey}) - ridl = list(rids) - rid = ridl[0][0] + assert self.jobs_queued + + rid = self.jobs_queued.pop() job = db.session.query(db.Releases) \