中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java中怎么利用多線程實現無阻塞讀取遠程文件

發布時間:2021-07-02 14:30:18 來源:億速云 閱讀:148 作者:Leah 欄目:編程語言

本篇文章為大家展示了Java中怎么利用多線程實現無阻塞讀取遠程文件,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

一、HttpReader類功能:HTTP協議從指定URL讀取數據

/** *//**
* author by http://www.bt285.cn http://www.5a520.cn
*/
package instream;   
  
import java.io.IOException;   
import java.io.InputStream;   
import java.net.HttpURLConnection;   
import java.net.URL;   
  
public final class HttpReader {   
    public static final int MAX_RETRY = 10;   
    private static long content_length;   
    private URL url;   
    private HttpURLConnection httpConnection;   
    private InputStream in_stream;   
    private long cur_pos;           //用于決定seek方法中是否執行文件定位   
    private int connect_timeout;   
    private int read_timeout;   
       
    public HttpReader(URL u) {   
        this(u, 5000, 5000);   
    }   
       
    public HttpReader(URL u, int connect_timeout, int read_timeout) {   
        this.connect_timeout = connect_timeout;   
        this.read_timeout = read_timeout;   
        url = u;   
        if (content_length == 0) {   
            int retry = 0;   
            while (retry < HttpReader.MAX_RETRY)   
                try {   
                    this.seek(0);   
                    content_length = httpConnection.getContentLength();   
                    break;   
                } catch (Exception e) {   
                    retry++;   
                }   
        }   
    }   
       
    public static long getContentLength() {   
        return content_length;   
    }   
       
    public int read(byte[] b, int off, int len) throws IOException {   
        int r = in_stream.read(b, off, len);   
        cur_pos += r;   
        return r;   
    }   
       
    public int getData(byte[] b, int off, int len) throws IOException {   
        int r, rema = len;   
        while (rema > 0) {   
            if ((r = in_stream.read(b, off, rema)) == -1) {   
                return -1;   
            }   
            rema -= r;   
            off += r;   
            cur_pos += r;   
        }   
        return len;   
    }   
       
    public void close() {   
        if (httpConnection != null) {   
            httpConnection.disconnect();   
            httpConnection = null;   
        }   
        if (in_stream != null) {   
            try {   
                in_stream.close();   
            } catch (IOException e) {}   
            in_stream = null;   
        }   
        url = null;   
    }   
       
    /**//*  
     * 拋出異常通知再試  
     * 響應碼503可能是由某種暫時的原因引起的,例如同一IP頻繁的連接請求可能遭服務器拒絕  
     */  
    public void seek(long start_pos) throws IOException {   
        if (start_pos == cur_pos && in_stream != null)   
            return;   
        if (httpConnection != null) {   
            httpConnection.disconnect();   
            httpConnection = null;   
        }   
        if (in_stream != null) {   
            in_stream.close();   
            in_stream = null;   
        }   
        httpConnection = (HttpURLConnection) url.openConnection();   
        httpConnection.setConnectTimeout(connect_timeout);   
        httpConnection.setReadTimeout(read_timeout);   
        String sProperty = "bytes=" + start_pos + "-";   
        httpConnection.setRequestProperty("Range", sProperty);   
        //httpConnection.setRequestProperty("Connection", "Keep-Alive");   
        int responseCode = httpConnection.getResponseCode();   
        if (responseCode < 200 || responseCode >= 300) {   
            try {   
                Thread.sleep(500);   
            } catch (InterruptedException e) {   
                e.printStackTrace();   
            }   
            throw new IOException("HTTP responseCode="+responseCode);   
        }   
  
        in_stream = httpConnection.getInputStream();   
        cur_pos = start_pos;   
    }   
  
}

二、IWriterCallBack接口功能:實現讀/寫通信。

package instream;   
  
public interface IWriterCallBack {   
    public boolean tryWriting(Writer w) throws InterruptedException;   
    public void updateBuffer(int i, int len);   
    public void updateWriterCount();   
    public void terminateWriters();   
}

三、Writer類:下載線程,負責向buf[]寫數據。

/** *//**
* http://www.bt285.cn http://www.5a520.cn 
*/
package instream;   
import java.io.IOException;   
import java.net.URL;   
  
