/**
 *  HostQueue
 *  Copyright 2013 by Michael Christen
 *  First released 24.09.2013 at http://yacy.net
 *
 *  This library is free software; you can redistribute it and/or
 *  modify it under the terms of the GNU Lesser General Public
 *  License as published by the Free Software Foundation; either
 *  version 2.1 of the License, or (at your option) any later version.
 *  
 *  This library is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 *  Lesser General Public License for more details.
 *  
 *  You should have received a copy of the GNU Lesser General Public License
 *  along with this program in the file lgpl21.txt
 *  If not, see <http://www.gnu.org/licenses/>.
 */

package net.yacy.crawler;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Array;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.document.encoding.UTF8;
import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.protocol.ClientIdentification;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.crawler.data.CrawlProfile;
import net.yacy.crawler.data.Latency;
import net.yacy.crawler.retrieval.Request;
import net.yacy.crawler.robots.RobotsTxt;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.index.BufferedObjectIndex;
import net.yacy.kelondro.index.Index;
import net.yacy.kelondro.index.OnDemandOpenFileIndex;
import net.yacy.kelondro.index.Row;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.table.Table;
import static net.yacy.kelondro.util.FileUtils.deletedelete;
import net.yacy.kelondro.util.kelondroException;
import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.search.Switchboard;

public class HostQueue implements Balancer {

    private final static ConcurrentLog log = new ConcurrentLog("HostQueue");
    
    public  static final String indexSuffix           = ".stack";
    private static final int    EcoFSBufferSize       = 1000;
    private static final int    objectIndexBufferSize = 1000;

    private final File          hostPath;
    private final String        hostName;
    private       String        hostHash;
    private final int           port;
    private final boolean       exceed134217727;
    private final boolean       onDemand;
    private       TreeMap<Integer, Index> depthStacks;

    public HostQueue (
            final File hostsPath,
            final String hostName,
            final int port,
            final boolean onDemand,
            final boolean exceed134217727) {
        this.onDemand = onDemand;
        this.exceed134217727 = exceed134217727;
        this.hostName = (hostName == null)  ? "localhost" : hostName; // might be null (file://) but hostqueue needs a name (for queue file)
        this.port = port;
        this.hostPath = new File(hostsPath, this.hostName + "." + this.port);
        init();
    }

    public HostQueue (
            final File hostPath,
            final boolean onDemand,
            final boolean exceed134217727) {
        this.onDemand = onDemand;
        this.exceed134217727 = exceed134217727;
        this.hostPath = hostPath;
        // parse the hostName and port from the file name
        String filename = hostPath.getName();
        int p = filename.lastIndexOf('.');
        if (p < 0) throw new RuntimeException("hostPath name must contain a dot: " + filename);
        this.hostName = filename.substring(0, p);
        this.port = Integer.parseInt(filename.substring(p + 1)); // consider "host.com" contains dot but no required port -> will throw exception
        init();
    }
    
    private final void init() {
        try {
            if (this.hostName == null)
                this.hostHash="";
            else
            this.hostHash = DigestURL.hosthash(this.hostName, this.port);
        } catch (MalformedURLException e) {
            this.hostHash = "";
        }
        if (!(this.hostPath.exists())) this.hostPath.mkdirs();
        this.depthStacks = new TreeMap<Integer, Index>();
        int size = openAllStacks();
        if (log.isInfo()) log.info("opened HostQueue " + this.hostPath.getAbsolutePath() + " with " + size + " urls.");
    }
    
    public String getHost() {
        return this.hostName;
    }
    
    public int getPort() {
        return this.port;
    }
    
    private int openAllStacks() {
        String[] l = this.hostPath.list();
        int c = 0;
        if (l != null) for (String s: l) {
            if (s.endsWith(indexSuffix)) try {
                int depth = Integer.parseInt(s.substring(0, s.length() - indexSuffix.length()));
                File stackFile = new File(this.hostPath, s);
                Index depthStack = openStack(stackFile);
                if (depthStack != null) {
                    int sz = depthStack.size();
                    if (sz == 0) {
                        depthStack.close();
                        deletedelete(stackFile);
                    } else {
                        this.depthStacks.put(depth, depthStack);
                        c += sz;
                    }
                }
            } catch (NumberFormatException e) {}
            }
        return c;
    }

