Split up the aarchitecture because I want to scrape gelbooru too.
This commit is contained in:
parent
2d67b291aa
commit
47bc610ffd
|
@ -0,0 +1,191 @@
|
|||
|
||||
import database as db
|
||||
import webFunctions
|
||||
import logging
|
||||
import traceback
|
||||
import sqlalchemy.exc
|
||||
import runstate
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import re
|
||||
import parsedatetime
|
||||
import os
|
||||
import settings
|
||||
import os.path
|
||||
import time
|
||||
import datetime
|
||||
|
||||
import fetchBase
|
||||
|
||||
class DanbooruFetcher(fetchBase.AbstractFetcher):
|
||||
|
||||
pluginkey = 'Danbooru'
|
||||
loggerpath = "Main.Danbooru"
|
||||
|
||||
|
||||
def extractTags(self, job, tagsection):
|
||||
|
||||
characterlis = tagsection.find_all('li', class_='category-4')
|
||||
artistlis = tagsection.find_all('li', class_='category-1')
|
||||
taglis = tagsection.find_all('li', class_='category-0')
|
||||
|
||||
|
||||
tags = []
|
||||
for tagli in taglis:
|
||||
tag = tagli.find('a', class_="search-tag").get_text()
|
||||
tags.append(tag)
|
||||
|
||||
artists = []
|
||||
for artistli in artistlis:
|
||||
artist = artistli.find('a', class_="search-tag").get_text()
|
||||
artists.append(artist)
|
||||
|
||||
characters = []
|
||||
for characterli in characterlis:
|
||||
character = characterli.find('a', class_="search-tag").get_text()
|
||||
characters.append(character)
|
||||
|
||||
for tag in tags:
|
||||
if tag not in job.tags:
|
||||
job.tags.append(tag)
|
||||
for artist in artists:
|
||||
if artist not in job.artist:
|
||||
job.artist.append(artist)
|
||||
for character in characters:
|
||||
if character not in job.character:
|
||||
job.character.append(character)
|
||||
|
||||
def getxy(self, instr):
|
||||
found = re.search(r"\((\d+)x(\d+)\)", instr)
|
||||
x, y = found.groups()
|
||||
return x, y
|
||||
|
||||
def extractInfo(self, job, infosection):
|
||||
imgurl = None
|
||||
for li in infosection.find_all("li"):
|
||||
rawt = li.get_text()
|
||||
name, val = rawt.split(":", 1)
|
||||
|
||||
name = name.strip()
|
||||
val = val.strip()
|
||||
|
||||
if name == 'Rating':
|
||||
job.rating = val
|
||||
elif name == 'Favorites':
|
||||
job.favorites = val
|
||||
elif name == 'Score':
|
||||
job.score = val
|
||||
elif name == 'Date':
|
||||
cal = parsedatetime.Calendar()
|
||||
tstruct, pstat = cal.parse(val)
|
||||
assert pstat == 1 or pstat == 2
|
||||
job.posted = datetime.datetime.fromtimestamp(time.mktime(tstruct))
|
||||
elif name == 'Size':
|
||||
if not '\n' in val:
|
||||
return False
|
||||
fsize, res = val.split("\n")
|
||||
fsize, res = fsize.strip(), res.strip()
|
||||
job.imgx, job.imgy = self.getxy(res)
|
||||
|
||||
link = li.find("a")
|
||||
if link:
|
||||
imgurl = link['href']
|
||||
|
||||
elif name == 'Status':
|
||||
job.status = val
|
||||
# Do not try to fetch things that are banned (e.g. removed)
|
||||
if val == 'Banned':
|
||||
job.dlstate=-2
|
||||
elif name in ['Approver', 'ID', 'Source', 'Uploader']:
|
||||
pass
|
||||
else:
|
||||
self.log.warning("Unknown item key-value:")
|
||||
self.log.warning(" '{}' -> '{}'".format(name, val))
|
||||
return imgurl
|
||||
def extractMeta(self, job, soup):
|
||||
tagsection = soup.find('section', id='tag-list')
|
||||
assert tagsection
|
||||
infosection = soup.find('section', id='post-information')
|
||||
assert infosection
|
||||
self.extractTags(job, tagsection)
|
||||
imgurl = self.extractInfo(job, infosection)
|
||||
return imgurl
|
||||
|
||||
|
||||
|
||||
def fetchImage(self, job, url, srcurl):
|
||||
url = urllib.parse.urljoin(srcurl, url)
|
||||
fname = url.split("/")[-1]
|
||||
|
||||
|
||||
cont = self.wg.getpage(url, addlHeaders={'Referer':srcurl})
|
||||
|
||||
fpath = self.saveFile(job, fname, cont)
|
||||
self.log.info("Saved file to path: '%s'", fpath)
|
||||
|
||||
job.filename = fname
|
||||
job.filepath = fpath
|
||||
job.dlstate = 2
|
||||
db.session.commit()
|
||||
# print(fname)
|
||||
|
||||
def processJob(self, job):
|
||||
pageurl = 'https://danbooru.donmai.us/posts/{}'.format(job.postid)
|
||||
try:
|
||||
soup = self.wg.getSoup(pageurl)
|
||||
except urllib.error.URLError:
|
||||
job.dlstate=-1
|
||||
db.session.commit()
|
||||
return
|
||||
|
||||
text = soup.get_text()
|
||||
if 'You need a gold account to see this image.' in text:
|
||||
job.dlstate=-3
|
||||
db.session.commit()
|
||||
return
|
||||
if 'This post was deleted for the following reasons' in text:
|
||||
job.dlstate=-4
|
||||
db.session.commit()
|
||||
return
|
||||
err = 0
|
||||
while err < 5:
|
||||
try:
|
||||
imgurl = self.extractMeta(job, soup)
|
||||
if imgurl:
|
||||
self.fetchImage(job, imgurl, pageurl)
|
||||
else:
|
||||
self.log.info("No image found for URL: '%s'", pageurl)
|
||||
job.dlstate=-5
|
||||
break
|
||||
except sqlalchemy.exc.IntegrityError:
|
||||
err += 1
|
||||
db.session.rollback()
|
||||
|
||||
|
||||
|
||||
def retreiveItem(self):
|
||||
job = self.get_job()
|
||||
if not job:
|
||||
return False
|
||||
|
||||
self.processJob(job)
|
||||
return True
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def run(indice):
|
||||
print("Runner {}!".format(indice))
|
||||
fetcher = DanbooruFetcher()
|
||||
remainingTasks = True
|
||||
|
||||
try:
|
||||
while remainingTasks and runstate.run:
|
||||
remainingTasks = fetcher.retreiveItem()
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
except:
|
||||
print("Unhandled exception!")
|
||||
traceback.print_exc()
|
||||
raise
|
41
database.py
41
database.py
|
@ -1,4 +1,5 @@
|
|||
|
||||
import settings
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
@ -60,6 +61,13 @@ db_artist_link = Table(
|
|||
PrimaryKeyConstraint('releases_id', 'type_id')
|
||||
)
|
||||
|
||||
db_file_link = Table(
|
||||
'db_file_link', Base.metadata,
|
||||
Column('releases_id', Integer, ForeignKey('db_releases.id'), nullable=False),
|
||||
Column('file_id', Integer, ForeignKey('db_files.id'), nullable=False),
|
||||
PrimaryKeyConstraint('releases_id', 'file_id')
|
||||
)
|
||||
|
||||
class RawPages(Base):
|
||||
__tablename__ = 'db_raw_pages'
|
||||
id = Column(Integer, primary_key = True)
|
||||
|
@ -99,6 +107,18 @@ class Artist(Base):
|
|||
)
|
||||
|
||||
|
||||
class Files(Base):
|
||||
__tablename__ = 'db_files'
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
filepath = Column(citext.CIText(), nullable=False)
|
||||
fhash = Column(Text, nullable=False)
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint('filepath'),
|
||||
UniqueConstraint('fhash'),
|
||||
)
|
||||
|
||||
def tag_creator(tag):
|
||||
|
||||
tmp = session.query(Tags) \
|
||||
|
@ -126,6 +146,22 @@ def artist_creator(artist):
|
|||
return Artist(artist=artist)
|
||||
|
||||
|
||||
def file_creator(filetups):
|
||||
filepath, fhash = filetups
|
||||
|
||||
# We only care about uniqueness WRT hashes.
|
||||
tmp = session.query(Files) \
|
||||
.filter(Files.fhash == fhash) \
|
||||
.scalar()
|
||||
if tmp:
|
||||
return tmp
|
||||
|
||||
# Remove the absolute path (if needed)
|
||||
if settings.storeDir in filepath:
|
||||
filepath = filepath[len(settings.storeDir):]
|
||||
return Files(filepath=filepath, fhash=fhash)
|
||||
|
||||
|
||||
class Releases(Base):
|
||||
__tablename__ = 'db_releases'
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
@ -145,19 +181,20 @@ class Releases(Base):
|
|||
res_x = Column(Integer)
|
||||
res_y = Column(Integer)
|
||||
|
||||
filename = Column(Text)
|
||||
filepath = Column(Text)
|
||||
|
||||
status = Column(Text)
|
||||
rating = Column(Text)
|
||||
|
||||
|
||||
tags_rel = relationship('Tags', secondary=lambda: db_tags_link)
|
||||
character_rel = relationship('Characters', secondary=lambda: db_chars_link)
|
||||
artist_rel = relationship('Artist', secondary=lambda: db_artist_link)
|
||||
file_rel = relationship('Files', secondary=lambda: db_file_link)
|
||||
|
||||
tags = association_proxy('tags_rel', 'tag', creator=tag_creator)
|
||||
character = association_proxy('character_rel', 'character', creator=character_creator)
|
||||
artist = association_proxy('artist_rel', 'artist', creator=artist_creator)
|
||||
file = association_proxy('file_rel', 'files', creator=file_creator)
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint('postid'),
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
|
||||
import database as db
|
||||
import webFunctions
|
||||
import logging
|
||||
import traceback
|
||||
import sqlalchemy.exc
|
||||
import runstate
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import re
|
||||
import parsedatetime
|
||||
import hashlib
|
||||
import os
|
||||
import settings
|
||||
import os.path
|
||||
import time
|
||||
import datetime
|
||||
|
||||
import abc
|
||||
|
||||
class AbstractFetcher(object, metaclass=abc.ABCMeta):
|
||||
|
||||
@abc.abstractproperty
|
||||
def loggerpath(self):
|
||||
pass
|
||||
|
||||
@abc.abstractproperty
|
||||
def pluginkey(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def processJob(self, job):
|
||||
pass
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.log = logging.getLogger(self.loggerpath)
|
||||
self.wg = webFunctions.WebGetRobust()
|
||||
|
||||
# db.session = db.Session()
|
||||
|
||||
def get_job(self):
|
||||
while 1:
|
||||
try:
|
||||
job = db.session.query(db.Releases) \
|
||||
.filter(db.Releases.dlstate == 0) \
|
||||
.filter(db.Releases.source == self.pluginkey) \
|
||||
.order_by(db.Releases.postid) \
|
||||
.limit(1) \
|
||||
.one()
|
||||
if job == None:
|
||||
return None
|
||||
job.dlstate = 1
|
||||
db.session.commit()
|
||||
return job
|
||||
except sqlalchemy.exc.DatabaseError:
|
||||
self.log.warning("Error when getting job. Probably a concurrency issue.")
|
||||
self.log.warning("Trying again.")
|
||||
for line in traceback.format_exc().split("\n"):
|
||||
self.log.warning(line)
|
||||
db.session.rollback()
|
||||
|
||||
|
||||
def saveFile(self, row, filename, fileCont):
|
||||
if not os.path.exists(settings.storeDir):
|
||||
self.log.warn("Cache directory for book items did not exist. Creating")
|
||||
self.log.warn("Directory at path '%s'", settings.storeDir)
|
||||
os.makedirs(settings.storeDir)
|
||||
|
||||
|
||||
|
||||
ext = os.path.splitext(filename)[-1]
|
||||
fhash = hashlib.md5(fileCont).hexdigest()
|
||||
|
||||
ext = ext.lower()
|
||||
fhash = fhash.upper()
|
||||
|
||||
# use the first 3 chars of the hash for the folder name.
|
||||
# Since it's hex-encoded, that gives us a max of 2^12 bits of
|
||||
# directories, or 4096 dirs.
|
||||
dirName = fhash[:3]
|
||||
|
||||
dirPath = os.path.join(settings.storeDir, dirName)
|
||||
if not os.path.exists(dirPath):
|
||||
os.makedirs(dirPath)
|
||||
|
||||
|
||||
# The "." is part of the ext.
|
||||
filename = '{filename}{ext}'.format(filename=fhash, ext=ext)
|
||||
|
||||
fqpath = os.path.join(dirPath, filename)
|
||||
fqpath = os.path.abspath(fqpath)
|
||||
if not fqpath.startswith(settings.storeDir):
|
||||
raise ValueError("Generating the file path to save a cover produced a path that did not include the storage directory?")
|
||||
|
||||
locpath = fqpath[len(settings.storeDir):]
|
||||
|
||||
row.file.append((locpath, fhash))
|
||||
|
||||
|
||||
with open(fqpath, "wb") as fp:
|
||||
fp.write(fileCont)
|
||||
|
||||
return locpath
|
||||
|
||||
|
||||
def retreiveItem(self):
|
||||
job = self.get_job()
|
||||
if not job:
|
||||
return False
|
||||
|
||||
self.processJob(job)
|
||||
return True
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def run(indice):
|
||||
print("Runner {}!".format(indice))
|
||||
fetcher = DanbooruFetcher()
|
||||
remainingTasks = True
|
||||
|
||||
try:
|
||||
while remainingTasks and runstate.run:
|
||||
remainingTasks = fetcher.retreiveItem()
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
except:
|
||||
print("Unhandled exception!")
|
||||
traceback.print_exc()
|
||||
raise
|
8
main.py
8
main.py
|
@ -3,14 +3,14 @@ import database as db
|
|||
import logSetup
|
||||
logSetup.initLogging()
|
||||
|
||||
import fetcher
|
||||
import danbooruFetch
|
||||
import runstate
|
||||
import concurrent.futures
|
||||
|
||||
THREADS = 1
|
||||
THREADS = 25
|
||||
|
||||
def insertStartingPoints():
|
||||
def insertDanbooruStartingPoints():
|
||||
|
||||
tmp = db.session.query(db.Releases) \
|
||||
.filter(db.Releases.postid == 1) \
|
||||
|
@ -34,14 +34,14 @@ def resetDlstate():
|
|||
|
||||
|
||||
def go():
|
||||
insertStartingPoints()
|
||||
insertDanbooruStartingPoints()
|
||||
resetDlstate()
|
||||
|
||||
executor = concurrent.futures.ThreadPoolExecutor(max_workers=THREADS)
|
||||
try:
|
||||
# for x in range(2):
|
||||
for x in range(THREADS):
|
||||
executor.submit(fetcher.run, x)
|
||||
executor.submit(danbooruFetch.run, x)
|
||||
executor.shutdown()
|
||||
except KeyboardInterrupt:
|
||||
print("Waiting for executor.")
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
|
||||
import database as db
|
||||
import logSetup
|
||||
logSetup.initLogging()
|
||||
|
||||
import danbooruFetch
|
||||
import runstate
|
||||
import settings
|
||||
import concurrent.futures
|
||||
|
||||
import os
|
||||
import os.path
|
||||
import hashlib
|
||||
|
||||
THREADS = 1
|
||||
THREADS = 25
|
||||
|
||||
def insertDanbooruStartingPoints():
|
||||
|
||||
tmp = db.session.query(db.Releases) \
|
||||
.filter(db.Releases.postid == 1) \
|
||||
.count()
|
||||
if not tmp:
|
||||
for x in range(2070000):
|
||||
new = db.Releases(dlstate=0, postid=x, source='Danbooru')
|
||||
db.session.add(new)
|
||||
if x % 10000 == 0:
|
||||
print("Loop ", x, "flushing...")
|
||||
db.session.flush()
|
||||
print("Flushed.")
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def resetDlstate():
|
||||
tmp = db.session.query(db.Releases) \
|
||||
.filter(db.Releases.dlstate == 1) \
|
||||
.update({db.Releases.dlstate : 0})
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def go():
|
||||
have = db.session.query(db.Releases) \
|
||||
.filter(db.Releases.filepath != None) \
|
||||
.all()
|
||||
|
||||
proc = 0
|
||||
for row in have:
|
||||
fpath = settings.storeDir+row.filepath
|
||||
fpath = os.path.abspath(fpath)
|
||||
|
||||
with open(fpath, "rb") as fp:
|
||||
cont = fp.read()
|
||||
fhash = hashlib.md5(cont).hexdigest()
|
||||
|
||||
|
||||
print(os.path.exists(fpath), fhash, fpath)
|
||||
|
||||
row.file.append((fpath, fhash))
|
||||
proc += 1
|
||||
if proc % 50 == 0:
|
||||
db.session.commit()
|
||||
if __name__ == '__main__':
|
||||
go()
|
||||
|
|
@ -13,6 +13,6 @@ DATABASE_IP = "10.1.1.8"
|
|||
# This determines the path mask that will be used when deduplicating
|
||||
# hentai items.
|
||||
# If you aren't running the deduper, just specify something basic, like "/"
|
||||
storeDir = r"/media/Storage/H/Danbooru"
|
||||
storeDir = r"/media/Storage/H/Danbooru/"
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue