HashMap与线程安全,ConcurrentBag的落到实处原理

目录

Java要点2

HashMap与线程安全

HashMap与线程安全

  • 一、前言
  • 二、ConcurrentBag类
  • 三 、ConcurrentBag线程安全完成原理
    • 1.
      ConcurrentBag的私家字段
    • 2.
      用于数据存款和储蓄的TrehadLocalList类
    • 3.
      ConcurrentBag落到实处新增元素
    • 4. ConcurrentBag
      怎么着完毕迭代器方式
  • 四、总结
  • 作者水平有限,假若不当欢迎各位批评指正!

JAVA 集合类

壹 、HashMap 为什么是线程不安全的

壹 、HashMap 为何是线程不安全的


1.JAVA常用集合类效能、分歧和性质

两大类:Collections,Map;Collections分List和Set

List接口与事实上现类

List类似于数组,能够通过索引来访问成分,达成该接口的常用类有ArrayList、LinkedList、Vector、Stack等。

   
HashMap是通过散列表来完毕存款和储蓄结构的,具体内容请看自身的另一篇博客《HashMap深度剖析》,那么HashMap为何线程不安全啊,首要有三个原因。

   
HashMap是经过散列表来完结存款和储蓄结构的,具体内容请看本人的另一篇博客《HashMap深度解析》,那么HashMap为何线程不安全吧,主要有七个原因。


ArrayList

ArrayList是动态数组,能够依照插入的成分的多寡自动扩大体量,而使用者不供给驾驭其里面是怎么时候进行增添的,把它作为丰富体量的数组来行使即可。
ArrayList访问成分的方法get是常数时间,因为是一向遵照下标索引来访问的,而add方法的时日复杂度是O(n),因为须要活动成分,将新成分插入到分外的地方。
ArrayList是非线程安全的,即它从不一块,可是,能够透过Collections.synchronizedList()静态方法再次来到一个共同的实例,如:

List synList = Collections.synchronizedList(list);

数组扩容:ArrayList在插入成分的时候,都会检查当前的数组大小是或不是丰裕,即使不够,将会扩大体量到方今体积
* 1.5 +
1(加1是为着当前体量为1时,也能扩展到2),即把原先的要素全体复制到3个两倍大小的新数组,将旧的数组放任掉(等待垃圾回收),那个操作是相比耗费时间,由此建议在开创ArrayList的时候,依照要插入的要素的数目来开首估计Capacity,并初阶化ArrayList,如:

ArrayList list = new ArrayList(100);

这么,在插入小于玖20个要素的时候都是不须要展开扩大容量的,能够拉动质量的升级,当然,借使对这么些容积测度大了,或许会拉动一些空间的消耗。

率先肯定是多少个线程同时去往集合里添加多少,第二个原因:三个线程同时丰裕相同的key值数据,当四个线程同时遍历完桶内的链表时,发现,没有该key值的数码,那是他俩还要成立了八个Entry结点,都增加到了桶内的链表上,这样在该HashMap集合中就涌出了三个Key相同的多少。第一个原因:当七个线程同时检查和测试到size/capacity>负载因猪时,在扩大体积的时候也许会在链表上暴发死循环(为啥会生出死循环,能够看有的HashMap的死循环相关的博客),也说不定会时有产生存款和储蓄卓殊。

率先肯定是多个线程同时去往集合里添加多少,第三个原因:七个线程同时添加相同的key值数据,当五个线程同时遍历完桶内的链表时,发现,没有该key值的数目,这是他俩同时创立了二个Entry结点,都助长到了桶内的链表上,那样在该HashMap集合中就涌出了两个Key相同的多少。第四个原因:当多个线程同时检查和测试到size/capacity>负载因辰时,在扩大体量的时候可能会在链表上爆发死循环(为啥会时有发生死循环,可以看有的HashMap的死循环相关的博客),也说不定会发生存款和储蓄万分。

一、前言

小编眼前在做3个体系,项目中为了升高吞吐量,使用了音信队列,中间达成了传宗接代消费情势,在生产消费者方式中要求有1个晤面,来存款和储蓄生产者所生产的物料,小编利用了最广大的List<T>集聚类型。

出于生产者线程有不少个,消费者线程也有成都百货上千个,所以不可防止的就时有爆发了线程同步的标题。开首小编是使用lock第二字,进行线程同步,但是质量并不是专程出彩,然后有网上好友说可以行使SynchronizedList<T>来替代使用List<T>高达到规定的分数线程安全的指标。于是小编就替换到了SynchronizedList<T>,不过发现性能依然不好,于是查看了SynchronizedList<T>的源代码,发现它就是简单的在List<T>提供的API的根底上加了lock,所以品质基本与小编达成情势相差无几。

末尾作者找到了消除的方案,使用ConcurrentBag<T>类来促成,质量有极大的改变,于是我查阅了ConcurrentBag<T>的源代码,实现丰富精细,特此在那记录一下。

LinkedList

LinkedList也完成了List接口,个中间贯彻是行使双向链表来保存成分,由此插入与删除元素的质量都表现不错。它还提供了部分任何操作方法,如在头顶、底部插入或然去除成分,由此,能够用它来贯彻栈、队列、双向队列。
出于是使用链表保存成分的,所以随便走访成分的时候速度会相比慢(须要遍历链表找到对象成分),那或多或少比较ArrayList的即兴走访要差,ArrayList是运用数组实现方式,直接接纳下标能够访问到成分而不须求遍历。由此,在急需频仍随意访问成分的状态下,建议利用ArrayList。
与ArrayList一样,LinkedList也是非同步的,假若须求完毕二十多线程访问,则需要团结在外部完结共同方法。当然也足以使用Collections.synchronizedList()静态方法。

二 、怎样线程安全的选取HashMap

二 、怎样线程安全的运用HashMap

二、ConcurrentBag类

ConcurrentBag<T>实现了IProducerConsumerCollection<T>接口,该接口首要用以生产者消费者形式下,可知该类基本就是为生育消费者格局定制的。然后还达成了正规的IReadOnlyCollection<T>类,完毕了此类就需求贯彻IEnumerable<T>、IEnumerable、 ICollection类。

ConcurrentBag<T>对外提供的方法没有List<T>那么多,可是同样有Enumerable达成的恢弘方法。类本人提供的法门如下所示。

名称 说明
Add 将对象添加到 ConcurrentBag 中。
CopyTo 从指定数组索引开始,将 ConcurrentBag 元素复制到现有的一维 Array 中。
Equals(Object) 确定指定的 Object 是否等于当前的 Object。 (继承自 Object。)
Finalize 允许对象在“垃圾回收”回收之前尝试释放资源并执行其他清理操作。 (继承自 Object。)
GetEnumerator 返回循环访问 ConcurrentBag 的枚举器。
GetHashCode 用作特定类型的哈希函数。 (继承自 Object。)
GetType 获取当前实例的 Type。 (继承自 Object。)
MemberwiseClone 创建当前 Object 的浅表副本。 (继承自 Object。)
ToArray 将 ConcurrentBag 元素复制到新数组。
ToString 返回表示当前对象的字符串。 (继承自 Object。)
TryPeek 尝试从 ConcurrentBag 返回一个对象但不移除该对象。
TryTake 尝试从 ConcurrentBag 中移除并返回对象。

Vector

