Better threading things.

This commit is contained in:
Fake-Name 2019-02-03 20:12:01 -08:00
parent f44b548be1
commit 02e9831954
4 changed files with 65 additions and 6 deletions

View File

@ -39,8 +39,10 @@ from settings import DATABASE_PASS as C_DATABASE_PASS
SQLALCHEMY_DATABASE_URI = 'postgresql://{user}:{passwd}@{host}:5432/{database}'.format(user=C_DATABASE_USER, passwd=C_DATABASE_PASS, host=C_DATABASE_IP, database=C_DATABASE_DB_NAME)
DB_CONNECTION_POOL_SIZE = 20
# I was having issues with timeouts because the default connection pool is 5 connections.
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size = 20, isolation_level='REPEATABLE_READ')
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size = DB_CONNECTION_POOL_SIZE, isolation_level='REPEATABLE_READ')
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)

View File

@ -238,3 +238,13 @@ class AbstractFetcher(object, metaclass=abc.ABCMeta):
print("Runner {}!".format(cls.pluginkey))
fetcher = cls()
fetcher.__go()
@classmethod
def run_single_thread(cls):
try:
fetcher = cls()
fetcher.run_worker()
except KeyboardInterrupt:
fetcher.log.info("Keyboard Interrupt!")
scraper.runstate.run = False

View File

@ -114,7 +114,7 @@ class DanbooruFetcher(scraper.fetchBase.AbstractFetcher):
job.state = 'removed'
job.err_str = 'item banned'
self.log.warning("Marking %s as %s (%s)", job.id, job.state, job.err_str)
elif name in ['Approver', 'ID', 'Source', 'Uploader']:
elif name in ['Approver', 'ID', 'Source', 'Uploader', 'Top Tagger']:
pass
else:
self.log.warning("Unknown item key-value:")

View File

@ -2,6 +2,7 @@ import logging
import threading
import multiprocessing
import concurrent.futures
import scraper.database as db
import scraper.runstate
@ -36,13 +37,13 @@ class RunEngine(object):
def run(self):
self.log.info("Inserting start URLs")
for plugin in PLUGIN_CLASSES:
instance = plugin()
instance.do_upsert()
self.log.info("Creating run contexts")
for plugin in PLUGIN_CLASSES:
instance = plugin()
instance.do_upsert()
threads = []
try:
@ -63,7 +64,53 @@ class RunEngine(object):
for thread in threads:
thread.join()
def run_sequential(self):
self.log.info("Inserting start URLs")
for plugin in PLUGIN_CLASSES:
instance = plugin()
instance.do_upsert()
self.log.info("Doing sequential execution")
try:
for plugin in PLUGIN_CLASSES:
plugin.run_scraper()
except KeyboardInterrupt:
scraper.runstate.run = False
self.log.info("Waiting for threads to join.")
def run_shared_pool(self):
self.log.info("Inserting start URLs")
for plugin in PLUGIN_CLASSES:
instance = plugin()
instance.do_upsert()
self.log.info("Doing sequential execution")
worker_threads = db.DB_CONNECTION_POOL_SIZE * 2
executor = concurrent.futures.ThreadPoolExecutor(max_workers=db.DB_CONNECTION_POOL_SIZE)
try:
self.log.info("Staggered-Launching %s threads", worker_threads)
for plugin in PLUGIN_CLASSES:
for _ in range(db.DB_CONNECTION_POOL_SIZE // 2):
executor.submit(plugin.run_single_thread)
except KeyboardInterrupt:
scraper.runstate.run = False
self.log.info("Waiting for threads to join.")
def go():
instance = RunEngine()
instance.run()
# instance.run()
# instance.run_sequential()
instance.run_shared_pool()