这是流程:
>将所有传入的记录存储在多个线程的CHM中.记录将以非常高的速度进行.
>从每1分钟运行的后台线程将这些记录从CHM发送到ZeroMQ服务器.
>将每个记录发送到ZeroMQ服务器后,将它们添加到重试桶中,以便在特定时间过后,如果尚未收到此记录的确认,则可以重试.
>我们还有一个poller可运行的线程,它收到来自ZeroMQ服务器的确认,告诉这些记录已经收到,所以一旦我得到确认,我从重试桶中删除该记录,以便它不会被重试.
>即使有一些记录发送两次,没关系,但最好还是这样做.
我不知道在我的下面的情况下最好的方法是最小化.
下面是我的Processor类,其中一个.add()方法将被多个线程调用,以线程安全的方式填充dataHolderByPartitionReference CHM.然后,在Processor类的构造函数中,通过调用SendToZeroMQ类,如下所示,启动每隔30秒运行的后台线程,将记录从同一个CHM推送到一组ZeroMQ服务器.
处理器
public class Processor { private final ScheduledExecutorService executorService = Executors .newSingleThreadScheduledExecutor(); private final AtomicReference<ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference = new AtomicReference<>(new ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>()); private static class Holder { private static final Processor INSTANCE = new Processor(); } public static Processor getInstance() { return Holder.INSTANCE; } private Processor() { executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { validateAndSendAllPartitions(dataHolderByPartitionReference .getAndSet(new ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>())); } },30,TimeUnit.SECONDS); } private void validateAndSendAllPartitions( ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) { // calling validateAndSend in parallel for each partition (which is map key) // generally there will be only 5-6 unique partitions max } private void validateAndSend(final int partition,final ConcurrentLinkedQueue<DataHolder> dataHolders) { Map<byte[],byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>(); int totalSize = 0; while (!dataHolders.isEmpty()) { ......... ......... SendToZeroMQ.getInstance().executeAsync(partition,clientKeyBytesAndProcessBytesHolder); } // calling again with remaining values SendToZeroMQ.getInstance().executeAsync(partition,clientKeyBytesAndProcessBytesHolder); } // called by multiple threads to populate dataHolderByPartitionReference CHM public void add(final int partition,final DataHolder holder) { // store records in dataHolderByPartitionReference in a thread safe way } }
以下是我的SendToZeroMQ类,它将一条记录发送到一组ZeroMQ服务器,并根据确认传递相应地重试.
>首先,它将向ZeroMQ服务器发送一条记录.
>然后它将添加一个相同的记录retryBucket,这将稍后重试,取决于是否收到确认.
>在同一个类中,我启动了一个后台线程,每1分钟运行一次,再次发送记录,这些记录仍然在重试桶中.
>同样的类也启动ResponsePoller线程,它将永远保持运行,看看哪些记录已被确认(我们之前发送过的),所以一旦记录得到确认,ResponsePoller线程就会从retryBucket中删除这些记录,这样不要重试
SendToZeroMQ
public class SendToZeroMQ { // do I need these two ScheduledExecutorService or one is sufficient to start my both the thread? private final ScheduledExecutorService executorServicePoller = Executors .newSingleThreadScheduledExecutor(); private final ScheduledExecutorService executorService = Executors .newSingleThreadScheduledExecutor(); private final Cache<Long,byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000) .removalListener(RemovalListeners.asynchronous(new CustomListener(),executorService)) .build(); private static class Holder { private static final SendToZeroMQ INSTANCE = new SendToZeroMQ(); } public static SendToZeroMQ getInstance() { return Holder.INSTANCE; } private SendToZeroMQ() { executorServicePoller.submit(new ResponsePoller()); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (Entry<Long,byte[]> entry : retryBucket.asMap().entrySet()) { executeAsync(entry.getKey(),entry.getValue()); } } },1,TimeUnit.MINUTES); } public boolean executeAsync(final long address,final byte[] encodedByteArray) { Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket(); if (!liveSockets.isPresent()) { return false; } return executeAsync(address,encodedByteArray,liveSockets.get().getSocket()); } public boolean executeAsync(final long address,final byte[] encodedByteArray,final Socket socket) { ZMsg msg = new ZMsg(); msg.add(encodedByteArray); boolean sent = msg.send(socket); msg.destroy(); // add to retry bucket retryBucket.put(address,encodedByteArray); return sent; } public boolean executeAsync(final int partition,final Map<byte[],byte[]> clientKeyBytesAndProcessBytesHolder) { Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket(); if (!liveSockets.isPresent()) { return false; } Map<Long,byte[]> addressToencodedByteArray = encode(partition,clientKeyBytesAndProcessBytesHolder); long address = addressToencodedByteArray.entrySet().iterator().next().getKey(); byte[] encodedByteArray = addressToencodedByteArray.entrySet().iterator().next().getValue(); return executeAsync(address,liveSockets.get().getSocket()); } private Map<Long,byte[]> encode(final int partition,byte[]> clientKeyBytesAndProcessBytesHolder) { // this address will be unique always long address = TestUtils.getAddress(); Frame frame = new Frame(............); byte[] packedByteArray = frame.serialize(); // this map will always have one entry in it. return ImmutableMap.of(address,packedByteArray); } public void removeFromRetryBucket(final long address) { retryBucket.invalidate(address); } }
下面是我的ResponsePoller类,它等待所有这些记录的确认,这些记录已经被其他后台线程发送了.如果接收到确认,则从重试桶中删除确认信息,以免重新尝试.
public class ResponsePoller implements Runnable { private static final Random random = new Random(); private static final int listenerPort = 8076; @Override public void run() { ZContext ctx = new ZContext(); Socket client = ctx.createSocket(ZMQ.PULL); // Set random identity to make tracing easier String identity = String.format("%04X-%04X",random.nextInt(),random.nextInt()); client.setIdentity(identity.getBytes(ZMQ.CHARSET)); client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort); PollItem[] items = new PollItem[] {new PollItem(client,Poller.POLLIN)}; while (!Thread.currentThread().isInterrupted()) { // Tick once per second,pulling in arriving messages for (int centitick = 0; centitick < 100; centitick++) { ZMQ.poll(items,10); if (items[0].isReadable()) { ZMsg msg = ZMsg.recvMsg(client); Iterator<ZFrame> it = msg.iterator(); while (it.hasNext()) { ZFrame frame = it.next(); try { long address = TestUtils.getAddress(frame.getData()); // remove from retry bucket since we got the acknowledgment for this record SendToZeroMQ.getInstance().removeFromRetryBucket(address); } catch (Exception ex) { // log error } finally { frame.destroy(); } } msg.destroy(); } } } ctx.destroy(); } }
题:
>我从设计的角度来看,设计这个问题的最好办法是什么,所以我所有的逻辑都能无缝地运作?
>我相当肯定有一个更好的方法来设计这个问题,而不是我有什么 – 更好的方法可以是什么?
解决方法
在这种情况下 – 由于ZeroMQ建立在TCP本身之上,进一步的优化,您不必担心成功的数据传输,只要传输层没有异常(这显然会反弹到您处理案例).
我看到你的问题的方式是 – 你正在运行Kafka消费者线程,它将接收和反弹消息到另一个消息队列(在这种情况下是ZMQ,它正在使用TCP并保证成功的消息传递,或者在较低级别引发异常通信层).
我可以想到的最简单的解决方案是使用线程池,从每个消费者内部,并尝试使用ZMQ发送消息.在任何网络错误的情况下,只要您的应用程序守护程序正在运行,您可以轻松地将该消息集中在以后的消费或日志记录中.
在提出的解决方案中,我假设消息的顺序不在问题空间中.你不是在看复杂的事情.