    public synchronized int getLowestStackDepth() {
        while (this.depthStacks.size() > 0) {
            Map.Entry<Integer, Index> entry;
            synchronized (this) {
                entry = this.depthStacks.firstEntry();
            }
            if (entry == null) return 0; // happens only if map is empty
            if (entry.getValue().size() == 0) {
                entry.getValue().close();
                deletedelete(getFile(entry.getKey()));
                this.depthStacks.remove(entry.getKey());
                continue;
            }
            return entry.getKey();
        }
        // this should not happen but it happens if a deletion is done
        //assert false;
        return 0;
    }

    private Index getLowestStack() {
        while (this.depthStacks.size() > 0) {
            Map.Entry<Integer, Index> entry;
            synchronized (this) {
                entry = this.depthStacks.firstEntry();
            }
            if (entry == null) return null; // happens only if map is empty
            if (entry.getValue().size() == 0) {
                entry.getValue().close();
                deletedelete(getFile(entry.getKey()));
                this.depthStacks.remove(entry.getKey());
                continue;
            }
            return entry.getValue();
        }
        // this should not happen
        //assert false;
        return null;
    }
    
    private Index getStack(int depth) {
        Index depthStack;
        synchronized (this) {
            depthStack = this.depthStacks.get(depth);
            if (depthStack != null) return depthStack;
        }
        // create a new stack
        synchronized (this) {
            // check again 
            depthStack = this.depthStacks.get(depth);
            if (depthStack != null) return depthStack;
            // now actually create a new stack
            final File f = getFile(depth);
            depthStack = openStack(f);
            if (depthStack != null) this.depthStacks.put(depth, depthStack);
        }
        return depthStack;
    }
    
    private File getFile(int depth) {
        String name = Integer.toString(depth);
        while (name.length() < 4) name = "0" + name;
        final File f = new File(this.hostPath, name + indexSuffix);
        return f;
    }
    