public final class Writer implements Runnable {   
    private static boolean isalive = true;   
    private byte[] buf;   
    private IWriterCallBack icb;   
    protected int index;            //buf[]內"塊"索引號   
    protected long start_pos;       //index對應的文件位置(相對于文件首的偏移量)   
    protected int await_count;      //用于判斷:下載速度足夠就退出一個"寫"線程   
    private HttpReader hr;   
       
    public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) {   
        hr = new HttpReader(u);   
        if(HttpReader.getContentLength() == 0)  //實例化HttpReader對象都不成功   
            return;   
        icb = call_back;   
        buf = b;   
        Thread t = new Thread(this,"dt_"+i);   
        t.setPriority(Thread.NORM_PRIORITY + 1);   
        t.start();   
    }   
       
    public void run() {   
        int write_bytes=0, write_pos=0, rema = 0, retry = 0;   
        boolean cont = true;   
        while (cont) {   
            try {   
                // 1.等待空閑塊   
                if(retry == 0) {   
                    if (icb.tryWriting(this) == false)   
                        break;   
                    write_bytes = 0;   
                    rema = BuffRandAcceURL.UNIT_LENGTH;   
                    write_pos = index << BuffRandAcceURL.UNIT_LENGTH_BITS;   
                }   
                   
                // 2.定位   
                hr.seek(start_pos);   
  
                // 3.下載"一塊"   
                int w;   
                while (rema > 0 && isalive) {   
                    w = (rema < 2048) ? rema : 2048; //每次讀幾K合適?   
                    if ((w = hr.read(buf, write_pos, w)) == -1) {   
                        cont = false;   
                        break;   
                    }   
                    rema -= w;   
                    write_pos += w;   
                    start_pos += w;   
                    write_bytes += w;   
                }   
                   
                //4.通知"讀"線程   
                retry = 0;   
                icb.updateBuffer(index, write_bytes);   
            } catch (InterruptedException e) {   
                isalive = false;   
                icb.terminateWriters();   
                break;   
            } catch (IOException e) {   
                if(++retry == HttpReader.MAX_RETRY) {   
                    isalive = false;   
                    icb.terminateWriters();   
                    break;   
                }   
            }   
        }   
        icb.updateWriterCount();   
        try {   
            hr.close();   
        } catch (Exception e) {}   
        hr = null;   
        buf = null;   
        icb = null;   
    }   
  
}


四、IRandomAccess接口:

無阻塞讀取遠程文件中需要隨機讀取文件接口,BuffRandAcceURL類和BuffRandAcceFile類實現接口方法。BuffRandAcceFile類實現讀取本地磁盤文件,這兒就不給出其源碼了。

package instream;   
  
public interface IRandomAccess {   
    public int read() throws Exception;   
    public int read(byte b[]) throws Exception;   
    public int read(byte b[], int off, int len) throws Exception;   
    public int dump(int src_off, byte b[], int dst_off, int len) throws Exception;   
    public void seek(long pos) throws Exception;   
    public long length();   
    public long getFilePointer();   
    public void close();   
}


五、BuffRandAcceURL類功能:創建下載線程;read方法從buf[]讀數據。

關鍵是如何簡單有效防止死鎖?以下只是我的一次嘗試,請指正。

/** *//**
* http://www.5a520.cn  http://www.bt285.cn
*/ 
package instream;   
  
