Added a transaction interface to the snapshots: all documents in the

snapshots can now be processed with transactions using commit and
rollback commands. Furthermore, a large number of monitoring methods had
been added to check the success of transactions.

The transactions for snapshots have two main components: a rss search
API to get information about latest/oldest entries and a commit/rollback
API to move entries away from the rss results. This is done by usage of
two storage locations for the snapshots, INVENTORY and ARCHIVE. New
snapshots are placed to INVENTORY, commited snapshots move to ARCHIVE,
rollback snapshots move to INVENTORY again.

Normal Workflow:
Beside all these options below, usually it is sufficient to process data
like this:
- call
http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=LATESTFIRST
- process the rss result and use the <guid> value as <urlhash> (see next
command)
- for each processed result call
http://localhost:8090/api/snapshot.json?command=commit&urlhash=<urlhash>
- then you can call the rss feed again and the commited urls are omited
from the next set of items.

These are the commands to control this:
The rss feed:
http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=LATESTFIRST
http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=OLDESTFIRST
http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=ANY
http://localhost:8090/api/snapshot.rss?state=ARCHIVE&order=LATESTFIRST
http://localhost:8090/api/snapshot.rss?state=ARCHIVE&order=OLDESTFIRST
http://localhost:8090/api/snapshot.rss?state=ARCHIVE&order=LATESTFIRST

The feed will return a <urlhash> in the <guid> - field of the rss. This
must be used for commit/rollback:

Commit/Rollback:
http://localhost:8090/api/snapshot.json?command=commit&urlhash=<urlhash>
http://localhost:8090/api/snapshot.json?command=rollback&urlhash=<urlhash>
The json will return a property list containing the property "result"
with possible values "success" or "fail", according of the result. If an
"fail" occurs, please look into the log for further info.

Monitoring:
http://localhost:8090/api/snapshot.json?command=status
This shows the total number of entries in the INVENTORY and the ARCHIVE 
http://localhost:8090/api/snapshot.json?command=list
This will result a list of all hosts which have snapshots and the number
of entries for the hosts. Counts for INVENTORY and ARCHIVE are listed in
the porperties for "count.INVENTORY" and "count.ARCHIVE"
http://localhost:8090/api/snapshot.json?command=list&depth=2
The list can be restricted to such which have a specific depth. The list
contains then the same host names, but the count values change because
only documents at that specific crawl depth are listed
http://localhost:8090/api/snapshot.json?command=list&host=yacy.net.80
This lists all urlhashes for the given host, not only an accumulated
list of the number of entries
http://localhost:8090/api/snapshot.json?command=list&host=yacy.net.80&depth=0
This restricts the list of urlhashes for that host for the given depth
http://localhost:8090/api/snapshot.json?command=list&state=INVENTORY
http://localhost:8090/api/snapshot.json?command=list&state=ARCHIVE
This selects either the INVENTORY or ARCHIVE for all list commands,
default is ALL which means that from both snapshot directories the host
information is collected and combined. You can use the state option for
all the commands as listed above

Detailed Information:
http://localhost:8090/api/snapshot.json?command=metadata&urlhash=upiFJ7Fh1hyQ
This collects metadata information for the given urlhash. This can also
be restricted with state=INVENTORY and state=ARCHIVE to test if the
document is either in one of these snapshot directories. If an urlhash
is not found, an empty result is returned. If an entry was found and the
state was not restricted, then the result contains a state property
containing the name of the location where the document is, either
INVENTORY or ARCHIVE.

Hint:
If a very large number of documents is inside of INVENTORY, then it
could be better to call the rss feed with
http://localhost:8090/api/snapshot.rss?state=INVENTORY&order=ANY
because that is very efficient.
pull/1/head
Michael Peter Christen 10 years ago
parent 578ae29f1e
commit 9971e197e0

