I think this should bring everything up to functional.

This commit is contained in:
Fake-Name 2017-11-22 23:08:27 -08:00
parent eea2949abf
commit 7177817531
14 changed files with 495 additions and 151 deletions

69
alembic.ini Normal file
View File

@ -0,0 +1,69 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# max length of characters to apply to the
# "slug" field
#truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
output_encoding = utf-8
# Overridden in env.py
sqlalchemy.url =
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

82
alembic/env.py Normal file
View File

@ -0,0 +1,82 @@
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
# Path hackery
import os.path
import sys
sys.path.append(os.path.abspath(os.getcwd()))
# this will overwrite the ini-file sqlalchemy.url path
# with the path given in the config of the main code
import scraper.database
context.config.set_main_option('sqlalchemy.url', scraper.database.SQLALCHEMY_DATABASE_URI)
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
base = scraper.database.Base
target_metadata = base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def include_object(object, name, type_, reflected, compare_to):
# Ignore a few special things (partial intexes on distance, trigram index, and the
# autogenerated table from apscheduler).
ignored = [
# 'ix_web_pages_distance_filtered',
# 'ix_web_pages_distance_filtered_nowp',
# 'ix_web_pages_distance_filtered_wp',
# 'ix_web_pages_url_ops',
# 'apscheduler',
]
if any([tmp in object.name for tmp in ignored]):
print((object.name, object, name, type, reflected, compare_to))
return False
return True
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
with connectable.connect() as connection:
context.configure(
connection = connection,
target_metadata = target_metadata,
include_object = include_object,
compare_type=True
)
with context.begin_transaction():
context.run_migrations()
run_migrations_online()

36
alembic/script.py.mako Normal file
View File

@ -0,0 +1,36 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
from alembic import op
import sqlalchemy as sa
# Patch in knowledge of the citext type, so it reflects properly.
from sqlalchemy.dialects.postgresql.base import ischema_names
import citext
import queue
import datetime
from sqlalchemy.dialects.postgresql import ENUM
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.dialects.postgresql import TSVECTOR
ischema_names['citext'] = citext.CIText
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,42 @@
"""empty message
Revision ID: cbfec58e6ca8
Revises:
Create Date: 2017-11-23 05:24:08.420515
"""
# revision identifiers, used by Alembic.
revision = 'cbfec58e6ca8'
down_revision = None
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
# Patch in knowledge of the citext type, so it reflects properly.
from sqlalchemy.dialects.postgresql.base import ischema_names
import citext
import queue
import datetime
from sqlalchemy.dialects.postgresql import ENUM
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.dialects.postgresql import TSVECTOR
ischema_names['citext'] = citext.CIText
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('ix_db_releases_source_dlstate_postid', table_name='db_releases')
op.drop_column('db_releases', 'filepath')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('db_releases', sa.Column('filepath', sa.TEXT(), autoincrement=False, nullable=True))
op.create_index('ix_db_releases_source_dlstate_postid', 'db_releases', ['source', 'dlstate', 'postid'], unique=False)
# ### end Alembic commands ###

View File

