VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > C#编程 >
  • C#教程之C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容

本站最新发布   C#从入门到精通
试听地址  
https://www.xin3721.com/eschool/CSharpxin3721/

 为什么编写TaskSchedulerEx类?

    因为.NET默认线程池只有一个线程池,如果某个批量任务一直占着大量线程,甚至耗尽默认线程池,则会严重影响应用程序域中其它任务或批量任务的性能。

     特点:

    1、使用独立线程池,线程池中线程分为核心线程和辅助线程,辅助线程会动态增加和释放,且总线程数不大于参数_maxThreadCount

    2、无缝兼容Task,使用上和Task一样,可以用它来实现异步,参见:C# async await 异步执行方法封装 替代 BackgroundWorker

    3、队列中尚未执行的任务可以取消

    4、通过扩展类TaskHelper实现任务分组

    5、和SmartThreadPool对比,优点是无缝兼容Task类,和Task类使用没有区别,因为它本身就是对Task、TaskScheduler的扩展,所以Task类的ContinueWith、WaitAll等方法它都支持,以及兼容async、await异步编程

    6、代码量相当精简,TaskSchedulerEx类只有230多行代码

    7、池中的线程数量会根据负载自动增减,支持,但没有SmartThreadPool智能,为了性能,使用了比较笨的方式实现,不知道大家有没有既智能,性能又高的方案,我有一个思路,在定时器中计算每个任务执行平均耗时,然后使用公式(线程数 = CPU核心数 * ( 本地计算时间 + 等待时间 ) / 本地计算时间)来计算最佳线程数,然后按最佳线程数来动态创建线程,但这个计算过程可能会牺牲性能

     对比SmartThreadPool:

    TaskSchedulerEx类代码(使用BlockingCollection,当线程数大于200时,CPU占用高,线程数小于100时,CPU占用正常):