    private Index openStack(File f) {
        for (int i = 0; i < 10; i++) {
            // we try that again if it fails because it shall not fail
            if (this.onDemand && (!f.exists() || f.length() < 10000)) {
                try {
                    return new BufferedObjectIndex(new OnDemandOpenFileIndex(f, Request.rowdef, exceed134217727), objectIndexBufferSize);
                } catch (kelondroException e) {
                    // possibly the file was closed meanwhile
                    ConcurrentLog.logException(e);
                }
            } else {
                    try {
                    return new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, false, exceed134217727, true), objectIndexBufferSize);
                } catch (final SpaceExceededException e) {
                    try {
                        return new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize);
                    } catch (final SpaceExceededException e1) {
                        ConcurrentLog.logException(e1);
                    }
                } catch (kelondroException e) {
                    // possibly the file was closed meanwhile
                    ConcurrentLog.logException(e);
            }
        }
        }
        return null;
    }

    @Override
    public synchronized void close() {
        for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
            int size = entry.getValue().size();
            entry.getValue().close();
            if (size == 0) deletedelete(getFile(entry.getKey()));
        }
        this.depthStacks.clear();
        String[] l = this.hostPath.list();
        if ((l == null || l.length == 0) && this.hostPath != null) deletedelete(this.hostPath);
    }

    @Override
    public synchronized void clear() {
        for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
            entry.getValue().close();
            deletedelete(getFile(entry.getKey()));
        }
        this.depthStacks.clear();
        String[] l = this.hostPath.list();
        if (l != null) for (String s: l) {
            deletedelete(new File(this.hostPath, s));
        }
        deletedelete(this.hostPath);
    }

    @Override
    public Request get(final byte[] urlhash) throws IOException {
        assert urlhash != null;
        if (this.depthStacks == null) return null; // case occurs during shutdown
        for (Index depthStack: this.depthStacks.values()) {
            final Row.Entry entry = depthStack.get(urlhash, false);
            if (entry == null) return null;
            return new Request(entry);
        }
        return null;
    }

    @Override
    public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException {
        // first find a list of url hashes that shall be deleted
        final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
        int count = 0;
        synchronized (this) {
            for (Index depthStack: this.depthStacks.values()) {
                final HandleSet urlHashes = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100);
                final Iterator<Row.Entry> i = depthStack.rows();
                Row.Entry rowEntry;
                Request crawlEntry;
                while (i.hasNext() && (System.currentTimeMillis() < terminate)) {
                    rowEntry = i.next();
                    crawlEntry = new Request(rowEntry);
                    if (crawlEntry.profileHandle().equals(profileHandle)) {
                        urlHashes.put(crawlEntry.url().hash());
                    }
                    if (System.currentTimeMillis() > terminate) break;
                }
                for (final byte[] urlhash: urlHashes) {
                    depthStack.remove(urlhash);
                    count++;
                }
            }
        }
        return count;
    }
    
    /**
     * delete all urls which are stored for given host hashes
     * @param hosthashes
     * @return number of deleted urls
     */
    @Override
    public int removeAllByHostHashes(final Set<String> hosthashes) {
        for (String h: hosthashes) {
            if (this.hostHash.equals(h)) {
                int s = this.size();
                this.clear();
                return s;
            }
        }
        return 0;
    }

    /**
     * remove urls from the queue
     * @param urlHashes, a list of hashes that shall be removed
     * @return number of entries that had been removed
     * @throws IOException
     */
    @Override
    public synchronized int remove(final HandleSet urlHashes) throws IOException {
        int removedCounter = 0;
        for (Index depthStack: this.depthStacks.values()) {
            final int s = depthStack.size();
            for (final byte[] urlhash: urlHashes) {
                final Row.Entry entry = depthStack.remove(urlhash);
                if (entry != null) removedCounter++;
            }
            if (removedCounter == 0) return 0;
            assert depthStack.size() + removedCounter == s : "urlFileIndex.size() = " + depthStack.size() + ", s = " + s;
        }
        return removedCounter;
    }

    @Override
    public boolean has(final byte[] urlhashb) {
        for (int retry = 0; retry < 3; retry++) {
            try {
                for (Index depthStack: this.depthStacks.values()) {
                    if (depthStack.has(urlhashb)) return true;
                }
                return false;
            } catch (ConcurrentModificationException e) {}
        }
        return false;
    }

    @Override
    public int size() {
        int size = 0;
        for (Index depthStack: this.depthStacks.values()) {
            size += depthStack.size();
        }
        return size;
    }

    @Override
    public boolean isEmpty() {
        for (Index depthStack: this.depthStacks.values()) {
            if (!depthStack.isEmpty()) return false;
        }
        return true;
    }

    @Override
    public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
        assert entry != null;
        final byte[] hash = entry.url().hash();
        synchronized (this) {
            // double-check
            if (this.has(hash)) return "double occurrence in urlFileIndex";

            // increase dom counter
            if (profile != null) {
                int maxPages = profile.domMaxPages();
                if (maxPages != Integer.MAX_VALUE && maxPages > 0) {
                    String host = entry.url().getHost();
                    profile.domInc(host);
                }
            }
            
            // add to index
            Index depthStack = getStack(entry.depth());
            final int s = depthStack.size();
            depthStack.put(entry.toRow());
            assert s < depthStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + depthStack.size();
            assert depthStack.has(hash) : "hash = " + ASCII.String(hash);
        }
        return null;
    }


    @Override
    public Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException {
        // returns a crawl entry from the stack and ensures minimum delta times
        long sleeptime = 0;
        Request crawlEntry = null;
        CrawlProfile profileEntry = null;
        synchronized (this) {
            mainloop: while (true) {
                Index depthStack = getLowestStack();
                if (depthStack == null) return null;
                Row.Entry rowEntry = null;
                while (depthStack.size() > 0) {
                    rowEntry = depthStack.removeOne();
                    if (rowEntry != null) break;
                }
                if (rowEntry == null) continue mainloop;
                crawlEntry = new Request(rowEntry);

                // check blacklist (again) because the user may have created blacklist entries after the queue has been filled
                if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) {
                    if (log.isFine()) log.fine("URL '" + crawlEntry.url() + "' is in blacklist.");
                    continue mainloop;
                }

                // at this point we must check if the crawlEntry has relevance because the crawl profile still exists
                // if not: return null. A calling method must handle the null value and try again
                profileEntry = cs.get(UTF8.getBytes(crawlEntry.profileHandle()));
                if (profileEntry == null) {
                    if (log.isFine()) log.fine("no profile entry for handle " + crawlEntry.profileHandle());
                    continue mainloop;
                }
                
                // depending on the caching policy we need sleep time to avoid DoS-like situations
                sleeptime = Latency.getDomainSleepTime(robots, profileEntry, crawlEntry.url());
                break;
            }
        }
        if (crawlEntry == null) return null;
        ClientIdentification.Agent agent = profileEntry == null ? ClientIdentification.yacyInternetCrawlerAgent : profileEntry.getAgent();
        long robotsTime = Latency.getRobotsTime(robots, crawlEntry.url(), agent);
        Latency.updateAfterSelection(crawlEntry.url(), profileEntry == null ? 0 : robotsTime);
        if (delay && sleeptime > 0) {
            // force a busy waiting here
            // in best case, this should never happen if the balancer works properly
            // this is only to protection against the worst case, where the crawler could
            // behave in a DoS-manner
            if (log.isInfo()) log.info("forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, agent));
            long loops = sleeptime / 1000;
            long rest = sleeptime % 1000;
            if (loops < 3) {
                rest = rest + 1000 * loops;
                loops = 0;
            }
            Thread.currentThread().setName("Balancer waiting for " + crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds");
            synchronized(this) {
                // must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough
                if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}}
                for (int i = 0; i < loops; i++) {
                    if (log.isInfo()) log.info("waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining...");
                    try {this.wait(1000); } catch (final InterruptedException e) {}
                }
            }
            Latency.updateAfterSelection(crawlEntry.url(), robotsTime);
        }
        return crawlEntry;
    }

    @Override
    public Iterator<Request> iterator() throws IOException {
        final Iterator<Map.Entry<Integer, Index>> depthIterator = this.depthStacks.entrySet().iterator();
        @SuppressWarnings("unchecked")
        final Iterator<Row.Entry>[] rowIterator = (Iterator<Row.Entry>[]) Array.newInstance(Iterator.class, 1);
        rowIterator[0] = null;
        return new Iterator<Request>() {
            @Override
            public boolean hasNext() {
                return depthIterator.hasNext() || (rowIterator[0] != null && rowIterator[0].hasNext());
            }
            @Override
            public Request next() {
                synchronized (HostQueue.this) {
                    try {
                        while (rowIterator[0] == null || !rowIterator[0].hasNext()) {
                            Map.Entry<Integer, Index> entry = depthIterator.next();
                            rowIterator[0] = entry.getValue().iterator();
                        }
                        if (!rowIterator[0].hasNext()) return null;
                        Row.Entry rowEntry = rowIterator[0].next();
                        if (rowEntry == null) return null;
                        return new Request(rowEntry);
                    } catch (Throwable e) {
                        return null;
                    }
                }
            }
            @Override
            public void remove() {
                rowIterator[0].remove();
            }
        };
    }

    /**
     * get a list of domains that are currently maintained as domain stacks
     * @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time}
     */
    @Override
    public Map<String, Integer[]> getDomainStackHosts(RobotsTxt robots) {
        Map<String, Integer[]> map = new TreeMap<String, Integer[]>();
        int delta = Latency.waitingRemainingGuessed(this.hostName, this.hostHash, robots, ClientIdentification.yacyInternetCrawlerAgent);
        map.put(this.hostName, new Integer[]{this.size(), delta});
        return map;
    }

    /**
     * get lists of crawl request entries for a specific host
     * @param host
     * @param maxcount
     * @param maxtime
     * @return a list of crawl loader requests
     */
    @Override
    public List<Request> getDomainStackReferences(String host, int maxcount, long maxtime) {
        if (host == null) return new ArrayList<Request>(0);
        if (!this.hostName.equals(host)) return new ArrayList<Request>(0);
        final ArrayList<Request> cel = new ArrayList<Request>(maxcount);
        long timeout = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
        Iterator<Request> i;
        try {
            i = this.iterator();
            while (i.hasNext()) {
                Request r = i.next();
                if (r != null) cel.add(r);
                if (System.currentTimeMillis() > timeout || cel.size() >= maxcount) break;
            }
        } catch (IOException e) {
        }
        return cel;
    }

    @Override
    public int getOnDemandLimit() {
        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }

    @Override
    public boolean getExceed134217727() {
        return this.exceed134217727;
    }

}