DB prototyping, threading, etc...

This commit is contained in:
Fake-Name 2015-07-11 12:09:25 -07:00
commit d0a718886c
8 changed files with 1619 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/__pycache__/*.pyc
/*.lwp
/logs

188
database.py Normal file
View File

@ -0,0 +1,188 @@
# import rpc
# import task_exceptions
# import deps.ExContentLoader
# import deps.ContentLoader
# import deps.LibraryContentEnqueue
# import deps.LibraryContentEnqueue
# import deps.ExExtract
# import deps.nameTools as nt
# import os.path
# import traceback
# import string
# import settings
# import time
# import pprint
# import traceback
# from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy import Table
# from sqlalchemy import MetaData
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import BigInteger
from sqlalchemy import Text
from sqlalchemy import Float
from sqlalchemy import Boolean
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy.orm import relationship
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.associationproxy import association_proxy
# from sqlalchemy.sql.expression import func
# from citext import CIText
# Patch in knowledge of the citext type, so it reflects properly.
from sqlalchemy.dialects.postgresql.base import ischema_names
import citext
ischema_names['citext'] = citext.CIText
from settings import DATABASE_IP as C_DATABASE_IP
from settings import DATABASE_DB_NAME as C_DATABASE_DB_NAME
from settings import DATABASE_USER as C_DATABASE_USER
from settings import DATABASE_PASS as C_DATABASE_PASS
SQLALCHEMY_DATABASE_URI = 'postgresql://{user}:{passwd}@{host}:5432/{database}'.format(user=C_DATABASE_USER, passwd=C_DATABASE_PASS, host=C_DATABASE_IP, database=C_DATABASE_DB_NAME)
engine = create_engine(SQLALCHEMY_DATABASE_URI)
SessionFactory = sessionmaker(bind=engine)
Session = scoped_session(SessionFactory)
session = Session()
Base = declarative_base()
db_tags_link = Table(
'db_tags_link', Base.metadata,
Column('releases_id', Integer, ForeignKey('db_releases.id'), nullable=False),
Column('tags_id', Integer, ForeignKey('db_tags.id'), nullable=False),
PrimaryKeyConstraint('releases_id', 'tags_id')
)
db_chars_link = Table(
'db_chars_link', Base.metadata,
Column('releases_id', Integer, ForeignKey('db_releases.id'), nullable=False),
Column('character_id', Integer, ForeignKey('db_characters.id'), nullable=False),
PrimaryKeyConstraint('releases_id', 'character_id')
)
db_artist_link = Table(
'db_artist_link', Base.metadata,
Column('releases_id', Integer, ForeignKey('db_releases.id'), nullable=False),
Column('type_id', Integer, ForeignKey('db_artist.id'), nullable=False),
PrimaryKeyConstraint('releases_id', 'type_id')
)
class RawPages(Base):
__tablename__ = 'db_raw_pages'
id = Column(Integer, primary_key = True)
dlstate = Column(Integer, default=0, index = True)
sourceurl = Column(Text, nullable = False, index = True)
pgctnt = Column(Text)
scantime = Column(DateTime)
urltype = Column(Integer, nullable = False)
class Tags(Base):
__tablename__ = 'db_tags'
id = Column(Integer, primary_key=True)
tag = Column(citext.CIText(), nullable=False, index=True)
__table_args__ = (
UniqueConstraint('tag'),
)
class Characters(Base):
__tablename__ = 'db_characters'
id = Column(Integer, primary_key=True)
character = Column(citext.CIText(), nullable=False, index=True)
__table_args__ = (
UniqueConstraint('character'),
)
class Artist(Base):
__tablename__ = 'db_artist'
id = Column(Integer, primary_key=True)
artist = Column(citext.CIText(), nullable=False, index=True)
__table_args__ = (
UniqueConstraint('artist'),
)
def tag_creator(tag):
tmp = session.query(Tags) \
.filter(Tags.tag == tag) \
.scalar()
if tmp:
return tmp
return Tags(tag=tag)
def character_creator(char):
tmp = session.query(Characters) \
.filter(Characters.character == char) \
.scalar()
if tmp:
return tmp
return Characters(character=char)
def artist_creator(artist):
tmp = session.query(Artist) \
.filter(Artist.artist == artist) \
.scalar()
if tmp:
return tmp
return Artist(artist=artist)
class Releases(Base):
__tablename__ = 'db_releases'
id = Column(Integer, primary_key=True)
dlstate = Column(Integer, nullable=False, index=True)
postid = Column(Integer, nullable=False, index=True)
fsize = Column(BigInteger)
score = Column(Float)
favourites = Column(Integer)
parent = Column(Text)
posted = Column(DateTime)
res_x = Column(Integer)
res_y = Column(Integer)
filename = Column(Text)
filepath = Column(Text)
status = Column(Text)
tags_rel = relationship('Tags', secondary=lambda: db_tags_link)
character_rel = relationship('Characters', secondary=lambda: db_chars_link)
artist_rel = relationship('Artist', secondary=lambda: db_artist_link)
tags = association_proxy('tags_rel', 'tag', creator=tag_creator)
character = association_proxy('character_rel', 'character', creator=character_creator)
artist = association_proxy('artist_rel', 'artist', creator=artist_creator)
__table_args__ = (
UniqueConstraint('postid'),
)
Base.metadata.create_all(bind=engine, checkfirst=True)

58
fetcher.py Normal file
View File

@ -0,0 +1,58 @@
import database as db
import webFunctions
import logging
import traceback
import sqlalchemy.exc
import runstate
class DanbooruFetcher(object):
def __init__(self):
self.log = logging.getLogger("Main.Danbooru")
self.wg = webFunctions.WebGetRobust()
self.session = db.Session()
def get_job(self):
while 1:
try:
job = self.session.query(db.Releases) \
.filter(db.Releases.dlstate == 0) \
.order_by(db.Releases.postid) \
.limit(1) \
.one()
if job == None:
return None
job.dlstate = 1
self.session.commit()
return job
except sqlalchemy.exc.DatabaseError:
self.log.warning("Error when getting job. Probably a concurrency issue.")
self.log.warning("Trying again.")
for line in traceback.format_exc().split("\n"):
self.log.warning(line)
self.session.rollback()
def retreiveItem(self):
job = self.get_job()
if not job:
return False
return True
def run(indice):
print("Runner {}!".format(indice))
fetcher = DanbooruFetcher()
job = True
try:
while job and runstate.run:
job = fetcher.get_job()
print("Have job: ", job, job.postid)
except KeyboardInterrupt:
return

167
logSetup.py Normal file
View File

@ -0,0 +1,167 @@
import logging
import colorama as clr
import os.path
import sys
import time
import traceback
# Pylint can't figure out what's in the record library for some reason
#pylint: disable-msg=E1101
colours = [clr.Fore.BLUE, clr.Fore.RED, clr.Fore.GREEN, clr.Fore.YELLOW, clr.Fore.MAGENTA, clr.Fore.CYAN, clr.Back.YELLOW + clr.Fore.BLACK, clr.Back.YELLOW + clr.Fore.BLUE, clr.Fore.WHITE]
def getColor(idx):
return colours[idx%len(colours)]
class ColourHandler(logging.Handler):
def __init__(self, level=logging.DEBUG):
logging.Handler.__init__(self, level)
self.formatter = logging.Formatter('\r%(name)s%(padding)s - %(style)s%(levelname)s - %(message)s'+clr.Style.RESET_ALL)
clr.init()
self.logPaths = {}
def emit(self, record):
# print record.levelname
# print record.name
segments = record.name.split(".")
if segments[0] == "Main" and len(segments) > 1:
segments.pop(0)
segments[0] = "Main."+segments[0]
nameList = []
for indice, pathSegment in enumerate(segments):
if not indice in self.logPaths:
self.logPaths[indice] = [pathSegment]
elif not pathSegment in self.logPaths[indice]:
self.logPaths[indice].append(pathSegment)
name = clr.Style.RESET_ALL
name += getColor(self.logPaths[indice].index(pathSegment))
name += pathSegment
name += clr.Style.RESET_ALL
nameList.append(name)
record.name = ".".join(nameList)
if record.levelname == "DEBUG":
record.style = clr.Style.DIM
elif record.levelname == "WARNING":
record.style = clr.Style.BRIGHT
elif record.levelname == "ERROR":
record.style = clr.Style.BRIGHT+clr.Fore.RED
elif record.levelname == "CRITICAL":
record.style = clr.Style.BRIGHT+clr.Back.BLUE+clr.Fore.RED
else:
record.style = clr.Style.NORMAL
record.padding = ""
print((self.format(record)))
class RobustFileHandler(logging.FileHandler):
"""
A handler class which writes formatted logging records to disk files.
"""
def emit(self, record):
"""
Emit a record.
If the stream was not opened because 'delay' was specified in the
constructor, open it before calling the superclass's emit.
"""
failures = 0
while self.stream is None:
try:
self.stream = self._open()
except:
time.sleep(1)
if failures > 3:
traceback.print_exc()
print("Cannot open log file?")
return
failures += 1
failures = 0
while failures < 3:
try:
logging.StreamHandler.emit(self, record)
break
except:
failures += 1
else:
traceback.print_stack()
print("Error writing to file?")
self.close()
def exceptHook(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
mainLogger = logging.getLogger("Main") # Main logger
mainLogger.critical('Uncaught exception!')
mainLogger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
# Global hackyness to detect and warn on double-initialization of the logging systems.
LOGGING_INITIALIZED = False
def initLogging(logLevel=logging.INFO):
global LOGGING_INITIALIZED
if LOGGING_INITIALIZED:
print("ERROR - Logging initialized twice!")
print(traceback.format_exc())
return
LOGGING_INITIALIZED = True
print("Setting up loggers....")
if not os.path.exists(os.path.join("./logs")):
os.mkdir(os.path.join("./logs"))
mainLogger = logging.getLogger("Main") # Main logger
mainLogger.setLevel(logLevel)
ch = ColourHandler()
mainLogger.addHandler(ch)
logName = "Error - %s.txt" % (time.strftime("%Y-%m-%d %H;%M;%S", time.gmtime()))
errLogHandler = RobustFileHandler(os.path.join("./logs", logName))
errLogHandler.setLevel(logging.WARNING)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
errLogHandler.setFormatter(formatter)
mainLogger.addHandler(errLogHandler)
# Install override for excepthook, to catch all errors
sys.excepthook = exceptHook
print("done")
if __name__ == "__main__":
initLogging(logToDb=True)
log = logging.getLogger("Main.Test")
log.debug("Testing logging - level: debug")
log.info("Testing logging - level: info")
log.warn("Testing logging - level: warn")
log.error("Testing logging - level: error")
log.critical("Testing logging - level: critical")

54
main.py Normal file
View File

@ -0,0 +1,54 @@
import database as db
import logSetup
logSetup.initLogging()
import fetcher
import runstate
import concurrent.futures
THREADS = 10
def insertStartingPoints():
tmp = db.session.query(db.Releases) \
.filter(db.Releases.postid == 1) \
.count()
if not tmp:
for x in range(2070000):
new = db.Releases(dlstate=0, postid=x)
db.session.add(new)
if x % 10000 == 0:
print("Loop ", x, "flushing...")
db.session.flush()
print("Flushed.")
db.session.commit()
def resetDlstate():
tmp = db.session.query(db.Releases) \
.filter(db.Releases.dlstate == 1) \
.update({db.Releases.dlstate : 0})
db.session.commit()
def go():
insertStartingPoints()
resetDlstate()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=THREADS)
try:
# for x in range(THREADS):
for x in range(1):
executor.submit(fetcher.run, x)
executor.shutdown()
except KeyboardInterrupt:
print("Waiting for executor.")
runstate.run = False
executor.shutdown()
if __name__ == '__main__':
go()

2
runstate.py Normal file
View File

@ -0,0 +1,2 @@
run = True

18
settings.py Normal file
View File

@ -0,0 +1,18 @@
# Your postgres SQL database credentials for the primary database.
# the DATABASE_USER must have write access to the database DATABASE_DB_NAME
DATABASE_USER = "dbarchiver"
DATABASE_PASS = "YEkTYt4sCcWctY"
DATABASE_DB_NAME = "dbmirror"
DATABASE_IP = "10.1.1.8"
# Note that a local socket will be tried before the DATABASE_IP value, so if DATABASE_IP is
# invalid, it may work anyways.
# The directory "Context" of all the hentai items.
# This determines the path mask that will be used when deduplicating
# hentai items.
# If you aren't running the deduper, just specify something basic, like "/"
storeDir = r"/media/Storage/H/Danbooru"

1129
webFunctions.py Normal file

File diff suppressed because it is too large Load Diff