From 9971e197e00e96544e4ed3b96d1b04416955fc28 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Mon, 15 Dec 2014 23:32:46 +0100 Subject: [PATCH] Added a transaction interface to the snapshots: all documents in the snapshots can now be processed with transactions using commit and rollback commands. Furthermore, a large number of monitoring methods had been added to check the success of transactions. The transactions for snapshots have two main components: a rss search API to get information about latest/oldest entries and a commit/rollback API to move entries away from the rss results. This is done by usage of two storage locations for the snapshots, INVENTORY and ARCHIVE. New snapshots are placed to INVENTORY, commited snapshots move to ARCHIVE, rollback snapshots move to INVENTORY again. Normal Workflow: Beside all these options below, usually it is sufficient to process data like this: - call http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=LATESTFIRST - process the rss result and use the value as (see next command) - for each processed result call http://localhost:8090/api/snapshot.json?command=commit&urlhash= - then you can call the rss feed again and the commited urls are omited from the next set of items. These are the commands to control this: The rss feed: http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=LATESTFIRST http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=OLDESTFIRST http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=ANY http://localhost:8090/api/snapshot.rss?state=ARCHIVE&order=LATESTFIRST http://localhost:8090/api/snapshot.rss?state=ARCHIVE&order=OLDESTFIRST http://localhost:8090/api/snapshot.rss?state=ARCHIVE&order=LATESTFIRST The feed will return a in the - field of the rss. This must be used for commit/rollback: Commit/Rollback: http://localhost:8090/api/snapshot.json?command=commit&urlhash= http://localhost:8090/api/snapshot.json?command=rollback&urlhash= The json will return a property list containing the property "result" with possible values "success" or "fail", according of the result. If an "fail" occurs, please look into the log for further info. Monitoring: http://localhost:8090/api/snapshot.json?command=status This shows the total number of entries in the INVENTORY and the ARCHIVE http://localhost:8090/api/snapshot.json?command=list This will result a list of all hosts which have snapshots and the number of entries for the hosts. Counts for INVENTORY and ARCHIVE are listed in the porperties for "count.INVENTORY" and "count.ARCHIVE" http://localhost:8090/api/snapshot.json?command=list&depth=2 The list can be restricted to such which have a specific depth. The list contains then the same host names, but the count values change because only documents at that specific crawl depth are listed http://localhost:8090/api/snapshot.json?command=list&host=yacy.net.80 This lists all urlhashes for the given host, not only an accumulated list of the number of entries http://localhost:8090/api/snapshot.json?command=list&host=yacy.net.80&depth=0 This restricts the list of urlhashes for that host for the given depth http://localhost:8090/api/snapshot.json?command=list&state=INVENTORY http://localhost:8090/api/snapshot.json?command=list&state=ARCHIVE This selects either the INVENTORY or ARCHIVE for all list commands, default is ALL which means that from both snapshot directories the host information is collected and combined. You can use the state option for all the commands as listed above Detailed Information: http://localhost:8090/api/snapshot.json?command=metadata&urlhash=upiFJ7Fh1hyQ This collects metadata information for the given urlhash. This can also be restricted with state=INVENTORY and state=ARCHIVE to test if the document is either in one of these snapshot directories. If an urlhash is not found, an empty result is returned. If an entry was found and the state was not restricted, then the result contains a state property containing the name of the location where the document is, either INVENTORY or ARCHIVE. Hint: If a very large number of documents is inside of INVENTORY, then it could be better to call the rss feed with http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=ANY because that is very efficient. --- htroot/api/snapshot.java | 127 +++++++- source/net/yacy/crawler/data/Snapshots.java | 275 ++++++++++++++---- .../net/yacy/crawler/data/Transactions.java | 149 +++++++++- 3 files changed, 472 insertions(+), 79 deletions(-) diff --git a/htroot/api/snapshot.java b/htroot/api/snapshot.java index 789acc476..f347aa4ad 100644 --- a/htroot/api/snapshot.java +++ b/htroot/api/snapshot.java @@ -26,8 +26,8 @@ import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.util.Collection; -import java.util.Date; import java.util.Map; +import java.util.TreeMap; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; @@ -41,8 +41,11 @@ import net.yacy.cora.protocol.ClientIdentification; import net.yacy.cora.protocol.RequestHeader; import net.yacy.cora.util.ConcurrentLog; import net.yacy.cora.util.Html2Image; +import net.yacy.cora.util.JSONException; +import net.yacy.cora.util.JSONObject; import net.yacy.crawler.data.Snapshots; import net.yacy.crawler.data.Transactions; +import net.yacy.crawler.data.Snapshots.Revisions; import net.yacy.document.ImageParser; import net.yacy.kelondro.util.FileUtils; import net.yacy.search.Switchboard; @@ -64,6 +67,7 @@ public class snapshot { final boolean authenticated = sb.adminAuthenticated(header) >= 2; final String ext = header.get("EXT", ""); + if (ext.equals("rss")) { // create a report about the content of the snapshot directory if (!authenticated) return null; @@ -75,16 +79,16 @@ public class snapshot { String statex = post == null ? Transactions.State.INVENTORY.name() : post.get("state", Transactions.State.INVENTORY.name()); Transactions.State state = Transactions.State.valueOf(statex); String host = post == null ? null : post.get("host"); - Map iddate = Transactions.select(host, depth, order, maxcount, state); + Map iddate = Transactions.select(host, depth, order, maxcount, state); // now select the URL from the index for these ids in iddate and make an RSS feed RSSFeed rssfeed = new RSSFeed(Integer.MAX_VALUE); rssfeed.setChannel(new RSSMessage("Snapshot list for host = " + host + ", depth = " + depth + ", order = " + order + ", maxcount = " + maxcount, "", "")); - for (Map.Entry e: iddate.entrySet()) { + for (Map.Entry e: iddate.entrySet()) { try { - DigestURL u = sb.index.fulltext().getURL(e.getKey()); + DigestURL u = e.getValue().url == null ? sb.index.fulltext().getURL(e.getKey()) : new DigestURL(e.getValue().url); if (u == null) continue; RSSMessage message = new RSSMessage(u.toNormalform(true), "", u, e.getKey()); - message.setPubDate(e.getValue()); + message.setPubDate(e.getValue().dates[0]); rssfeed.addMessage(message); } catch (IOException ee) { ConcurrentLog.logException(ee); @@ -94,6 +98,7 @@ public class snapshot { return new ByteArrayInputStream(rssBinary); } + // for the following methods we (mostly) need an url or a url hash if (post == null) return null; final boolean xml = ext.equals("xml"); final boolean pdf = ext.equals("pdf"); @@ -101,24 +106,128 @@ public class snapshot { final boolean pngjpg = ext.equals("png") || ext.equals("jpg"); String urlhash = post.get("urlhash", ""); String url = post.get("url", ""); - if (url.length() == 0 && urlhash.length() == 0) return null; DigestURL durl = null; - if (urlhash.length() == 0) { + if (urlhash.length() == 0 && url.length() > 0) { try { durl = new DigestURL(url); urlhash = ASCII.String(durl.hash()); } catch (MalformedURLException e) { } } - if (durl == null) { + if (durl == null && urlhash.length() > 0) { try { durl = sb.index.fulltext().getURL(urlhash); } catch (IOException e) { ConcurrentLog.logException(e); } } + if (url.length() == 0 && durl != null) url = durl.toNormalform(true); + + if (ext.equals("json")) { + // command interface: view and change a transaction state, get metadata about transactions in the past + String command = post.get("command", "metadata"); + String statename = post.get("state"); + JSONObject result = new JSONObject(); + try { + if (command.equals("status")) { + // return a status of the transaction archive + JSONObject sizes = new JSONObject(); + for (Map.Entry state: Transactions.sizes().entrySet()) sizes.put(state.getKey(), state.getValue()); + result.put("size", sizes); + } else if (command.equals("list")) { + if (!authenticated) return null; + // return a status of the transaction archive + String host = post.get("host"); + String depth = post.get("depth"); + for (Transactions.State state: statename == null ? + new Transactions.State[]{Transactions.State.INVENTORY, Transactions.State.ARCHIVE} : + new Transactions.State[]{Transactions.State.valueOf(statename)}) { + if (host == null) { + JSONObject hostCountInventory = new JSONObject(); + for (String h: Transactions.listHosts(state)) { + hostCountInventory.put(h, Transactions.listIDs(state, h).size()); + } + result.put("count." + state.name(), hostCountInventory); + } else { + TreeMap> ids = Transactions.listIDs(state, host); + if (ids == null) { + result.put("error", "no host " + host + " found"); + } else { + for (Map.Entry> entry: ids.entrySet()) { + if (depth != null && Integer.parseInt(depth) != entry.getKey()) continue; + for (Revisions r: entry.getValue()) { + try { + JSONObject metadata = new JSONObject(); + DigestURL u = r.url != null ? new DigestURL(r.url) : sb.index.fulltext().getURL(r.urlhash); + metadata.put("url", u == null ? "unknown" : u.toNormalform(true)); + metadata.put("dates", r.dates); + assert r.depth == entry.getKey().intValue(); + metadata.put("depth", entry.getKey().intValue()); + result.put(r.urlhash, metadata); + } catch (IOException e) {} + } + } + } + } + } + } else if (command.equals("commit")) { + if (!authenticated) return null; + Revisions r = Transactions.commit(urlhash); + if (r != null) { + result.put("result", "success"); + result.put("depth", r.depth); + result.put("url", r.url); + result.put("dates", r.dates); + } else { + result.put("result", "fail"); + } + result.put("urlhash", urlhash); + } else if (command.equals("rollback")) { + if (!authenticated) return null; + Revisions r = Transactions.rollback(urlhash); + if (r != null) { + result.put("result", "success"); + result.put("depth", r.depth); + result.put("url", r.url); + result.put("dates", r.dates); + } else { + result.put("result", "fail"); + } + result.put("urlhash", urlhash); + } else if (command.equals("metadata")) { + try { + Revisions r; + Transactions.State state = statename == null || statename.length() == 0 ? null : Transactions.State.valueOf(statename); + if (state == null) { + r = Transactions.getRevisions(Transactions.State.INVENTORY, urlhash); + if (r != null) state = Transactions.State.INVENTORY; + r = Transactions.getRevisions(Transactions.State.ARCHIVE, urlhash); + if (r != null) state = Transactions.State.ARCHIVE; + } else { + r = Transactions.getRevisions(state, urlhash); + } + if (r != null) { + JSONObject metadata = new JSONObject(); + DigestURL u; + u = r.url != null ? new DigestURL(r.url) : sb.index.fulltext().getURL(r.urlhash); + metadata.put("url", u == null ? "unknown" : u.toNormalform(true)); + metadata.put("dates", r.dates); + metadata.put("depth", r.depth); + metadata.put("state", state.name()); + result.put(r.urlhash, metadata); + } + } catch (IOException |IllegalArgumentException e) {} + } + } catch (JSONException e) { + ConcurrentLog.logException(e); + } + String json = result.toString(); + if (post.containsKey("callback")) json = post.get("callback") + "([" + json + "]);"; + return new ByteArrayInputStream(UTF8.getBytes(json)); + } + + // for the following methods we always need the durl to fetch data if (durl == null) return null; - url = durl.toNormalform(true); if (xml) { Collection xmlSnapshots = Transactions.findPaths(durl, "xml", Transactions.State.ANY); diff --git a/source/net/yacy/crawler/data/Snapshots.java b/source/net/yacy/crawler/data/Snapshots.java index c54548318..f2ba55c0a 100644 --- a/source/net/yacy/crawler/data/Snapshots.java +++ b/source/net/yacy/crawler/data/Snapshots.java @@ -20,15 +20,21 @@ package net.yacy.crawler.data; +import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.text.ParseException; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -60,17 +66,17 @@ public class Snapshots { private File storageLocation; - private Map>> directory; // a TreeMap for each domain where the key is the depth and the value is a Set containing a key/urlhash id to get all files into a specific order to provide a recent view on the documents + private Map>> directory; // a TreeMap for each domain (host.port) where the key is the depth and the value is a Set containing a key/urlhash id to get all files into a specific order to provide a recent view on the documents - public Snapshots(File location) { + public Snapshots(final File location) { this.storageLocation = location; this.storageLocation.mkdirs(); // scan the location to fill the directory this.directory = new HashMap<>(); - for (String domain: location.list()) { + for (String hostport: location.list()) { TreeMap> domaindepth = new TreeMap<>(); - this.directory.put(domain, domaindepth); - File domaindir = new File(location, domain); + this.directory.put(hostport, domaindepth); + File domaindir = new File(location, hostport); if (domaindir.isDirectory()) domainscan: for (String depth: domaindir.list()) { TreeSet dateid = new TreeSet<>(); Integer depthi = -1; @@ -83,21 +89,117 @@ public class Snapshots { File sharddir = new File(domaindir, depth); if (sharddir.isDirectory()) for (String shard: sharddir.list()) { File snapshotdir = new File(sharddir, shard); - if (snapshotdir.isDirectory()) for (String snapshotfile: snapshotdir.list()) { - if (snapshotfile.endsWith(".pdf")) { - String s = snapshotfile.substring(0, snapshotfile.length() - 4); - int p = s.indexOf('.'); - assert p == 12; - if (p > 0) { - String key = s.substring(p + 1) + '.' + s.substring(0, p); - dateid.add(key); + if (snapshotdir.isDirectory()) { + for (String snapshotfile: snapshotdir.list()) { + if (snapshotfile.endsWith(".xml")) { + String s = snapshotfile.substring(0, snapshotfile.length() - 4); + int p = s.indexOf('.'); + assert p == 12; + if (p > 0) { + String key = s.substring(p + 1) + '.' + s.substring(0, p); + dateid.add(key); + } } } } } + if (dateid.size() == 0) domaindepth.remove(depthi); } + if (domaindepth.size() == 0) this.directory.remove(hostport); } } + + /** + * get the number of entries in the snapshot directory + * @return the total number of different documents + */ + public int size() { + int c = 0; + for (Map> m: directory.values()) { + for (TreeSet n: m.values()) { + c += n.size(); + } + } + return c; + } + + /** + * get a list of host names in the snapshot directory + * @return + */ + public Set listHosts() { + return directory.keySet(); + } + + public final class Revisions { + public final int depth; + public final Date[] dates; + public final String urlhash; + public final String url; + public final File[] pathtoxml; + public Revisions(final String hostport, final int depth, final String datehash) { + this.depth = depth; + int p = datehash.indexOf('.'); + this.dates = new Date[1]; + String datestring = datehash.substring(0, p); + this.dates[0] = parseDate(datestring); + this.urlhash = datehash.substring(p + 1); + this.pathtoxml = new File[1]; + this.pathtoxml[0] = new File(pathToShard(hostport, urlhash, depth), this.urlhash + "." + datestring + ".xml"); + String u = null; + if (this.pathtoxml[0].exists()) { + try { + BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(this.pathtoxml[0]))); + String line; + while ((line = reader.readLine()) != null) { + if (line.startsWith("")) { + u = line.substring(16, line.length() - 6); + break; + } + } + reader.close(); + } catch (IOException e) {} + } + this.url = u; + } + } + + public Revisions getRevisions(String urlhash) { + if (urlhash == null || urlhash.length() == 0) return null; + // search for the hash, we must iterate through all entries + for (Map.Entry>> hostportDomaindepth: this.directory.entrySet()) { + String hostport = hostportDomaindepth.getKey(); + for (Map.Entry> depthDateHash: hostportDomaindepth.getValue().entrySet()) { + int depth = depthDateHash.getKey(); + for (String dateHash: depthDateHash.getValue()) { + if (dateHash.endsWith(urlhash)) { + return new Revisions(hostport, depth, dateHash); + } + } + } + } + return null; + } + + /** + * list the snapshots for a given host name + * @param host the host for the domain + * @return a map with a set for each depth in the domain of the host name + */ + public TreeMap> listIDs(final String hostport) { + TreeMap> result = new TreeMap<>(); + TreeMap> list = directory.get(hostport); + if (list != null) { + for (Map.Entry> entry: list.entrySet()) { + Collection r = new ArrayList<>(entry.getValue().size()); + for (String datehash: entry.getValue()) { + r.add(new Revisions(hostport, entry.getKey(), datehash)); + } + result.put(entry.getKey(), r); + } + } + return result; + } /** * Compute the path of a snapshot. This does not create the snapshot, only gives a path. @@ -114,17 +216,58 @@ public class Snapshots { File path = new File(pathToShard(url, depth), id + "." + ds + "." + ext); return path; } - + + /** + * Write information about the storage of a snapshot to the Snapshot-internal index. + * The actual writing of files to the target directory must be done elsewehre, this method does not store the snapshot files. + * @param url + * @param depth + * @param date + */ public void announceStorage(final DigestURL url, final int depth, final Date date) { String id = ASCII.String(url.hash()); String ds = GenericFormatter.SHORT_MINUTE_FORMATTER.format(date); - TreeMap> domaindepth = this.directory.get(pathToHostDir(url)); - if (domaindepth == null) {domaindepth = new TreeMap>(); this.directory.put(pathToHostDir(url), domaindepth);} + String pathToHostPortDir = pathToHostPortDir(url.getHost(), url.getPort()); + TreeMap> domaindepth = this.directory.get(pathToHostPortDir); + if (domaindepth == null) {domaindepth = new TreeMap>(); this.directory.put(pathToHostPortDir(url.getHost(), url.getPort()), domaindepth);} TreeSet dateid = domaindepth.get(depth); if (dateid == null) {dateid = new TreeSet(); domaindepth.put(depth, dateid);} - dateid.add(ds + '.' + id); + dateid.add(ds + '.' + id); + } + + /** + * Delete information about the storage of a snapshot to the Snapshot-internal index. + * The actual deletion of files in the target directory must be done elsewehre, this method does not store the snapshot files. + * @param url + * @param depth + * @param date + */ + public Set announceDeletion(final DigestURL url, final int depth) { + HashSet dates = new HashSet<>(); + String id = ASCII.String(url.hash()); + String pathToHostPortDir = pathToHostPortDir(url.getHost(), url.getPort()); + TreeMap> domaindepth = this.directory.get(pathToHostPortDir); + if (domaindepth == null) return dates; + TreeSet dateid = domaindepth.get(depth); + if (dateid == null) return dates; + Iterator i = dateid.iterator(); + while (i.hasNext()) { + String dis = i.next(); + if (dis.endsWith("." + id)) { + String d = dis.substring(0, dis.length() - id.length() - 1); + Date date = parseDate(d); + if (date != null) dates.add(date); + i.remove(); + } + } + if (dateid.size() == 0) domaindepth.remove(depth); + if (domaindepth.size() == 0) this.directory.remove(pathToHostPortDir); + return dates; } + /** + * Order enum class for the select method + */ public static enum Order { ANY, OLDESTFIRST, LATESTFIRST; } @@ -139,59 +282,72 @@ public class Snapshots { * @param maxcount the maximum number of hosthashes. If unlimited, submit Integer.MAX_VALUE * @return a map of hosthashes with the associated creation date */ - public Map select(String host, Integer depth, final Order order, int maxcount) { - TreeSet dateIdResult = new TreeSet<>(); + public LinkedHashMap select(final String host, final Integer depth, final Order order, int maxcount) { + TreeMap dateIdResult = new TreeMap<>(); if (host == null && depth == null) { - loop: for (TreeMap> domaindepth: this.directory.values()) { - for (TreeSet keys: domaindepth.values()) { - dateIdResult.addAll(keys); - if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop; + loop: for (Map.Entry>> hostportDepths: this.directory.entrySet()) { + for (Map.Entry> depthIds: hostportDepths.getValue().entrySet()) { + for (String id: depthIds.getValue()) { + dateIdResult.put(id, new String[]{hostportDepths.getKey(), Integer.toString(depthIds.getKey())}); + if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop; + } } } } if (host == null && depth != null) { - loop: for (TreeMap> domaindepth: this.directory.values()) { - TreeSet keys = domaindepth.get(depth); - if (keys != null) dateIdResult.addAll(keys); - if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop; + loop: for (Map.Entry>> hostportDepths: this.directory.entrySet()) { + TreeSet ids = hostportDepths.getValue().get(depth); + if (ids != null) for (String id: ids) { + dateIdResult.put(id, new String[]{hostportDepths.getKey(), Integer.toString(depth)}); + if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop; + } } } if (host != null && depth == null) { - TreeMap> domaindepth = this.directory.get(pathToHostDir(host,80)); - if (domaindepth != null) loop: for (TreeSet keys: domaindepth.values()) { - dateIdResult.addAll(keys); - if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop; + String hostport = pathToHostPortDir(host,80); + TreeMap> depthIdsMap = this.directory.get(hostport); + if (depthIdsMap != null) loop: for (Map.Entry> depthIds: depthIdsMap.entrySet()) { + for (String id: depthIds.getValue()) { + dateIdResult.put(id, new String[]{hostport, Integer.toString(depthIds.getKey())}); + if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop; + } } } if (host != null && depth != null) { - TreeMap> domaindepth = this.directory.get(pathToHostDir(host,80)); + String hostport = pathToHostPortDir(host,80); + TreeMap> domaindepth = this.directory.get(hostport); if (domaindepth != null) { - TreeSet keys = domaindepth.get(depth); - if (keys != null) dateIdResult.addAll(keys); + TreeSet ids = domaindepth.get(depth); + if (ids != null) loop: for (String id: ids) { + dateIdResult.put(id, new String[]{hostport, Integer.toString(depth)}); + if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop; + } } } - Map result = new HashMap<>(); - Iterator i = order == Order.LATESTFIRST ? dateIdResult.descendingIterator() : dateIdResult.iterator(); + LinkedHashMap result = new LinkedHashMap<>(); + Iterator> i = order == Order.LATESTFIRST ? dateIdResult.descendingMap().entrySet().iterator() : dateIdResult.entrySet().iterator(); while (i.hasNext() && result.size() < maxcount) { - String di = i.next(); - int p = di.indexOf('.'); + Map.Entry entry = i.next(); + String datehash = entry.getKey(); + int p = datehash.indexOf('.'); assert p >= 0; - String d = di.substring(0, p); - Date date; - try { - date = GenericFormatter.SHORT_MINUTE_FORMATTER.parse(d); - } catch (ParseException e) { - try { - date = GenericFormatter.SHORT_DAY_FORMATTER.parse(d); - } catch (ParseException ee) { - date = new Date(); - } - } - result.put(di.substring(p + 1), date); + Revisions r = new Revisions(entry.getValue()[0], Integer.parseInt(entry.getValue()[1]), datehash); + result.put(datehash.substring(p + 1), r); } return result; } + private static Date parseDate(String d) { + try { + return GenericFormatter.SHORT_MINUTE_FORMATTER.parse(d); + } catch (ParseException e) { + try { + return GenericFormatter.SHORT_DAY_FORMATTER.parse(d); + } catch (ParseException ee) { + return null; + } + } + } /** * get the depth to a document, helper method for definePath to determine the depth value @@ -230,6 +386,8 @@ public class Snapshots { return new ArrayList<>(0); } + // pathtoxml = /.///..xml + /** * for a given url, get all paths for storage locations. * The locations are all for the single url but may represent different storage times. @@ -250,20 +408,19 @@ public class Snapshots { } return paths; } - + private File pathToShard(final DigestURL url, final int depth) { - String id = ASCII.String(url.hash()); - File pathToHostDir = new File(storageLocation, pathToHostDir(url)); + return pathToShard(pathToHostPortDir(url.getHost(), url.getPort()), ASCII.String(url.hash()), depth); + } + + private File pathToShard(final String hostport, final String urlhash, final int depth) { + File pathToHostDir = new File(storageLocation, hostport); File pathToDepthDir = new File(pathToHostDir, pathToDepthDir(depth)); - File pathToShard = new File(pathToDepthDir, pathToShard(id)); + File pathToShard = new File(pathToDepthDir, pathToShard(urlhash)); return pathToShard; } - private String pathToHostDir(final DigestURL url) { - return pathToHostDir(url.getHost(), url.getPort()); - } - - private String pathToHostDir(final String host, final int port) { + private String pathToHostPortDir(final String host, final int port) { return host + "." + port; } diff --git a/source/net/yacy/crawler/data/Transactions.java b/source/net/yacy/crawler/data/Transactions.java index e98f7dd95..8ec57f4c2 100644 --- a/source/net/yacy/crawler/data/Transactions.java +++ b/source/net/yacy/crawler/data/Transactions.java @@ -29,7 +29,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -42,6 +45,7 @@ import net.yacy.cora.protocol.ClientIdentification; import net.yacy.cora.util.ConcurrentLog; import net.yacy.cora.util.Html2Image; import net.yacy.crawler.data.Snapshots.Order; +import net.yacy.crawler.data.Snapshots.Revisions; import net.yacy.search.schema.CollectionSchema; /** @@ -55,8 +59,6 @@ public class Transactions { private final static char[] WHITESPACE = new char[132]; private final static int WHITESPACE_START = XML_PREFIX.length(); private final static int WHITESPACE_LENGTH = WHITESPACE.length; - private final static String SNAPSHOT_INVENTORY_DIR = "inventory"; - private final static String SNAPSHOT_ARCHIVE_DIR = "archive"; private static File transactionDir = null, inventoryDir = null, archiveDir = null; private static Snapshots inventory = null, archive = null; private static ExecutorService executor = Executors.newCachedThreadPool(); @@ -67,18 +69,66 @@ public class Transactions { } public static enum State { - INVENTORY, ARCHIVE, ANY; + INVENTORY("inventory"), ARCHIVE("archive"), ANY(null); + public String dirname; + State(String dirname) { + this.dirname = dirname; + } } public static void init(File dir) { transactionDir = dir; transactionDir.mkdirs(); - inventoryDir = new File(transactionDir, SNAPSHOT_INVENTORY_DIR); + inventoryDir = new File(transactionDir, State.INVENTORY.dirname); inventory = new Snapshots(inventoryDir); - archiveDir = new File(transactionDir, SNAPSHOT_ARCHIVE_DIR); + archiveDir = new File(transactionDir, State.ARCHIVE.dirname); archive = new Snapshots(archiveDir); } + /** + * get the number of entries for each of the transaction states + * @return the total number of different documents for each transaction state + */ + public static Map sizes() { + HashMap m = new HashMap<>(); + m.put(State.INVENTORY.name(), inventory.size()); + m.put(State.ARCHIVE.name(), archive.size()); + return m; + } + + public static Revisions getRevisions(final State state, final String urlhash) { + switch (state) { + case INVENTORY : return inventory.getRevisions(urlhash); + case ARCHIVE : return archive.getRevisions(urlhash); + default : Revisions a = inventory.getRevisions(urlhash); return a == null ? archive.getRevisions(urlhash) : a; + } + } + + /** + * get a list of host names in the snapshot directory + * @return the list of the given state. if the state is ALL or unknown, all lists are combined + */ + public static Set listHosts(final State state) { + switch (state) { + case INVENTORY : return inventory.listHosts(); + case ARCHIVE : return archive.listHosts(); + default : Set a = inventory.listHosts(); a.addAll(archive.listHosts()); return a; + } + } + + /** + * list the snapshots for a given host name + * @param host the host for the domain + * @return a map with a set for each depth in the domain of the host name + */ + public static TreeMap> listIDs(final State state, final String host) { + switch (state) { + case INVENTORY : return inventory.listIDs(host); + case ARCHIVE : return archive.listIDs(host); + default : TreeMap> a = inventory.listIDs(host); a.putAll(archive.listIDs(host)); return a; + } + } + public static boolean store(final SolrInputDocument doc, final boolean loadImage, final boolean replaceOld, final String proxy, final ClientIdentification.Agent agent, final String acceptLanguage) { // GET METADATA FROM DOC @@ -116,7 +166,7 @@ public class Transactions { osw.write("\n"); osw.close(); fos.close(); - Transactions.announceStorage(url, depth, date); + Transactions.announceStorage(url, depth, date, State.INVENTORY); } } catch (IOException e) { ConcurrentLog.logException(e); @@ -146,7 +196,62 @@ public class Transactions { return success; } + + /** + * Announce the commit of a snapshot: this will move all data for the given urlhash from the inventory to the archive + * The index within the snapshot management will update also. + * @param urlhash + * @return a revision object from the moved document if the commit has succeeded, null if something went wrong + */ + public static Revisions commit(String urlhash) { + return transact(urlhash, State.INVENTORY, State.ARCHIVE); + } + + /** + * Announce the rollback of a snapshot: this will move all data for the given urlhash from the archive to the inventory + * The index within the snapshot management will update also. + * @param urlhash + * @return a revision object from the moved document if the commit has succeeded, null if something went wrong + */ + public static Revisions rollback(String urlhash) { + return transact(urlhash, State.ARCHIVE, State.INVENTORY); + } + private static Revisions transact(final String urlhash, final State from, final State to) { + Revisions r = Transactions.getRevisions(from, urlhash); + if (r == null) return null; + // we take all pathtoxml and move that to archive + for (File f: r.pathtoxml) { + String name = f.getName(); + String nameStub = name.substring(0, name.length() - 4); + File sourceParent = f.getParentFile(); + File targetParent = new File(sourceParent.getAbsolutePath().replace("/" + from.dirname + "/", "/" + to.dirname + "/")); + targetParent.mkdirs(); + // list all files in the parent directory + for (String a: sourceParent.list()) { + if (a.startsWith(nameStub)) { + new File(sourceParent, a).renameTo(new File(targetParent, a)); + } + } + // delete empty directories + while (sourceParent.list().length == 0) { + sourceParent.delete(); + sourceParent = sourceParent.getParentFile(); + } + } + // announce the movement + DigestURL durl; + try { + durl = new DigestURL(r.url); + Transactions.announceDeletion(durl, r.depth, from); + Transactions.announceStorage(durl, r.depth, r.dates[0], to); + return r; + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + } + + return null; + } /** * select a set of urlhashes from the snapshot directory. The selection either ordered @@ -159,8 +264,8 @@ public class Transactions { * @param state the wanted transaction state, State.INVENTORY, State.ARCHIVE or State.ANY * @return a map of hosthashes with the associated creation date */ - public static Map select(String host, Integer depth, final Order order, int maxcount, State state) { - Map result = new HashMap<>(); + public static LinkedHashMap select(String host, Integer depth, final Order order, int maxcount, State state) { + LinkedHashMap result = new LinkedHashMap<>(); if (state == State.INVENTORY || state == State.ANY) result.putAll(inventory.select(host, depth, order, maxcount)); if (state == State.ARCHIVE || state == State.ANY) result.putAll(archive.select(host, depth, order, maxcount)); return result; @@ -182,9 +287,31 @@ public class Transactions { if (state == State.ARCHIVE) return archive.definePath(url, depth, date, ext); return null; } - - public static void announceStorage(final DigestURL url, final int depth, final Date date) { - inventory.announceStorage(url, depth, date); + + /** + * Write information about the storage of a snapshot to the Snapshot-internal index. + * The actual writing of files to the target directory must be done elsewehre, this method does not store the snapshot files. + * @param state the wanted transaction state, State.INVENTORY, State.ARCHIVE or State.ANY + * @param url + * @param depth + * @param date + */ + public static void announceStorage(final DigestURL url, final int depth, final Date date, State state) { + if (state == State.INVENTORY || state == State.ANY) inventory.announceStorage(url, depth, date); + if (state == State.ARCHIVE || state == State.ANY) archive.announceStorage(url, depth, date); + } + + /** + * Delete information about the storage of a snapshot to the Snapshot-internal index. + * The actual deletion of files in the target directory must be done elsewehre, this method does not store the snapshot files. + * @param state the wanted transaction state, State.INVENTORY, State.ARCHIVE or State.ANY + * @param url + * @param depth + * @param date + */ + public static void announceDeletion(final DigestURL url, final int depth, final State state) { + if (state == State.INVENTORY || state == State.ANY) inventory.announceDeletion(url, depth); + if (state == State.ARCHIVE || state == State.ANY) archive.announceDeletion(url, depth); } /**