Restructure mostly done, it at least starts running.

I need to rework the db stuff a LOT though.
This commit is contained in:
Fake-Name 2017-11-20 22:57:18 -08:00
parent b36af77670
commit 7061a147de
10 changed files with 233 additions and 232 deletions

83
main.py
View File

@ -1,84 +1,9 @@
import sys
import database as db
from sqlalchemy.dialects.postgresql import insert
import logSetup
logSetup.initLogging()
import danbooruFetch
import gelbooruFetch
import r34xxxScrape
import KonaChanFetch
import e621Scrape
import runstate
import concurrent.futures
# THREADS = 6
THREADS = 15
UPSERT_STEP = 10000
def do_upsert(target, maxitems):
for x in range(maxitems, 0, UPSERT_STEP * -1):
print("[%s] - Building insert data structure %s -> %s" % (target, x, x+UPSERT_STEP))
dat = [{"dlstate" : 0, "postid" : x, "source" : target} for x in range(x, x+UPSERT_STEP)]
print("[%s] - Building insert query" % target)
q = insert(db.Releases).values(dat)
q = q.on_conflict_do_nothing()
print("[%s] - Built. Doing insert." % target)
ret = db.session.execute(q)
changes = ret.rowcount
print("[%s] - Changed rows: %s" % (target, changes))
db.session.commit()
if not changes:
break
print("[%s] - Done." % target)
def resetDlstate():
tmp = db.session.query(db.Releases) \
.filter(db.Releases.dlstate == 1) \
.update({db.Releases.dlstate : 0})
db.session.commit()
def go():
print("Inserting start URLs")
do_upsert("Danbooru", 2750000)
do_upsert('Gelbooru', 3650000)
do_upsert('Rule34.xxx', 2300000)
do_upsert('e621', 1200000)
do_upsert('KonaChan', 245000)
print("Resetting DL states.")
# resetDlstate()
print("Creating run contexts")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=THREADS)
plugins = [r34xxxScrape, danbooruFetch, gelbooruFetch, e621Scrape, KonaChanFetch]
try:
for plugin in plugins:
for x in range(50):
executor.submit(plugin.run, x)
print("Waiting for workers to complete.")
executor.shutdown()
except KeyboardInterrupt:
print("Waiting for executor.")
runstate.run = False
executor.shutdown()
import util.logSetup
import scraper.runner
if __name__ == '__main__':
go()
util.logSetup.initLogging()
scraper.runner.go()

0
scraper/__init__.py Normal file
View File

View File