复制代码
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    public class TaskSchedulerEx : TaskScheduler, IDisposable
    {
        #region 外部方法
        [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")]
        public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize);
        #endregion

        #region 变量属性事件

        private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
        List<Thread> _threadList = new List<Thread>();
        private int _threadCount = 0;
        private int _maxThreadCount = 0;
        private int _timeOut = Timeout.Infinite;
        private int _extTimeOut = 3000;
        private Task _tempTask;
        private int _activeThreadCount = 0;
        private System.Timers.Timer _timer;
        private object _lockCreateTimer = new object();

        /// <summary>
        /// 活跃线程数

        /// </summary>
        public int ActiveThreadCount
        {
            get { return _activeThreadCount; }
        }

        /// <summary>
        /// 核心线程数

        /// </summary>
        public int CoreThreadCount
        {
            get { return _threadCount; }
        }

        /// <summary>
        /// 最大线程数
        /// </summary>
        public int MaxThreadCount
        {
            get { return _maxThreadCount; }
        }
        #endregion

        #region 构造函数

        public TaskSchedulerEx(int threadCount = 10, int maxThreadCount = 20)
        {
            _maxThreadCount = maxThreadCount;
            CreateThreads(threadCount);
        }
        #endregion

        #region override GetScheduledTasks
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks;
        }
        #endregion

        #region override TryExecuteTaskInline
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }
        #endregion

        #region override QueueTask
        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);
        }
        #endregion

        #region 资源释放
        /// <summary>
        /// 资源释放
        /// 如果尚有任务在执行,则会在调用此方法的线程上引发System.Threading.ThreadAbortException,请使用Task.WaitAll等待任务执行完毕后,再调用该方法
        /// </summary>
        public void Dispose()
        {
            if (_timer != null)
            {
                _timer.Stop();
                _timer.Dispose();
                _timer = null;
            }

            _timeOut = 100;

            foreach (Thread item in _threadList)
            {
                item.Abort();
                Interlocked.Decrement(ref _activeThreadCount);
            }
            _threadList.Clear();

            GC.Collect();
            GC.WaitForPendingFinalizers();
            if (Environment.OSVersion.Platform == PlatformID.Win32NT)
            {
                SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1);
            }
        }
        #endregion

        #region 创建线程池

        /// <summary>
        /// 创建线程池

        /// </summary>
        private void CreateThreads(int? threadCount = null)
        {
            if (threadCount != null) _threadCount = threadCount.Value;
            _timeOut = Timeout.Infinite;

            for (int i = 0; i < _threadCount; i++)
            {
                Interlocked.Increment(ref _activeThreadCount);
                Thread thread = new Thread(new ThreadStart(() =>
                {
                    Task task;
                    while (_tasks.TryTake(out task, _timeOut))
                    {
                        CreateTimer();
                        TryExecuteTask(task);
                    }
                }));
                thread.IsBackground = true;
                thread.Start();
                _threadList.Add(thread);
            }
        }
        #endregion

        #region 创建线程
        /// <summary>
        /// 创建线程
        /// </summary>
        private void CreateThread()
        {
            Interlocked.Increment(ref _activeThreadCount);
            Thread thread = null;
            thread = new Thread(new ThreadStart(() =>
            {
                Task task;
                while (_tasks.TryTake(out task, _extTimeOut))
                {
                    TryExecuteTask(task);
                }
                Interlocked.Decrement(ref _activeThreadCount);
                if (_activeThreadCount == _threadCount)
                {
                    GC.Collect();
                    GC.WaitForPendingFinalizers();
                    if (Environment.OSVersion.Platform == PlatformID.Win32NT)
                    {
                        SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1);
                    }
                }
                if (thread != null)
                {
                    thread.Abort();
                    thread = null;
                }
            }));
            thread.IsBackground = true;
            thread.Start();
        }
        #endregion

        #region 创建定时器

        private void CreateTimer()
        {
            if (_timer == null) //_timer不为空时,跳过,不走lock,提升性能
            {
                lock (_lockCreateTimer)
                {
                    if (_timer == null)
                    {
                        int interval = 20;
                        _timer = new System.Timers.Timer();
                        _timer.Interval = 1000;
                        _timer.Elapsed += (s, e) =>
                        {
                            if (_timer.Interval != interval) _timer.Interval = interval;

                            if (_activeThreadCount >= _threadCount && _activeThreadCount < _maxThreadCount)
                            {
                                if (_tasks.Count > 0)
                                {
                                    CreateThread();
                                }
                                else
                                {
                                    if (_timer != null)
                                    {
                                        _timer.Stop();
                                        _timer.Dispose();
                                        _timer = null;
                                    }
                                }
                            }
                        };
                        _timer.Start();
                    }
                }
            }
        }
        #endregion

        #region 全部取消
        /// <summary>
        /// 全部取消
        /// 当前正在执行的任务无法取消,取消的只是后续任务,相当于AbortAll
        /// </summary>
        public void CancelAll()
        {
            while (_tasks.TryTake(out _tempTask)) { }
        }
        #endregion

    }
}
复制代码

    TaskSchedulerEx类代码(使用ConcurrentQueue,测试500个线程,CPU占用0%-2%,正常):

