1. Introduction
1.导言
In this tutorial, we’ll show how to share memory between two or more JVMs running on the same machine. This capability enables very fast inter-process communication since we can move data blocks around without any I/O operation.
在本教程中,我们将演示如何在同一台机器上运行的两个或多个 JVM 之间共享内存。这种功能可以实现非常快速的进程间通信,因为我们可以在不进行任何 I/O 操作的情况下移动数据块。
2. How Shared Memory Works?
2.共享内存如何工作?
A process running in any modern operating system gets what’s called a virtual memory space. We call it virtual because, although it looks like a large, continuous, and private addressable memory space, in fact, it’s made of pages spread all over the physical RAM. Here, page is just OS slang for a block of contiguous memory, whose size range depends on the particular CPU architecture in use. For x86-84, a page can be as small as 4KB or as large as 1 GB.
在任何现代操作系统中运行的进程都会获得所谓的虚拟内存空间。我们称其为虚拟内存空间,是因为尽管它看起来像一个大的、连续的、私有的可寻址内存空间,但实际上,它是由分布在物理 RAM 中的页组成的。在这里,页只是操作系统对连续内存块的俗称,其大小范围取决于所使用的特定 CPU 架构。对于 x86-84,一个页面可以小到 4KB,也可以大到 1GB。
At a given time, only a fraction of this virtual space is actually mapped to physical pages. As time passes and the process starts to consume more memory for its tasks, the OS starts to allocate more physical pages and map them to the virtual space. When the demand for memory exceeds what’s physically available, the OS will start to swap out pages that are not being used at that moment to secondary storage to make room for the request.
在给定时间内,只有一小部分虚拟空间被实际映射到物理页上。随着时间的推移,进程的任务开始消耗更多的内存,操作系统就会开始分配更多的物理页,并将它们映射到虚拟空间。当内存需求超过物理可用空间时,操作系统就会开始将当时未使用的内存页交换到二级存储空间,以满足需求。
A shared memory block behaves just like regular memory, but, in contrast with regular memory, it is not private to a single process. When a process changes the contents of any byte within this block, any other process with access to this same shared memory “sees” this change instantly.
共享内存块的行为与常规内存类似,但与常规内存不同的是,它不为单个进程所独占。当一个进程更改了该内存块中任何字节的内容时,任何可以访问相同共享内存的其他进程都会立即 “看到 “这一更改。
This is a list of common uses for shared memory:
这是共享内存的常见用途列表:
- Debuggers (ever wondered how a debugger can inspect variables in another process?)
- Inter-process communication
- Read-only content sharing between processes (ex: dynamic library code)
- Hacks of all sorts ;^)
3. Shared Memory and Memory-Mapped Files
3.共享内存和内存映射文件
A memory-mapped file, as the name suggests, is a regular file whose contents are directly mapped to a contiguous area in the virtual memory of a process. This means that we can read and/or change its contents without explicit use of I/O operations. The OS will detect any writes to the mapped area and will schedule a background I/O operation to persist the modified data.
内存映射文件,顾名思义,是一种常规文件,其内容直接映射到进程虚拟内存中的连续区域。这意味着我们无需明确使用 I/O 操作即可读取和/或更改其内容。操作系统将检测到对映射区域的任何写入,并安排一次后台 I/O 操作来持久化修改后的数据。
Since there are no guarantees on when this background operation will happen, the OS also offers a system call to flush any pending changes. This is important for use cases like database redo logs, but not needed for our inter-process communication (IPC, for short) scenario.
由于无法保证后台操作何时发生,操作系统还提供了一个系统调用来刷新任何待处理的更改。这对数据库重做日志等用例很重要,但在我们的进程间通信(简称 IPC)场景中却不需要。
Memory-mapped files are commonly used by database servers to achieve high throughput I/O operations, but we can also use them to bootstrap a shared-memory-based IPC mechanism. The basic idea is that all processes that need to share data map the same file and, voilà, they now have a shared memory area.
内存映射文件通常被数据库服务器用于实现高吞吐量的 I/O 操作,但我们也可以使用它们来引导基于共享内存的 IPC 机制。基本原理是,所有需要共享数据的进程映射同一个文件,然后,它们就拥有了一个共享内存区域。
4. Creating Memory-Mapped Files in Java
4.用 Java 创建内存映射文件
In Java, we use the FileChannel‘s map() method to map a region of a file into memory, which returns a MappedByteBuffer that allows us to access its contents:
在 Java 中,我们使用 FileChannel 的 map() 方法将文件的一个区域映射到内存中,然后返回一个 MappedByteBuffer 允许我们访问其内容:
MappedByteBuffer createSharedMemory(String path, long size) {
try (FileChannel fc = (FileChannel)Files.newByteChannel(new File(path).toPath(),
EnumSet.of(
StandardOpenOption.CREATE,
StandardOpenOption.SPARSE,
StandardOpenOption.WRITE,
StandardOpenOption.READ))) {
return fc.map(FileChannel.MapMode.READ_WRITE, 0, size);
}
catch( IOException ioe) {
throw new RuntimeException(ioe);
}
}
The use of the SPARSE option here is quite relevant. As long the underlying OS and file system supports it, we can map sizable memory area without actually consuming disk space.
这里使用的SPARSE选项非常重要。只要底层操作系统和文件系统支持该选项,我们就可以映射相当大的内存区域,而不会实际占用磁盘空间。
Now, let’s create a simple demo application. The Producer application will allocate a shared memory large enough to hold 64KB of data plus a SHA1 hash (20 bytes). Next, it will start a loop where it will fill the buffer with random data, followed by its SHA1 hash. We’ll repeat this operation continuously for 30 seconds and then exit:
现在,让我们创建一个简单的演示应用程序。Producer 应用程序将分配一个共享内存,其大小足以容纳 64KB 的数据和 SHA1 哈希值(20 字节)。接下来,它将启动一个循环,用随机数据填充缓冲区,然后再填充其 SHA1 哈希值。我们将连续重复这一操作 30 秒,然后退出:
// ... SHA1 digest initialization omitted
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
Random rnd = new Random();
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
System.out.println("Starting producer iterations...");
while(System.currentTimeMillis() - start < 30000) {
for (int i = 0; i < capacity - hashLen; i++) {
byte value = (byte) (rnd.nextInt(256) & 0x00ff);
digest.update(value);
shm.put(i, value);
}
// Write hash at the end
byte[] hash = digest.digest();
shm.put(capacity - hashLen, hash);
iterations++;
}
System.out.printf("%d iterations run\n", iterations);
To test that we indeed can share memory, we’ll also create a Consumer app that will read the buffer’s content, compute its hash, and compare it with the Producer-generated one. We’ll repeat this process for 30 seconds. At each iteration, will also compute the buffer content’s hash and compare it with the one present at the buffer’s end:
为了测试我们是否真的可以共享内存,我们还将创建一个 Consumer 应用程序,它将读取缓冲区的内容、计算其散列并与 Producer 生成的散列进行比较。我们将重复这一过程 30 秒。在每次迭代时,还将计算缓冲区内容的哈希值,并将其与缓冲区结束时的哈希值进行比较:
// ... digest initialization omitted
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
System.out.println("Starting consumer iterations...");
long matchCount = 0;
long mismatchCount = 0;
byte[] expectedHash = new byte[hashLen];
while (System.currentTimeMillis() - start < 30000) {
for (int i = 0; i < capacity - 20; i++) {
byte value = shm.get(i);
digest.update(value);
}
byte[] hash = digest.digest();
shm.get(capacity - hashLen, expectedHash);
if (Arrays.equals(hash, expectedHash)) {
matchCount++;
} else {
mismatchCount++;
}
iterations++;
}
System.out.printf("%d iterations run. matches=%d, mismatches=%d\n", iterations, matchCount, mismatchCount);
To test our memory-sharing scheme, let’s start both programs at the same time. This is their output when running on a 3Ghz, quad-core Intel I7 machine:
为了测试我们的内存共享方案,让我们同时启动这两个程序。这是在 3Ghz 四核英特尔 I7 机器上运行时的输出结果:
# Producer output
Starting producer iterations...
11722 iterations run
# Consumer output
Starting consumer iterations...
18893 iterations run. matches=11714, mismatches=7179
We can see that, in many cases, the consumer detects that the expected computed values are different. Welcome to the wonderful world of concurrency issues!
我们可以看到,在许多情况下,消费者会检测到预期的计算值是不同的。欢迎来到并发问题的奇妙世界!
5. Synchronizing Shared Memory Access
5.同步共享内存访问
The root cause for the issue we’ve seen is that we need to synchronize access to the shared memory buffer. The Consumer must wait for the Producer to finish writing the hash before it starts reading the data. On the other hand, the Producer also must wait for the Consumer to finish consuming the data before writing to it again.
我们所看到的问题的根本原因在于我们需要同步访问共享内存缓冲区。Consumer 必须等待 Producer 完成哈希值的写入,然后才能开始读取数据。另一方面,Producer 也必须等待Consumer 消耗完数据后,才能再次写入数据。
For a regular multithreaded application, solving this issue is no big deal. The standard library offers several synchronization primitives that allow us to control who can write to the shared memory at a given time.
对于普通的多线程应用程序来说,解决这个问题并不难。标准库提供了多个同步原语,让我们可以控制谁可以在特定时间写入共享内存。
However, ours is a multi-JVM scenario, so none of those standard methods apply. So, what should we do? Well, the short answer is that we’ll have to cheat. We could resort to OS-specific mechanisms like semaphores, but this would hinder our application’s portability. Also, this implies using JNI or JNA, which also complicates things.
但是,我们的情况是多 JVM,所以这些标准方法都不适用。那么,我们该怎么办呢?简而言之,我们必须作弊。我们可以采用特定于操作系统的机制,如 semaphores,但这会妨碍应用程序的可移植性。此外,这还意味着要使用 JNI 或 JNA,这也会让事情变得复杂。
Enter Unsafe. Despite its somewhat scary name, this standard library class offers exactly what we need to implement a simple lock mechanism: the compareAndSwapInt() method.
请输入 不安全。尽管它的名字有点吓人,但这个标准库类提供了我们实现简单锁定机制所需的内容:compareAndSwapInt() 方法。
This method implements an atomic test-and-set primitive that takes four arguments. Although not clearly stated in the documentation, it can target not only Java objects but also a raw memory address. For the latter, we pass null in the first argument, which makes it treat the offset argument as a virtual memory address.
该方法实现了一个原子测试和设置基元,需要四个参数。 虽然文档中没有明确说明,但它不仅可以针对 Java 对象,还可以针对原始内存地址。对于后者,我们在第一个参数中传递null,这样它就会将offset参数视为虚拟内存地址。
When we call this method, it will first check the value at the target address and compare it with the expected value. If they’re equal, then it will modify the location’s content to the new value and return true indicating success. If the value at the location is different from expected, nothing happens, and the method returns false.
当我们调用此方法时,它会首先检查目标地址的值,并将其与 expected 值进行比较。如果两者相等,它就会将位置内容修改为新值,并返回 true 表示成功。如果位置上的值与 expected 不同,则不会发生任何事情,方法返回 false. 。
More importantly, this atomic operation is guaranteed to work even in multicore architectures, which is a critical feature for synchronizing multiple executing threads.
更重要的是,这种原子操作即使在多核架构中也能保证正常运行,而这正是同步多个执行线程的关键特性。
Let’s create a SpinLock class that takes advantage of this method to implement a (very!) simple lock mechanism:
让我们创建一个 SpinLock 类,利用该方法来实现一个(非常!)简单的锁定机制:
//... package and imports omitted
public class SpinLock {
private static final Unsafe unsafe;
// ... unsafe initialization omitted
private final long addr;
public SpinLock(long addr) {
this.addr = addr;
}
public boolean tryLock(long maxWait) {
long deadline = System.currentTimeMillis() + maxWait;
while (System.currentTimeMillis() < deadline ) {
if (unsafe.compareAndSwapInt(null, addr, 0, 1)) {
return true;
}
}
return false;
}
public void unlock() {
unsafe.putInt(addr, 0);
}
}
This implementation lacks key features, like checking whether it owns the lock before releasing it, but it will suffice for our purpose.
这种实现方式缺少一些关键功能,比如在释放锁之前检查自己是否拥有该锁,但对于我们的目的来说已经足够了。
Okay, so how do we get the memory address that we’ll use to store the lock status? This must be an address within the shared memory buffer so both processes can use it, but the MappedByteBuffer class does not expose the actual memory address.
好了,那么我们如何获得用于存储锁状态的内存地址呢?这必须是共享内存缓冲区中的一个地址,这样两个进程都可以使用它,但是 MappedByteBuffer 类并没有公开实际的内存地址。
Inspecting the object that map() returns, we can see that it is a DirectByteBuffer. This class has a public method called address() that returns exactly what we want. Unfortunately, this class is package-private so we can’t use a simple cast to access this method.
检查 map() 返回的对象,我们可以看到它是一个 DirectByteBuffer. 该类有一个名为 address() 的公共方法,它返回的正是我们想要的内容。不幸的是,该类是包私有的,因此我们无法使用简单的施用来访问该方法。
To bypass this limitation, we’ll cheat a little again and use reflection to invoke this method:
为了绕过这一限制,我们将再次作弊,使用反射来调用该方法:
private static long getBufferAddress(MappedByteBuffer shm) {
try {
Class<?> cls = shm.getClass();
Method maddr = cls.getMethod("address");
maddr.setAccessible(true);
Long addr = (Long) maddr.invoke(shm);
if (addr == null) {
throw new RuntimeException("Unable to retrieve buffer's address");
}
return addr;
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
}
Here, we’re using setAccessible() to make the address() method callable through the Method handle. However, be aware that, from Java 17 onwards, this technique won’t work unless we explicitly use the runtime –add-opens flag.
在这里,我们使用 setAccessible() 使 address() 方法可通过 Method 句柄调用。不过,请注意,从 Java 17 开始,除非我们显式地使用运行时 -add-opens 标志,否则此技术将无法工作。
6. Adding Synchronization to Producer and Consumer
6.为 Producer 和 Consumer 添加同步功能
Now that we have a lock mechanism, let’s apply it to the Producer first. For the purposes of this demo, we’ll assume that the Producer will always start before the Consumer. We need this so we can initialize the buffer, clearing its content including the area we’ll use with the SpinLock:
现在我们有了锁定机制,让我们首先将其应用到 Producer 上。在本演示中,我们假定 Producer 始终在 Consumer 之前启动。我们需要这样做,以便初始化缓冲区,清除其内容,包括我们将与 SpinLock 一起使用的区域:</em
public static void main(String[] args) throws Exception {
// ... digest initialization omitted
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
// Cleanup lock area
shm.putInt(0, 0);
long addr = getBufferAddress(shm);
System.out.println("Starting producer iterations...");
long start = System.currentTimeMillis();
long iterations = 0;
Random rnd = new Random();
int capacity = shm.capacity();
SpinLock lock = new SpinLock(addr);
while(System.currentTimeMillis() - start < 30000) {
if (!lock.tryLock(5000)) {
throw new RuntimeException("Unable to acquire lock");
}
try {
// Skip the first 4 bytes, as they're used by the lock
for (int i = 4; i < capacity - hashLen; i++) {
byte value = (byte) (rnd.nextInt(256) & 0x00ff);
digest.update(value);
shm.put(i, value);
}
// Write hash at the end
byte[] hash = digest.digest();
shm.put(capacity - hashLen, hash);
iterations++;
}
finally {
lock.unlock();
}
}
System.out.printf("%d iterations run\n", iterations);
}
Compared to the unsynchronized version, there are just minor changes:
与非同步版本相比,只有微小的变化:
- Retrieve the memory address associated with the MappedByteBufer
- Create a SpinLock instance using this address. The lock uses an int, so it will take the four initial bytes of the buffer
- Use the SpinLock instance to protect the code that fills the buffer with random data and its hash
Now, let’s apply similar changes to the Consumer side:
现在,让我们对消费者端进行类似的更改:
private static void main(String args[]) throws Exception {
// ... digest initialization omitted
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
long addr = getBufferAddress(shm);
System.out.println("Starting consumer iterations...");
Random rnd = new Random();
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
long matchCount = 0;
long mismatchCount = 0;
byte[] expectedHash = new byte[hashLen];
SpinLock lock = new SpinLock(addr);
while (System.currentTimeMillis() - start < 30000) {
if (!lock.tryLock(5000)) {
throw new RuntimeException("Unable to acquire lock");
}
try {
for (int i = 4; i < capacity - hashLen; i++) {
byte value = shm.get(i);
digest.update(value);
}
byte[] hash = digest.digest();
shm.get(capacity - hashLen, expectedHash);
if (Arrays.equals(hash, expectedHash)) {
matchCount++;
} else {
mismatchCount++;
}
iterations++;
} finally {
lock.unlock();
}
}
System.out.printf("%d iterations run. matches=%d, mismatches=%d\n", iterations, matchCount, mismatchCount);
}
With those changes, we can now run both sides and compare them with the previous result:
有了这些改动,我们现在就可以运行两边的结果,并与之前的结果进行比较:
# Producer output
Starting producer iterations...
8543 iterations run
# Consumer output
Starting consumer iterations...
8607 iterations run. matches=8607, mismatches=0
As expected, the reported iteration count will be lower compared to the non-synchronized version. The main reason is that we spend most part of the time within the critical section of the code holding the lock. Whichever program holding the lock prevents the other side from doing anything.
不出所料,报告的迭代次数将低于非同步版本。主要原因是我们在代码的关键部分花费了大部分时间来锁定。无论哪个程序持有锁,都会阻止另一方执行任何操作。
If we compare the average iteration count reported from the first case, it will be approximately the same as the sum of iterations we got this time. This shows that the overhead added by the lock mechanism itself is minimal.
如果我们比较第一种情况下报告的平均迭代次数,它将与我们这次得到的迭代次数总和大致相同。这说明锁定机制本身增加的开销很小。
6. Conclusion
6.结论
In this tutorial, we’ve explored how to use share a memory area between two JVMs running on the same machine. We can use the technique presented here as the foundation for high-throughput, low-latency inter-process communication libraries.
在本教程中,我们探讨了如何在同一台机器上运行的两个 JVM 之间共享内存区域。我们可以使用这里介绍的技术作为高吞吐量、低延迟进程间通信库的基础。
As usual, all code is available over on GitHub.
与往常一样,所有代码均可在 GitHub 上获取。