@ -1,22 +1,19 @@
import database as db
import webFunctions
import logging
import traceback
import sqlalchemy.exc
import runstate
import urllib.error
import urllib.parse
import re
import parsedatetime
import hashlib
import os
import settings
import os.path
import time
import datetime
import abc
import hashlib
import sqlalchemy.exc
import settings
import util.WebRequest
import scraper.runstate
import scraper.database as db
class AbstractFetcher(object, metaclass=abc.ABCMeta):
@ -35,6 +32,7 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
def __init__(self):
self.log = logging.getLogger(self.loggerpath)
self.wg = util.WebRequest.WebGetRobust(logPath=self.loggerpath+".Web")
# db.session = db.Session()
@ -49,7 +47,7 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
.limit(1)
job = job.scalar()
if job == None:
if job is None:
return None
job.dlstate = 1
db.session.commit()
@ -64,8 +62,8 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
def saveFile(self, row, filename, fileCont):
if not os.path.exists(settings.storeDir):
self.log.warn("Cache directory for book items did not exist. Creating")
self.log.warn("Directory at path '%s'", settings.storeDir)
self.log.warning("Cache directory for book items did not exist. Creating")
self.log.warning("Directory at path '%s'", settings.storeDir)
os.makedirs(settings.storeDir)
@ -115,19 +113,62 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
def saveFile(self, filename, fileCont):
if not os.path.exists(settings.storeDir):
self.log.warn("Cache directory for book items did not exist. Creating")
self.log.warn("Directory at path '%s'", settings.storeDir)
os.makedirs(settings.storeDir)
def run(indice):
print("Runner {}!".format(indice))
fetcher = DanbooruFetcher()
remainingTasks = True
fHash, ext = os.path.splitext(filename)
try:
while remainingTasks and runstate.run:
remainingTasks = fetcher.retreiveItem()
except KeyboardInterrupt:
return
except:
print("Unhandled exception!")
traceback.print_exc()
raise
ext = ext.lower()
fHash = fHash.upper()
# use the first 3 chars of the hash for the folder name.
# Since it's hex-encoded, that gives us a max of 2^12 bits of
# directories, or 4096 dirs.
dirName = fHash[:3]
dirPath = os.path.join(settings.storeDir, dirName)
if not os.path.exists(dirPath):
os.makedirs(dirPath)
ext = os.path.splitext(filename)[-1]
ext = ext.lower()
fHash = fHash.upper()
# The "." is part of the ext.
filename = '{filename}{ext}'.format(filename=fHash, ext=ext)
fqpath = os.path.join(dirPath, filename)
fqpath = os.path.abspath(fqpath)
if not fqpath.startswith(settings.storeDir):
raise ValueError("Generating the file path to save a cover produced a path that did not include the storage directory?")
locpath = fqpath[len(settings.storeDir):]
with open(fqpath, "wb") as fp:
fp.write(fileCont)
return locpath
@classmethod
def run_scraper(cls, indice):
print("Runner {}!".format(indice))
fetcher = cls()
remainingTasks = True
try:
while remainingTasks and scraper.runstate.run:
remainingTasks = fetcher.retreiveItem()
except KeyboardInterrupt:
return
except:
print("Unhandled exception!")
traceback.print_exc()
raise

View File

@ -1,29 +1,25 @@
import database as db
import webFunctions
import logging
import datetime
import re
import time
import traceback
import sqlalchemy.exc
import runstate
import urllib.error
import urllib.parse
import re
import parsedatetime
import os
import settings
import os.path
import time
import datetime
import fetchBase
class KonaChanFetcher(fetchBase.AbstractFetcher):
import parsedatetime
import sqlalchemy.exc
import scraper.runstate
import scraper.database as db
import scraper.fetchBase
class KonaChanFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'KonaChan'
loggerpath = "Main.KonaChan"
def __init__(self):
self.log = logging.getLogger("Main.KonaChan")
self.wg = webFunctions.WebGetRobust(logPath="Main.KonaChan.Web")
super().__init__()
def extractTags(self, job, tagsection):
@ -231,7 +227,7 @@ def run(indice):
remainingTasks = True
try:
while remainingTasks and runstate.run:
while remainingTasks and scraper.runstate.run:
remainingTasks = fetcher.retreiveItem()
except KeyboardInterrupt:
return

View File

View File

@ -1,30 +1,25 @@
import database as db
import webFunctions
import logging
import datetime
import re
import time
import traceback
import sqlalchemy.exc
import runstate
import urllib.error
import urllib.parse
import re
import sqlalchemy.exc
import parsedatetime
import os
import settings
import os.path
import time
import datetime
import fetchBase
import scraper.runstate
import scraper.database as db
import scraper.fetchBase
class DanbooruFetcher(fetchBase.AbstractFetcher):
class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'Danbooru'
loggerpath = "Main.Danbooru"
def __init__(self):
self.log = logging.getLogger("Main.Danbooru")
self.wg = webFunctions.WebGetRobust(logPath="Main.Danbooru.Web")
super().__init__()
def extractTags(self, job, tagsection):
@ -103,8 +98,9 @@ class DanbooruFetcher(fetchBase.AbstractFetcher):
pass
else:
self.log.warning("Unknown item key-value:")
self.log.warning(" '{}' -> '{}'".format(name, val))
self.log.warning(" '%s' -> '%s'", name, val)
return imgurl
def extractMeta(self, job, soup):
tagsection = soup.find('section', id='tag-list')
assert tagsection
@ -188,7 +184,7 @@ def run(indice):
remainingTasks = True
try:
while remainingTasks and runstate.run:
while remainingTasks and scraper.runstate.run:
remainingTasks = fetcher.retreiveItem()
except KeyboardInterrupt:
return

