diff --git a/changesetmd.py b/changesetmd.py index 18d2e25..0b22453 100755 --- a/changesetmd.py +++ b/changesetmd.py @@ -50,23 +50,28 @@ def createTables(self, connection): cursor.execute(queries.createGeometryColumn) connection.commit() - def insertNew(self, connection, id, userId, createdAt, minLat, maxLat, minLon, maxLon, closedAt, open, numChanges, userName, tags, comments): + def insertNewBatch(self, connection, data_arr): cursor = connection.cursor() if self.createGeometry: - cursor.execute('''INSERT into osm_changeset + sql = '''INSERT into osm_changeset (id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, open, num_changes, user_name, tags, geom) - values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,ST_SetSRID(ST_MakeEnvelope(%s,%s,%s,%s), 4326))''', - (id, userId, createdAt, minLat, maxLat, minLon, maxLon, closedAt, open, numChanges, userName, tags, minLon, minLat, maxLon, maxLat)) + values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,ST_SetSRID(ST_MakeEnvelope(%s,%s,%s,%s), 4326))''' + psycopg2.extras.execute_batch(cursor, sql, data_arr) + cursor.close() else: - cursor.execute('''INSERT into osm_changeset + sql = '''INSERT into osm_changeset (id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, open, num_changes, user_name, tags) - values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''', - (id, userId, createdAt, minLat, maxLat, minLon, maxLon, closedAt, open, numChanges, userName, tags)) - for comment in comments: - cursor.execute('''INSERT into osm_changeset_comment + values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' + psycopg2.extras.execute_batch(cursor, sql, data_arr) + cursor.close() + + def insertNewBatchComment(self, connection, comment_arr): + cursor=connection.cursor() + sql = '''INSERT into osm_changeset_comment (comment_changeset_id, comment_user_id, comment_user_name, comment_date, comment_text) - values (%s,%s,%s,%s,%s)''', - (id, comment['uid'], comment['user'], comment['date'], comment['text'])) + values (%s,%s,%s,%s,%s)''' + psycopg2.extras.execute_batch(cursor, sql, comment_arr) + cursor.close() def deleteExisting(self, connection, id): cursor = connection.cursor() @@ -81,6 +86,8 @@ def parseFile(self, connection, changesetFile, doReplication): cursor = connection.cursor() context = etree.iterparse(changesetFile) action, root = context.next() + changesets = [] + comments = [] for action, elem in context: if(elem.tag != 'changeset'): continue @@ -91,28 +98,31 @@ def parseFile(self, connection, changesetFile, doReplication): for tag in elem.iterchildren(tag='tag'): tags[tag.attrib['k']] = tag.attrib['v'] - comments = [] for discussion in elem.iterchildren(tag='discussion'): for commentElement in discussion.iterchildren(tag='comment'): - comment = dict() - comment['uid'] = commentElement.attrib.get('uid') - comment['user'] = commentElement.attrib.get('user') - comment['date'] = commentElement.attrib.get('date') for text in commentElement.iterchildren(tag='text'): - comment['text'] = text.text + text = text.text + comment = (elem.attrib['id'], commentElement.attrib.get('uid'), commentElement.attrib.get('user'), commentElement.attrib.get('date'), text) comments.append(comment) if(doReplication): self.deleteExisting(connection, elem.attrib['id']) - self.insertNew(connection, elem.attrib['id'], elem.attrib.get('uid', None), - elem.attrib['created_at'], elem.attrib.get('min_lat', None), - elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), - elem.attrib.get('max_lon', None),elem.attrib.get('closed_at', None), - elem.attrib.get('open', None), elem.attrib.get('num_changes', None), - elem.attrib.get('user', None), tags, comments) - - if((parsedCount % 10000) == 0): + if self.createGeometry: + changesets.append((elem.attrib['id'], elem.attrib.get('uid', None), elem.attrib['created_at'], elem.attrib.get('min_lat', None), + elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), elem.attrib.get('max_lon', None), elem.attrib.get('closed_at', None), + elem.attrib.get('open', None), elem.attrib.get('num_changes', None), elem.attrib.get('user', None), tags,elem.attrib.get('min_lon', None), elem.attrib.get('min_lat', None), + elem.attrib.get('max_lon', None), elem.attrib.get('max_lat', None))) + else: + changesets.append((elem.attrib['id'], elem.attrib.get('uid', None), elem.attrib['created_at'], elem.attrib.get('min_lat', None), + elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), elem.attrib.get('max_lon', None), elem.attrib.get('closed_at', None), + elem.attrib.get('open', None), elem.attrib.get('num_changes', None), elem.attrib.get('user', None), tags)) + + if((parsedCount % 100000) == 0): + self.insertNewBatch(connection, changesets) + self.insertNewBatchComment(connection, comments ) + changesets = [] + comments = [] print "parsed %s" % ('{:,}'.format(parsedCount)) print "cumulative rate: %s/sec" % '{:,.0f}'.format(parsedCount/timedelta.total_seconds(datetime.now() - startTime)) @@ -120,6 +130,9 @@ def parseFile(self, connection, changesetFile, doReplication): elem.clear() while elem.getprevious() is not None: del elem.getparent()[0] + # Update whatever is left, then commit + self.insertNewBatch(connection, changesets) + self.insertNewBatchComment(connection, comments) connection.commit() print "parsing complete" print "parsed {:,}".format(parsedCount)