复制代码
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    public class TaskSchedulerEx : TaskScheduler, IDisposable
    {
        #region 外部方法
        [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")]
        public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize);
        #endregion

        #region 变量属性事件

        private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
        List<Thread> _threadList = new List<Thread>();
        private int _threadCount = 0;
        private int _maxThreadCount = 0;
        private int _timeOut = Timeout.Infinite;
        private int _extTimeOut = 20000;
        private Task _tempTask;
        private int _activeThreadCount = 0;
        private System.Timers.Timer _timer;
        private object _lockCreateTimer = new object();
        private bool _run = true;
        private SpinWait _spinWait = new SpinWait();

        /// <summary>
        /// 活跃线程数

        /// </summary>
        public int ActiveThreadCount
        {
            get { return _activeThreadCount; }
        }

        /// <summary>
        /// 核心线程数

        /// </summary>
        public int CoreThreadCount
        {
            get { return _threadCount; }
        }

        /// <summary>
        /// 最大线程数
        /// </summary>
        public int MaxThreadCount
        {
            get { return _maxThreadCount; }
        }
        #endregion

        #region 构造函数

        public TaskSchedulerEx(int threadCount = 10, int maxThreadCount = 20)
        {
            _maxThreadCount = maxThreadCount;
            CreateThreads(threadCount);
        }
        #endregion

        #region override GetScheduledTasks
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks;
        }
        #endregion

        #region override TryExecuteTaskInline
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }
        #endregion

        #region override QueueTask
        protected override void QueueTask(Task task)
        {
            _tasks.Enqueue(task);
        }
        #endregion

        #region 资源释放
        /// <summary>
        /// 资源释放
        /// 如果尚有任务在执行,则会在调用此方法的线程上引发System.Threading.ThreadAbortException,请使用Task.WaitAll等待任务执行完毕后,再调用该方法
        /// </summary>
        public void Dispose()
        {
            _run = false;

            if (_timer != null)
            {
                _timer.Stop();
                _timer.Dispose();
                _timer = null;
            }

            _timeOut = 100;

            foreach (Thread item in _threadList)
            {
                item.Abort();
                Interlocked.Decrement(ref _activeThreadCount);
            }
            _threadList.Clear();

            GC.Collect();
            GC.WaitForPendingFinalizers();
            if (Environment.OSVersion.Platform == PlatformID.Win32NT)
            {
                SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1);
            }
        }
        #endregion

        #region 创建线程池

        /// <summary>
        /// 创建线程池

        /// </summary>
        private void CreateThreads(int? threadCount = null)
        {
            if (threadCount != null) _threadCount = threadCount.Value;
            _timeOut = Timeout.Infinite;

            for (int i = 0; i < _threadCount; i++)
            {
                Interlocked.Increment(ref _activeThreadCount);
                Thread thread = new Thread(new ThreadStart(() =>
                {
                    Task task;
                    while (_run)
                    {
                        if (_tasks.TryDequeue(out task))
                        {
                            CreateTimer();
                            TryExecuteTask(task);
                        }
                        else
                        {
                            Thread.Sleep(10);
                        }
                    }
                }));
                thread.IsBackground = true;
                thread.Start();
                _threadList.Add(thread);
            }
        }
        #endregion

        #region 创建线程
        /// <summary>
        /// 创建线程
        /// </summary>
        private void CreateThread()
        {
            Interlocked.Increment(ref _activeThreadCount);
            Thread thread = null;
            thread = new Thread(new ThreadStart(() =>
            {
                Task task;
                DateTime dt = DateTime.Now;
                while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _extTimeOut)
                {
                    if (_tasks.TryDequeue(out task))
                    {
                        TryExecuteTask(task);
                        dt = DateTime.Now;
                    }
                    else
                    {
                        Thread.Sleep(100);
                    }
                }
                Interlocked.Decrement(ref _activeThreadCount);
                if (_activeThreadCount == _threadCount)
                {
                    GC.Collect();
                    GC.WaitForPendingFinalizers();
                    if (Environment.OSVersion.Platform == PlatformID.Win32NT)
                    {
                        SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1);
                    }
                }
                if (thread != null)
                {
                    thread.Abort();
                    thread = null;
                }
            }));
            thread.IsBackground = true;
            thread.Start();
        }
        #endregion

        #region 创建定时器

        private void CreateTimer()
        {
            if (_timer == null) //_timer不为空时,跳过,不走lock,提升性能
            {
                lock (_lockCreateTimer)
                {
                    if (_timer == null)
                    {
                        int interval = 20;
                        _timer = new System.Timers.Timer();
                        _timer.Interval = 500;
                        _timer.Elapsed += (s, e) =>
                        {
                            if (_timer.Interval != interval) _timer.Interval = interval;

                            if (_activeThreadCount >= _threadCount && _activeThreadCount < _maxThreadCount)
                            {
                                if (_tasks.Count > 0)
                                {
                                    CreateThread();
                                }
                                else
                                {
                                    if (_timer != null)
                                    {
                                        _timer.Stop();
                                        _timer.Dispose();
                                        _timer = null;
                                    }
                                }
                            }
                        };
                        _timer.Start();
                    }
                }
            }
        }
        #endregion

        #region 全部取消
        /// <summary>
        /// 全部取消
        /// 当前正在执行的任务无法取消,取消的只是后续任务,相当于AbortAll
        /// </summary>
        public void CancelAll()
        {
            while (_tasks.TryDequeue(out _tempTask)) { }
        }
        #endregion

    }
}
复制代码

     RunHelper类代码:

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// 线程工具类

    /// </summary>
    public static class RunHelper
    {
        #region 变量属性事件


        #endregion

        #region 线程中执行

        /// <summary>
        /// 线程中执行

        /// </summary>
        public static Task Run(this TaskScheduler scheduler, Action<object> doWork, object arg = null, Action<Exception> errorAction = null)
        {
            return Task.Factory.StartNew((obj) =>
            {
                try
                {
                    doWork(obj);
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                }
            }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行

        /// <summary>
        /// 线程中执行

        /// </summary>
        public static Task Run(this TaskScheduler scheduler, Action doWork, Action<Exception> errorAction = null)
        {
            return Task.Factory.StartNew(() =>
            {
                try
                {
                    doWork();
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                }
            }, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行

        /// <summary>
        /// 线程中执行

        /// </summary>
        public static Task<T> Run<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null)
        {
            return Task.Factory.StartNew<T>((obj) =>
            {
                try
                {
                    return doWork(obj);
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                    return default(T);
                }
            }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行

        /// <summary>
        /// 线程中执行

        /// </summary>
        public static Task<T> Run<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null)
        {
            return Task.Factory.StartNew<T>(() =>
            {
                try
                {
                    return doWork();
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                    return default(T);
                }
            }, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行

        /// <summary>
        /// 线程中执行

        /// </summary>
        public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null)
        {
            return await Task.Factory.StartNew<T>((obj) =>
            {
                try
                {
                    return doWork(obj);
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                    return default(T);
                }
            }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行

        /// <summary>
        /// 线程中执行

        /// </summary>
        public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null)
        {
            return await Task.Factory.StartNew<T>(() =>
            {
                try
                {
                    return doWork();
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                    return default(T);
                }
            }, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

    }
}
复制代码

    TaskHelper扩展类(代码中LimitedTaskScheduler改为TaskSchedulerEx即可)(这个任务分类有点多,每个任务分类的核心线程一般是不释放的,一直占着线程,算不算滥用):

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// Task帮助类基类

    /// </summary>
    public class TaskHelper
    {
        #region UI任务
        private static LimitedTaskScheduler _UITask;
        /// <summary>
        /// UI任务(4个线程)
        /// </summary>
        public static LimitedTaskScheduler UITask
        {
            get
            {
                if (_UITask == null) _UITask = new LimitedTaskScheduler(4);
                return _UITask;
            }
        }
        #endregion

        #region 菜单任务
        private static LimitedTaskScheduler _MenuTask;
        /// <summary>
        /// 菜单任务
        /// </summary>
        public static LimitedTaskScheduler MenuTask
        {
            get
            {
                if (_MenuTask == null) _MenuTask = new LimitedTaskScheduler(2);
                return _MenuTask;
            }
        }
        #endregion

        #region 计算任务
        private static LimitedTaskScheduler _CalcTask;
        /// <summary>
        /// 计算任务(8个线程)
        /// </summary>
        public static LimitedTaskScheduler CalcTask
        {
            get
            {
                if (_CalcTask == null) _CalcTask = new LimitedTaskScheduler(8);
                return _CalcTask;
            }
        }
        #endregion

        #region 网络请求
        private static LimitedTaskScheduler _RequestTask;
        /// <summary>
        /// 网络请求(32个线程)
        /// </summary>
        public static LimitedTaskScheduler RequestTask
        {
            get
            {
                if (_RequestTask == null) _RequestTask = new LimitedTaskScheduler(32);
                return _RequestTask;
            }
        }
        #endregion

        #region 数据库任务

        private static LimitedTaskScheduler _DBTask;
        /// <summary>
        /// 数据库任务(32个线程)
        /// </summary>
        public static LimitedTaskScheduler DBTask
        {
            get
            {
                if (_DBTask == null) _DBTask = new LimitedTaskScheduler(32);
                return _DBTask;
            }
        }
        #endregion

        #region IO任务
        private static LimitedTaskScheduler _IOTask;
        /// <summary>
        /// IO任务(8个线程)
        /// </summary>
        public static LimitedTaskScheduler IOTask
        {
            get
            {
                if (_IOTask == null) _IOTask = new LimitedTaskScheduler(8);
                return _IOTask;
            }
        }
        #endregion

        #region 首页任务
        private static LimitedTaskScheduler _MainPageTask;
        /// <summary>
        /// 首页任务(16个线程)
        /// </summary>
        public static LimitedTaskScheduler MainPageTask
        {
            get
      



  
相关教程