Vector是ArrayList的线程同步版本,便是说Vector是共同的,援助八线程访问。除却,还有一些不等时,当体积不够时,Vector私下认可扩张一倍容积,而ArrayList是时下容积
* 1.5 + 1

方法一:Hashtable

方法一:Hashtable

三 、 ConcurrentBag线程安全完结原理

Stack

Stack是一种后进先出的数据结构,继承自Vector类,提供了push、pop、peek(获得栈顶成分)等方式。

   
Hashtable是Java低版本中提议来的,由于其里面包车型地铁加锁机制,是的其性质较低,近期早已不常用了。所以当四个线程访问Hashtable的叁头方法时,其余线程假设也要访问同步方法,会被阻塞住。举个例子,当2个线程使用put方法时,另三个线程不但不能运用put方法,连get方法都不得以,效能十分低。

   
Hashtable是Java低版本中提出来的,由于其里面包车型大巴加锁机制,是的其本性较低,近期一度不常用了。所以当2个线程访问Hashtable的一路方法时,其余线程假使也要拜访同步方法,会被阻塞住。举个例子,当1个线程使用put方法时,另三个线程不但不得以应用put方法,连get方法都无法,功用极低。

1. ConcurrentBag的民用字段

ConcurrentBag线程安全完成重庆大学是通过它的数目存款和储蓄的构造和细颗粒度的锁。

   public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
    {
        // ThreadLocalList对象包含每个线程的数据
        ThreadLocal<ThreadLocalList> m_locals;

        // 这个头指针和尾指针指向中的第一个和最后一个本地列表,这些本地列表分散在不同线程中
        // 允许在线程局部对象上枚举
        volatile ThreadLocalList m_headList, m_tailList;

        // 这个标志是告知操作线程必须同步操作
        // 在GlobalListsLock 锁中 设置
        bool m_needSync;

}

首要选拔大家来看它表明的民用字段,在那之中必要注意的是会面的数据是存放在在ThreadLocal线程本地存款和储蓄中的。也便是说访问它的各种线程会维护3个体协会调的集聚数据列表,三个会晤中的数据或者会存放在分裂线程的地头存款和储蓄空间中,所以假诺线程访问自个儿本地存款和储蓄的对象,那么是未曾难题的,那就是落实线程安全的率先层,运用线程本地存款和储蓄数据

然后能够看出ThreadLocalList m_headList, m_tailList;其一是存放在着当地列表对象的头指针和尾指针,通过那八个指针,我们就能够透过遍历的格局来拜会具有地方列表。它采取volatile修饰,差别意线程进行本地缓存,每一种线程的读写都以从来操作在共享内部存款和储蓄器上,那就保障了变量始终存有一致性。任何线程在任什么日时期实行读写操作均是新型值。对于volatile修饰符,感谢本身是攻城狮提议描述不当。

最终又定义了2个注解,这几个标志告知操作线程必须开始展览同步操作,那是促成了多少个细颗粒度的锁,因为唯有在多少个尺码满足的气象下才需求展开线程同步。

Set接口

Set是无法包罗重合成分的容器,其达成类有HashSet,继承于它的接口有SortedSet接口等。Set中提供了加、减、和交等集合操作函数。Set不能够根据索引随机访问成分,那是它与List的3个根本分裂。

   
HashTable源码中是采纳synchronized来确认保证线程安全的,比如上边包车型客车get方法和put方法:
public synchronized V get(Object key) {}

   
HashTable源码中是使用synchronized来确认保证线程安全的,比如上面包车型大巴get方法和put方法:
public synchronized V get(Object key) {}

2. 用于数据存款和储蓄的TrehadLocalList类

接下去大家来看一下ThreadLocalList类的协会,该类正是事实上存款和储蓄了数码的职责。实际上它是采取双向链表那种组织进行多少存款和储蓄。

[Serializable]
// 构造了双向链表的节点
internal class Node
{
    public Node(T value)
    {
        m_value = value;
    }
    public readonly T m_value;
    public Node m_next;
    public Node m_prev;
}

/// <summary>
/// 集合操作类型
/// </summary>
internal enum ListOperation
{
    None,
    Add,
    Take
};

/// <summary>
/// 线程锁定的类
/// </summary>
internal class ThreadLocalList
{
    // 双向链表的头结点 如果为null那么表示链表为空
    internal volatile Node m_head;

    // 双向链表的尾节点
    private volatile Node m_tail;

    // 定义当前对List进行操作的种类 
    // 与前面的 ListOperation 相对应
    internal volatile int m_currentOp;

    // 这个列表元素的计数
    private int m_count;

    // The stealing count
    // 这个不是特别理解 好像是在本地列表中 删除某个Node 以后的计数
    internal int m_stealCount;

    // 下一个列表 可能会在其它线程中
    internal volatile ThreadLocalList m_nextList;

    // 设定锁定是否已进行
    internal bool m_lockTaken;

    // The owner thread for this list
    internal Thread m_ownerThread;

    // 列表的版本,只有当列表从空变为非空统计是底层
    internal volatile int m_version;

    /// <summary>
    /// ThreadLocalList 构造器
    /// </summary>
    /// <param name="ownerThread">拥有这个集合的线程</param>
    internal ThreadLocalList(Thread ownerThread)
    {
        m_ownerThread = ownerThread;
    }
    /// <summary>
    /// 添加一个新的item到链表首部
    /// </summary>
    /// <param name="item">The item to add.</param>
    /// <param name="updateCount">是否更新计数.</param>
    internal void Add(T item, bool updateCount)
    {
        checked
        {
            m_count++;
        }
        Node node = new Node(item);
        if (m_head == null)
        {
            Debug.Assert(m_tail == null);
            m_head = node;
            m_tail = node;
            m_version++; // 因为进行初始化了,所以将空状态改为非空状态
        }
        else
        {
            // 使用头插法 将新的元素插入链表
            node.m_next = m_head;
            m_head.m_prev = node;
            m_head = node;
        }
        if (updateCount) // 更新计数以避免此添加同步时溢出
        {
            m_count = m_count - m_stealCount;
            m_stealCount = 0;
        }
    }

    /// <summary>
    /// 从列表的头部删除一个item
    /// </summary>
    /// <param name="result">The removed item</param>
    internal void Remove(out T result)
    {
        // 双向链表删除头结点数据的流程
        Debug.Assert(m_head != null);
        Node head = m_head;
        m_head = m_head.m_next;
        if (m_head != null)
        {
            m_head.m_prev = null;
        }
        else
        {
            m_tail = null;
        }
        m_count--;
        result = head.m_value;

    }

    /// <summary>
    /// 返回列表头部的元素
    /// </summary>
    /// <param name="result">the peeked item</param>
    /// <returns>True if succeeded, false otherwise</returns>
    internal bool Peek(out T result)
    {
        Node head = m_head;
        if (head != null)
        {
            result = head.m_value;
            return true;
        }
        result = default(T);
        return false;
    }

    /// <summary>
    /// 从列表的尾部获取一个item
    /// </summary>
    /// <param name="result">the removed item</param>
    /// <param name="remove">remove or peek flag</param>
    internal void Steal(out T result, bool remove)
    {
        Node tail = m_tail;
        Debug.Assert(tail != null);
        if (remove) // Take operation
        {
            m_tail = m_tail.m_prev;
            if (m_tail != null)
            {
                m_tail.m_next = null;
            }
            else
            {
                m_head = null;
            }
            // Increment the steal count
            m_stealCount++;
        }
        result = tail.m_value;
    }


