Ok, most of the scrapers are go.

Whoooo!
This commit is contained in:
Fake-Name 2017-11-23 20:16:10 -08:00
parent 6b5daa3c4a
commit f9fad86d54
13 changed files with 209 additions and 58 deletions

View File

@ -38,7 +38,7 @@ from settings import DATABASE_PASS as C_DATABASE_PASS
SQLALCHEMY_DATABASE_URI = 'postgresql://{user}:{passwd}@{host}:5432/{database}'.format(user=C_DATABASE_USER, passwd=C_DATABASE_PASS, host=C_DATABASE_IP, database=C_DATABASE_DB_NAME)
# I was having issues with timeouts because the default connection pool is 5 connections.
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size = 20)
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size = 20, isolation_level='REPEATABLE_READ')
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)

View File

@ -9,6 +9,8 @@ import hashlib
import concurrent.futures
import sqlalchemy.exc
from sqlalchemy import desc
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
import settings
@ -44,13 +46,45 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
def get_job(self):
session = db.session()
print("Cursor: ", session)
while 1:
self.log.info("Getting job")
try:
raw_query = '''
UPDATE
db_releases
SET
state = 'fetching'
WHERE
db_releases.id in (
SELECT
db_releases.id
FROM
db_releases
WHERE
db_releases.state = 'new'
AND
source = :source
ORDER BY
db_releases.postid ASC
LIMIT 1
)
AND
db_releases.state = 'new'
RETURNING
db_releases.id;
'''
rids = session.execute(text(raw_query), {'source' : self.pluginkey})
ridl = list(rids)
rid = ridl[0][0]
job = db.session.query(db.Releases) \
.filter(db.Releases.source == self.pluginkey) \
.filter(db.Releases.state == 'new') \
.order_by(db.Releases.postid) \
.limit(1)
.filter(db.Releases.id == rid)
job = job.scalar()
if job is None:
@ -58,7 +92,9 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
job.state = 'fetching'
db.session.commit()
return job
except sqlalchemy.exc.DatabaseError:
except sqlalchemy.exc.OperationalError as e:
db.session.rollback()
except sqlalchemy.exc.DatabaseError as e:
self.log.warning("Error when getting job. Probably a concurrency issue.")
self.log.warning("Trying again.")
for line in traceback.format_exc().split("\n"):
@ -120,14 +156,21 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
def run_worker(self):
pass
try:
while self.retreiveItem():
pass
except Exception as e:
self.log.error("Worker thread had exception")
for line in traceback.format_exc().split("\n"):
self.log.error(line)
def resetDlstate(self):
sess = db.session()
tmp = sess.query(db.Releases) \
.filter(db.Releases.state == 'fetching' or db.Releases.state == 'processing') \
.filter(db.Releases.source == self.pluginkey) \
sess.query(db.Releases) \
.filter(db.Releases.state == 'fetching' or db.Releases.state == 'processing') \
.filter(db.Releases.source == self.pluginkey) \
.update({db.Releases.state : 'new'})
sess.commit()

View File

@ -13,6 +13,8 @@ import scraper.runstate
import scraper.database as db
import scraper.fetchBase
import util.WebRequest
class KonaChanFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'KonaChan'
@ -162,9 +164,10 @@ class KonaChanFetcher(scraper.fetchBase.AbstractFetcher):
pageurl = 'https://konachan.com/post/show/{}'.format(job.postid)
try:
soup = self.wg.getSoup(pageurl)
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching container page'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -172,16 +175,19 @@ class KonaChanFetcher(scraper.fetchBase.AbstractFetcher):
if 'You need a gold account to see this image.' in text:
job.state = 'removed'
job.err_str = 'requires account'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
if 'This post was deleted for the following reasons' in text:
job.state = 'removed'
job.err_str = 'post deleted'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
if 'Save this flash' in text:
job.state = 'disabled'
job.err_str = 'content is flash .swf'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
err = 0
@ -194,12 +200,14 @@ class KonaChanFetcher(scraper.fetchBase.AbstractFetcher):
self.log.info("No image found for URL: '%s'", pageurl)
job.state = 'error'
job.err_str = 'failed to find image!'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
break
except AssertionError:
self.log.info("Assertion error?: '%s'", pageurl)
traceback.print_exc()
job.state = 'error'
job.err_str = 'failure fetching actual image'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.rollback()
break

View File