@ -0,0 +1,158 @@
"""empty message
Revision ID: e19fd729888d
Revises: cbfec58e6ca8
Create Date: 2017-11-23 05:39:54.880014
"""
# revision identifiers, used by Alembic.
revision = 'e19fd729888d'
down_revision = 'cbfec58e6ca8'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
# Patch in knowledge of the citext type, so it reflects properly.
from sqlalchemy.dialects.postgresql.base import ischema_names
import citext
import queue
import datetime
from sqlalchemy.dialects.postgresql import ENUM
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.dialects.postgresql import TSVECTOR
ischema_names['citext'] = citext.CIText
from sqlalchemy.dialects import postgresql
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
print("Adding state column")
op.add_column('db_releases', sa.Column('state', postgresql.ENUM('new', 'fetching', 'processing', 'complete', 'error', 'removed', 'disabled', name='dlstate_enum'), nullable=True))
print("Adding text column")
op.add_column('db_releases', sa.Column('err_str', sa.Text()))
# -4
# -3
# -5
# -1
# -9
# 2
# 0
conn = op.get_bind()
print("Updating for row: -4")
conn.execute('''
UPDATE
db_releases
SET
state = 'error'
WHERE
dlstate = -4;
''')
print("Updating for row: -3")
conn.execute('''
UPDATE
db_releases
SET
state = 'error'
WHERE
dlstate = -3;
''')
print("Updating for row: -5")
conn.execute('''
UPDATE
db_releases
SET
state = 'error'
WHERE
dlstate = -5;
''')
print("Updating for row: -1")
conn.execute('''
UPDATE
db_releases
SET
state = 'error'
WHERE
dlstate = -1;
''')
print("Updating for row: -9")
conn.execute('''
UPDATE
db_releases
SET
state = 'error'
WHERE
dlstate = -9;
''')
print("Updating for row: 0")
conn.execute('''
UPDATE
db_releases
SET
state = 'new'
WHERE
dlstate = 0;
''')
print("Updating for row: 2")
conn.execute('''
UPDATE
db_releases
SET
state = 'complete'
WHERE
dlstate = 2;
''')
print("Checking there are no nulls")
res = conn.execute('''
SELECT
COUNT(*)
FROM
db_releases
WHERE
state IS NULL;
''')
nulls = res.fetchall()
assert nulls == [(0,)]
print("Should be zero:", nulls)
op.alter_column('db_releases', 'state', nullable=False)
op.create_index(op.f('ix_db_releases_state'), 'db_releases', ['state'], unique=False)
op.drop_index('ix_db_releases_dlstate', table_name='db_releases')
op.drop_column('db_releases', 'dlstate')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('db_releases', sa.Column('dlstate', sa.INTEGER(), autoincrement=False, nullable=False))
op.create_index('ix_db_releases_dlstate', 'db_releases', ['dlstate'], unique=False)
op.drop_index(op.f('ix_db_releases_state'), table_name='db_releases')
op.drop_column('db_releases', 'state')
# ### end Alembic commands ###

View File

@ -1,65 +0,0 @@
import database as db
import logSetup
logSetup.initLogging()
import danbooruFetch
import runstate
import settings
import concurrent.futures
import os
import os.path
import hashlib
THREADS = 1
THREADS = 25
def insertDanbooruStartingPoints():
tmp = db.session.query(db.Releases) \
.filter(db.Releases.postid == 1) \
.count()
if not tmp:
for x in range(2070000):
new = db.Releases(dlstate=0, postid=x, source='Danbooru')
db.session.add(new)
if x % 10000 == 0:
print("Loop ", x, "flushing...")
db.session.flush()
print("Flushed.")
db.session.commit()
def resetDlstate():
tmp = db.session.query(db.Releases) \
.filter(db.Releases.dlstate == 1) \
.update({db.Releases.dlstate : 0})
db.session.commit()
def go():
have = db.session.query(db.Releases) \
.filter(db.Releases.filepath != None) \
.all()
proc = 0
for row in have:
fpath = settings.storeDir+row.filepath
fpath = os.path.abspath(fpath)
with open(fpath, "rb") as fp:
cont = fp.read()
fhash = hashlib.md5(cont).hexdigest()
print(os.path.exists(fpath), fhash, fpath)
row.file.append((fpath, fhash))
proc += 1
if proc % 50 == 0:
db.session.commit()
if __name__ == '__main__':
go()

View File

@ -23,8 +23,12 @@ from sqlalchemy.ext.associationproxy import association_proxy
# Patch in knowledge of the citext type, so it reflects properly.
from sqlalchemy.dialects.postgresql.base import ischema_names
from sqlalchemy.dialects.postgresql import ENUM
import citext
ischema_names['citext'] = citext.CIText
dlstate_enum = ENUM('new', 'fetching', 'processing', 'complete', 'error', 'removed', 'disabled', name='dlstate_enum')
from settings import DATABASE_IP as C_DATABASE_IP
from settings import DATABASE_DB_NAME as C_DATABASE_DB_NAME
@ -166,7 +170,8 @@ def file_creator(filetups):
class Releases(Base):
__tablename__ = 'db_releases'
id = Column(Integer, primary_key=True)
dlstate = Column(Integer, nullable=False, index=True)
state = Column(dlstate_enum, nullable=False, index=True, default='new')
err_str = Column(Text)
postid = Column(Integer, nullable=False, index=True)
source = Column(citext.CIText, nullable=False, index=True)

