removed synchronization and concurrency in Fulltext class, concurrent

deletions are now handled in ConcurrentUpdateSolrConnector
pull/1/head
Michael Peter Christen 12 years ago
parent f965d04496
commit b24d1d18e4

@ -124,7 +124,7 @@ public class CrawlResults {
if (post.containsKey("deletedomain")) { if (post.containsKey("deletedomain")) {
final String domain = post.get("domain", null); final String domain = post.get("domain", null);
if (domain != null) { if (domain != null) {
sb.index.fulltext().deleteDomainHostname(domain, null, false); sb.index.fulltext().deleteDomainHostname(domain, null);
ResultURLs.deleteDomain(tabletype, domain); ResultURLs.deleteDomain(tabletype, domain);
} }
} }

@ -301,7 +301,7 @@ public class Crawler_p {
siteFilter = CrawlProfile.siteFilter(rootURLs); siteFilter = CrawlProfile.siteFilter(rootURLs);
if (deleteold) { if (deleteold) {
for (DigestURI u: rootURLs) { for (DigestURI u: rootURLs) {
sb.index.fulltext().deleteDomainHashpart(u.hosthash(), deleteageDate, rootURLs.size() > 1); sb.index.fulltext().deleteDomainHashpart(u.hosthash(), deleteageDate);
} }
} }
} else if (subPath) { } else if (subPath) {
@ -310,7 +310,7 @@ public class Crawler_p {
for (DigestURI u: rootURLs) { for (DigestURI u: rootURLs) {
String basepath = u.toNormalform(true); String basepath = u.toNormalform(true);
if (!basepath.endsWith("/")) {int p = basepath.lastIndexOf("/"); if (p > 0) basepath = basepath.substring(0, p + 1);} if (!basepath.endsWith("/")) {int p = basepath.lastIndexOf("/"); if (p > 0) basepath = basepath.substring(0, p + 1);}
int count = sb.index.fulltext().remove(basepath, deleteageDate, rootURLs.size() > 1); int count = sb.index.fulltext().remove(basepath, deleteageDate);
if (count > 0) Log.logInfo("Crawler_p", "deleted " + count + " documents for host " + u.getHost()); if (count > 0) Log.logInfo("Crawler_p", "deleted " + count + " documents for host " + u.getHost());
} }
} }

@ -279,7 +279,7 @@ public class IndexControlURLs_p {
if (post.containsKey("deletedomain")) { if (post.containsKey("deletedomain")) {
final String domain = post.get("domain"); final String domain = post.get("domain");
segment.fulltext().deleteDomainHostname(domain, null, false); segment.fulltext().deleteDomainHostname(domain, null);
// trigger the loading of the table // trigger the loading of the table
post.put("statistics", ""); post.put("statistics", "");
} }

@ -417,12 +417,7 @@ public final class Fulltext {
//Date sdDate = (Date) connector.getFieldById(id, CollectionSchema.last_modified.getSolrFieldName()); //Date sdDate = (Date) connector.getFieldById(id, CollectionSchema.last_modified.getSolrFieldName());
//Date docDate = null; //Date docDate = null;
//if (sdDate == null || (docDate = SchemaConfiguration.getDate(doc, CollectionSchema.last_modified)) == null || sdDate.before(docDate)) { //if (sdDate == null || (docDate = SchemaConfiguration.getDate(doc, CollectionSchema.last_modified)) == null || sdDate.before(docDate)) {
if (this.collectionConfiguration.contains(CollectionSchema.ip_s)) { connector.add(doc);
// ip_s needs a dns lookup which causes blockings during search here
connector.add(doc);
} else synchronized (this.solrInstances) {
connector.add(doc);
}
//} //}
} catch (SolrException e) { } catch (SolrException e) {
throw new IOException(e.getMessage(), e); throw new IOException(e.getMessage(), e);
@ -546,7 +541,7 @@ public final class Fulltext {
* @param freshdate either NULL or a date in the past which is the limit for deletion. Only documents older than this date are deleted * @param freshdate either NULL or a date in the past which is the limit for deletion. Only documents older than this date are deleted
* @throws IOException * @throws IOException
*/ */
public void deleteDomainHashpart(final String hosthash, Date freshdate, boolean concurrent) { public void deleteDomainHashpart(final String hosthash, Date freshdate) {
// first collect all url hashes that belong to the domain // first collect all url hashes that belong to the domain
assert hosthash.length() == 6; assert hosthash.length() == 6;
final String collection1Query = CollectionSchema.host_id_s.getSolrFieldName() + ":\"" + hosthash + "\"" + final String collection1Query = CollectionSchema.host_id_s.getSolrFieldName() + ":\"" + hosthash + "\"" +
@ -559,54 +554,43 @@ public final class Fulltext {
(" AND " + WebgraphSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : (" AND " + WebgraphSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") :
"" ""
); );
Thread t = new Thread() {
public void run() {
// delete in solr
synchronized (Fulltext.this.solrInstances) {
try {Fulltext.this.getDefaultConnector().deleteByQuery(collection1Query);} catch (IOException e) {}
try {Fulltext.this.getWebgraphConnector().deleteByQuery(webgraphQuery);} catch (IOException e) {}
}
// delete in old metadata structure // delete in solr
if (Fulltext.this.urlIndexFile != null) { try {Fulltext.this.getDefaultConnector().deleteByQuery(collection1Query);} catch (IOException e) {}
final ArrayList<String> l = new ArrayList<String>(); try {Fulltext.this.getWebgraphConnector().deleteByQuery(webgraphQuery);} catch (IOException e) {}
synchronized (this) {
CloneableIterator<byte[]> i; // delete in old metadata structure
try { if (Fulltext.this.urlIndexFile != null) {
i = Fulltext.this.urlIndexFile.keys(true, null); final ArrayList<String> l = new ArrayList<String>();
String hash; CloneableIterator<byte[]> i;
while (i != null && i.hasNext()) { try {
hash = ASCII.String(i.next()); i = Fulltext.this.urlIndexFile.keys(true, null);
if (hosthash.equals(hash.substring(6))) l.add(hash); String hash;
} while (i != null && i.hasNext()) {
hash = ASCII.String(i.next());
// then delete the urls using this list if (hosthash.equals(hash.substring(6))) l.add(hash);
for (final String h: l) Fulltext.this.urlIndexFile.delete(ASCII.getBytes(h));
} catch (IOException e) {}
}
} }
// finally remove the line with statistics // then delete the urls using this list
if (Fulltext.this.statsDump != null) { for (final String h: l) Fulltext.this.urlIndexFile.delete(ASCII.getBytes(h));
final Iterator<HostStat> hsi = Fulltext.this.statsDump.iterator(); } catch (IOException e) {}
HostStat hs; }
while (hsi.hasNext()) {
hs = hsi.next(); // finally remove the line with statistics
if (hs.hosthash.equals(hosthash)) { if (Fulltext.this.statsDump != null) {
hsi.remove(); final Iterator<HostStat> hsi = Fulltext.this.statsDump.iterator();
break; HostStat hs;
} while (hsi.hasNext()) {
} hs = hsi.next();
if (hs.hosthash.equals(hosthash)) {
hsi.remove();
break;
} }
} }
};
if (concurrent) t.start(); else {
t.run();
Fulltext.this.commit(true);
} }
} }
public void deleteDomainHostname(final String hostname, Date freshdate, boolean concurrent) { public void deleteDomainHostname(final String hostname, Date freshdate) {
// first collect all url hashes that belong to the domain // first collect all url hashes that belong to the domain
final String collectionQuery = final String collectionQuery =
CollectionSchema.host_s.getSolrFieldName() + ":\"" + hostname + "\"" + CollectionSchema.host_s.getSolrFieldName() + ":\"" + hostname + "\"" +
@ -615,35 +599,27 @@ public final class Fulltext {
"" ""
); );
final String webgraphQuery = final String webgraphQuery =
WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + hostname + "\"" + WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + hostname + "\"" +
((freshdate != null && freshdate.before(new Date())) ? ((freshdate != null && freshdate.before(new Date())) ?
(" AND " + WebgraphSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : (" AND " + WebgraphSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") :
"" ""
); );
Thread t = new Thread() {
public void run() { // delete in solr
// delete in solr try {Fulltext.this.getDefaultConnector().deleteByQuery(collectionQuery);} catch (IOException e) {}
synchronized (Fulltext.this.solrInstances) { try {Fulltext.this.getWebgraphConnector().deleteByQuery(webgraphQuery);} catch (IOException e) {}
try {Fulltext.this.getDefaultConnector().deleteByQuery(collectionQuery);} catch (IOException e) {}
try {Fulltext.this.getWebgraphConnector().deleteByQuery(webgraphQuery);} catch (IOException e) {} // finally remove the line with statistics
} if (Fulltext.this.statsDump != null) {
// finally remove the line with statistics final Iterator<HostStat> hsi = Fulltext.this.statsDump.iterator();
if (Fulltext.this.statsDump != null) { HostStat hs;
final Iterator<HostStat> hsi = Fulltext.this.statsDump.iterator(); while (hsi.hasNext()) {
HostStat hs; hs = hsi.next();
while (hsi.hasNext()) { if (hs.hostname.equals(hostname)) {
hs = hsi.next(); hsi.remove();
if (hs.hostname.equals(hostname)) { break;
hsi.remove();
break;
}
}
} }
} }
};
if (concurrent) t.start(); else {
t.run();
Fulltext.this.commit(true);
} }
} }
@ -653,30 +629,25 @@ public final class Fulltext {
* @param freshdate either NULL or a date in the past which is the limit for deletion. Only documents older than this date are deleted * @param freshdate either NULL or a date in the past which is the limit for deletion. Only documents older than this date are deleted
* @param concurrently if true, then the method returnes immediately and runs concurrently * @param concurrently if true, then the method returnes immediately and runs concurrently
*/ */
public int remove(final String basepath, Date freshdate, final boolean concurrently) { public int remove(final String basepath, Date freshdate) {
DigestURI uri; DigestURI uri;
try {uri = new DigestURI(basepath);} catch (MalformedURLException e) {return 0;} try {uri = new DigestURI(basepath);} catch (MalformedURLException e) {return 0;}
final String host = uri.getHost(); final String host = uri.getHost();
final String collectionQuery = CollectionSchema.host_s.getSolrFieldName() + ":\"" + host + "\"" + final String collectionQuery = CollectionSchema.host_s.getSolrFieldName() + ":\"" + host + "\"" +
((freshdate != null && freshdate.before(new Date())) ? (" AND " + CollectionSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : ""); ((freshdate != null && freshdate.before(new Date())) ? (" AND " + CollectionSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : "");
final AtomicInteger count = new AtomicInteger(0); final AtomicInteger count = new AtomicInteger(0);
Thread t = new Thread(){ final BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, 0, 1000000, 600000, -1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
public void run() { try {
final BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, 0, 1000000, 600000, -1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); SolrDocument doc;
try { while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
SolrDocument doc; String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName());
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { if (u.startsWith(basepath)) {
String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()); remove(ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName())));
if (u.startsWith(basepath)) { count.incrementAndGet();
remove(ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))); }
count.incrementAndGet();
}
}
if (count.get() > 0) Fulltext.this.commit(true);
} catch (InterruptedException e) {}
} }
}; if (count.get() > 0) Fulltext.this.commit(true);
if (concurrently) t.start(); else t.run(); } catch (InterruptedException e) {}
return count.get(); return count.get();
} }
@ -688,11 +659,8 @@ public final class Fulltext {
public void remove(final Collection<String> deleteIDs) { public void remove(final Collection<String> deleteIDs) {
if (deleteIDs == null || deleteIDs.size() == 0) return; if (deleteIDs == null || deleteIDs.size() == 0) return;
try { try {
synchronized (Fulltext.this.solrInstances) { this.getDefaultConnector().deleteByIds(deleteIDs);
this.getDefaultConnector().deleteByIds(deleteIDs); this.getWebgraphConnector().deleteByIds(deleteIDs);
this.getWebgraphConnector().deleteByIds(deleteIDs);
this.commit(true);
}
} catch (final Throwable e) { } catch (final Throwable e) {
Log.logException(e); Log.logException(e);
} }
@ -708,10 +676,8 @@ public final class Fulltext {
if (urlHash == null) return false; if (urlHash == null) return false;
try { try {
String id = ASCII.String(urlHash); String id = ASCII.String(urlHash);
synchronized (this.solrInstances) { this.getDefaultConnector().deleteById(id);
this.getDefaultConnector().deleteById(id); this.getWebgraphConnector().deleteById(id);
this.getWebgraphConnector().deleteById(id);
}
} catch (final Throwable e) { } catch (final Throwable e) {
Log.logException(e); Log.logException(e);
} }

Loading…
Cancel
Save