- better concurrency for network scanner

- network scanner can now start from the list of all hosts in the search
index
pull/1/head
Michael Peter Christen 12 years ago
parent a34e137e27
commit dea71851d2

@ -45,7 +45,8 @@
<input type="radio" name="source" id="sourcehost" value="hosts"#(intranet.checked)# checked="checked"::#(/intranet.checked)# />Scan sub-range with given host
<textarea name="scanhosts" id="scanhosts" value="#[scanhosts]#" cols="64" rows="3" size="41"></textarea><br/><br/>
<input type="radio" name="source" id="sourcenet" value="intranet"#(intranet.checked)#:: checked="checked"#(/intranet.checked)# />Full Intranet Scan: #[intranethosts]#<br/>
#(intranetHint)#::<div class="info">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Do not use intranet scan results, you are not in an intranet environment!</div>#(/intranetHint)#
#(intranetHint)#::<div class="info">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Do not use intranet scan results, you are not in an intranet environment!</div>#(/intranetHint)#<br/><br/>
<input type="radio" name="source" id="sourceall" value="all" />All known hosts in the search index (/31 subnet recommended!)
</dd>
<dt>Subnet</dt>
<dd>

@ -21,11 +21,11 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@ -74,9 +74,10 @@ public class CrawlStartScanner_p
// make a scanhosts entry
String hostt = post == null ? "" : post.get("scanhosts", "").trim();
boolean listall = false;
if (hostt.equals("*")) {
if (hostt.equals("*") || (post != null && "all".equals(post.get("source", "")))) {
hostt = "";
listall = true;
post.put("source", "hosts");
}
String[] hosts0 = hostt.indexOf('\n') > 0 || hostt.indexOf('\r') > 0 ? hostt.split("[\\r\\n]+") : hostt.split(Pattern.quote(","));
Set<String> hostSet = new LinkedHashSet<String>();
@ -133,13 +134,14 @@ public class CrawlStartScanner_p
// scan a range of ips
if (post.containsKey("scan")) {
boolean scanftp = "on".equals(post.get("scanftp", ""));
// start a scanner
final Scanner scanner = new Scanner(CONCURRENT_RUNNER, timeout);
boolean scanhttp = "on".equals(post.get("scanhttp", ""));
boolean scanhttps = "on".equals(post.get("scanhttps", ""));
boolean scanftp = "on".equals(post.get("scanftp", ""));
boolean scansmb = "on".equals(post.get("scansmb", ""));
final Set<InetAddress> scanbase = new HashSet<InetAddress>();
// select host base to scan
if ("hosts".equals(post.get("source", ""))) {
for (String host: hostSet) {
@ -150,6 +152,7 @@ public class CrawlStartScanner_p
final int p = host.indexOf('/', 0);
if (p >= 0) host = host.substring(0, p);
InetAddress ip;
Collection<InetAddress> scanbase = new ArrayList<InetAddress>();
if (host.length() > 0) {
ip = Domains.dnsResolve(host); if (ip != null) scanbase.add(ip);
if (scanftp && !hostSet.contains("ftp." + host)) {
@ -161,20 +164,13 @@ public class CrawlStartScanner_p
if (ip != null) scanbase.add(ip);
}
}
scanner.addProtocols(Scanner.genlist(scanbase, subnet), scanhttp, scanhttps, scanftp, scansmb);
}
}
if ("intranet".equals(post.get("source", ""))) {
scanbase.addAll(Domains.myIntranetIPs());
scanner.addProtocols(Scanner.genlist(Domains.myIntranetIPs(), subnet), scanhttp, scanhttps, scanftp, scansmb);
}
// start a scanner
final Scanner scanner = new Scanner(CONCURRENT_RUNNER, timeout);
List<InetAddress> addresses = Scanner.genlist(scanbase, subnet);
if (scanftp) scanner.addFTP(addresses);
if (scanhttp) scanner.addHTTP(addresses);
if (scanhttps) scanner.addHTTPS(addresses);
if (scansmb) scanner.addSMB(addresses);
scanner.start();
scanner.terminate();
if ("on".equals(post.get("accumulatescancache", "")) && !"scheduler".equals(post.get("rescan", ""))) {
Scanner.scancacheExtend(scanner);

@ -35,10 +35,11 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.yacy.cora.document.MultiProtocolURI;
import net.yacy.cora.protocol.ftp.FTPClient;
@ -49,29 +50,29 @@ import net.yacy.kelondro.data.meta.DigestURI;
* a protocol scanner
* scans given ip's for existing http, https, ftp and smb services
*/
public class Scanner extends Thread {
private static final Service POISONSERVICE = new Service(Protocol.http, null);
private static final Object PRESENT = new Object();
public class Scanner {
public static enum Access {unknown, empty, granted, denied;}
public static enum Protocol {http(80), https(443), ftp(21), smb(445);
public int port;
private Protocol(final int port) {this.port = port;}
}
public static class Service {
public class Service implements Runnable {
public Protocol protocol;
public InetAddress inetAddress;
private String hostname;
private final long starttime;
public Service(final Protocol protocol, final InetAddress inetAddress) {
this.protocol = protocol;
this.inetAddress = inetAddress;
this.hostname = null;
this.starttime = System.currentTimeMillis();
}
public Service(final String protocol, final InetAddress inetAddress) {
this.protocol = protocol.equals("http") ? Protocol.http : protocol.equals("https") ? Protocol.https : protocol.equals("ftp") ? Protocol.ftp : Protocol.smb;
this.inetAddress = inetAddress;
this.hostname = null;
this.starttime = System.currentTimeMillis();
}
public Protocol getProtocol() {
return this.protocol;
@ -111,12 +112,52 @@ public class Scanner extends Thread {
}
@Override
public int hashCode() {
return this.inetAddress.hashCode();
return (this.inetAddress.toString() + ":" + protocol.port).hashCode();
}
@Override
public boolean equals(final Object o) {
return (o instanceof Service) && ((Service) o).protocol == this.protocol && ((Service) o).inetAddress.equals(this.inetAddress);
}
@Override
public void run() {
try {
Thread.currentThread().setName("Scanner.Runner: Ping to " + this.getInetAddress().getHostAddress() + ":" + this.getProtocol().port); // good for debugging
if (TimeoutRequest.ping(this.getInetAddress().getHostAddress(), this.getProtocol().port, Scanner.this.timeout)) {
Access access = this.getProtocol() == Protocol.http || this.getProtocol() == Protocol.https ? Access.granted : Access.unknown;
Scanner.this.services.put(this, access);
if (access == Access.unknown) {
// ask the service if it lets us in
if (this.getProtocol() == Protocol.ftp) {
final FTPClient ftpClient = new FTPClient();
try {
ftpClient.open(this.getInetAddress().getHostAddress(), this.getProtocol().port);
ftpClient.login("anonymous", "anomic@");
final List<String> list = ftpClient.list("/", false);
ftpClient.CLOSE();
access = list == null || list.isEmpty() ? Access.empty : Access.granted;
} catch (final IOException e) {
access = Access.denied;
}
}
if (this.getProtocol() == Protocol.smb) {
try {
final MultiProtocolURI uri = new MultiProtocolURI(this.toString());
final String[] list = uri.list();
access = list == null || list.length == 0 ? Access.empty : Access.granted;
} catch (final IOException e) {
access = Access.denied;
}
}
}
if (access != Access.unknown) Scanner.this.services.put(this, access);
}
} catch (final ExecutionException e) {
} catch (final OutOfMemoryError e) {
}
}
public long age() {
return System.currentTimeMillis() - this.starttime;
}
}
private final static Map<Service, Access> scancache = new ConcurrentHashMap<Service, Access>();
@ -158,155 +199,45 @@ public class Scanner extends Thread {
//if (System.currentTimeMillis() > scancacheValidUntilTime) return true;
final InetAddress a = url.getInetAddress(); // try to avoid that!
if (a == null) return true;
final Access access = scancache.get(new Service(url.getProtocol(), a));
if (access == null) return false;
return access == Access.granted;
}
/*
private static InetAddress normalize(final InetAddress a) {
if (a == null) return null;
final byte[] b = a.getAddress();
if (b[3] == 1) return a;
b[3] = 1;
try {
return InetAddress.getByAddress(b);
} catch (final UnknownHostException e) {
return a;
for (Map.Entry<Service, Access> entry: scancache.entrySet()) {
Service service = entry.getKey();
if (service.inetAddress.equals(a) && service.protocol.toString().equals(url.getProtocol())) {
Access access = entry.getValue();
if (access == null) return false;
return access == Access.granted;
}
}
return true;
}
*/
private final int runnerCount;
private final BlockingQueue<Service> scanqueue;
private final Map<Service, Access> services;
private final Map<Runner, Object> runner;
private final ThreadPoolExecutor threadPool;
private final int timeout;
public Scanner(final int concurrentRunner, final int timeout) {
this.runnerCount = concurrentRunner;
this.scanqueue = new LinkedBlockingQueue<Service>();
this.services = Collections.synchronizedMap(new HashMap<Service, Access>());
this.runner = new ConcurrentHashMap<Runner, Object>();
this.threadPool = new ThreadPoolExecutor(concurrentRunner, concurrentRunner, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
this.timeout = timeout;
}
@Override
public void run() {
Service uri;
try {
while ((uri = this.scanqueue.take()) != POISONSERVICE) {
Thread.currentThread().setName("Scanner Start Loop; now: " + uri.getInetAddress()); // good for debugging
while (this.runner.size() >= this.runnerCount) {
/*for (Runner r: runner.keySet()) {
if (r.age() > 3000) synchronized(r) { r.interrupt(); }
}*/
if (this.runner.size() >= this.runnerCount) Thread.sleep(20);
}
final Runner runner = new Runner(uri);
this.runner.put(runner, PRESENT);
runner.start();
}
} catch (final InterruptedException e) {
}
}
public int pending() {
return this.scanqueue.size();
return this.threadPool.getQueue().size() + this.threadPool.getActiveCount();
}
public void terminate() {
for (int i = 0; i < this.runnerCount; i++) try {
this.scanqueue.put(POISONSERVICE);
} catch (final InterruptedException e) {
}
try {
this.join();
} catch (final InterruptedException e) {
}
this.threadPool.shutdown();
}
public class Runner extends Thread {
private final Service service;
private final long starttime;
public Runner(final Service service) {
this.service = service;
this.starttime = System.currentTimeMillis();
}
@Override
public void run() {
try {
Thread.currentThread().setName("Scanner.Runner: Ping to " + this.service.getInetAddress().getHostAddress() + ":" + this.service.getProtocol().port); // good for debugging
if (TimeoutRequest.ping(this.service.getInetAddress().getHostAddress(), this.service.getProtocol().port, Scanner.this.timeout)) {
Access access = this.service.getProtocol() == Protocol.http || this.service.getProtocol() == Protocol.https ? Access.granted : Access.unknown;
Scanner.this.services.put(this.service, access);
if (access == Access.unknown) {
// ask the service if it lets us in
if (this.service.getProtocol() == Protocol.ftp) {
final FTPClient ftpClient = new FTPClient();
try {
ftpClient.open(this.service.getInetAddress().getHostAddress(), this.service.getProtocol().port);
ftpClient.login("anonymous", "anomic@");
final List<String> list = ftpClient.list("/", false);
ftpClient.CLOSE();
access = list == null || list.isEmpty() ? Access.empty : Access.granted;
} catch (final IOException e) {
access = Access.denied;
}
}
if (this.service.getProtocol() == Protocol.smb) {
try {
final MultiProtocolURI uri = new MultiProtocolURI(this.service.toString());
final String[] list = uri.list();
access = list == null || list.length == 0 ? Access.empty : Access.granted;
} catch (final IOException e) {
access = Access.denied;
}
}
}
if (access != Access.unknown) Scanner.this.services.put(this.service, access);
}
} catch (final ExecutionException e) {
} catch (final OutOfMemoryError e) {
}
final Object r = Scanner.this.runner.remove(this);
assert r != null;
}
public long age() {
return System.currentTimeMillis() - this.starttime;
}
@Override
public boolean equals(final Object o) {
return (o instanceof Runner) && this.service.equals(((Runner) o).service);
}
@Override
public int hashCode() {
return this.service.hashCode();
}
public void addProtocols(final List<InetAddress> addresses, boolean http, boolean https, boolean ftp, boolean smb) {
if (http) addProtocol(Protocol.http, addresses);
if (https) addProtocol(Protocol.https, addresses);
if (ftp) addProtocol(Protocol.ftp, addresses);
if (smb) addProtocol(Protocol.smb, addresses);
}
public void addHTTP(final List<InetAddress> addresses) {
addProtocol(Protocol.http, addresses);
}
public void addHTTPS(final List<InetAddress> addresses) {
addProtocol(Protocol.https, addresses);
}
public void addSMB(final List<InetAddress> addresses) {
addProtocol(Protocol.smb, addresses);
}
public void addFTP(final List<InetAddress> addresses) {
addProtocol(Protocol.ftp, addresses);
}
private void addProtocol(final Protocol protocol, final List<InetAddress> addresses) {
for (final InetAddress i: addresses) {
try {
this.scanqueue.put(new Service(protocol, i));
} catch (final InterruptedException e) {
}
threadPool.execute(new Service(protocol, i));
}
}
@ -363,11 +294,7 @@ public class Scanner extends Thread {
//try {System.out.println("192.168.1.91: " + ping(new MultiProtocolURI("smb://192.168.1.91/"), 1000));} catch (MalformedURLException e) {}
final Scanner scanner = new Scanner(100, 10);
List<InetAddress> addresses = genlist(Domains.myIntranetIPs(), 20);
scanner.addFTP(addresses);
scanner.addHTTP(addresses);
scanner.addHTTPS(addresses);
scanner.addSMB(addresses);
scanner.start();
scanner.addProtocols(addresses, true, true, true, true);
scanner.terminate();
for (final Service service: scanner.services().keySet()) {
System.out.println(service.toString());

Loading…
Cancel
Save