Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions c_src/py_callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -2856,6 +2856,38 @@ static PyObject *erlang_channel_is_closed_impl(PyObject *self, PyObject *args) {
}
}

/**
* @brief Close a channel from Python
*
* Usage: erlang._channel_close(channel_ref)
* Returns: True on success
* Raises: TypeError if invalid reference
*/
static PyObject *erlang_channel_close_impl(PyObject *self, PyObject *args) {
(void)self;
PyObject *capsule;

if (!PyArg_ParseTuple(args, "O", &capsule)) {
return NULL;
}

if (!PyCapsule_CheckExact(capsule)) {
PyErr_SetString(PyExc_TypeError, "expected channel reference");
return NULL;
}

py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(capsule, CHANNEL_CAPSULE_NAME);
if (channel == NULL) {
PyErr_SetString(PyExc_ValueError, "invalid channel reference");
return NULL;
}

/* Close the channel - this wakes any waiting receivers */
channel_close(channel);

Py_RETURN_TRUE;
}

/* ============================================================================
* ByteChannel Methods (raw bytes, no term conversion)
* ============================================================================ */
Expand Down Expand Up @@ -3328,6 +3360,10 @@ static PyMethodDef ErlangModuleMethods[] = {
"Check if channel is closed.\n"
"Usage: erlang._channel_is_closed(channel_ref)\n"
"Returns: True if closed, False otherwise."},
{"_channel_close", erlang_channel_close_impl, METH_VARARGS,
"Close a channel.\n"
"Usage: erlang._channel_close(channel_ref)\n"
"Returns: True. Wakes any waiting receivers."},
/* ByteChannel methods (raw bytes, no term conversion) */
{"_byte_channel_try_receive_bytes", erlang_byte_channel_try_receive_bytes_impl, METH_VARARGS,
"ByteChannel receive (non-blocking, raw bytes).\n"
Expand Down
30 changes: 30 additions & 0 deletions docs/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,27 @@ msg = await ch.async_receive()

**Raises:** `ChannelClosed` when the channel is closed.

#### `close()`

Close the channel from Python. Wakes any waiting receivers.

```python
ch.close() # Signal no more data will be sent
```

Safe to call multiple times.

#### Context Manager

Channels support the `with` statement for automatic cleanup:

```python
with Channel(channel_ref) as ch:
for msg in ch:
process(msg)
# channel automatically closed on exit
```

#### Iteration

```python
Expand Down Expand Up @@ -461,6 +482,15 @@ def process_bytes(channel_ref):

# Send bytes back
ch.send_bytes(b"response data")

# Close when done
ch.close()

# Or use context manager for automatic cleanup
with ByteChannel(channel_ref) as ch:
for chunk in ch:
process(chunk)
# channel automatically closed
```

### Async Python API
Expand Down
24 changes: 24 additions & 0 deletions priv/_erlang_impl/_byte_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,5 +307,29 @@ def _is_closed(self) -> bool:
except Exception:
return True

def close(self):
"""Close the byte channel.

Closes the channel and wakes any waiting receivers with ByteChannelClosed.
Safe to call multiple times - subsequent calls have no effect.

Example:
byte_channel.close() # Signal no more data
"""
import erlang
try:
erlang._channel_close(self._ref)
except Exception:
pass # Ignore errors on close (may already be closed)

def __enter__(self):
"""Context manager entry - returns self."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - closes the channel."""
self.close()
return False

def __repr__(self):
return f"<ByteChannel ref={self._ref!r}>"
24 changes: 24 additions & 0 deletions priv/_erlang_impl/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,30 @@ def _is_closed(self):
except Exception:
return True

def close(self):
"""Close the channel.

Closes the channel and wakes any waiting receivers with ChannelClosed.
Safe to call multiple times - subsequent calls have no effect.

Example:
channel.close() # Signal no more data
"""
import erlang
try:
erlang._channel_close(self._ref)
except Exception:
pass # Ignore errors on close (may already be closed)

def __enter__(self):
"""Context manager entry - returns self."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - closes the channel."""
self.close()
return False

def __repr__(self):
return f"<Channel ref={self._ref!r}>"

Expand Down
Loading