View File

@ -48,14 +48,14 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
try:
job = db.session.query(db.Releases) \
.filter(db.Releases.source == self.pluginkey) \
.filter(db.Releases.dlstate == 0) \
.filter(db.Releases.state == 'new') \
.order_by(db.Releases.postid) \
.limit(1)
job = job.scalar()
if job is None:
return None
job.dlstate = 1
job.state = 'fetching'
db.session.commit()
return job
except sqlalchemy.exc.DatabaseError:
@ -125,10 +125,10 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
def resetDlstate(self):
sess = db.session()
tmp = sess.query(db.Releases) \
.filter(db.Releases.dlstate == 1) \
.filter(db.Releases.source == self.pluginkey) \
.update({db.Releases.dlstate : 0}) \
tmp = sess.query(db.Releases) \
.filter(db.Releases.state == 'fetching' or db.Releases.state == 'processing') \
.filter(db.Releases.source == self.pluginkey) \
.update({db.Releases.state : 'new'})
sess.commit()
@ -140,7 +140,7 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
for x in range(self.content_count_max, 0, UPSERT_STEP * -1):
self.log.info("[%s] - Building insert data structure %s -> %s", self.pluginkey, x, x+UPSERT_STEP)
dat = [{"dlstate" : 0, "postid" : x, "source" : self.pluginkey} for x in range(x, x+UPSERT_STEP)]
dat = [{"dlstate" : 'new', "postid" : x, "source" : self.pluginkey} for x in range(x, x+UPSERT_STEP)]
self.log.info("[%s] - Building insert query", self.pluginkey)
q = insert(db.Releases).values(dat)
q = q.on_conflict_do_nothing()

View File

@ -159,7 +159,7 @@ class KonaChanFetcher(scraper.fetchBase.AbstractFetcher):
fpath = self.saveFileRow(job, fname, cont)
self.log.info("Saved file to path: '%s'", fpath)
job.dlstate = 2
job.state = 'complete'
db.session.commit()
# print(fname)
@ -168,21 +168,25 @@ class KonaChanFetcher(scraper.fetchBase.AbstractFetcher):
try:
soup = self.wg.getSoup(pageurl)
except urllib.error.URLError:
job.dlstate=-1
job.state = 'error'
job.err_str = 'failure fetching container page'
db.session.commit()
return
text = soup.get_text()
if 'You need a gold account to see this image.' in text:
job.dlstate=-3
job.state = 'removed'
job.err_str = 'requires account'
db.session.commit()
return
if 'This post was deleted for the following reasons' in text:
job.dlstate=-4
job.state = 'removed'
job.err_str = 'post deleted'
db.session.commit()
return
if 'Save this flash' in text:
job.dlstate=-9
job.state = 'disabled'
job.err_str = 'content is flash .swf'
db.session.commit()
return
err = 0
@ -193,12 +197,14 @@ class KonaChanFetcher(scraper.fetchBase.AbstractFetcher):
self.fetchImage(job, imgurl, pageurl)
else:
self.log.info("No image found for URL: '%s'", pageurl)
job.dlstate=-5
job.state = 'error'
job.err_str = 'failed to find image!'
break
except AssertionError:
self.log.info("Assertion error?: '%s'", pageurl)
traceback.print_exc()
job.dlstate=-50
job.state = 'error'
job.err_str = 'failure fetching actual image'
db.session.rollback()
break

View File

