""" This script loads the stackoverflow data from xml source files into an Rdbhost database. it expects to find the source files in a subdirectory called 'dmp'. 1) edit this file to add the role and authcode 2) run as 'import_so_data.py all' or without parameters for an option display Each database table has an import script and an XML parser class, except Tags and Tagging which use one script/parser pair. The import script, named like 'import_*' creates the table, initializes the parser, and starts parsing. The parser uses the SAX parsing model, converting piecewise (as apposed to the DOM model, which parses the whole thing at once) to minimize the memory requirements. (I don't have a 12GB machine to parse on) The methods of each parsing class override inherited methods; the non-overriding methods are all named with leading underscores. ie: _saveBatch(..) This script takes several hours to run, using the August datadump. """ import sys #; reload(sys) import string from xml.sax import handler, make_parser import re from rdbhdb import rdbhdb as db encoding = "utf-8" #host = 'dev.rdbhost.com' host = 'www.rdbhost.com' role = 's0000000767' #authcode = 'Vcx3------------------------------------Je3qheccjb' authcode = "2Qinz-----------------------------------cjbBYWinuc" conn=db.connect(role, authcode=authcode, host=host) def escape(str): """ \ are special for Python strings *and* for regexps. Hence the multiple escaping. Here,we just replace every \ by \\ for PostgreSQL -- this code originally from Stephane Bortzmeyer Psycopg2 interprets % as indicating a substitution token, so we escape them as '%' -> '%%' """ if str is None: return str assert type(str) is unicode, (type(str),str) str = re.sub("\s", " ", str) str = re.sub("\\\\", "\\\\\\\\", str) str = re.sub("'","''",str) str = re.sub("%","%%",str) str = unicode(str) assert type(str) is unicode, (type(str),str) return str.encode(encoding) def field_fmt(str): """ Escape string, and then format for interpolation into query as value. """ if str is None: return 'NULL' try: i = int(str) return "%i"%i except ValueError, e: return "'%s'"%escape(str) ###### # ## Users table class UsersHandler(handler.ContentHandler): """handles conversion of users.xml file to postgresql INSERT statements """ lines_per_request = 200 def __init__(self, cursor): self.cursor = cursor self.containers = [] self.idx = 0 self.lines = [] def startDocument(self): print '---- Users conversion started ----\n' def endDocument(self): self._saveBatch() print '\n---- Users conversion ended ----\n' def _saveBatch(self): if self.lines: sql = '\n'.join(self.lines) self.cursor.execute(sql) self.lines = [] print self.idx, def _startRow(self,name,attrs): if self.idx % self.lines_per_request == 0: self._saveBatch() id = int(attrs['Id']) reputation = int(attrs['Reputation']) creation = field_fmt(attrs['CreationDate']) name = field_fmt(attrs['DisplayName']) lastaccess = field_fmt(attrs['LastAccessDate']) website = field_fmt(attrs.get('WebsiteUrl')) location = field_fmt(attrs.get('Location')) age = field_fmt(attrs.get('Age')) views = int(attrs['Views']) upvotes = int(attrs['UpVotes']) aboutme = field_fmt(attrs.get('AboutMe')) downvotes = int(attrs['DownVotes']) recin = '''INSERT INTO Users (id,name,reputation,creation, website,age,location,lastaccess, views,upvotes,downvotes,aboutme) VALUES(%i,%s,%i,%s, %s,%s,%s,%s, %i,%i,%i,%s);''' \ %(id,name,reputation,creation, website,age,location,lastaccess, views,upvotes,downvotes,aboutme) self.lines.append(recin) self.idx += 1 assert len(self.containers) < 25 def startElement(self, name, attrs): self.containers.append(name) if name == 'users': pass elif name == 'row': self._startRow(name,attrs) else: raise Exception('tag name [%s] not handled'%name) def endElement(self, name): ctnr = self.containers.pop() assert ctnr == name def import_users(filename): todel = 'Comments', 'Tagging', 'Tags', 'Posts', 'Users' c = conn.cursor() for td in todel: try: c.execute('DROP TABLE '+td) except db.Error, e: if e[0] == '42P01': pass else: raise createq = """ CREATE TABLE Users ( id INTEGER UNIQUE NOT NULL, name TEXT, -- Yes, can be null some times reputation INTEGER NOT NULL, creation TIMESTAMP NOT NULL, website TEXT, age INTEGER, location TEXT, lastaccess TIMESTAMP NOT NULL, views INTEGER NOT NULL, upvotes INTEGER NOT NULL, downvotes INTEGER NOT NULL, aboutme TEXT ); """ c.execute(createq) handler = UsersHandler(c) parser = make_parser() parser.setContentHandler(handler) infile = open(filename,'r') parser.parse(infile) infile.close() ###### # ## Posts table class PostsHandler(handler.ContentHandler): """handles conversion of posts.xml file to postgresql INSERT statements """ lines_per_request = 100 def __init__(self, cursor): self.cursor = cursor self.containers = [] self.idx = 0 self.lines = [] def startDocument(self): print '---- Posts conversion started ----\n' def endDocument(self): self._saveBatch() print '\n---- Posts conversion ended ----\n' def _saveBatch(self,depth=0): try: if self.lines: sql = '\n'.join(self.lines) self.cursor.execute(sql) self.lines = [] print self.idx, except db.InterfaceError, e: if depth > 20: raise self._saveBatch(depth+1) def _startRow(self,name,attrs): if self.idx % self.lines_per_request == 0: self._saveBatch() id = int(attrs['Id']) type = int(attrs['PostTypeId']) parentid = field_fmt(attrs.get('ParentId')) title = field_fmt(attrs.get('Title')) creation = escape(attrs['CreationDate']) owner = field_fmt(attrs.get('OwnerUserId')) acceptedanswer = field_fmt(attrs.get('AcceptedAnswerId')) score = field_fmt(attrs.get('Score')) viewcount = int(attrs['ViewCount']) body = field_fmt(attrs['Body']) lasteditor = field_fmt(attrs.get('LastEditorUserId')) lasteditorname = field_fmt(attrs.get('LastEditorDisplayName')) lasteditdate = field_fmt(attrs.get('LastEditDate')) lastactivitydate = field_fmt(attrs.get('LastActivityDate')) communityowneddate = field_fmt(attrs.get('CommunityOwnedDate')) closeddate = field_fmt(attrs.get('ClosedDate')) tags = field_fmt(attrs.get('Tags')) answercount = field_fmt(attrs.get('AnswerCount')) commentcount = field_fmt(attrs.get('CommentCount')) favoritecount = field_fmt(attrs.get('FavoriteCount')) recin = """ INSERT INTO Posts (id, type, parentid, title, creation, owner, accepted_answer, score, viewcount, body, lasteditor, lasteditorname, lasteditdate, lastactivitydate, communityowneddate, closeddate, tags, answercount, commentcount, favoritecount) VALUES(%s, %s, %s, %s, '%s', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ %(id,type,parentid,title,creation,owner,acceptedanswer, score, viewcount, body, lasteditor, lasteditorname, lasteditdate, lastactivitydate, communityowneddate, closeddate, tags, answercount, commentcount, favoritecount) self.lines.append(recin) self.idx += 1 assert len(self.containers) < 25 #print 'id', id, 'name', name def startElement(self, name, attrs): self.containers.append(name) if name == 'posts': pass elif name == 'row': self._startRow(name,attrs) else: raise Exception('tag name [%s] not handled'%name) def endElement(self, name): ctnr = self.containers.pop() assert ctnr == name def import_posts(filename): delqs = ['DROP TABLE Posts', 'DROP TABLE Comments', 'DROP TABLE Tags', 'DROP TABLE Tagging'] c = conn.cursor() for delq in delqs: try: c.execute(delq) except db.Error: pass createq = """ CREATE TABLE Posts ( id INTEGER UNIQUE NOT NULL, type INTEGER NOT NULL, -- 1 Question, 2 Answer. TODO: uses an ENUM type parentid INTEGER, title TEXT, -- May be NULL if the post is not a Question creation TIMESTAMP NOT NULL, owner INTEGER REFERENCES Users(id), accepted_answer INTEGER, -- No REFERENCES Comments(id) because some posts references -- a non-available comment :-( score INTEGER, viewcount INTEGER NOT NULL, body TEXT, lasteditor INTEGER, lasteditorname VARCHAR(50), lasteditdate TIMESTAMP, lastactivitydate TIMESTAMP, communityowneddate TIMESTAMP, closeddate TIMESTAMP, tags VARCHAR(256), answercount INTEGER, commentcount INTEGER, favoritecount INTEGER -- TODO: score, size of the body, last edition, last editor, etc ); """ c.execute(createq) handler = PostsHandler(c) parser = make_parser() parser.setContentHandler(handler) infile = open(filename,'r') parser.parse(infile) infile.close() ###### # ## Comments table class CommentsHandler(handler.ContentHandler): """handles conversion of comments.xml file to postgresql INSERT statements """ lines_per_request = 200 def __init__(self, cursor): self.cursor = cursor self.containers = [] self.idx = 0 self.lines = [] def startDocument(self): print '---- Comments conversion started ----\n' def endDocument(self): self._saveBatch() print '\n---- Comments conversion ended ----\n' def _saveBatch(self): if self.lines: sql = '\n'.join(self.lines) self.cursor.execute(sql) self.lines = [] print self.idx, def _startRow(self,name,attrs): #lines_per_COPY = 4 if self.idx % self.lines_per_request == 0: self._saveBatch() id = int(attrs['Id']) userid = attrs.get('UserId',None) if userid is None: userid = 'NULL' else: userid = "%i"%int(userid) postid = int(attrs['PostId']) score = attrs.get('Score',None) if score is None: score = 'NULL' else: score = "%i"%int(score) text = escape(attrs['Text']) creation = escape(attrs['CreationDate']) recin = "INSERT INTO Comments (id,postid,owner,creation,score,text)"+\ "VALUES(%i,%i,%s,'%s',%s,'%s');" recin = recin % (id,postid,userid,creation,score,text) self.lines.append(recin) self.idx += 1 assert len(self.containers) < 25 def startElement(self, name, attrs): self.containers.append(name) if name == 'comments': pass elif name == 'row': self._startRow(name,attrs) else: raise Exception('tag name %s not handled'%name) def endElement(self, name): ctnr = self.containers.pop() assert ctnr == name def import_comments(filename): delq = 'DROP TABLE Comments' c = conn.cursor() try: c.execute(delq) except db.Error: pass tabcreate = """ CREATE TABLE Comments ( id INTEGER UNIQUE NOT NULL, postid INTEGER NOT NULL, -- REFERENCES Posts(id), creation TIMESTAMP NOT NULL, owner INTEGER REFERENCES Users(id), text TEXT, score INTEGER -- can be null ); """ c.execute(tabcreate) #outfile = sys.stdout handler = CommentsHandler(c) parser = make_parser() parser.setContentHandler(handler) infile = open(filename,'r') parser.parse(infile) infile.close() ###### # ## Votes table class VotesHandler(handler.ContentHandler): """handles conversion of votes.xml file to postgresql INSERT statements """ lines_per_request = 1000 def __init__(self, cursor): self.cursor = cursor self.containers = [] self.idx = 0 self.lines = [] def startDocument(self): print '---- Votes conversion started ----\n' def endDocument(self): self._saveBatch() print '\n---- Votes conversion ended ----\n' def _saveBatch(self): if self.lines: sql = '\n'.join(self.lines) self.cursor.execute(sql) self.lines = [] print self.idx, def _startRow(self,name,attrs): if self.idx % self.lines_per_request == 0: self._saveBatch() id = field_fmt(attrs['Id']) type = field_fmt(attrs['VoteTypeId']) post = field_fmt(attrs.get('PostId')) creation = field_fmt(attrs['CreationDate']) recin = """ INSERT INTO Votes (id, type, post, creation) VALUES(%s, %s, %s, %s); """ %(id,type,post,creation) self.lines.append(recin) self.idx += 1 assert len(self.containers) < 25 def startElement(self, name, attrs): self.containers.append(name) if name == 'votes': pass elif name == 'row': self._startRow(name,attrs) else: raise Exception('tag name [%s] not handled'%name) def endElement(self, name): ctnr = self.containers.pop() assert ctnr == name def import_votes(filename): delq = 'DROP TABLE Votes' c = conn.cursor() try: c.execute(delq) except db.Error: pass createq = """ CREATE TABLE Votes ( id INTEGER UNIQUE NOT NULL, type INTEGER NOT NULL, -- 2 UpMod, 3 DownMod, etc. TODO: uses an ENUM type post INTEGER NOT NULL, -- No REFERENCES Posts(id) because some votes references -- a non-available post :-( creation DATE NOT NULL); -- Unfortunately not a timestamp """ c.execute(createq) handler = VotesHandler(c) parser = make_parser() parser.setContentHandler(handler) infile = open(filename,'r') parser.parse(infile) infile.close() ###### # ## Tags table #import pprint class TagsHandler(handler.ContentHandler): """handles conversion of posts.xml file to postgresql tag INSERT statements """ lines_per_request = 200 def __init__(self,cursor,skip=None): self.cursor = cursor self.skip = skip self.containers = [] self.idx = 0 self.taghash = {} self.tagnumctr = 0 self.taglines = [] self.tagginglines = [] def startDocument(self): print '\n---- Tags conversion started ----\n' def endDocument(self): self._saveAll() print '\n---- Tags conversion ended ----\n' def _saveAll(self): """Push tag data to tables. """ def saveTag(tag,num): """Saves tag to Tags database""" recin = "INSERT INTO Tags (name, id) VALUES('%s',%s);"%(tag,num) self.taglines.append(recin) def saveTagging(num,postid): """Saves application of tag to post.""" recin = "INSERT INTO Tagging (tag,post) VALUES(%s,%s);"%(num,postid) self.tagginglines.append(recin) def flushTagging(): if self.tagginglines: q = '\n'.join(self.tagginglines) self.cursor.execute(q) self.tagginglines = [] def flushTags(): if self.taglines: q = '\n'.join(self.taglines) self.cursor.execute(q) self.taglines = [] self.idx = 0 print "" for tag in self.taghash: #print 'tag', tag, 'ord', self.taghash[tag][0], 'len', len(self.taghash[tag])-1 ord = self.taghash[tag][0] posts = self.taghash[tag][1:] assert posts, tag saveTag(tag,ord) for post in posts: saveTagging(ord,post) if len(self.tagginglines) > self.lines_per_request: self.idx += len(self.tagginglines) flushTags() flushTagging() print self.idx, self.idx += len(self.tagginglines) flushTags() flushTagging() print self.idx, def _startRow(self,name,attrs): id = int(attrs['Id']) tagfld = escape(attrs.get('Tags')) if tagfld: tagfld = tagfld.strip('<>') tags = tagfld.split('><') for tag in tags: if tag not in self.taghash: self.taghash[tag] = [self.tagnumctr, id] self.tagnumctr += 1 else: self.taghash[tag].append(id) self.idx += 1 assert len(self.containers) < 25 if self.idx % (3*self.lines_per_request) == 0: print self.idx, def startElement(self, name, attrs): self.containers.append(name) if name == 'posts': pass elif name == 'row': self._startRow(name,attrs) else: raise Exception('tag name [%s] not handled'%name) def endElement(self, name): ctnr = self.containers.pop() assert ctnr == name def import_tags(filename,skip=None): c = conn.cursor() if skip is not None: c.execute('DELETE FROM Tags WHERE id >= %i'%skip) c.execute('DELETE FROM Tagging WHERE id >= %i'%skip) else: delq1 = 'DROP TABLE Tags' delq0 = 'DROP TABLE Tagging' try: c.execute(delq0) except db.Error: pass try: c.execute(delq1) except db.Error: pass createq0 = """ CREATE TABLE Tags ( id INTEGER UNIQUE NOT NULL, name text UNIQUE NOT NULL ) """ c.execute(createq0) createq1 = """ CREATE TABLE Tagging ( tag INTEGER NOT NULL REFERENCES Tags(id), post INTEGER NOT NULL REFERENCES Posts(id)); """ c.execute(createq1) handler = TagsHandler(c,skip) parser = make_parser() parser.setContentHandler(handler) infile = open(filename,'r') parser.parse(infile) infile.close() ###### # ## Badges table class BadgeHandler(handler.ContentHandler): """handles conversion of badges.xml file to postgresql INSERT statements """ lines_per_request = 300 def __init__(self, cursor): self.cursor = cursor self.containers = [] self.idx = 0 self.lines = [] def startDocument(self): print '---- Badges conversion started ----\n' def endDocument(self): self._saveBatch() print '\n---- Badges conversion ended ----\n' def _saveBatch(self): if self.lines: sql = '\n'.join(self.lines) self.cursor.execute(sql) self.lines = [] print self.idx, def _startRow(self,name,attrs): if self.idx % self.lines_per_request == 0: self._saveBatch() id = int(attrs['Id']) userid = int(attrs['UserId']) name = escape(attrs['Name']) date = escape(attrs['Date']) recin = '''INSERT INTO Badges (id,userid,name,date) VALUES(%i,%i,'%s','%s');'''%(id,userid,name,date) self.lines.append(recin) self.idx += 1 assert len(self.containers) < 25 def startElement(self, name, attrs): self.containers.append(name) if name == 'badges': pass elif name == 'row': self._startRow(name,attrs) else: raise Exception('tag name %s not handled'%name) def endElement(self, name): ctnr = self.containers.pop() assert ctnr == name def import_badges(filename): #fname = 'dmp/badges.xml' delq = 'DROP TABLE Badges' c = conn.cursor() try: c.execute(delq) except db.Error: pass delcreate = """ CREATE TABLE Badges ( id INTEGER UNIQUE NOT NULL, userid INTEGER NOT NULL, name TEXT, date TIMESTAMP NOT NULL ); """ c.execute(delcreate) #outfile = sys.stdout handler = BadgeHandler(c) parser = make_parser() parser.setContentHandler(handler) infile = open(filename,'r') parser.parse(infile) infile.close() ####### # ## main if __name__ == '__main__': args = sys.argv[1:] if not args: print """ call like: import_so_data.py (args), where args are one or more of: all (everything) votes badges users (users, comments, posts, and tags) comments posts (posts and tags) tags """ else: if 'all' in args: args.extend(['users','votes','badges','comments','posts','tags']) if 'votes' in args: import_votes('dmp/votes.xml') if 'badges' in args: import_badges('dmp/badges.xml') if 'users' in args: args.extend(['comments', 'posts', 'tags']) import_users('dmp/users.xml') if 'posts' in args: args.extend(['tags', 'comments']) import_posts('dmp/posts.xml') if 'comments' in args: import_comments('dmp/comments.xml') if 'tags' in args: import_tags('dmp/posts.xml')