@ -26,8 +26,8 @@ import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.TreeMap;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
@ -41,8 +41,11 @@ import net.yacy.cora.protocol.ClientIdentification;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.Html2Image;
import net.yacy.cora.util.JSONException;
import net.yacy.cora.util.JSONObject;
import net.yacy.crawler.data.Snapshots;
import net.yacy.crawler.data.Transactions;
import net.yacy.crawler.data.Snapshots.Revisions;
import net.yacy.document.ImageParser;
import net.yacy.kelondro.util.FileUtils;
import net.yacy.search.Switchboard;
@ -64,6 +67,7 @@ public class snapshot {
final boolean authenticated = sb.adminAuthenticated(header) >= 2;
final String ext = header.get("EXT", "");
if (ext.equals("rss")) {
// create a report about the content of the snapshot directory
if (!authenticated) return null;
@ -75,16 +79,16 @@ public class snapshot {
String statex = post == null ? Transactions.State.INVENTORY.name() : post.get("state", Transactions.State.INVENTORY.name());
Transactions.State state = Transactions.State.valueOf(statex);
String host = post == null ? null : post.get("host");
Map<String, Date> iddate = Transactions.select(host, depth, order, maxcount, state);
Map<String, Revisions> iddate = Transactions.select(host, depth, order, maxcount, state);
// now select the URL from the index for these ids in iddate and make an RSS feed
RSSFeed rssfeed = new RSSFeed(Integer.MAX_VALUE);
rssfeed.setChannel(new RSSMessage("Snapshot list for host = " + host + ", depth = " + depth + ", order = " + order + ", maxcount = " + maxcount, "", ""));
for (Map.Entry<String, Date> e: iddate.entrySet()) {
for (Map.Entry<String, Revisions> e: iddate.entrySet()) {
try {
DigestURL u = sb.index.fulltext().getURL(e.getKey());
DigestURL u = e.getValue().url == null ? sb.index.fulltext().getURL(e.getKey()) : new DigestURL(e.getValue().url);
if (u == null) continue;
RSSMessage message = new RSSMessage(u.toNormalform(true), "", u, e.getKey());
message.setPubDate(e.getValue());
message.setPubDate(e.getValue().dates[0]);
rssfeed.addMessage(message);
} catch (IOException ee) {
ConcurrentLog.logException(ee);
@ -94,6 +98,7 @@ public class snapshot {
return new ByteArrayInputStream(rssBinary);
}
// for the following methods we (mostly) need an url or a url hash
if (post == null) return null;
final boolean xml = ext.equals("xml");
final boolean pdf = ext.equals("pdf");
@ -101,24 +106,128 @@ public class snapshot {
final boolean pngjpg = ext.equals("png") || ext.equals("jpg");
String urlhash = post.get("urlhash", "");
String url = post.get("url", "");
if (url.length() == 0 && urlhash.length() == 0) return null;
DigestURL durl = null;
if (urlhash.length() == 0) {
if (urlhash.length() == 0 && url.length() > 0) {
try {
durl = new DigestURL(url);
urlhash = ASCII.String(durl.hash());
} catch (MalformedURLException e) {
}
}
if (durl == null) {
if (durl == null && urlhash.length() > 0) {
try {
durl = sb.index.fulltext().getURL(urlhash);
} catch (IOException e) {
ConcurrentLog.logException(e);
}
}
if (url.length() == 0 && durl != null) url = durl.toNormalform(true);
if (ext.equals("json")) {
// command interface: view and change a transaction state, get metadata about transactions in the past
String command = post.get("command", "metadata");
String statename = post.get("state");
JSONObject result = new JSONObject();
try {
if (command.equals("status")) {
// return a status of the transaction archive
JSONObject sizes = new JSONObject();
for (Map.Entry<String, Integer> state: Transactions.sizes().entrySet()) sizes.put(state.getKey(), state.getValue());
result.put("size", sizes);
} else if (command.equals("list")) {
if (!authenticated) return null;
// return a status of the transaction archive
String host = post.get("host");
String depth = post.get("depth");
for (Transactions.State state: statename == null ?
new Transactions.State[]{Transactions.State.INVENTORY, Transactions.State.ARCHIVE} :
new Transactions.State[]{Transactions.State.valueOf(statename)}) {
if (host == null) {
JSONObject hostCountInventory = new JSONObject();
for (String h: Transactions.listHosts(state)) {
hostCountInventory.put(h, Transactions.listIDs(state, h).size());
}
result.put("count." + state.name(), hostCountInventory);
} else {
TreeMap<Integer, Collection<Revisions>> ids = Transactions.listIDs(state, host);
if (ids == null) {
result.put("error", "no host " + host + " found");
} else {
for (Map.Entry<Integer, Collection<Revisions>> entry: ids.entrySet()) {
if (depth != null && Integer.parseInt(depth) != entry.getKey()) continue;
for (Revisions r: entry.getValue()) {
try {
JSONObject metadata = new JSONObject();
DigestURL u = r.url != null ? new DigestURL(r.url) : sb.index.fulltext().getURL(r.urlhash);
metadata.put("url", u == null ? "unknown" : u.toNormalform(true));
metadata.put("dates", r.dates);
assert r.depth == entry.getKey().intValue();
metadata.put("depth", entry.getKey().intValue());
result.put(r.urlhash, metadata);
} catch (IOException e) {}
}
}
}
}
}
} else if (command.equals("commit")) {
if (!authenticated) return null;
Revisions r = Transactions.commit(urlhash);
if (r != null) {
result.put("result", "success");
result.put("depth", r.depth);
result.put("url", r.url);
result.put("dates", r.dates);
} else {
result.put("result", "fail");
}
result.put("urlhash", urlhash);
} else if (command.equals("rollback")) {
if (!authenticated) return null;
Revisions r = Transactions.rollback(urlhash);
if (r != null) {
result.put("result", "success");
result.put("depth", r.depth);
result.put("url", r.url);
result.put("dates", r.dates);
} else {
result.put("result", "fail");
}
result.put("urlhash", urlhash);
} else if (command.equals("metadata")) {
try {
Revisions r;
Transactions.State state = statename == null || statename.length() == 0 ? null : Transactions.State.valueOf(statename);
if (state == null) {
r = Transactions.getRevisions(Transactions.State.INVENTORY, urlhash);
if (r != null) state = Transactions.State.INVENTORY;
r = Transactions.getRevisions(Transactions.State.ARCHIVE, urlhash);
if (r != null) state = Transactions.State.ARCHIVE;
} else {
r = Transactions.getRevisions(state, urlhash);
}
if (r != null) {
JSONObject metadata = new JSONObject();
DigestURL u;
u = r.url != null ? new DigestURL(r.url) : sb.index.fulltext().getURL(r.urlhash);
metadata.put("url", u == null ? "unknown" : u.toNormalform(true));
metadata.put("dates", r.dates);
metadata.put("depth", r.depth);
metadata.put("state", state.name());
result.put(r.urlhash, metadata);
}
} catch (IOException |IllegalArgumentException e) {}
}
} catch (JSONException e) {
ConcurrentLog.logException(e);
}
String json = result.toString();
if (post.containsKey("callback")) json = post.get("callback") + "([" + json + "]);";
return new ByteArrayInputStream(UTF8.getBytes(json));
}
// for the following methods we always need the durl to fetch data
if (durl == null) return null;
url = durl.toNormalform(true);
if (xml) {
Collection<File> xmlSnapshots = Transactions.findPaths(durl, "xml", Transactions.State.ANY);

@ -20,15 +20,21 @@
package net.yacy.crawler.data;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@ -60,17 +66,17 @@ public class Snapshots {
private File storageLocation;
private Map<String, TreeMap<Integer, TreeSet<String>>> directory; // a TreeMap for each domain where the key is the depth and the value is a Set containing a key/urlhash id to get all files into a specific order to provide a recent view on the documents
private Map<String, TreeMap<Integer, TreeSet<String>>> directory; // a TreeMap for each domain (host.port) where the key is the depth and the value is a Set containing a key/urlhash id to get all files into a specific order to provide a recent view on the documents
public Snapshots(File location) {
public Snapshots(final File location) {
this.storageLocation = location;
this.storageLocation.mkdirs();
// scan the location to fill the directory
this.directory = new HashMap<>();
for (String domain: location.list()) {
for (String hostport: location.list()) {
TreeMap<Integer, TreeSet<String>> domaindepth = new TreeMap<>();
this.directory.put(domain, domaindepth);
File domaindir = new File(location, domain);
this.directory.put(hostport, domaindepth);
File domaindir = new File(location, hostport);
if (domaindir.isDirectory()) domainscan: for (String depth: domaindir.list()) {
TreeSet<String> dateid = new TreeSet<>();
Integer depthi = -1;
@ -83,21 +89,117 @@ public class Snapshots {
File sharddir = new File(domaindir, depth);
if (sharddir.isDirectory()) for (String shard: sharddir.list()) {
File snapshotdir = new File(sharddir, shard);
if (snapshotdir.isDirectory()) for (String snapshotfile: snapshotdir.list()) {
if (snapshotfile.endsWith(".pdf")) {
String s = snapshotfile.substring(0, snapshotfile.length() - 4);
int p = s.indexOf('.');
assert p == 12;
if (p > 0) {
String key = s.substring(p + 1) + '.' + s.substring(0, p);
dateid.add(key);
if (snapshotdir.isDirectory()) {
for (String snapshotfile: snapshotdir.list()) {
if (snapshotfile.endsWith(".xml")) {
String s = snapshotfile.substring(0, snapshotfile.length() - 4);
int p = s.indexOf('.');
assert p == 12;
if (p > 0) {
String key = s.substring(p + 1) + '.' + s.substring(0, p);
dateid.add(key);
}
}
}
}
}
if (dateid.size() == 0) domaindepth.remove(depthi);
}
if (domaindepth.size() == 0) this.directory.remove(hostport);
}
}
/**
* get the number of entries in the snapshot directory
* @return the total number of different documents
*/
public int size() {
int c = 0;
for (Map<Integer, TreeSet<String>> m: directory.values()) {
for (TreeSet<String> n: m.values()) {
c += n.size();
}
}
return c;
}
/**
* get a list of host names in the snapshot directory
* @return
*/
public Set<String> listHosts() {
return directory.keySet();
}
public final class Revisions {
public final int depth;
public final Date[] dates;
public final String urlhash;
public final String url;
public final File[] pathtoxml;
public Revisions(final String hostport, final int depth, final String datehash) {
this.depth = depth;
int p = datehash.indexOf('.');
this.dates = new Date[1];
String datestring = datehash.substring(0, p);
this.dates[0] = parseDate(datestring);
this.urlhash = datehash.substring(p + 1);
this.pathtoxml = new File[1];
this.pathtoxml[0] = new File(pathToShard(hostport, urlhash, depth), this.urlhash + "." + datestring + ".xml");
String u = null;
if (this.pathtoxml[0].exists()) {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(this.pathtoxml[0])));
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("<str name=\"sku\">")) {
u = line.substring(16, line.length() - 6);
break;
}
}
reader.close();
} catch (IOException e) {}
}
this.url = u;
}
}
public Revisions getRevisions(String urlhash) {
if (urlhash == null || urlhash.length() == 0) return null;
// search for the hash, we must iterate through all entries
for (Map.Entry<String, TreeMap<Integer, TreeSet<String>>> hostportDomaindepth: this.directory.entrySet()) {
String hostport = hostportDomaindepth.getKey();
for (Map.Entry<Integer, TreeSet<String>> depthDateHash: hostportDomaindepth.getValue().entrySet()) {
int depth = depthDateHash.getKey();
for (String dateHash: depthDateHash.getValue()) {
if (dateHash.endsWith(urlhash)) {
return new Revisions(hostport, depth, dateHash);
}
}
}
}
return null;
}
/**
* list the snapshots for a given host name
* @param host the host for the domain
* @return a map with a set for each depth in the domain of the host name
*/
public TreeMap<Integer, Collection<Revisions>> listIDs(final String hostport) {
TreeMap<Integer, Collection<Revisions>> result = new TreeMap<>();
TreeMap<Integer, TreeSet<String>> list = directory.get(hostport);
if (list != null) {
for (Map.Entry<Integer, TreeSet<String>> entry: list.entrySet()) {
Collection<Revisions> r = new ArrayList<>(entry.getValue().size());
for (String datehash: entry.getValue()) {
r.add(new Revisions(hostport, entry.getKey(), datehash));
}
result.put(entry.getKey(), r);
}
}
return result;
}
/**
* Compute the path of a snapshot. This does not create the snapshot, only gives a path.
@ -114,17 +216,58 @@ public class Snapshots {
File path = new File(pathToShard(url, depth), id + "." + ds + "." + ext);
return path;
}
/**
* Write information about the storage of a snapshot to the Snapshot-internal index.
* The actual writing of files to the target directory must be done elsewehre, this method does not store the snapshot files.
* @param url
* @param depth
* @param date
*/
public void announceStorage(final DigestURL url, final int depth, final Date date) {
String id = ASCII.String(url.hash());
String ds = GenericFormatter.SHORT_MINUTE_FORMATTER.format(date);
TreeMap<Integer, TreeSet<String>> domaindepth = this.directory.get(pathToHostDir(url));
if (domaindepth == null) {domaindepth = new TreeMap<Integer, TreeSet<String>>(); this.directory.put(pathToHostDir(url), domaindepth);}
String pathToHostPortDir = pathToHostPortDir(url.getHost(), url.getPort());
TreeMap<Integer, TreeSet<String>> domaindepth = this.directory.get(pathToHostPortDir);
if (domaindepth == null) {domaindepth = new TreeMap<Integer, TreeSet<String>>(); this.directory.put(pathToHostPortDir(url.getHost(), url.getPort()), domaindepth);}
TreeSet<String> dateid = domaindepth.get(depth);
if (dateid == null) {dateid = new TreeSet<String>(); domaindepth.put(depth, dateid);}
dateid.add(ds + '.' + id);
dateid.add(ds + '.' + id);
}
/**
* Delete information about the storage of a snapshot to the Snapshot-internal index.
* The actual deletion of files in the target directory must be done elsewehre, this method does not store the snapshot files.
* @param url
* @param depth
* @param date
*/
public Set<Date> announceDeletion(final DigestURL url, final int depth) {
HashSet<Date> dates = new HashSet<>();
String id = ASCII.String(url.hash());
String pathToHostPortDir = pathToHostPortDir(url.getHost(), url.getPort());
TreeMap<Integer, TreeSet<String>> domaindepth = this.directory.get(pathToHostPortDir);
if (domaindepth == null) return dates;
TreeSet<String> dateid = domaindepth.get(depth);
if (dateid == null) return dates;
Iterator<String> i = dateid.iterator();
while (i.hasNext()) {
String dis = i.next();
if (dis.endsWith("." + id)) {
String d = dis.substring(0, dis.length() - id.length() - 1);
Date date = parseDate(d);
if (date != null) dates.add(date);
i.remove();
}
}
if (dateid.size() == 0) domaindepth.remove(depth);
if (domaindepth.size() == 0) this.directory.remove(pathToHostPortDir);
return dates;
}
/**
* Order enum class for the select method
*/
public static enum Order {
ANY, OLDESTFIRST, LATESTFIRST;
}
@ -139,59 +282,72 @@ public class Snapshots {
* @param maxcount the maximum number of hosthashes. If unlimited, submit Integer.MAX_VALUE
* @return a map of hosthashes with the associated creation date
*/
public Map<String, Date> select(String host, Integer depth, final Order order, int maxcount) {
TreeSet<String> dateIdResult = new TreeSet<>();
public LinkedHashMap<String, Revisions> select(final String host, final Integer depth, final Order order, int maxcount) {
TreeMap<String, String[]> dateIdResult = new TreeMap<>();
if (host == null && depth == null) {
loop: for (TreeMap<Integer, TreeSet<String>> domaindepth: this.directory.values()) {
for (TreeSet<String> keys: domaindepth.values()) {
dateIdResult.addAll(keys);
if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop;
loop: for (Map.Entry<String, TreeMap<Integer, TreeSet<String>>> hostportDepths: this.directory.entrySet()) {
for (Map.Entry<Integer, TreeSet<String>> depthIds: hostportDepths.getValue().entrySet()) {
for (String id: depthIds.getValue()) {
dateIdResult.put(id, new String[]{hostportDepths.getKey(), Integer.toString(depthIds.getKey())});
if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop;
}
}
}
}
if (host == null && depth != null) {
loop: for (TreeMap<Integer, TreeSet<String>> domaindepth: this.directory.values()) {
TreeSet<String> keys = domaindepth.get(depth);
if (keys != null) dateIdResult.addAll(keys);
if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop;
loop: for (Map.Entry<String, TreeMap<Integer, TreeSet<String>>> hostportDepths: this.directory.entrySet()) {
TreeSet<String> ids = hostportDepths.getValue().get(depth);
if (ids != null) for (String id: ids) {
dateIdResult.put(id, new String[]{hostportDepths.getKey(), Integer.toString(depth)});
if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop;
}
}
}
if (host != null && depth == null) {
TreeMap<Integer, TreeSet<String>> domaindepth = this.directory.get(pathToHostDir(host,80));
if (domaindepth != null) loop: for (TreeSet<String> keys: domaindepth.values()) {
dateIdResult.addAll(keys);
if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop;
String hostport = pathToHostPortDir(host,80);
TreeMap<Integer, TreeSet<String>> depthIdsMap = this.directory.get(hostport);
if (depthIdsMap != null) loop: for (Map.Entry<Integer, TreeSet<String>> depthIds: depthIdsMap.entrySet()) {
for (String id: depthIds.getValue()) {
dateIdResult.put(id, new String[]{hostport, Integer.toString(depthIds.getKey())});
if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop;
}
}
}
if (host != null && depth != null) {
TreeMap<Integer, TreeSet<String>> domaindepth = this.directory.get(pathToHostDir(host,80));
String hostport = pathToHostPortDir(host,80);
TreeMap<Integer, TreeSet<String>> domaindepth = this.directory.get(hostport);
if (domaindepth != null) {
TreeSet<String> keys = domaindepth.get(depth);
if (keys != null) dateIdResult.addAll(keys);
TreeSet<String> ids = domaindepth.get(depth);
if (ids != null) loop: for (String id: ids) {
dateIdResult.put(id, new String[]{hostport, Integer.toString(depth)});
if (order == Order.ANY && dateIdResult.size() >= maxcount) break loop;
}
}
}
Map<String, Date> result = new HashMap<>();
Iterator<String> i = order == Order.LATESTFIRST ? dateIdResult.descendingIterator() : dateIdResult.iterator();
LinkedHashMap<String, Revisions> result = new LinkedHashMap<>();
Iterator<Map.Entry<String, String[]>> i = order == Order.LATESTFIRST ? dateIdResult.descendingMap().entrySet().iterator() : dateIdResult.entrySet().iterator();
while (i.hasNext() && result.size() < maxcount) {
String di = i.next();
int p = di.indexOf('.');
Map.Entry<String, String[]> entry = i.next();
String datehash = entry.getKey();
int p = datehash.indexOf('.');
assert p >= 0;
String d = di.substring(0, p);
Date date;
try {
date = GenericFormatter.SHORT_MINUTE_FORMATTER.parse(d);
} catch (ParseException e) {
try {
date = GenericFormatter.SHORT_DAY_FORMATTER.parse(d);
} catch (ParseException ee) {
date = new Date();
}
}
result.put(di.substring(p + 1), date);
Revisions r = new Revisions(entry.getValue()[0], Integer.parseInt(entry.getValue()[1]), datehash);
result.put(datehash.substring(p + 1), r);
}
return result;
}
private static Date parseDate(String d) {
try {
return GenericFormatter.SHORT_MINUTE_FORMATTER.parse(d);
} catch (ParseException e) {
try {
return GenericFormatter.SHORT_DAY_FORMATTER.parse(d);
} catch (ParseException ee) {
return null;
}
}
}
/**
* get the depth to a document, helper method for definePath to determine the depth value
@ -230,6 +386,8 @@ public class Snapshots {
return new ArrayList<>(0);
}
// pathtoxml = <storageLocation>/<host>.<port>/<depth>/<shard>/<urlhash>.<date>.xml
/**
* for a given url, get all paths for storage locations.
* The locations are all for the single url but may represent different storage times.
@ -250,20 +408,19 @@ public class Snapshots {
}
return paths;
}
private File pathToShard(final DigestURL url, final int depth) {
String id = ASCII.String(url.hash());
File pathToHostDir = new File(storageLocation, pathToHostDir(url));
return pathToShard(pathToHostPortDir(url.getHost(), url.getPort()), ASCII.String(url.hash()), depth);
}
private File pathToShard(final String hostport, final String urlhash, final int depth) {
File pathToHostDir = new File(storageLocation, hostport);
File pathToDepthDir = new File(pathToHostDir, pathToDepthDir(depth));
File pathToShard = new File(pathToDepthDir, pathToShard(id));
File pathToShard = new File(pathToDepthDir, pathToShard(urlhash));
return pathToShard;
}
private String pathToHostDir(final DigestURL url) {
return pathToHostDir(url.getHost(), url.getPort());
}
private String pathToHostDir(final String host, final int port) {
private String pathToHostPortDir(final String host, final int port) {
return host + "." + port;
}

@ -29,7 +29,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@ -42,6 +45,7 @@ import net.yacy.cora.protocol.ClientIdentification;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.Html2Image;
import net.yacy.crawler.data.Snapshots.Order;
import net.yacy.crawler.data.Snapshots.Revisions;
import net.yacy.search.schema.CollectionSchema;
/**
@ -55,8 +59,6 @@ public class Transactions {
private final static char[] WHITESPACE = new char[132];
private final static int WHITESPACE_START = XML_PREFIX.length();
private final static int WHITESPACE_LENGTH = WHITESPACE.length;
private final static String SNAPSHOT_INVENTORY_DIR = "inventory";
private final static String SNAPSHOT_ARCHIVE_DIR = "archive";
private static File transactionDir = null, inventoryDir = null, archiveDir = null;
private static Snapshots inventory = null, archive = null;
private static ExecutorService executor = Executors.newCachedThreadPool();
@ -67,18 +69,66 @@ public class Transactions {
}
public static enum State {
INVENTORY, ARCHIVE, ANY;
INVENTORY("inventory"), ARCHIVE("archive"), ANY(null);
public String dirname;
State(String dirname) {
this.dirname = dirname;
}
}
public static void init(File dir) {
transactionDir = dir;
transactionDir.mkdirs();
inventoryDir = new File(transactionDir, SNAPSHOT_INVENTORY_DIR);
inventoryDir = new File(transactionDir, State.INVENTORY.dirname);
inventory = new Snapshots(inventoryDir);
archiveDir = new File(transactionDir, SNAPSHOT_ARCHIVE_DIR);
archiveDir = new File(transactionDir, State.ARCHIVE.dirname);
archive = new Snapshots(archiveDir);
}
/**
* get the number of entries for each of the transaction states
* @return the total number of different documents for each transaction state
*/
public static Map<String, Integer> sizes() {
HashMap<String, Integer> m = new HashMap<>();
m.put(State.INVENTORY.name(), inventory.size());
m.put(State.ARCHIVE.name(), archive.size());
return m;
}
public static Revisions getRevisions(final State state, final String urlhash) {
switch (state) {
case INVENTORY : return inventory.getRevisions(urlhash);
case ARCHIVE : return archive.getRevisions(urlhash);
default : Revisions a = inventory.getRevisions(urlhash); return a == null ? archive.getRevisions(urlhash) : a;
}
}
/**
* get a list of host names in the snapshot directory
* @return the list of the given state. if the state is ALL or unknown, all lists are combined
*/
public static Set<String> listHosts(final State state) {
switch (state) {
case INVENTORY : return inventory.listHosts();
case ARCHIVE : return archive.listHosts();
default : Set<String> a = inventory.listHosts(); a.addAll(archive.listHosts()); return a;
}
}
/**
* list the snapshots for a given host name
* @param host the host for the domain
* @return a map with a set for each depth in the domain of the host name
*/
public static TreeMap<Integer, Collection<Revisions>> listIDs(final State state, final String host) {
switch (state) {
case INVENTORY : return inventory.listIDs(host);
case ARCHIVE : return archive.listIDs(host);
default : TreeMap<Integer, Collection<Revisions>> a = inventory.listIDs(host); a.putAll(archive.listIDs(host)); return a;
}
}
public static boolean store(final SolrInputDocument doc, final boolean loadImage, final boolean replaceOld, final String proxy, final ClientIdentification.Agent agent, final String acceptLanguage) {
// GET METADATA FROM DOC
@ -116,7 +166,7 @@ public class Transactions {
osw.write("</response>\n");
osw.close();
fos.close();
Transactions.announceStorage(url, depth, date);
Transactions.announceStorage(url, depth, date, State.INVENTORY);
}
} catch (IOException e) {
ConcurrentLog.logException(e);
@ -146,7 +196,62 @@ public class Transactions {
return success;
}
/**
* Announce the commit of a snapshot: this will move all data for the given urlhash from the inventory to the archive
* The index within the snapshot management will update also.
* @param urlhash
* @return a revision object from the moved document if the commit has succeeded, null if something went wrong
*/
public static Revisions commit(String urlhash) {
return transact(urlhash, State.INVENTORY, State.ARCHIVE);
}
/**
* Announce the rollback of a snapshot: this will move all data for the given urlhash from the archive to the inventory
* The index within the snapshot management will update also.
* @param urlhash
* @return a revision object from the moved document if the commit has succeeded, null if something went wrong
*/
public static Revisions rollback(String urlhash) {
return transact(urlhash, State.ARCHIVE, State.INVENTORY);
}
private static Revisions transact(final String urlhash, final State from, final State to) {
Revisions r = Transactions.getRevisions(from, urlhash);
if (r == null) return null;
// we take all pathtoxml and move that to archive
for (File f: r.pathtoxml) {
String name = f.getName();
String nameStub = name.substring(0, name.length() - 4);
File sourceParent = f.getParentFile();
File targetParent = new File(sourceParent.getAbsolutePath().replace("/" + from.dirname + "/", "/" + to.dirname + "/"));
targetParent.mkdirs();
// list all files in the parent directory
for (String a: sourceParent.list()) {
if (a.startsWith(nameStub)) {
new File(sourceParent, a).renameTo(new File(targetParent, a));
}
}
// delete empty directories
while (sourceParent.list().length == 0) {
sourceParent.delete();
sourceParent = sourceParent.getParentFile();
}
}
// announce the movement
DigestURL durl;
try {
durl = new DigestURL(r.url);
Transactions.announceDeletion(durl, r.depth, from);
Transactions.announceStorage(durl, r.depth, r.dates[0], to);
return r;
} catch (MalformedURLException e) {
ConcurrentLog.logException(e);
}
return null;
}
/**
* select a set of urlhashes from the snapshot directory. The selection either ordered
@ -159,8 +264,8 @@ public class Transactions {
* @param state the wanted transaction state, State.INVENTORY, State.ARCHIVE or State.ANY
* @return a map of hosthashes with the associated creation date
*/
public static Map<String, Date> select(String host, Integer depth, final Order order, int maxcount, State state) {
Map<String, Date> result = new HashMap<>();
public static LinkedHashMap<String, Revisions> select(String host, Integer depth, final Order order, int maxcount, State state) {
LinkedHashMap<String, Revisions> result = new LinkedHashMap<>();
if (state == State.INVENTORY || state == State.ANY) result.putAll(inventory.select(host, depth, order, maxcount));
if (state == State.ARCHIVE || state == State.ANY) result.putAll(archive.select(host, depth, order, maxcount));
return result;
@ -182,9 +287,31 @@ public class Transactions {
if (state == State.ARCHIVE) return archive.definePath(url, depth, date, ext);
return null;
}
public static void announceStorage(final DigestURL url, final int depth, final Date date) {
inventory.announceStorage(url, depth, date);
/**
* Write information about the storage of a snapshot to the Snapshot-internal index.
* The actual writing of files to the target directory must be done elsewehre, this method does not store the snapshot files.
* @param state the wanted transaction state, State.INVENTORY, State.ARCHIVE or State.ANY
* @param url
* @param depth
* @param date
*/
public static void announceStorage(final DigestURL url, final int depth, final Date date, State state) {
if (state == State.INVENTORY || state == State.ANY) inventory.announceStorage(url, depth, date);
if (state == State.ARCHIVE || state == State.ANY) archive.announceStorage(url, depth, date);
}
/**
* Delete information about the storage of a snapshot to the Snapshot-internal index.
* The actual deletion of files in the target directory must be done elsewehre, this method does not store the snapshot files.
* @param state the wanted transaction state, State.INVENTORY, State.ARCHIVE or State.ANY
* @param url
* @param depth
* @param date
*/
public static void announceDeletion(final DigestURL url, final int depth, final State state) {
if (state == State.INVENTORY || state == State.ANY) inventory.announceDeletion(url, depth);
if (state == State.ARCHIVE || state == State.ANY) archive.announceDeletion(url, depth);
}
/**

Loading…
Cancel
Save