@ -94,7 +94,8 @@ class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
job.status = val
# Do not try to fetch things that are banned (e.g. removed)
if val == 'Banned':
job.dlstate=-2
job.state = 'removed'
job.err_str = 'item banned'
elif name in ['Approver', 'ID', 'Source', 'Uploader']:
pass
else:
@ -125,7 +126,7 @@ class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
job.filename = fname
job.filepath = fpath
job.dlstate = 2
job.state = 'complete'
db.session.commit()
# print(fname)
@ -134,21 +135,25 @@ class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
try:
soup = self.wg.getSoup(pageurl)
except urllib.error.URLError:
job.dlstate=-1
job.state = 'error'
job.err_str = 'failure fetching container page'
db.session.commit()
return
text = soup.get_text()
if 'You need a gold account to see this image.' in text:
job.dlstate=-3
job.state = 'removed'
job.err_str = 'requires account'
db.session.commit()
return
if 'This post was deleted for the following reasons' in text:
job.dlstate=-4
job.state = 'removed'
job.err_str = 'post deleted'
db.session.commit()
return
if 'Save this flash' in text:
job.dlstate=-9
job.state = 'disabled'
job.err_str = 'content is flash .swf'
db.session.commit()
return
err = 0
@ -159,11 +164,17 @@ class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
self.fetchImage(job, imgurl, pageurl)
else:
self.log.info("No image found for URL: '%s'", pageurl)
job.dlstate=-5
job.state = 'error'
job.err_str = 'failed to find image!'
break
except sqlalchemy.exc.IntegrityError:
err += 1
db.session.rollback()
except urllib.error.URLError:
job.state = 'error'
job.err_str = 'failure fetching actual image'
db.session.commit()

View File

@ -141,12 +141,12 @@ class E621Fetcher(scraper.fetchBase.AbstractFetcher):
cont = self.wg.getpage(url, addlHeaders={'Referer':srcurl})
fpath = self.saveFile(job, fname, cont)
fpath = self.saveFileRow(job, fname, cont)
self.log.info("Saved file to path: '%s'", fpath)
job.filename = fname
job.filepath = fpath
job.dlstate = 2
job.state = 'complete'
db.session.commit()
# print(fname)
@ -155,21 +155,25 @@ class E621Fetcher(scraper.fetchBase.AbstractFetcher):
try:
soup = self.wg.getSoup(pageurl)
except urllib.error.URLError:
job.dlstate=-1
job.state = 'error'
job.err_str = 'failure fetching container page'
db.session.commit()
return
text = soup.get_text()
if 'You need a gold account to see this image.' in text:
job.dlstate=-3
job.state = 'removed'
job.err_str = 'requires account'
db.session.commit()
return
if 'This post was deleted for the following reasons' in text:
job.dlstate=-4
job.state = 'removed'
job.err_str = 'post deleted'
db.session.commit()
return
if 'Save this flash' in text:
job.dlstate=-9
job.state = 'disabled'
job.err_str = 'content is flash .swf'
db.session.commit()
return
err = 0
@ -180,12 +184,14 @@ class E621Fetcher(scraper.fetchBase.AbstractFetcher):
self.fetchImage(job, imgurl, pageurl)
else:
self.log.info("No image found for URL: '%s'", pageurl)
job.dlstate=-5
job.state = 'error'
job.err_str = 'failed to find image!'
break
except AssertionError:
self.log.info("Assertion error?: '%s'", pageurl)
traceback.print_exc()
job.dlstate=-50
job.state = 'error'
job.err_str = 'Assertion failure?'
db.session.rollback()
break

View File

@ -24,25 +24,6 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
# db.session = db.Session()
def get_job(self):
while 1:
try:
job = db.session.query(db.Releases) \
.filter(db.Releases.dlstate == 0) \
.order_by(db.Releases.postid) \
.limit(1) \
.one()
if job == None:
return None
job.dlstate = 1
db.session.commit()
return job
except sqlalchemy.exc.DatabaseError:
self.log.warning("Error when getting job. Probably a concurrency issue.")
self.log.warning("Trying again.")
for line in traceback.format_exc().split("\n"):
self.log.warning(line)
db.session.rollback()
def extractTags(self, job, tagsection):
@ -119,7 +100,8 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
job.status = val
# Do not try to fetch things that are banned (e.g. removed)
if val == 'Banned':
job.dlstate=-2
job.state = 'removed'
job.err_str = 'item banned'
elif name in ['Approver', 'Id', 'Source', 'Uploader']:
pass
else:
@ -157,7 +139,7 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
job.filename = fname
job.filepath = fpath
job.dlstate = 2
job.state = 'complete'
db.session.commit()
# print(fname)
@ -172,27 +154,25 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
else:
break
except urllib.error.URLError:
job.dlstate=-1
job.state = 'error'
job.err_str = 'failure fetching container page'
db.session.commit()
return
if 'Gelbooru - Image List' in soup.title.get_text():
self.log.warning("Image has been removed.")
job.dlstate=-4
job.state = 'removed'
job.err_str = 'image has been removed'
db.session.commit()
return
if 'This post was deleted. Reason: Duplicate of' in soup.get_text():
self.log.warning("Image has been removed.")
job.dlstate=-6
job.state = 'removed'
job.err_str = 'image has been removed because it was a duplicate'
db.session.commit()
return
# text = soup.get_text()
# if 'You need a gold account to see this image.' in text:
# job.dlstate=-3
# db.session.commit()
# return
err = 0
while err < 5:
@ -202,13 +182,15 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
self.fetchImage(job, imgurl, pageurl)
else:
self.log.info("No image found for URL: '%s'", pageurl)
job.dlstate=-5
job.state = 'error'
job.err_str = 'failed to find image!'
break
except sqlalchemy.exc.IntegrityError:
err += 1
db.session.rollback()
except urllib.error.URLError:
job.dlstate=-8
job.state = 'error'
job.err_str = 'failure fetching actual image'
db.session.commit()