@ -13,6 +13,8 @@ import scraper.runstate
import scraper.database as db
import scraper.fetchBase
import util.WebRequest
class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'Danbooru'
@ -100,6 +102,7 @@ class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
if val == 'Banned':
job.state = 'removed'
job.err_str = 'item banned'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
elif name in ['Approver', 'ID', 'Source', 'Uploader']:
pass
else:
@ -138,9 +141,10 @@ class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
pageurl = 'https://danbooru.donmai.us/posts/{}'.format(job.postid)
try:
soup = self.wg.getSoup(pageurl)
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching container page'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -148,18 +152,22 @@ class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
if 'You need a gold account to see this image.' in text:
job.state = 'removed'
job.err_str = 'requires account'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
if 'This post was deleted for the following reasons' in text:
job.state = 'removed'
job.err_str = 'post deleted'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
if 'Save this flash' in text:
job.state = 'disabled'
job.err_str = 'content is flash .swf'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
err = 0
while err < 5:
try:
@ -170,13 +178,17 @@ class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
self.log.info("No image found for URL: '%s'", pageurl)
job.state = 'error'
job.err_str = 'failed to find image!'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
break
except sqlalchemy.exc.IntegrityError:
err += 1
db.session.rollback()
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching actual image'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()

View File

@ -13,6 +13,8 @@ import scraper.runstate
import scraper.fetchBase
import scraper.database as db
import util.WebRequest
class E621Fetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'e621'
@ -154,9 +156,10 @@ class E621Fetcher(scraper.fetchBase.AbstractFetcher):
pageurl = 'https://e621.net/post/show/{}'.format(job.postid)
try:
soup = self.wg.getSoup(pageurl)
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching container page'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -164,16 +167,19 @@ class E621Fetcher(scraper.fetchBase.AbstractFetcher):
if 'You need a gold account to see this image.' in text:
job.state = 'removed'
job.err_str = 'requires account'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
if 'This post was deleted for the following reasons' in text:
job.state = 'removed'
job.err_str = 'post deleted'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
if 'Save this flash' in text:
job.state = 'disabled'
job.err_str = 'content is flash .swf'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
err = 0
@ -186,12 +192,14 @@ class E621Fetcher(scraper.fetchBase.AbstractFetcher):
self.log.info("No image found for URL: '%s'", pageurl)
job.state = 'error'
job.err_str = 'failed to find image!'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
break
except AssertionError:
self.log.info("Assertion error?: '%s'", pageurl)
traceback.print_exc()
job.state = 'error'
job.err_str = 'Assertion failure?'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.rollback()
break

View File

@ -13,6 +13,8 @@ import scraper.runstate
import scraper.database as db
import scraper.fetchBase
import util.WebRequest
class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'Gelbooru'
@ -110,6 +112,8 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
if val == 'Banned':
job.state = 'removed'
job.err_str = 'item banned'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
elif name in ['Approver', 'Id', 'Source', 'Uploader']:
pass
else:
@ -159,9 +163,10 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
time.sleep(13)
else:
break
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching container page'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -169,6 +174,7 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
self.log.warning("Image has been removed.")
job.state = 'removed'
job.err_str = 'image has been removed'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -176,6 +182,7 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
self.log.warning("Image has been removed.")
job.state = 'removed'
job.err_str = 'image has been removed because it was a duplicate'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -190,13 +197,15 @@ class GelbooruFetcher(scraper.fetchBase.AbstractFetcher):
self.log.info("No image found for URL: '%s'", pageurl)
job.state = 'error'
job.err_str = 'failed to find image!'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
break
except sqlalchemy.exc.IntegrityError:
err += 1
db.session.rollback()
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching actual image'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()

View File

