Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ jobs:
matrix:
python-version: [3.8, 3.9, "3.10", 3.11, 3.12, 3.13]
os: [ubuntu-latest]
disable_trio: [""]
include:
- python-version: "3.13"
disable_trio: "-p no:pytest-trio"
- python-version: "3.14-dev"
disable_trio: "-p no:pytest-trio"
runs-on: ${{ matrix.os }}
steps:
- uses: KeisukeYamashita/memcached-actions@v1
Expand All @@ -31,4 +37,4 @@ jobs:
uv run flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics --exclude=.venv
uv run flake8 . --count --exit-zero --max-complexity=10 --statistics --exclude=.venv
- name: Test with pytest
run: uv run pytest -v
run: pytest ${{ matrix.disable_trio }}
86 changes: 86 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,92 @@ Key features:
$ pip install memcache
```

## Usage

### Basic Usage

```python
import memcache

client = memcache.Memcache(("localhost", 11211))

client.set("key", "value", expire=60)
value = client.get("key")
client.delete("key")

# Atomic counters
client.set("counter", 0)
client.incr("counter") # 1
client.incr("counter", 5) # 6
client.decr("counter", 2) # 4

# Compare-and-swap
value, token = client.gets("key")
client.cas("key", "new_value", token)
```

Async usage mirrors the sync API with `AsyncMemcache` and `await`.

### MetaClient (Advanced)

> **Experimental.** `MetaClient` lives under `memcache.experiment` and its API
> may change in any minor release. If you depend on it, pin the **minor version**
> in your dependency spec. Patch releases (`x.y.Z`) will not introduce breaking
> changes, but minor releases (`x.Y.0`) might.
>
> **requirements.txt**
> ```
> memcache~=0.14.0 # allows 0.14.x, blocks 0.15+
> ```
>
> **pyproject.toml**
> ```toml
> [project]
> dependencies = [
> "memcache>=0.14.0,<0.15",
> ]
> ```

`MetaClient` exposes the full power of memcached's
[meta protocol](https://github.com/memcached/memcached/blob/master/doc/protocol.txt),
including flags unavailable through the basic API.

```python
from memcache.experiment import MetaClient

client = MetaClient(("localhost", 11211))

# get returns a GetResult with rich metadata
result = client.get(
"key",
return_cas=True,
return_ttl=True,
return_hit_before=True,
)
if result is not None:
print(result.value)
print(result.cas_token)
print(result.ttl)
print(result.hit_before)

# Atomic get-and-touch (update TTL in the same round-trip)
value = client.gat("key", expire=120)

# Store only if key does not exist
client.add("key", "value", expire=60)

# Store only if key already exists
client.replace("key", "new_value")

# Increment with auto-create if missing
client.incr("counter", delta=1, initial=0, initial_ttl=3600)

# Flush with a delay
client.flush_all(delay=30)
```

`AsyncMetaClient` is the async counterpart with the same interface.

## About the Project

Memcache is &copy; 2020-2025 by [aisk](https://github.com/aisk).
Expand Down
118 changes: 118 additions & 0 deletions memcache/async_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator, Callable, Optional, Tuple

import anyio
from anyio.streams.buffered import BufferedByteReceiveStream

from .errors import MemcacheError
from .meta_command import MetaCommand, MetaResult


class AsyncConnection:
def __init__(
self,
addr: Tuple[str, int],
*,
username: Optional[str] = None,
password: Optional[str] = None,
):
self._addr = addr
self._username = username
self._password = password
self._connected = False

async def _connect(self) -> None:
self.writer = await anyio.connect_tcp(self._addr[0], self._addr[1])
self.reader = BufferedByteReceiveStream(self.writer)
await self._auth()
self._connected = True

async def _auth(self) -> None:
if self._username is None or self._password is None:
return
auth_data = b"%s %s" % (
self._username.encode("utf-8"),
self._password.encode("utf-8"),
)
await self.writer.send(b"set auth x 0 %d\r\n" % len(auth_data))
await self.writer.send(auth_data)
await self.writer.send(b"\r\n")
response = await self.reader.receive_until(b"\r\n", max_bytes=1024)
if response != b"STORED":
raise MemcacheError(response)

async def flush_all(self, delay: int = 0) -> None:
if not self._connected:
await self._connect()

if delay > 0:
await self.writer.send(b"flush_all %d\r\n" % delay)
else:
await self.writer.send(b"flush_all\r\n")
response = await self.reader.receive_until(b"\r\n", max_bytes=1024)
if response != b"OK":
raise MemcacheError(response)

async def execute_meta_command(self, command: MetaCommand) -> MetaResult:
try:
return await self._execute_meta_command(command)
except (IndexError, ConnectionResetError, BrokenPipeError):
self._connected = False
return await self._execute_meta_command(command)

async def _execute_meta_command(self, command: MetaCommand) -> MetaResult:
if not self._connected:
await self._connect()

await self.writer.send(command.dump_header())
if command.value:
await self.writer.send(command.value + b"\r\n")
return await self._receive_meta_result()

async def _receive_meta_result(self) -> MetaResult:
header_line = await self.reader.receive_until(b"\r\n", max_bytes=1024)
result = MetaResult.load_header(header_line)

if result.rc == b"VA":
if result.datalen is None:
raise MemcacheError("invalid response: missing datalen")
result.value = await self.reader.receive_exactly(result.datalen)
await self.reader.receive_exactly(2) # read the "\r\n"

return result


class AsyncPool:
def __init__(
self,
create_connection: Callable[..., AsyncConnection],
max_size: Optional[int],
timeout: Optional[int],
) -> None:
self._create_connection = create_connection
self._max_size = max_size
self._timeout = timeout
self._size = 0
self._lock = asyncio.Lock()
self._connections: asyncio.Queue[AsyncConnection] = asyncio.Queue()

@asynccontextmanager
async def get(self) -> AsyncIterator[AsyncConnection]:
try:
connection = self._connections.get_nowait()
yield connection
await self._connections.put(connection)
except asyncio.QueueEmpty:
if self._max_size and self._size >= self._max_size:
connection = await asyncio.wait_for(
self._connections.get(), timeout=self._timeout
)
yield connection
await self._connections.put(connection)
else:
async with self._lock:
self._size += 1
connection = self._create_connection()
yield connection
await self._connections.put(connection)
Loading