View File

@ -1,31 +1,25 @@
import database as db
import webFunctions
import logging
import traceback
import sqlalchemy.exc
import runstate
import urllib.error
import urllib.parse
import re
import parsedatetime
import os
import settings
import os.path
import time
import datetime
import fetchBase
import parsedatetime
import sqlalchemy.exc
import fetchBase
class E621Fetcher(fetchBase.AbstractFetcher):
import scraper.runstate
import scraper.fetchBase
import scraper.database as db
class E621Fetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'e621'
loggerpath = "Main.e621"
def __init__(self):
self.log = logging.getLogger("Main.e621")
self.wg = webFunctions.WebGetRobust(logPath="Main.e621.Web")
super().__init__()
def extractTags(self, job, tagsection):
@ -218,7 +212,7 @@ def run(indice):
remainingTasks = True
try:
while remainingTasks and runstate.run:
while remainingTasks and scraper.runstate.run:
remainingTasks = fetcher.retreiveItem()
except KeyboardInterrupt:
return

View File

@ -1,24 +1,25 @@
import database as db
import webFunctions
import logging
import traceback
import sqlalchemy.exc
import runstate
import urllib.error
import urllib.parse
import re
import parsedatetime
import os
import settings
import os.path
import time
import datetime
class GelbooruFetcher(object):
import sqlalchemy.exc
import parsedatetime
import scraper.runstate
import scraper.database as db
import scraper.fetchBase
class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'Gelbooru'
loggerpath = "Main.Gelbooru"
def __init__(self):
self.log = logging.getLogger("Main.Gelbooru")
self.wg = webFunctions.WebGetRobust(logPath="Main.Gelbooru.Web")
super().__init__()
# db.session = db.Session()
@ -143,48 +144,6 @@ class GelbooruFetcher(object):
def saveFile(self, filename, fileCont):
if not os.path.exists(settings.storeDir):
self.log.warn("Cache directory for book items did not exist. Creating")
self.log.warn("Directory at path '%s'", settings.storeDir)
os.makedirs(settings.storeDir)
fHash, ext = os.path.splitext(filename)
ext = ext.lower()
fHash = fHash.upper()
# use the first 3 chars of the hash for the folder name.
# Since it's hex-encoded, that gives us a max of 2^12 bits of
# directories, or 4096 dirs.
dirName = fHash[:3]
dirPath = os.path.join(settings.storeDir, dirName)
if not os.path.exists(dirPath):
os.makedirs(dirPath)
ext = os.path.splitext(filename)[-1]
ext = ext.lower()
fHash = fHash.upper()
# The "." is part of the ext.
filename = '{filename}{ext}'.format(filename=fHash, ext=ext)
fqpath = os.path.join(dirPath, filename)
fqpath = os.path.abspath(fqpath)
if not fqpath.startswith(settings.storeDir):
raise ValueError("Generating the file path to save a cover produced a path that did not include the storage directory?")
locpath = fqpath[len(settings.storeDir):]
with open(fqpath, "wb") as fp:
fp.write(fileCont)
return locpath
def fetchImage(self, job, url, srcurl):
url = urllib.parse.urljoin(srcurl, url)
fname = url.split("/")[-1]
@ -217,13 +176,13 @@ class GelbooruFetcher(object):
return
if 'Gelbooru - Image List' in soup.title.get_text():
self.log.warn("Image has been removed.")
self.log.warning("Image has been removed.")
job.dlstate=-4
db.session.commit()
return
if 'This post was deleted. Reason: Duplicate of' in soup.get_text():
self.log.warn("Image has been removed.")
self.log.warning("Image has been removed.")
job.dlstate=-6
db.session.commit()
return
@ -271,7 +230,7 @@ def run(indice):
remainingTasks = True
try:
while remainingTasks and runstate.run:
while remainingTasks and scraper.runstate.run:
remainingTasks = fetcher.retreiveItem()
except KeyboardInterrupt:
return

View File