@ -13,6 +13,8 @@ import scraper.runstate
import scraper.database as db
import scraper.fetchBase
import util.WebRequest
class R34xxxFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'Rule34.xxx'
@ -143,28 +145,44 @@ class R34xxxFetcher(scraper.fetchBase.AbstractFetcher):
def processJob(self, job):
pageurl = 'https://rule34.xxx/index.php?page=post&s=view&id={}'.format(job.postid)
try:
soup = self.wg.getSoup(pageurl)
except urllib.error.URLError:
job.state = 'error'
job.err_str = 'failure fetching container page'
db.session.commit()
return
while 1:
try:
soup = self.wg.getSoupNoRedirects(pageurl)
if 'You are viewing an advertisement' in soup.get_text():
self.log.warning("Working around advertisement. Sleeping 10 seconds")
time.sleep(13)
else:
break
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching container page'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
except util.WebRequest.RedirectedError:
job.state = 'error'
job.err_str = 'Content page redirected'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
text = soup.get_text()
if 'You need a gold account to see this image.' in text:
job.state = 'removed'
job.err_str = 'requires account'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
if 'This post was deleted for the following reasons' in text:
job.state = 'removed'
job.err_str = 'post deleted'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
if 'Save this flash' in text:
job.state = 'disabled'
job.err_str = 'content is flash .swf'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
err = 0
@ -177,18 +195,21 @@ class R34xxxFetcher(scraper.fetchBase.AbstractFetcher):
self.log.info("No image found for URL: '%s'", pageurl)
job.state = 'error'
job.err_str = 'failed to find image!'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
break
except AssertionError:
self.log.info("Assertion error?: '%s'", pageurl)
traceback.print_exc()
job.state = 'error'
job.err_str = 'Assertion failure?'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.rollback()
break
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching actual image'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
except sqlalchemy.exc.IntegrityError:

View File

@ -13,6 +13,8 @@ import scraper.runstate
import scraper.database as db
import scraper.fetchBase
import util.WebRequest
class TbibFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'TBIB'
@ -111,6 +113,7 @@ class TbibFetcher(scraper.fetchBase.AbstractFetcher):
if val == 'Banned':
job.state = 'removed'
job.err_str = 'item banned'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
elif name in ['Approver', 'Id', 'Source', 'Uploader']:
pass
else:
@ -156,15 +159,22 @@ class TbibFetcher(scraper.fetchBase.AbstractFetcher):
pageurl = 'http://tbib.org/index.php?page=post&s=view&id={}'.format(job.postid)
while 1:
try:
soup = self.wg.getSoup(pageurl)
soup = self.wg.getSoupNoRedirects(pageurl)
if 'You are viewing an advertisement' in soup.get_text():
self.log.warning("Working around advertisement. Sleeping 10 seconds")
time.sleep(13)
else:
break
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching container page'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
except util.WebRequest.RedirectedError:
job.state = 'error'
job.err_str = 'Content page redirected'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -172,6 +182,7 @@ class TbibFetcher(scraper.fetchBase.AbstractFetcher):
self.log.warning("Image has been removed.")
job.state = 'removed'
job.err_str = 'image has been removed'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -179,6 +190,7 @@ class TbibFetcher(scraper.fetchBase.AbstractFetcher):
self.log.warning("Image has been removed.")
job.state = 'removed'
job.err_str = 'image has been removed because it was a duplicate'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -193,13 +205,15 @@ class TbibFetcher(scraper.fetchBase.AbstractFetcher):
self.log.info("No image found for URL: '%s'", pageurl)
job.state = 'error'
job.err_str = 'failed to find image!'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
break
except sqlalchemy.exc.IntegrityError:
err += 1
db.session.rollback()
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching actual image'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()

View File

@ -13,6 +13,8 @@ import scraper.runstate
import scraper.database as db
import scraper.fetchBase
import util.WebRequest
class XBooruFetcher(scraper.fetchBase.AbstractFetcher):
pluginkey = 'XBooru'
@ -111,6 +113,7 @@ class XBooruFetcher(scraper.fetchBase.AbstractFetcher):
if val == 'Banned':
job.state = 'removed'
job.err_str = 'item banned'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
elif name in ['Approver', 'Id', 'Source', 'Uploader']:
pass
else:
@ -160,9 +163,10 @@ class XBooruFetcher(scraper.fetchBase.AbstractFetcher):
time.sleep(13)
else:
break
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching container page'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -170,6 +174,7 @@ class XBooruFetcher(scraper.fetchBase.AbstractFetcher):
self.log.warning("Image has been removed.")
job.state = 'removed'
job.err_str = 'image has been removed'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -177,6 +182,7 @@ class XBooruFetcher(scraper.fetchBase.AbstractFetcher):
self.log.warning("Image has been removed.")
job.state = 'removed'
job.err_str = 'image has been removed because it was a duplicate'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()
return
@ -191,13 +197,15 @@ class XBooruFetcher(scraper.fetchBase.AbstractFetcher):
self.log.info("No image found for URL: '%s'", pageurl)
job.state = 'error'
job.err_str = 'failed to find image!'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
break
except sqlalchemy.exc.IntegrityError:
err += 1
db.session.rollback()
except urllib.error.URLError:
except util.WebRequest.WebGetException:
job.state = 'error'
job.err_str = 'failure fetching actual image'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
db.session.commit()

