我正在尝试像IDM这样的程序,可以同时下载文件的部分.
我正在使用的工具来实现这一点,是C#.Net4.5中的TPL
但是当使用任务使操作平行时,我有一个问题.
顺序功能运行正常,正确下载文件.
使用Tasks的并行功能正在运行,直到发生奇怪的事情:
我创建了4个任务,在Factory.StartNew()中,在每个任务中,给出了起始位置和结束位置,任务将下载这些文件,然后返回byte [],一切顺利,任务工作正常,但在某些时候,执行冻结就是这样,程序停止,没有其他事情发生.
并行功能的实现:
我正在使用的工具来实现这一点,是C#.Net4.5中的TPL
但是当使用任务使操作平行时,我有一个问题.
顺序功能运行正常,正确下载文件.
使用Tasks的并行功能正在运行,直到发生奇怪的事情:
我创建了4个任务,在Factory.StartNew()中,在每个任务中,给出了起始位置和结束位置,任务将下载这些文件,然后返回byte [],一切顺利,任务工作正常,但在某些时候,执行冻结就是这样,程序停止,没有其他事情发生.
并行功能的实现:
static void DownloadPartsParallel() { string uriPath = "http://mschnlnine.vo.llnwd.net/d1/pdc08/PPTX/BB01.pptx"; Uri uri = new Uri(uriPath); long l = GetFileSize(uri); Console.WriteLine("Size={0}",l); int granularity = 4; byte[][] arr = new byte[granularity][]; Task<byte[]>[] tasks = new Task<byte[]>[granularity]; tasks[0] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri,l / granularity)); tasks[1] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri,l / granularity + 1,l / granularity + l / granularity)); tasks[2] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri,l / granularity + l / granularity + 1,l / granularity + l / granularity + l / granularity)); tasks[3] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri,l / granularity + l / granularity + l / granularity + 1,l));//(l / granularity) + (l / granularity) + (l / granularity) + (l / granularity) arr[0] = tasks[0].Result; arr[1] = tasks[1].Result; arr[2] = tasks[2].Result; arr[3] = tasks[3].Result; Stream localStream; localStream = File.Create("E:\\a\\" + Path.GetFileName(uri.LocalPath)); for (int i = 0; i < granularity; i++) { if (i == granularity - 1) { for (int j = 0; j < arr[i].Length - 1; j++) { localStream.WriteByte(arr[i][j]); } } else for (int j = 0; j < arr[i].Length; j++) { localStream.WriteByte(arr[i][j]); } } }
DownloadPartOfFile函数实现:
public static byte[] DownloadPartOfFile(Uri fileUrl,long from,long to) { int bytesProcessed = 0; BinaryReader reader = null; WebResponse response = null; byte[] bytes = new byte[(to - from) + 1]; try { HttpWebRequest request = (HttpWebRequest)WebRequest.Create(fileUrl); request.AddRange(from,to); request.ReadWriteTimeout = int.MaxValue; request.Timeout = int.MaxValue; if (request != null) { response = request.GetResponse(); if (response != null) { reader = new BinaryReader(response.GetResponseStream()); int bytesRead; do { byte[] buffer = new byte[1024]; bytesRead = reader.Read(buffer,buffer.Length); if (bytesRead == 0) { break; } Array.Resize<byte>(ref buffer,bytesRead); buffer.CopyTo(bytes,bytesProcessed); bytesProcessed += bytesRead; Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ",Downloading" + bytesProcessed); } while (bytesRead > 0); } } } catch (Exception e) { Console.WriteLine(e.Message); } finally { if (response != null) response.Close(); if (reader != null) reader.Close(); } return bytes; }
我试图通过将int.MaxValue设置为读取超时,写入超时和超时来解决它,这就是为什么程序冻结,如果我没有这样做,超时的例外会在功能DownloadPartsParallel
所以有一个解决方案,或任何其他可能有帮助的建议,谢谢.
解决方法
我会使用
HttpClient.SendAsync
而不是WebRequest(见
“HttpClient is Here!”).
我不会使用任何额外的线程. HttpClient.SendAsync API自然是异步的,并且返回等待的任务,不需要将其卸载到具有Task.Run/Task.TaskFactory.StartNew的池线程(参见this进行详细的讨论).
我也将使用SemaphoreSlim.WaitAsync()限制并行下载的数量.以下是我作为控制台应用程序(未经广泛测试):
using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Threading; using System.Threading.Tasks; namespace Console_21737681 { class Program { const int MAX_PARALLEL = 4; // max parallel downloads const int CHUNK_SIZE = 2048; // size of a single chunk // a chunk of downloaded data class Chunk { public long Start { get; set; } public int Length { get; set; } public byte[] Data { get; set; } }; // throttle downloads SemaphoreSlim _throttleSemaphore = new SemaphoreSlim(MAX_PARALLEL); // get a chunk async Task<Chunk> GetChunk(HttpClient client,long start,int length,string url) { await _throttleSemaphore.WaitAsync(); try { using (var request = new HttpRequestMessage(HttpMethod.Get,url)) { request.Headers.Range = new System.Net.Http.Headers.RangeHeaderValue(start,start + length - 1); using (var response = await client.SendAsync(request)) { var data = await response.Content.ReadAsByteArrayAsync(); return new Chunk { Start = start,Length = length/*,Data = data*/ }; } } } finally { _throttleSemaphore.Release(); } } // download the URL in parallel by chunks async Task<Chunk[]> DownloadAsync(string url) { using (var client = new HttpClient()) { var request = new HttpRequestMessage(HttpMethod.Head,url); var response = await client.SendAsync(request); var contentLength = response.Content.Headers.ContentLength; if (!contentLength.HasValue) throw new InvalidOperationException("ContentLength"); var numOfChunks = (int)((contentLength.Value + CHUNK_SIZE - 1) / CHUNK_SIZE); var tasks = Enumerable.Range(0,numOfChunks).Select(i => { // start a new chunk long start = i * CHUNK_SIZE; var length = (int)Math.Min(CHUNK_SIZE,contentLength.Value - start); return GetChunk(client,start,length,url); }).ToList(); await Task.WhenAll(tasks); // the order of chunks is random return tasks.Select(task => task.Result).ToArray(); } } static void Main(string[] args) { var program = new Program(); var chunks = program.DownloadAsync("http://flaglane.com/download/australian-flag/australian-flag-large.png").Result; Console.WriteLine("Chunks: " + chunks.Count()); Console.ReadLine(); } } }