    /// <summary>
    /// 获取总计列表计数, 它不是线程安全的, 如果同时调用它, 则可能提供不正确的计数
    /// </summary>
    internal int Count
    {
        get
        {
            return m_count - m_stealCount;
        }
    }
}

从地方的代码中大家能够进一步证实以前的见地,正是ConcurentBag<T>在二个线程中贮存数据时,使用的是双向链表ThreadLocalList福寿无疆了一组对链表增删改查的办法。

HashSet

HashSet落成了Set接口,其里面是使用HashMap完毕的。放入HashSet的靶子最佳重写hashCode、equals方法,因为默许的那七个措施很大概与您的事务逻辑是差别等的,而且,要同时重写那多个函数,如若只重写在那之中二个,很不难发生不测的题材。
记住下边几条规则:

十分对象,hashCode一定相等。
不等对象,hashCode不肯定不等于。
五个指标的hashCode相同,不自然相等。
七个指标的hashCode不一致,一定相等。

public synchronized V put(K key, V value) {}

public synchronized V put(K key, V value) {}

3. ConcurrentBag兑现新增成分

接下去我们看一看ConcurentBag<T>是如何新增成分的。

/// <summary>
/// 尝试获取无主列表,无主列表是指线程已经被暂停或者终止,但是集合中的部分数据还存储在那里
/// 这是避免内存泄漏的方法
/// </summary>
/// <returns></returns>
private ThreadLocalList GetUnownedList()
{
    //此时必须持有全局锁
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    // 从头线程列表开始枚举 找到那些已经被关闭的线程
    // 将它所在的列表对象 返回
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {
        if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)
        {
            currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe
            return currentList;
        }
        currentList = currentList.m_nextList;
    }
    return null;
}
/// <summary>
/// 本地帮助方法,通过线程对象检索线程线程本地列表
/// </summary>
/// <param name="forceCreate">如果列表不存在,那么创建新列表</param>
/// <returns>The local list object</returns>
private ThreadLocalList GetThreadList(bool forceCreate)
{
    ThreadLocalList list = m_locals.Value;

    if (list != null)
    {
        return list;
    }
    else if (forceCreate)
    {
        // 获取用于更新操作的 m_tailList 锁
        lock (GlobalListsLock)
        {
            // 如果头列表等于空,那么说明集合中还没有元素
            // 直接创建一个新的
            if (m_headList == null)
            {
                list = new ThreadLocalList(Thread.CurrentThread);
                m_headList = list;
                m_tailList = list;
            }
            else
            {
               // ConcurrentBag内的数据是以双向链表的形式分散存储在各个线程的本地区域中
                // 通过下面这个方法 可以找到那些存储有数据 但是已经被停止的线程
                // 然后将已停止线程的数据 移交到当前线程管理
                list = GetUnownedList();
                // 如果没有 那么就新建一个列表 然后更新尾指针的位置
                if (list == null)
                {
                    list = new ThreadLocalList(Thread.CurrentThread);
                    m_tailList.m_nextList = list;
                    m_tailList = list;
                }
            }
            m_locals.Value = list;
        }
    }
    else
    {
        return null;
    }
    Debug.Assert(list != null);
    return list;
}
/// <summary>
/// Adds an object to the <see cref="ConcurrentBag{T}"/>.
/// </summary>
/// <param name="item">The object to be added to the
/// <see cref="ConcurrentBag{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.</param>
public void Add(T item)
{
    // 获取该线程的本地列表, 如果此线程不存在, 则创建一个新列表 (第一次调用 add)
    ThreadLocalList list = GetThreadList(true);
    // 实际的数据添加操作 在AddInternal中执行
    AddInternal(list, item);
}

/// <summary>
/// </summary>
/// <param name="list"></param>
/// <param name="item"></param>
private void AddInternal(ThreadLocalList list, T item)
{
    bool lockTaken = false;
    try
    {
        #pragma warning disable 0420
        Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add);
        #pragma warning restore 0420
        // 同步案例:
        // 如果列表计数小于两个, 因为是双向链表的关系 为了避免与任何窃取线程发生冲突 必须获取锁
        // 如果设置了 m_needSync, 这意味着有一个线程需要冻结包 也必须获取锁
        if (list.Count < 2 || m_needSync)
        {
            // 将其重置为None 以避免与窃取线程的死锁
            list.m_currentOp = (int)ListOperation.None;
            // 锁定当前对象
            Monitor.Enter(list, ref lockTaken);
        }
        // 调用 ThreadLocalList.Add方法 将数据添加到双向链表中
        // 如果已经锁定 那么说明线程安全  可以更新Count 计数
        list.Add(item, lockTaken);
    }
    finally
    {
        list.m_currentOp = (int)ListOperation.None;
        if (lockTaken)
        {
            Monitor.Exit(list);
        }
    }
}

从地点代码中,我们得以很明白的掌握Add()主意是什么运作的,在那之中的主要就是GetThreadList()方式,通过该措施可以获得当前线程的多少存款和储蓄列表对象,若是不设有多少存款和储蓄列表,它会自行创制可能通过GetUnownedList()格局来搜寻那三个被结束不过还蕴藏有数量列表的线程,然后将数据列表重临给当下线程中,幸免了内部存款和储蓄器泄漏。

在数额增进的历程中,完毕了细颗粒度的lock手拉手锁,所以质量会很高。删除和别的操作与新增类似,本文不再赘述。

TreeSet

TreeSet同样的Set接口的兑现类,同样不能够存放相同的靶子。它与HashSet不相同的是,TreeSet的因素是根据顺序排列的,由此用TreeSet存放的目的须求实现Comparable接口。

方法二:SynchronizedMap

方法二:SynchronizedMap

4. ConcurrentBag 怎样贯彻迭代器格局

看完上面的代码后,小编很惊叹ConcurrentBag<T>是怎样兑现IEnumerator来贯彻迭代访问的,因为ConcurrentBag<T>是透过分流在分化线程中的ThreadLocalList来存款和储蓄数据的,那么在贯彻迭代器情势时,进度会相比较复杂。

后边再查看了源码之后,发现ConcurrentBag<T>为了贯彻迭代器情势,将分在分化线程中的数据全都存到三个List<T>聚拢中,然后回到了该副本的迭代器。所以每一回访问迭代器,它都会新建1个List<T>的副本,那样即使浪费了必然的仓库储存空间,可是逻辑上更是简明了。