View File

@ -15,26 +15,24 @@ import scraper.modules.tbibFetch
import scraper.modules.xbooruFetch
# THREADS = 6
THREADS = 15
PLUGIN_CLASSES = [
scraper.modules.danbooruFetch.DanbooruFetcher,
# scraper.modules.gelbooruFetch.GelbooruFetcher,
scraper.modules.r34xxxScrape.R34xxxFetcher,
scraper.modules.KonaChanFetch.KonaChanFetcher,
# Ok:
scraper.modules.e621Scrape.E621Fetcher,
scraper.modules.KonaChanFetch.KonaChanFetcher,
scraper.modules.r34xxxScrape.R34xxxFetcher,
scraper.modules.danbooruFetch.DanbooruFetcher,
scraper.modules.tbibFetch.TbibFetcher,
scraper.modules.xbooruFetch.XBooruFetcher,
# Fucked:
# scraper.modules.gelbooruFetch.GelbooruFetcher,
]
class RunEngine(object):
def __init__(self, worker_count):
def __init__(self):
self.log = logging.getLogger("Main.Runner")
self.workers = worker_count
def run(self):
self.log.info("Inserting start URLs")
@ -43,29 +41,29 @@ class RunEngine(object):
self.log.info("Creating run contexts")
for plugin in PLUGIN_CLASSES:
plugin.run_scraper()
# for plugin in PLUGIN_CLASSES:
# plugin.run_scraper()
# threads = []
# try:
# for plugin in PLUGIN_CLASSES:
# th = threading.Thread(target=plugin.run_scraper, name=plugin.loggerpath)
# threads.append(th)
threads = []
try:
for plugin in PLUGIN_CLASSES:
th = threading.Thread(target=plugin.run_scraper, name=plugin.loggerpath)
threads.append(th)
# for thread in threads:
# thread.start()
for thread in threads:
thread.start()
# self.log.info("Waiting for workers to complete.")
# for thread in threads:
# thread.join()
# except KeyboardInterrupt:
# self.log.info("Waiting for executor.")
# scraper.runstate.run = False
# for thread in threads:
# thread.join()
self.log.info("Waiting for workers to complete.")
for thread in threads:
thread.join()
except KeyboardInterrupt:
self.log.info("Waiting for executor.")
scraper.runstate.run = False
for thread in threads:
thread.join()
def go():
instance = RunEngine(THREADS)
instance = RunEngine()
instance.run()

View File

@ -4,6 +4,9 @@
class WebGetException(Exception):
pass
class RedirectedError(WebGetException):
pass
class ContentTypeError(WebGetException):
pass

View File

@ -188,6 +188,32 @@ class WebGetRobust(PhantomJSMixin.WebGetPjsMixin, ChromiumMixin.WebGetCrMixin):
return pgContent
def getSoupNoRedirects(self, *args, **kwargs):
if 'returnMultiple' in kwargs:
raise Exceptions.ArgumentError("getSoup cannot be called with 'returnMultiple'")
if 'soup' in kwargs and kwargs['soup']:
raise Exceptions.ArgumentError("getSoup contradicts the 'soup' directive!")
kwargs['returnMultiple'] = True
tgt_url = kwargs.get('requestedUrl', None)
if not tgt_url:
tgt_url = args[0]
page, handle = self.getpage(*args, **kwargs)
redirurl = handle.geturl()
if redirurl != tgt_url:
self.log.error("Requested %s, redirected to %s. Raising error", tgt_url, redirurl)
raise Exceptions.RedirectedError("Requested %s, redirected to %s" % (
tgt_url, redirurl))
soup = as_soup(page)
return soup
def getSoup(self, *args, **kwargs):
if 'returnMultiple' in kwargs and kwargs['returnMultiple']:
raise Exceptions.ArgumentError("getSoup cannot be called with 'returnMultiple' being true")

View File

@ -5,5 +5,6 @@ from .WebRequestClass import WebGetRobust
from .Exceptions import WebGetException
from .Exceptions import ContentTypeError
from .Exceptions import ArgumentError
from .Exceptions import RedirectedError
from .Exceptions import FetchFailureError