View File

@ -97,10 +97,6 @@ class R34xxxFetcher(scraper.fetchBase.AbstractFetcher):
elif name == 'Status':
pass
# job.status = val
# # Do not try to fetch things that are banned (e.g. removed)
# if val == 'Banned':
# job.dlstate=-2
elif name in ['Approver', 'Id', 'Source', 'Uploader']:
pass
else:
@ -131,7 +127,7 @@ class R34xxxFetcher(scraper.fetchBase.AbstractFetcher):
job.filename = fname
job.filepath = fpath
job.dlstate = 2
job.state = 'complete'
db.session.commit()
# print(fname)
@ -140,21 +136,25 @@ class R34xxxFetcher(scraper.fetchBase.AbstractFetcher):
try:
soup = self.wg.getSoup(pageurl)
except urllib.error.URLError:
job.dlstate=-1
job.state = 'error'
job.err_str = 'failure fetching container page'
db.session.commit()
return
text = soup.get_text()
if 'You need a gold account to see this image.' in text:
job.dlstate=-3
job.state = 'removed'
job.err_str = 'requires account'
db.session.commit()
return
if 'This post was deleted for the following reasons' in text:
job.dlstate=-4
job.state = 'removed'
job.err_str = 'post deleted'
db.session.commit()
return
if 'Save this flash' in text:
job.dlstate=-9
job.state = 'disabled'
job.err_str = 'content is flash .swf'
db.session.commit()
return
err = 0
@ -165,15 +165,22 @@ class R34xxxFetcher(scraper.fetchBase.AbstractFetcher):
self.fetchImage(job, imgurl, pageurl)
else:
self.log.info("No image found for URL: '%s'", pageurl)
job.dlstate=-5
job.state = 'error'
job.err_str = 'failed to find image!'
break
except AssertionError:
self.log.info("Assertion error?: '%s'", pageurl)
traceback.print_exc()
job.dlstate=-50
job.state = 'error'
job.err_str = 'Assertion failure?'
db.session.rollback()
break
except urllib.error.URLError:
job.state = 'error'
job.err_str = 'failure fetching actual image'
db.session.commit()
except sqlalchemy.exc.IntegrityError:
err += 1
db.session.rollback()

View File

@ -38,23 +38,28 @@ class RunEngine(object):
self.log.info("Creating run contexts")
threads = []
try:
for plugin in PLUGIN_CLASSES:
th = threading.Thread(target=plugin.run_scraper, name=plugin.loggerpath)
th.start()
threads.append(th)
for plugin in PLUGIN_CLASSES:
plugin.run_scraper()
# threads = []
# try:
# for plugin in PLUGIN_CLASSES:
# th = threading.Thread(target=plugin.run_scraper, name=plugin.loggerpath)
# threads.append(th)
self.log.info("Waiting for workers to complete.")
for thread in threads:
thread.join()
except KeyboardInterrupt:
self.log.info("Waiting for executor.")
scraper.runstate.run = False
for thread in threads:
thread.join()
# for thread in threads:
# thread.start()
# self.log.info("Waiting for workers to complete.")
# for thread in threads:
# thread.join()
# except KeyboardInterrupt:
# self.log.info("Waiting for executor.")
# scraper.runstate.run = False
# for thread in threads:
# thread.join()
def go():
instance = RunEngine(THREADS)