/// <summary>
/// 本地帮助器方法释放所有本地列表锁
/// </summary>
private void ReleaseAllLocks()
{
    // 该方法用于在执行线程同步以后 释放掉所有本地锁
    // 通过遍历每个线程中存储的 ThreadLocalList对象 释放所占用的锁
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {

        if (currentList.m_lockTaken)
        {
            currentList.m_lockTaken = false;
            Monitor.Exit(currentList);
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 从冻结状态解冻包的本地帮助器方法
/// </summary>
/// <param name="lockTaken">The lock taken result from the Freeze method</param>
private void UnfreezeBag(bool lockTaken)
{
    // 首先释放掉 每个线程中 本地变量的锁
    // 然后释放全局锁
    ReleaseAllLocks();
    m_needSync = false;
    if (lockTaken)
    {
        Monitor.Exit(GlobalListsLock);
    }
}

/// <summary>
/// 本地帮助器函数等待所有未同步的操作
/// </summary>
private void WaitAllOperations()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    ThreadLocalList currentList = m_headList;
    // 自旋等待 等待其它操作完成
    while (currentList != null)
    {
        if (currentList.m_currentOp != (int)ListOperation.None)
        {
            SpinWait spinner = new SpinWait();
            // 有其它线程进行操作时,会将cuurentOp 设置成 正在操作的枚举
            while (currentList.m_currentOp != (int)ListOperation.None)
            {
                spinner.SpinOnce();
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 本地帮助器方法获取所有本地列表锁
/// </summary>
private void AcquireAllLocks()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    bool lockTaken = false;
    ThreadLocalList currentList = m_headList;

    // 遍历每个线程的ThreadLocalList 然后获取对应ThreadLocalList的锁
    while (currentList != null)
    {
        // 尝试/最后 bllock 以避免在获取锁和设置所采取的标志之间的线程港口
        try
        {
            Monitor.Enter(currentList, ref lockTaken);
        }
        finally
        {
            if (lockTaken)
            {
                currentList.m_lockTaken = true;
                lockTaken = false;
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// Local helper method to freeze all bag operations, it
/// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added
/// to the dictionary
/// 2- Then Acquire all local lists locks to prevent steal and synchronized operations
/// 3- Wait for all un-synchronized operations to be done
/// </summary>
/// <param name="lockTaken">Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param>
private void FreezeBag(ref bool lockTaken)
{
    Contract.Assert(!Monitor.IsEntered(GlobalListsLock));

    // 全局锁定可安全地防止多线程调用计数和损坏 m_needSync
    Monitor.Enter(GlobalListsLock, ref lockTaken);

    // 这将强制同步任何将来的添加/执行操作
    m_needSync = true;

    // 获取所有列表的锁
    AcquireAllLocks();

    // 等待所有操作完成
    WaitAllOperations();
}

/// <summary>
/// 本地帮助器函数返回列表中的包项, 这主要由 CopyTo 和 ToArray 使用。
/// 这不是线程安全, 应该被称为冻结/解冻袋块
/// 本方法是私有的 只有使用 Freeze/UnFreeze之后才是安全的 
/// </summary>
/// <returns>List the contains the bag items</returns>
private List<T> ToList()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));
    // 创建一个新的List
    List<T> list = new List<T>();
    ThreadLocalList currentList = m_headList;
    // 遍历每个线程中的ThreadLocalList 将里面的Node的数据 添加到list中
    while (currentList != null)
    {
        Node currentNode = currentList.m_head;
        while (currentNode != null)
        {
            list.Add(currentNode.m_value);
            currentNode = currentNode.m_next;
        }
        currentList = currentList.m_nextList;
    }

    return list;
}

/// <summary>
/// Returns an enumerator that iterates through the <see
/// cref="ConcurrentBag{T}"/>.
/// </summary>
/// <returns>An enumerator for the contents of the <see
/// cref="ConcurrentBag{T}"/>.</returns>
/// <remarks>
/// The enumeration represents a moment-in-time snapshot of the contents
/// of the bag.  It does not reflect any updates to the collection after 
/// <see cref="GetEnumerator"/> was called.  The enumerator is safe to use
/// concurrently with reads from and writes to the bag.
/// </remarks>
public IEnumerator<T> GetEnumerator()
{
    // Short path if the bag is empty
    if (m_headList == null)
        return new List<T>().GetEnumerator(); // empty list

    bool lockTaken = false;
    try
    {
        // 首先冻结整个 ConcurrentBag集合
        FreezeBag(ref lockTaken);
        // 然后ToList 再拿到 List的 IEnumerator
        return ToList().GetEnumerator();
    }
    finally
    {
        UnfreezeBag(lockTaken);
    }
}

由地方的代码可见晓,为了获得迭代器对象,总共举办了三步关键的操作。

  1. 使用FreezeBag()主意,冻结一切ConcurrentBag<T>聚集。因为必要变更集合的List<T>副本,生成副本时期无法有任何线程更改损坏数据。
  2. ConcurrrentBag<T>生成List<T>HashMap与线程安全,ConcurrentBag的落到实处原理。副本。因为ConcurrentBag<T>仓库储存数据的法门比较新鲜,直接促成迭代器情势困难,考虑到线程安全和逻辑,最棒的措施是生成3个副本。
  3. 成就上述操作之后,就能够使用UnfreezeBag()方法解冻整个集合。

那么FreezeBag()主意是怎样来冻结一切集合的呢?也是分为三步走。

  1. 先是获得全局锁,通过Monitor.Enter(GlobalListsLock, ref lockTaken);这么一条语句,这样任何线程就不能冻结集合。
  2. 接下来拿走具有线程中ThreadLocalList的锁,通过`AcquireAllLocks()方法来遍历获取。这样任何线程就无法对它进行操作损坏数据。
  3. 伺机已经进去了操作流程线程结束,通过WaitAllOperations()办法来落到实处,该方法会遍历每三个ThreadLocalList对象的m_currentOp个性,确认保证全体远在None操作。

完了上述流程后,那么就是实在的冰冻了全体ConcurrentBag<T>聚集,要解冻的话也近乎。在此不再赘述。

Map接口

Map集合提供了如约“键值对”存储成分的法子,1个键唯一映射一个值。集合中“键值对”全部作为三个实体成分时,类似List集合,可是只要分别来年,Map是三个两列成分的汇集:键是一列,值是一列。与Set集合一样,Map也远非提供随机访问的能力,只可以通过键来访问对应的值。
Map的每三个要素都是3个Map.Entry,这几个实体的组织是< Key, Value
>样式。

   
调用synchronizedMap()方法后会重临1个SynchronizedMap类的指标,而在SynchronizedMap类中选择了synchronized同步关键字来保管对Map的操作是线程安全的。

   
调用synchronizedMap()方法后会重临贰个SynchronizedMap类的指标,而在SynchronizedMap类中应用了synchronized同步关键字来确定保证对Map的操作是线程安全的。

四、总结

下边给出一张图,描述了ConcurrentBag<T>是哪些存款和储蓄数据的。通过种种线程中的ThreadLocal来落到实处线程本地存款和储蓄,每一个线程中都有那般的布局,互不干扰。然后各种线程中的m_headList连接指向ConcurrentBag<T>的首先个列表,m_tailList本着最终三个列表。列表与列表之间通过m_locals
下的 m_nextList持续,构成2个单链表。

数码存款和储蓄在各样线程的m_locals中,通过Node类构成二个双向链表。
PS:
要注意m_tailListm_headList并不是储存在ThreadLocal中,而是拥有的线程共享一份。

美高梅开户网址 1

上述正是至于ConcurrentBag<T>类的落到实处,笔者的一些记下和分析。

HashMap

HashMap完成了Map接口,但它是非线程安全的。HashMap允许key值为null,value也能够为null。

源码如下

源码如下

小编水平有限,假使不当欢迎各位批评指正!

附上ConcurrentBag<T>源码地址:戳一戳

Hashtable

Hashtable也是Map的落实类,继承自Dictionary类。它与HashMap分歧的是,它是线程安全的。而且它不容许key为null,value也不能为null。
由于它是线程安全的,在效能上稍低于HashMap。

private static class SynchronizedMap<K,V>

    implements Map<K,V>, Serializable {

    // use serialVersionUID from JDK 1.2.2 for interoperability

    private static final long serialVersionUID =1978198479659022715L;

    private final Map<K,V> m;     // Backing Map

        final Object      mutex;    // Object on which to synchronize

    SynchronizedMap(Map<K,V> m) {

            if (m==null)

                throw new NullPointerException();

            this.m = m;

            mutex = this;

        }

    SynchronizedMap(Map<K,V> m, Object mutex) {

            this.m = m;

            this.mutex = mutex;

        }

    public int size() {

        synchronized(mutex) {return m.size();}

        }

    public boolean isEmpty(){

        synchronized(mutex) {return m.isEmpty();}

        }

    public boolean containsKey(Object key) {

        synchronized(mutex) {return m.containsKey(key);}

        }

    public boolean containsValue(Object value){

        synchronized(mutex) {return m.containsValue(value);}

        }

    public V get(Object key) {

        synchronized(mutex) {return m.get(key);}

        }

    public V put(K key, V value) {

        synchronized(mutex) {return m.put(key, value);}

        }

    public V remove(Object key) {

        synchronized(mutex) {return m.remove(key);}

        }

    public void putAll(Map<? extends K, ? extends V> map) {

        synchronized(mutex) {m.putAll(map);}

        }

    public void clear() {

        synchronized(mutex) {m.clear();}

    }

    private transient Set<K> keySet = null;

    private transient Set<Map.Entry<K,V>> entrySet = null;

    private transient Collection<V> values = null;

    public Set<K> keySet() {

            synchronized(mutex) {

                if (keySet==null)

                    keySet = new                                     SynchronizedSet<K>(m.keySet(),mutex); 

                return keySet;

            }

    }

    public Set<Map.Entry<K,V>> entrySet() {

            synchronized(mutex) {

                if (entrySet==null)

                    entrySet = new SynchronizedSet<Map.Entry<K,V>>(m.entrySet(), mutex);

                return entrySet;

            }

    }

    public Collection<V> values() {

            synchronized(mutex) {

                if (values==null)

                    values = new SynchronizedCollection<V>(m.values(), mutex);

                return values;

            }

        }

    public boolean equals(Object o) {

            if (this == o)

                return true;

            synchronized(mutex) {return m.equals(o);}

        }

    public int hashCode() {

            synchronized(mutex) {return m.hashCode();}

        }

    public String toString() {

        synchronized(mutex) {return m.toString();}

        }

        private void writeObject(ObjectOutputStream s) throws IOException {

        synchronized(mutex) {s.defaultWriteObject();}

        }

}
private static class SynchronizedMap<K,V>

    implements Map<K,V>, Serializable {

    // use serialVersionUID from JDK 1.2.2 for interoperability

    private static final long serialVersionUID =1978198479659022715L;

    private final Map<K,V> m;     // Backing Map

        final Object      mutex;    // Object on which to synchronize

    SynchronizedMap(Map<K,V> m) {

            if (m==null)

                throw new NullPointerException();

            this.m = m;

            mutex = this;

        }

    SynchronizedMap(Map<K,V> m, Object mutex) {

            this.m = m;

            this.mutex = mutex;

        }

    public int size() {

        synchronized(mutex) {return m.size();}

        }

    public boolean isEmpty(){

        synchronized(mutex) {return m.isEmpty();}

        }

    public boolean containsKey(Object key) {

        synchronized(mutex) {return m.containsKey(key);}

        }

    public boolean containsValue(Object value){

        synchronized(mutex) {return m.containsValue(value);}

        }

    public V get(Object key) {

        synchronized(mutex) {return m.get(key);}

        }

    public V put(K key, V value) {

        synchronized(mutex) {return m.put(key, value);}

        }

    public V remove(Object key) {

        synchronized(mutex) {return m.remove(key);}

        }

    public void putAll(Map<? extends K, ? extends V> map) {

        synchronized(mutex) {m.putAll(map);}

        }

    public void clear() {

        synchronized(mutex) {m.clear();}

    }

    private transient Set<K> keySet = null;

    private transient Set<Map.Entry<K,V>> entrySet = null;

    private transient Collection<V> values = null;

    public Set<K> keySet() {

            synchronized(mutex) {

                if (keySet==null)

                    keySet = new                                     SynchronizedSet<K>(m.keySet(),mutex); 

                return keySet;

            }

    }

    public Set<Map.Entry<K,V>> entrySet() {

            synchronized(mutex) {

                if (entrySet==null)

                    entrySet = new SynchronizedSet<Map.Entry<K,V>>(m.entrySet(), mutex);

                return entrySet;

            }

    }

    public Collection<V> values() {

            synchronized(mutex) {

                if (values==null)

                    values = new SynchronizedCollection<V>(m.values(), mutex);

                return values;

            }

        }

    public boolean equals(Object o) {

            if (this == o)

                return true;

            synchronized(mutex) {return m.equals(o);}

        }

    public int hashCode() {

            synchronized(mutex) {return m.hashCode();}

        }

    public String toString() {

        synchronized(mutex) {return m.toString();}

        }

        private void writeObject(ObjectOutputStream s) throws IOException {

        synchronized(mutex) {s.defaultWriteObject();}

        }

}

List总结

ArrayList内部贯彻选拔动态数组,当体积不够时,自动扩容至(当前容积*1.5+1)。成分的逐一根据插入的顺序排列。默许初叶容积为10。
contains复杂度为O(n),add复杂度为分摊的常数,即添加n个要素必要O(n)大运,remove为O(n),get复杂度为O(1)
随意访问效能高,随机插入、删除效用低。ArrayList是非线程安全的。

LinkedList内部使用双向链表完成,随机走访作用低,随机插入、删除功能高。能够作为储藏室、队列、双向队列来采纳。LinkedList也是非线程安全的。

Vector跟ArrayList是类似的,内部贯彻也是动态数组,随机走访效用高。Vector是线程安全的。

Stack是栈,继承于Vector,其各类操作也是依照Vector的各个操作,因而其里面贯彻也是动态数组,先进后出。Stack是线程安全的。

方法三:ConcurrentHashMap

方法三:ConcurrentHashMap

List使用情状

对此需求急迅插入、删除成分,应该选取LinkedList
对于急需急速随机走访成分,应该利用ArrayList
假若List须求被十六线程操作,应该使用Vector,假设只会被单线程操作,应该利用ArrayList
Set总结

HashSet内部是利用HashMap达成的,HashSet的key值是分裂意再度的,假使放入的靶子是自定义对象,那么最CANON够同时重写hashCode与equals函数,那样就能自定义添加的目的在什么样的图景下是均等的,即能担保在工作逻辑下能添加对象到HashSet中,保障工作逻辑的不错。其余,HashSet里的要素不是比照顺序存款和储蓄的。HashSet是非线程安全的。

TreeSet存款和储蓄的因素是按顺序存储的,假如是储存的要素是自定义对象,那么需求实现Comparable接口。TreeSet也是非线程安全的。

LinkedHashSet继承自HashSet,它与HashSet差别的是,LinkedHashSet存款和储蓄成分的次第是遵照成分的插入顺序存款和储蓄的。LinkedHashSet也是非线程安全的。

    ConcurrentHashMap是java.util.concurrent包中的一个类,

    ConcurrentHashMap是java.util.concurrent包中的三个类,

Map总结

HashMap存款和储蓄键值对。当程序试图将两个key-value对放入 HashMap
中时,程序首先依据该key的hashCode()重临值决定该Entry的储存地方:假诺五个Entry的key的hashCode()
再次回到值相同,那它们的积存地点相同。假诺这八个Entry的key通过equals比较再次回到true,新添加Entry的value将覆盖集合中本来Entry的
value,但key不会覆盖。假若那七个Entry的key通过equals
比较再次回到false,新加上的Entry将与聚集中原本Entry形成Entry
链,而且新加上的 Entry 位于 Entry
链的底部。看下边HashMap添加键值对的源代码:

    public V put(K key, V value) {
    if (key == null)
        return putForNullKey(value);
    int hash = hash(key.hashCode());
    int i = indexFor(hash, table.length);
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    addEntry(hash, key, value, i);
    return null;
    }
    void addEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    if (size++ >= threshold)
        resize(2 * table.length);
    }

HashMap允许key、value值为null。HashMap是非线程安全的。

Hashtable是HashMap的线程安全版本。而且,key、value都不一样意为null。

哈希值的采取分歧: Hashtable直接动用对象的hashCode,如下代码:

int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;

而HashMap重新计算hash值,如下代码:

  int hash = hash(key.hashCode());
  int i = indexFor(hash, table.length);
  static int hash(int h) {
  // This function ensures that hashCodes that differ only by
  // constant multiples at each bit position have a bounded
  // number of collisions (approximately 8 at default load factor).
  h ^= (h >>> 20) ^ (h >>> 12);
  return h ^ (h >>> 7) ^ (h >>> 4);
  }
  static int indexFor(int h, int length) {
  return h & (length-1);
  }

扩大体量区别: Hashtable中hash数组默许大小是11,扩充的艺术是
old*2+1。HashMap中hash数组的暗中同意大小是16,而且必然是2的指数。

第1,大家先来询问一下以此集合的规律。hashtable是做了一起的,不过质量下落了,因为
hashtable每一回同步实施的时候都要锁住整个结构。于是ConcurrentHashMap
修改了其锁住整个结构的格式,改为了只锁住HashMap的一个桶,锁的粒度大大减小,如下图:

第叁,我们先来询问一下那一个集合的规律。hashtable是做了一道的,然则品质下跌了,因为
hashtable每一次同步施行的时候都要锁住整个结构。于是ConcurrentHashMap
修改了其锁住整个结构的格式,改为了只锁住HashMap的八个桶,锁的粒度大大减小,如下图:

2.并发相关的集合类

出现功用的晋升是内需从最底层JVM指令级别最先重复设计,优化,立异同步锁的编写制定才能落成的,java.util.concurrent
的指标正是要贯彻 Collection
框架对数据结构所举行的面世操作。通过提供一组可信赖的、高品质并发创设块,开发人士能够增加并发类的线程安全、可伸缩性、品质、可读性和可信赖性。JDK
5.0 中的并发革新有以下三组:

• JVM 级别更改。超过二分之一现代总括机对出现对某一硬件级别提供支撑,日常以
compare-and-swap (CAS)指令方式。CAS
是一种低级别的、细粒度的技艺,它同意多少个线程更新贰个内部存款和储蓄器地方,同时能够检查和测试别的线程的争执并开展还原。它是广大高品质并发算法的基本功。在
JDK 5.0 此前,Java
语言中用来协调线程之间的拜会的惟一原语是一路,同步是更重量级和粗粒度的。公开
CAS 能够付出中度可伸缩的面世 Java 类。那几个改变首要由 JDK
库类使用,而不是由开发人士使用。

• 低级实用程序类 — 锁定和原子类。使用 CAS 作为并发原语,ReentrantLock
类提供与 synchronized
原语相同的锁定和内部存款和储蓄器语义,可是这么能够更好地操纵锁定(如计时的锁定等待、锁定轮询和可间歇的锁定等待)和提供更好的可伸缩性(竞争时的高品质)。大部分开发人士将不再直接采纳ReentrantLock 类,而是使用在 ReentrantLock 类上创设的高级类。

 美高梅开户网址 2

 美高梅开户网址 3

高级实用程序类。这么些类达成并发营造块,每一个总括机科学文本中都会讲述那几个类

信号、互斥、闩锁、屏障、调换程序、线程池和线程安全集合类等。大多数开发职员都足以在应用程序中用那个类,来替换许多(就算不是一切)同步、wait()
和 notify() 的接纳,从而升高品质、可读性和正确。

 

 

Hashtable

Hashtable
提供了一种易于使用的、线程安全的、关联的map作用,那当然也是方便人民群众的。但是,线程安全性是凭代价换到的――
Hashtable 的兼具办法都是同台的。
此时,无竞争的联合会导致可观的性情代价。 Hashtable 的后继者 HashMap
是作为JDK1.第22中学的集合框架的一片段出现的,它通过提供贰个差异台的基类和3个体协会助进行的包装器
Collections.synchronizedMap ,消除了线程安全性难点。
通过将挑益州的职能从线程安全性中分离开来, Collections.synchronizedMap
允许须求联合的用户能够具备共同,而不需求一块的用户则无需为同步付出代价。
Hashtable 和 synchronizedMap 所使用的取得同步的简单方法(同步 Hashtable
中依旧联合的 Map
包装器对象中的各个方法)有三个第3的阙如。首先,那种艺术对于可伸缩性是一种障碍,因为1回只可以有1个线程能够访问hash表。
同时,那样仍不足以提供真正的线程安全性,许多公用的滥竽充数操作依然须求卓殊的一路。固然诸如
get() 和 put()
之类的大约操作能够在不供给额外同步的情况下安全地做到,但要么有部分公用的操作类别,例如迭代或许put-if-absent(空则放入),须求外部的联合署名,防止止数据争用。

再者ConcurrentHashMap的读取操作大致是全然的出现操作。所以ConcurrentHashMap
读操作的加锁加锁粒度变小,个体操作差不离从不锁,所以比起在此之前的Hashtable大大变快了(那一点在桶越多时表现得更让人惊叹些)。唯有在求size等操作时才要求锁定任何表。作者认为ConcurrentHashMap是线程安全的聚合中最便捷的。

与此同时ConcurrentHashMap的读取操作大概是完全的面世操作。所以ConcurrentHashMap
读操作的加锁加锁粒度变小,个体操作大致一向不锁,所以比起从前的Hashtable大大变快了(那一点在桶越多时显示得更显著些)。唯有在求size等操作时才要求锁定任何表。小编觉着ConcurrentHashMap是线程安全的汇集中最便捷的。

减小锁粒度

增强 HashMap
的并发性同时还提供线程安全性的一种艺术是撤除对全体表使用一个锁的不二法门,而使用对hash表的各种bucket都施用二个锁的方法(或然,更宽广的是,使用一个锁池,每一个锁负责尊崇多少个bucket)
。那表示八个线程能够同时地拜会贰个 Map
的比不上部分,而不用争用单个的集聚范围的锁。那种艺术能够一贯升高插入、检索以及移除操作的可伸缩性。不幸的是,那种并发性是以一定的代价换到的――那使得对一切
集合进行操作的片段方法(例如 size() 或 isEmpty()
)的兑现尤其不便,因为那一个措施要求一遍获得过多的锁,并且还存在重回不得法的结果的风险。然则,对于有些情状,例如落到实处cache,那样做是一个很好的妥胁――因为检索和插入操作相比频仍,而
size() 和 isEmpty() 操作则少得多。

   
而在迭代时,ConcurrentHashMap使用了分歧于守旧集合的迅猛退步迭代器的另一种迭代方式,大家誉为弱一致迭代器。在那种迭代方式中,当iterator被创建后集合再爆发变动就不再是抛出
ConcurrentModificationException,取而代之的是在转移时new新的多寡从而不影响原本的数据,iterator完毕后再将头指针替换为新的数量,那样iterator线程能够选拔原来老的数目,而写线程也得以现身的做到改变,更首要的,这有限援救了八个线程并发执行的连续性和扩大性,是性质提高的严重性。

   
而在迭代时,ConcurrentHashMap使用了不一致于古板集合的便捷退步迭代器的另一种迭代方式,我们称为弱一致迭代器。在这种迭代格局中,当iterator被创制后集合再发生改变就不再是抛出
ConcurrentModificationException,取而代之的是在变更时new新的多寡从而不影响原本的数据,iterator达成后再将头指针替换为新的数量,这样iterator线程能够选取原来老的数目,而写线程也能够现身的达成改变,更首要的,那保险了四个线程并发执行的接二连三性和扩张性,是性质进步的要紧。

ConcurrentHashMap

美高梅开户网址,util.concurrent 包中的 ConcurrentHashMap 类(也将面世在JDK 1.5中的
java.util.concurrent 包中)是对 Map 的线程安全的达成,比起
synchronizedMap
来,它提供了好得多的并发性。三个读操作差不多总能够并发地执行,同时展开的读和写操作平时也能并发地执行,而与此同时拓展的写操作仍是能够时不时地出现举行(相关的类也提供了仿佛的多个读线程的并发性,可是,只允许有贰个活动的写线程)
。ConcurrentHashMap 被设计用来优化检索操作;实际上,成功的 get()
操作实现以往平常根本不会有锁着的能源。要在不利用锁的状态下得到线程安全性供给自然的技巧性,并且必要对Java内部存款和储蓄器模型(Java
Memory Model)的细节有时刻不忘的精通。

在Java 7中动用的是对Segment(Hash表的三个桶)加锁的办法

在Java 7中利用的是对Segment(Hash表的三个桶)加锁的章程

CopyOnWriteArrayList

在这几个遍历操作大大地多于插入或移除操作的面世应用程序中,一般用
CopyOnWriteArrayList 类替代 ArrayList
。假使是用于存放八个侦听器(listener)列表,例如在AWT或Swing应用程序中,恐怕在普遍的JavaBean中,那么那种情景很宽泛(相关的
CopyOnWriteArraySet 使用3个 CopyOnWriteArrayList 来落到实处 Set 接口) 。
假定你正在使用多个常备的 ArrayList
来存放在一个侦听器列表,那么一旦该列表是可变的,而且或者要被三个线程访问,您
就不可能不要么在对其进行迭代操作时期,要么在迭代前开始展览的克隆操作时期,锁定任何列表,那二种做法的支付都相当的大。当对列表执行会唤起列表产生变化的操作时,
CopyOnWriteArrayList
并不是为列表制造1个全新的副本,它的迭代器肯定能够回到在迭代器被创立时列表的情景,而不会抛出
ConcurrentModificationException
。在对列表实行迭代事先不要克隆列表可能在迭代之间锁
定列表,因为迭代器所阅览的列表的副本是不变的。换句话说,
CopyOnWriteArrayList
含有对四个不行变数组的2个可变的引用,因而,只要保留好越发引用,您就足以拿走不可变的线程安全性的益处,而且并非锁
定列表。

ConcurrentHashMap中首要实体类便是四个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点)。

ConcurrentHashMap中关键实体类正是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点)。

BlockingQueue

卡住队列(BlockingQueue)是贰个支撑多少个附加操作的行列。那多少个附加的操作是:在队列为空时,获取成分的线程会等待队列变为非空。当队列满时,存款和储蓄元素的线程会等待队列可用。阻塞队列常用于生产者和顾客的现象,生产者是往队列里添比索素的线程,消费者是从队列里拿成分的线程。阻塞队列正是劳动者存放成分的器皿,而顾客也只从容器里拿元素。

堵塞队列提供了八种处理措施:

方法处理格局 抛出尤其 重回特殊值 一直不通 过期退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查措施 element() peek() 不可用 不可用

抛出卓殊:是指当阻塞队列满时候,再往队列里插入成分,会抛出IllegalStateException(“Queue
full”)非凡。当队列为空时,从队列里得到成分时会抛出NoSuchElementException异常。
归来特殊值:插入方法会重返是不是成功,成功则赶回true。移除方法,则是从队列里拿出三个因素,倘若没有则赶回null
直白不通:当阻塞队列满时,借使劳动者线程往队列里put成分,队列会平昔不通生产者线程,直到得到数码,可能响应中断退出。当队列空时,消费者线程试图从队列里take成分,队列也会阻塞消费者线程,直到队列可用。
过期退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,要是超越一定的时刻,生产者线程就会脱离。

  1. Java里的不通队列

JDK7提供了八个闭塞队列。分别是

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :五个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :多少个帮忙先行级排序的无界阻塞队列。
DelayQueue:多个行使优先级队列达成的无界阻塞队列。
SynchronousQueue:三个不存款和储蓄成分的堵塞队列。
LinkedTransferQueue:八个由链表结构构成的无界阻塞队列。
LinkedBlockingDeque:三个由链表结构构成的双向阻塞队列。

Segment的源码

Segment的源码

ArrayBlockingQueue

ArrayBlockingQueue是3个用数组达成的有界阻塞队列。此行列依据先进先出(FIFO)的准绳对成分进行排序。暗中同意意况下不保障访问者公平的访问队列,所谓公平访问队列是指阻塞的有着生产者线程或消费者线程,当队列可用时,可以依据阻塞的先后顺序访问队列,即先阻塞的生产者线程,能够先往队列里插入元素,先堵塞的顾客线程,能够先从队列里取得成分。

static final class Segment<K,V> extends
ReentrantLock implements Serializable {

static final class Segment<K,V> extends
ReentrantLock implements Serializable {

LinkedBlockingQueue

LinkedBlockingQueue是3个用链表实现的有界阻塞队列。此行列的暗中同意和最大尺寸为Integer.MAX_VALUE。此行列根据先进先出的规则对成分实行排序。
LinkedBlockingDeque是3个由链表结构组成的双向阻塞队列。所谓双向队列指的你能够从队列的双面插入和移出成分。双端队列因为多了3个操作队列的进口,在八线程同时入队时,也就收缩了大体上的竞争。相比此外的堵截队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方式,以First单词结尾的措施,表示插入,获取(peek)或移除双端队列的第③个成分。以Last单词结尾的不二法门,表示插入,获取或移除双端队列的最终一个要素。其余插入方法add等同于addLast,移除方法remove等效于removeFirst。不过take方法却一样takeFirst,不驾驭是或不是Jdk的bug,使用时还是用富含First和Last后缀的办法更驾驭。在开端化LinkedBlockingDeque时能够先导化队列的体量,用来防护其再扩大体积时过渡膨胀。其余双向阻塞队列能够利用在“工作窃取”情势中。

           private static final long serialVersionUID
= 2249069246763182397L;

           private static final long serialVersionUID
= 2249069246763182397L;

PriorityBlockingQueue

PriorityBlockingQueue是3个支撑先行级的无界队列。默许意况下成分接纳自然顺序排列,也能够因而比较器comparator来钦命成分的排序规则。成分依照升序排列。

           static final int MAX_SCAN_RETRIES
=Runtime.getRuntime().

           static final int MAX_SCAN_RETRIES
=Runtime.getRuntime().

DelayQueue

DelayQueue是3个支撑延时获取成分的无界阻塞队列。队列使用PriorityQueue来完成。队列中的成分必须贯彻Delayed接口,在创建元素时能够钦命多短期才能从队列中获得当前因素。只有在延迟期满时才能从队列中领到成分。大家能够将DelayQueue运用在以下应用场景:

缓存系统的规划:能够用DelayQueue保存缓存成分的有效期,使用多个线程循环查询DelayQueue,一旦能从DelayQueue中获取成分时,表示缓存有效期到了。
定时任务调度。使用DelayQueue保存当天将会履行的职务和履行时间,一旦从DelayQueue中赢获得职分就起来施行,从比如TimerQueue便是使用DelayQueue实现的。
队列中的Delayed必须完结compareTo来钦定成分的依次。比如让延时时刻最长的放在队列的结尾。达成代码如下:

    public int compareTo(Delayed other) {
       if (other == this) // compare zero ONLY if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask x = (ScheduledFutureTask)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
   else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) -
                  other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

怎么着完成Delayed接口

咱俩得以参照ScheduledThreadPoolExecutor里ScheduledFutureTask类。这一个类实现了Delayed接口。首先:在对象创造的时候,使用time记录前对象如曾几何时候能够利用,代码如下:

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

接下来利用getDelay能够查询当前成分还亟需延时多长时间,代码如下:

    public long getDelay(TimeUnit unit) {
                 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

透过构造函数能够看看延迟时间参数ns的单位是微秒,本身设计的时候最佳利用皮秒,因为getDelay时能够钦点任意单位,一旦以阿秒作为单位,而延时的岁月又准确不到皮秒就劳动了。使用时请注意当time小于当前岁月时,getDelay会重返负数。

怎么贯彻延时队列

延时队列的达成一点也不细略,当消费者从队列里取得成分时,如若成分没有达成延时时间,就短路当前线程。

    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                else if (leader != null)
                    available.await();

                availableProcessors() > 1 ? 64 : 1;

                availableProcessors() > 1 ? 64 : 1;

卡住队列的兑现原理

若果队列是空的,消费者会直接等候,当生产者添美金素时候,消费者是如何了然当前队列有元素的啊?若是让你来安插阻塞队列你会怎么样规划,让劳动者和消费者能够高作用的展开广播发表呢?让大家先来探视JDK是哪些促成的。

选择布告格局达成。所谓文告方式,正是当生产者往满的队列里添港币素时会阻塞住生产者,当消费者消费了三个体系中的成分后,会文告劳动者当前队列可用。通过查阅JDK源码发现ArrayBlockingQueue使用了Condition来落到实处,代码如下:

    private final Condition notFull;
    private final Condition notEmpty;

    public ArrayBlockingQueue(int capacity, boolean fair) {
    //省略其他代码
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

    public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        insert(e);
    } finally {
        lock.unlock();
    }
    }

    public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return extract();
            } finally {
        lock.unlock();
    }
        }

        private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}

当我们往队列里插入3个因素时,如若队列不可用,阻塞生产者主要通过LockSupport.park(this);来促成

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)

                    reportInterruptAfterWait(interruptMode);
    }

持续进入源码,发现调用setBlocker先保存下就要阻塞的线程,然后调用unsafe.park阻塞当前线程。

    public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}

unsafe.park是个native方法,代码如下:

park这么些方法会阻塞当前线程,唯有以下多样情景中的一种产生时,该办法才会回来。

与park对应的unpark执行或曾经推行时。注意:已经推行是指unpark先进行,然后再实施的park。
线程被中断时。
假若参数中的time不是零,等待了钦定的阿秒数时。
爆发格外现象时。那几个卓殊事先不能够明确。
大家接二连三看一下JVM是什么贯彻park方法的,park在差异的操作系统使用分裂的不二法门贯彻,在linux下是行使的是系统方法pthread_cond_wait落到实处。完成代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的
os::Platform伊夫nt::park方法,代码如下:

    void os::PlatformEvent::park() {
         int v ;
     for (;;) {
             v = _Event ;
     if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
     }
     guarantee (v >= 0, "invariant") ;
     if (v == 0) {
     // Do this the hard way by blocking ...
     int status = pthread_mutex_lock(_mutex);
     assert_status(status == 0, status, "mutex_lock");
     guarantee (_nParked == 0, "invariant") ;
     ++ _nParked ;
     while (_Event < 0) {
     status = pthread_cond_wait(_cond, _mutex);
     // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
     // Treat this the same as if the wait was interrupted
     if (status == ETIME) { status = EINTR; }
     assert_status(status == 0 || status == EINTR, status, "cond_wait");
     }
     -- _nParked ;

     // In theory we could move the ST of 0 into _Event past the unlock(),
     // but then we'd need a MEMBAR after the ST.
     _Event = 0 ;
     status = pthread_mutex_unlock(_mutex);
     assert_status(status == 0, status, "mutex_unlock");
     }
     guarantee (_Event >= 0, "invariant") ;
     }

 }

pthread_cond_wait是一个二十多线程的尺码变量函数,cond是condition的缩写,字面意思能够知道为线程在等候叁个条件产生,那一个规则是多少个全局变量。这么些措施接收八个参数,叁个共享变量_cond,3个互斥量_mutex。而unpark方法在linux下是利用pthread_cond_signal实现的。park
在windows下则是行使WaitForSingleObject达成的。

           transient volatile HashEntry<K,V>[]
table;

           transient volatile HashEntry<K,V>[]
table;

3.有的常用集合类的内部贯彻方式

           transient int count;

           transient int count;

           transient int modCount;

           transient int modCount;

           transient int threshold;

           transient int threshold;

           final float loadFactor;

           final float loadFactor;

      }

      }

HashEntry的源码

HashEntry的源码

static final class HashEntry<K,V> { 

static final class HashEntry<K,V> { 

    final K key; 

    final K key; 

    final int hash; 

    final int hash; 

    volatile V value; 

    volatile V value; 

    final HashEntry<K,V> next; 

    final HashEntry<K,V> next; 

在Java 第88中学丢掉了Segment的定义,利用CAS算法做了新点子

在Java 第88中学丢掉了Segment的概念,利用CAS算法做了新格局

CAS算法选择“数组+链表+红黑树”的格局贯彻

CAS算法采纳“数组+链表+红黑树”的办法实现

                                      ————亓慧杰

                                      ————亓慧杰

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图