@ -1,31 +1,25 @@
import database as db
import webFunctions
import logging
import datetime
import re
import time
import traceback
import sqlalchemy.exc
import runstate
import urllib.error
import urllib.parse
import re
import sqlalchemy.exc
import parsedatetime
import os
import settings
import os.path
import time
import datetime
import fetchBase
import scraper.runstate
import scraper.database as db
import scraper.fetchBase
import danbooruFetch
class R34xxxFetcher(danbooruFetch.DanbooruFetcher):
class R34xxxFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'Rule34.xxx'
loggerpath = "Main.Rule34-xxx"
def __init__(self):
self.log = logging.getLogger("Main.Rule34-xxx")
self.wg = webFunctions.WebGetRobust(logPath="Main.Rule34-xxx.Web")
super().__init__()
def extractTags(self, job, tagsection):
@ -110,7 +104,7 @@ class R34xxxFetcher(danbooruFetch.DanbooruFetcher):
pass
else:
self.log.warning("Unknown item key-value:")
self.log.warning(" '{}' -> '{}'".format(name, val))
self.log.warning(" '%s' -> '%s'", name, val)
return imgurl
def extractMeta(self, job, soup):
@ -203,7 +197,7 @@ def run(indice):
remainingTasks = True
try:
while remainingTasks and runstate.run:
while remainingTasks and scraper.runstate.run:
remainingTasks = fetcher.retreiveItem()
except KeyboardInterrupt:
return

96
scraper/runner.py Normal file
View File

@ -0,0 +1,96 @@
import logging
import concurrent.futures
from sqlalchemy.dialects.postgresql import insert
import scraper.database as db
import scraper.runstate
import scraper.modules.danbooruFetch
import scraper.modules.gelbooruFetch
import scraper.modules.r34xxxScrape
import scraper.modules.KonaChanFetch
import scraper.modules.e621Scrape
# THREADS = 6
THREADS = 15
UPSERT_STEP = 10000
PLUGIN_CLASSES = [
scraper.modules.danbooruFetch.DanbooruFetcher,
scraper.modules.gelbooruFetch.GelbooruFetcher,
scraper.modules.r34xxxScrape.R34xxxFetcher,
scraper.modules.KonaChanFetch.KonaChanFetcher,
scraper.modules.e621Scrape.E621Fetcher,
]
class RunEngine(object):
def __init__(self, worker_count):
self.log = logging.getLogger("Main.Runner")
self.workers = worker_count
def resetDlstate(self):
tmp = db.session.query(db.Releases) \
.filter(db.Releases.dlstate == 1) \
.update({db.Releases.dlstate : 0})
db.session.commit()
def do_upsert(self, target, maxitems):
for x in range(maxitems, 0, UPSERT_STEP * -1):
self.log.info("[%s] - Building insert data structure %s -> %s", target, x, x+UPSERT_STEP)
dat = [{"dlstate" : 0, "postid" : x, "source" : target} for x in range(x, x+UPSERT_STEP)]
self.log.info("[%s] - Building insert query", target)
q = insert(db.Releases).values(dat)
q = q.on_conflict_do_nothing()
self.log.info("[%s] - Built. Doing insert.", target)
ret = db.session.execute(q)
changes = ret.rowcount
self.log.info("[%s] - Changed rows: %s", target, changes)
db.session.commit()
if not changes:
break
self.log.info("[%s] - Done.", target)
def run(self):
self.log.info("Inserting start URLs")
self.do_upsert("Danbooru", 2750000)
self.do_upsert('Gelbooru', 3650000)
self.do_upsert('Rule34.xxx', 2300000)
self.do_upsert('e621', 1200000)
self.do_upsert('KonaChan', 245000)
self.log.info("Resetting DL states.")
# resetDlstate()
self.log.info("Creating run contexts")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=THREADS)
try:
for plugin in PLUGIN_CLASSES:
for x in range(50):
executor.submit(plugin.run_scraper, x)
self.log.info("Waiting for workers to complete.")
executor.shutdown()
except KeyboardInterrupt:
self.log.info("Waiting for executor.")
scraper.runstate.run = False
executor.shutdown()
def go():
instance = RunEngine(THREADS)
instance.run()