import java.net.URL;   
import java.net.URLDecoder;   
import decode.Header;   
import tag.MP3Tag;   
import tag.TagThread;   
  
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {   
    public static final int UNIT_LENGTH_BITS = 15;                  //32K   
    public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS;   
    public static final int BUF_LENGTH = UNIT_LENGTH << 4;            //16塊   
    public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;   
    public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);   
    private static final int MAX_WRITER = 8;   
    private static long file_pointer;   
    private static int read_pos;   
    private static int fill_bytes;   
    private static byte[] buf;      //同時也作讀寫同步鎖:buf.wait()/buf.notify()   
    private static int[] buf_bytes;   
    private static int buf_index;   
    private static int alloc_pos;   
    private static URL url = null;   
    private static boolean isalive = true;   
    private static int writer_count;   
    private static int await_count;   
    private long file_length;   
    private long frame_bytes;   
       
    public BuffRandAcceURL(String sURL) throws Exception {   
        this(sURL,MAX_WRITER);   
    }   
       
    public BuffRandAcceURL(String sURL, int download_threads) throws Exception {   
        buf = new byte[BUF_LENGTH];   
        buf_bytes = new int[UNIT_COUNT];   
        url = new URL(sURL);   
           
        //創建線程以異步方式解析ID3   
        new TagThread(url);   
           
        //打印當前文件名   
        try {   
            String s = URLDecoder.decode(sURL, "GBK");   
            System.out.println("start>> " + s.substring(s.lastIndexOf("/") + 1));   
            s = null;   
        } catch (Exception e) {   
            System.out.println("start>> " + sURL);   
        }   
           
        //創建"寫"線程   
        for(int i = 0; i < download_threads; i++)   
            new Writer(this, url, buf, i+1);   
        frame_bytes = file_length = HttpReader.getContentLength();   
        if(file_length == 0) {   
            Header.strLastErr = "連接URL出錯,重試 " + HttpReader.MAX_RETRY + " 次后放棄。";   
            throw new Exception("retry " + HttpReader.MAX_RETRY);   
        }   
        writer_count = download_threads;   
           
        //緩沖   
        try_cache();   
           
        //跳過ID3 v2   
        MP3Tag mP3Tag = new MP3Tag();   
        int v2_size = mP3Tag.checkID3V2(buf,0);   
        if (v2_size > 0) {   
            frame_bytes -= v2_size;   
            //seek(v2_size):   
            fill_bytes -= v2_size;   
            file_pointer = v2_size;   
            read_pos = v2_size;   
            read_pos &= BUF_LENGTH_MASK;   
            int units = v2_size >> UNIT_LENGTH_BITS;   
            for(int i = 0; i < units; i++) {   
                buf_bytes[i] = 0;   
                this.notifyWriter();   
            }   
            buf_bytes[units] -= v2_size;   
            this.notifyWriter();   
        }   
        mP3Tag = null;   
    }   
       
    private void try_cache() throws InterruptedException {   
        int cache_size = BUF_LENGTH;   
        if(cache_size > (int)file_length - alloc_pos)   
            cache_size = (int)file_length - alloc_pos;   
        cache_size -= UNIT_LENGTH;   
           
        //等待填寫當前正在讀的那"一塊"緩沖區   
        /**//*if(fill_bytes >= cache_size && writer_count > 0) {  
            synchronized (buf) {  
                buf.wait();  
            }  
            return;  
        }*/  
           
        //等待填滿緩沖區   
        while (fill_bytes < cache_size) {   
            if (writer_count == 0 || isalive == false)   
                return;   
            if(BUF_LENGTH > (int)file_length - alloc_pos)   
                cache_size = (int)file_length - alloc_pos - UNIT_LENGTH;   
            System.out.printf("\r[緩沖%1$6.2f%%] ",(float)fill_bytes / cache_size * 100);   
            synchronized (buf) {   
                buf.wait();   
            }   
        }   
        System.out.printf("\r");   
    }   
       
    private int try_reading(int i, int len) throws Exception {   
        int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);   
        int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   
        while (r < len) {   
            if (writer_count == 0 || isalive == false)   
                return r;   
            try_cache();   
            r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   
        }   
           
        return len;   
    }   
       
    /**//*  
     * 各個"寫"線程互斥等待空閑塊  
     */  
    public synchronized boolean tryWriting(Writer w) throws InterruptedException {   
        await_count++;   
        while (buf_bytes[buf_index] != 0 && isalive) {   
            this.wait();   
        }   
           
        //下載速度足夠就結束一個"寫"線程   
        if(writer_count > 1 && w.await_count >= await_count &&   
                w.await_count >= writer_count)   
            return false;   
           
        if(alloc_pos >= file_length)   
            return false;   
        w.await_count = await_count;   
        await_count--;   
        w.start_pos = alloc_pos;   
        w.index = buf_index;   
        alloc_pos += UNIT_LENGTH;   
        buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1;   
        return isalive;   
    }   
       
    public void updateBuffer(int i, int len) {   
        synchronized (buf) {   
            buf_bytes[i] = len;   
            fill_bytes += len;   
            buf.notify();   
        }   
    }   
       
    public void updateWriterCount() {   
        synchronized (buf) {   
            writer_count--;   
            buf.notify();   
        }   
    }   
       
    public synchronized void notifyWriter() {   
        this.notifyAll();   
    }   
       
    public void terminateWriters() {   
        synchronized (buf) {   
            if (isalive) {   
                isalive = false;   
                Header.strLastErr = "讀取文件超時。重試 " + HttpReader.MAX_RETRY   
                        + " 次后放棄,請您稍后再試。";   
            }   
            buf.notify();   
        }   
           
        notifyWriter();        
    }   
       
    public int read() throws Exception {   
        int iret = -1;   
        int i = read_pos >> UNIT_LENGTH_BITS;   
        // 1."等待"有1字節可讀   
        while (buf_bytes[i] < 1) {   
            try_cache();   
            if (writer_count == 0)   
                return -1;   
        }   
        if(isalive == false)   
            return -1;   
  
        // 2.讀取   
        iret = buf[read_pos] & 0xff;   
        fill_bytes--;   
        file_pointer++;   
        read_pos++;   
        read_pos &= BUF_LENGTH_MASK;   
        if (--buf_bytes[i] == 0)   
            notifyWriter();     // 3.通知   
  
        return iret;   
    }   
       
    public int read(byte b[]) throws Exception {   
        return read(b, 0, b.length);   
    }   
  
    public int read(byte[] b, int off, int len) throws Exception {   
        if(len > UNIT_LENGTH)   
            len = UNIT_LENGTH;   
        int i = read_pos >> UNIT_LENGTH_BITS;   
           
        // 1."等待"有足夠內容可讀   
        if(try_reading(i, len) < len || isalive == false)   
            return -1;   
  
        // 2.讀取   
        int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH   
        if (tail_len < len) {   
            System.arraycopy(buf, read_pos, b, off, tail_len);   
            System.arraycopy(buf, 0, b, off + tail_len, len - tail_len);   
        } else  
            System.arraycopy(buf, read_pos, b, off, len);   
  
        fill_bytes -= len;   
        file_pointer += len;   
        read_pos += len;   
        read_pos &= BUF_LENGTH_MASK;   
        buf_bytes[i] -= len;   
        if (buf_bytes[i] < 0) {   
            int ni = read_pos >> UNIT_LENGTH_BITS;   
            buf_bytes[ni] += buf_bytes[i];   
            buf_bytes[i] = 0;   
            notifyWriter();   
        } else if (buf_bytes[i] == 0)   
            notifyWriter();   
           
        return len;   
    }   
       
    /**//*  
     * 從src_off位置復制,不移動文件"指針"  
     */  
    public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {   
        int rpos = read_pos + src_off;   
        if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false)   
            return -1;   
        int tail_len = BUF_LENGTH - rpos;   
        if (tail_len < len) {   
            System.arraycopy(buf, rpos, b, dst_off, tail_len);   
            System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len);   
        } else  
            System.arraycopy(buf, rpos, b, dst_off, len);   
        // 不發信號   
  
        return len;   
    }   
       
    public long length() {   
        return file_length;   
    }   
       
    public long getFilePointer() {   
        return file_pointer;   
    }   
  
    public void close() {   
        //   
    }   
       
    //   
    public void seek(long pos) throws Exception {   
        //   
    }   
       
}

上述內容就是Java中怎么利用多線程實現無阻塞讀取遠程文件,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

凤城市| 惠来县| 忻州市| 婺源县| 上林县| 甘德县| 云安县| 云和县| 北辰区| 常宁市| 轮台县| 资阳市| 开封市| 乌鲁木齐县| 栾川县| 康马县| 黄骅市| 岳阳县| 屏山县| 抚松县| 沂南县| 祁阳县| 泸定县| 冕宁县| 新津县| 沅陵县| 鹤山市| 江门市| 亳州市| 沭阳县| 任丘市| 黄龙县| 开封市| 巴林左旗| 吴桥县| 太仆寺旗| 屏东县| 黄梅县| 长泰县| 麦盖提县| 开平市|