kernel: msg: implement asynchronous event messages

This commit is contained in:
2026-03-24 18:32:33 +00:00
parent 110f625f04
commit 89d02c72ee
7 changed files with 178 additions and 56 deletions

View File

@@ -13,6 +13,7 @@ enum kmsg_status {
KMSG_WAIT_RECEIVE, KMSG_WAIT_RECEIVE,
KMSG_WAIT_REPLY, KMSG_WAIT_REPLY,
KMSG_REPLY_SENT, KMSG_REPLY_SENT,
KMSG_ASYNC,
}; };
struct msg { struct msg {
@@ -20,10 +21,26 @@ struct msg {
enum kmsg_status msg_status; enum kmsg_status msg_status;
struct btree_node msg_node; struct btree_node msg_node;
msgid_t msg_id; msgid_t msg_id;
kern_status_t msg_result;
struct port *msg_sender_port; struct port *msg_sender_port;
struct thread *msg_sender_thread; struct thread *msg_sender_thread;
kern_status_t msg_result;
kern_msg_type_t msg_type;
union {
/* msg_type = KERN_MSG_TYPE_DATA */
struct {
kern_msg_t msg_req, msg_resp; kern_msg_t msg_req, msg_resp;
}; };
/* msg_type = KERN_MSG_TYPE_EVENT */
struct {
kern_msg_event_type_t msg_event;
};
};
};
extern void msg_init(void);
extern struct msg *msg_alloc(void);
extern void msg_free(struct msg *msg);
#endif #endif

View File

@@ -78,6 +78,7 @@ void kernel_init(uintptr_t arg)
port_type_init(); port_type_init();
channel_type_init(); channel_type_init();
msg_init();
struct boot_module bsp_image = {0}; struct boot_module bsp_image = {0};
bsp_get_location(&bsp_image); bsp_get_location(&bsp_image);

View File

@@ -98,11 +98,18 @@ static struct msg *get_next_msg(
while (cur) { while (cur) {
struct msg *msg = BTREE_CONTAINER(struct msg, msg_node, cur); struct msg *msg = BTREE_CONTAINER(struct msg, msg_node, cur);
spin_lock_irqsave(&msg->msg_lock, lock_flags); spin_lock_irqsave(&msg->msg_lock, lock_flags);
if (msg->msg_status == KMSG_WAIT_RECEIVE) { switch (msg->msg_status) {
case KMSG_WAIT_RECEIVE:
msg->msg_status = KMSG_WAIT_REPLY; msg->msg_status = KMSG_WAIT_REPLY;
msg->msg_sender_port->p_status = PORT_REPLY_BLOCKED; msg->msg_sender_port->p_status = PORT_REPLY_BLOCKED;
channel->c_msg_waiting--; channel->c_msg_waiting--;
return msg; return msg;
case KMSG_ASYNC:
btree_delete(&channel->c_msg, &msg->msg_node);
channel->c_msg_waiting--;
return msg;
default:
break;
} }
spin_unlock_irqrestore(&msg->msg_lock, *lock_flags); spin_unlock_irqrestore(&msg->msg_lock, *lock_flags);
@@ -146,24 +153,22 @@ extern kern_status_t channel_recv_msg(
&channel->c_base, &channel->c_base,
CHANNEL_SIGNAL_MSG_RECEIVED); CHANNEL_SIGNAL_MSG_RECEIVED);
} }
#if 0
wait_item_init(&waiter, self);
for (;;) {
thread_wait_begin(&waiter, &channel->c_wq);
msg = get_next_msg(channel, &msg_lock_flags);
if (msg) {
break;
}
object_unlock_irqrestore(&channel->c_base, *irq_flags);
schedule(SCHED_NORMAL);
object_lock_irqsave(&channel->c_base, irq_flags);
}
thread_wait_end(&waiter, &channel->c_wq);
#endif
/* msg is now set to the next message to process */ /* msg is now set to the next message to process */
if (msg->msg_type != KERN_MSG_TYPE_DATA) {
/* event messages as asynchronous */
out_msg->msg_id = msg->msg_id;
out_msg->msg_type = msg->msg_type;
out_msg->msg_event = msg->msg_event;
out_msg->msg_sender = msg->msg_sender_thread->tr_parent->t_id;
out_msg->msg_endpoint = msg->msg_sender_port->p_base.ob_id;
spin_unlock_irqrestore(&msg->msg_lock, msg_lock_flags);
msg_free(msg);
return KERN_OK;
}
struct task *sender = msg->msg_sender_thread->tr_parent; struct task *sender = msg->msg_sender_thread->tr_parent;
struct task *receiver = self->tr_parent; struct task *receiver = self->tr_parent;
@@ -218,6 +223,7 @@ extern kern_status_t channel_recv_msg(
} }
out_msg->msg_id = msg->msg_id; out_msg->msg_id = msg->msg_id;
out_msg->msg_type = msg->msg_type;
out_msg->msg_sender = msg->msg_sender_thread->tr_parent->t_id; out_msg->msg_sender = msg->msg_sender_thread->tr_parent->t_id;
out_msg->msg_endpoint = msg->msg_sender_port->p_base.ob_id; out_msg->msg_endpoint = msg->msg_sender_port->p_base.ob_id;

22
kernel/msg.c Normal file
View File

@@ -0,0 +1,22 @@
#include <kernel/msg.h>
#include <kernel/vm.h>
static struct vm_cache msg_cache = {
.c_name = "msg",
.c_obj_size = sizeof(struct msg),
};
void msg_init(void)
{
vm_cache_init(&msg_cache);
}
struct msg *msg_alloc(void)
{
return vm_cache_alloc(&msg_cache, VM_NORMAL);
}
void msg_free(struct msg *msg)
{
vm_cache_free(&msg_cache, msg);
}

View File

@@ -1,16 +1,29 @@
#include <kernel/channel.h> #include <kernel/channel.h>
#include <kernel/port.h> #include <kernel/port.h>
#include <kernel/printk.h>
#include <kernel/thread.h> #include <kernel/thread.h>
#include <kernel/util.h> #include <kernel/util.h>
#define PORT_CAST(p) OBJECT_C_CAST(struct port, p_base, &port_type, p) #define PORT_CAST(p) OBJECT_C_CAST(struct port, p_base, &port_type, p)
static kern_status_t port_cleanup(struct object *obj);
static struct object_type port_type = { static struct object_type port_type = {
.ob_name = "port", .ob_name = "port",
.ob_size = sizeof(struct port), .ob_size = sizeof(struct port),
.ob_header_offset = offsetof(struct port, p_base), .ob_header_offset = offsetof(struct port, p_base),
.ob_ops = {
.destroy = port_cleanup,
},
}; };
static kern_status_t port_cleanup(struct object *obj, struct queue *q)
{
struct port *port = PORT_CAST(obj);
port_disconnect(port);
return KERN_OK;
}
kern_status_t port_type_init(void) kern_status_t port_type_init(void)
{ {
return object_type_register(&port_type); return object_type_register(&port_type);
@@ -58,9 +71,26 @@ struct port *port_create(void)
kern_status_t port_connect(struct port *port, struct channel *remote) kern_status_t port_connect(struct port *port, struct channel *remote)
{ {
if (port->p_status != PORT_OFFLINE) { if (port->p_status != PORT_OFFLINE) {
tracek("port_connect: port in bad state (%d)", port->p_status);
return KERN_BAD_STATE; return KERN_BAD_STATE;
} }
struct msg *msg = msg_alloc();
if (!msg) {
return KERN_NO_MEMORY;
}
msg->msg_status = KMSG_ASYNC;
msg->msg_type = KERN_MSG_TYPE_EVENT;
msg->msg_event = KERN_MSG_EVENT_CONNECTION;
msg->msg_sender_thread = current_thread();
msg->msg_sender_port = port;
unsigned long flags;
channel_lock_irqsave(remote, &flags);
channel_enqueue_msg(remote, msg);
channel_unlock_irqrestore(remote, flags);
port->p_remote = remote; port->p_remote = remote;
port->p_status = PORT_READY; port->p_status = PORT_READY;
return KERN_OK; return KERN_OK;
@@ -69,9 +99,27 @@ kern_status_t port_connect(struct port *port, struct channel *remote)
kern_status_t port_disconnect(struct port *port) kern_status_t port_disconnect(struct port *port)
{ {
if (port->p_status != PORT_READY) { if (port->p_status != PORT_READY) {
tracek("port_disconnect: port in bad state (%d)",
port->p_status);
return KERN_BAD_STATE; return KERN_BAD_STATE;
} }
struct msg *msg = msg_alloc();
if (!msg) {
return KERN_NO_MEMORY;
}
msg->msg_status = KMSG_ASYNC;
msg->msg_type = KERN_MSG_TYPE_EVENT;
msg->msg_event = KERN_MSG_EVENT_DISCONNECTION;
msg->msg_sender_thread = current_thread();
msg->msg_sender_port = port;
unsigned long flags;
channel_lock_irqsave(port->p_remote, &flags);
channel_enqueue_msg(port->p_remote, msg);
channel_unlock_irqrestore(port->p_remote, flags);
port->p_remote = NULL; port->p_remote = NULL;
port->p_status = PORT_OFFLINE; port->p_status = PORT_OFFLINE;
return KERN_OK; return KERN_OK;
@@ -84,12 +132,14 @@ kern_status_t port_send_msg(
unsigned long *lock_flags) unsigned long *lock_flags)
{ {
if (port->p_status != PORT_READY) { if (port->p_status != PORT_READY) {
tracek("port_send_msg: port in bad state (%d)", port->p_status);
return KERN_BAD_STATE; return KERN_BAD_STATE;
} }
struct thread *self = current_thread(); struct thread *self = current_thread();
struct msg msg; struct msg msg;
memset(&msg, 0x0, sizeof msg); memset(&msg, 0x0, sizeof msg);
msg.msg_type = KERN_MSG_TYPE_DATA;
msg.msg_status = KMSG_WAIT_RECEIVE; msg.msg_status = KMSG_WAIT_RECEIVE;
msg.msg_sender_thread = self; msg.msg_sender_thread = self;
msg.msg_sender_port = port; msg.msg_sender_port = port;

View File

@@ -44,6 +44,16 @@
* kern_object_wait */ * kern_object_wait */
#define KERN_WAIT_MAX_ITEMS 64 #define KERN_WAIT_MAX_ITEMS 64
/* message types */
#define KERN_MSG_TYPE_NONE 0
#define KERN_MSG_TYPE_DATA 1
#define KERN_MSG_TYPE_EVENT 2
/* event message types */
#define KERN_MSG_EVENT_NONE 0
#define KERN_MSG_EVENT_CONNECTION 1
#define KERN_MSG_EVENT_DISCONNECTION 2
/* equeue packet types */ /* equeue packet types */
#define EQUEUE_PKT_PAGE_REQUEST 0x01u #define EQUEUE_PKT_PAGE_REQUEST 0x01u
#define EQUEUE_PKT_ASYNC_SIGNAL 0x02u #define EQUEUE_PKT_ASYNC_SIGNAL 0x02u
@@ -92,6 +102,8 @@ typedef uint32_t kern_config_key_t;
typedef uint32_t vm_prot_t; typedef uint32_t vm_prot_t;
typedef int64_t ssize_t; typedef int64_t ssize_t;
typedef uint32_t kern_futex_t; typedef uint32_t kern_futex_t;
typedef uint32_t kern_msg_type_t;
typedef uint32_t kern_msg_event_type_t;
typedef unsigned short equeue_packet_type_t; typedef unsigned short equeue_packet_type_t;
@@ -122,14 +134,27 @@ typedef struct {
tid_t msg_sender; tid_t msg_sender;
/* the id of the port or channel used to send a particular message. */ /* the id of the port or channel used to send a particular message. */
koid_t msg_endpoint; koid_t msg_endpoint;
/* a list of iovecs that point to the buffers that make up the main /* the message type */
* message data. */ kern_msg_type_t msg_type;
union {
/* msg_type = KERN_MSG_TYPE_DATA */
struct {
/* a list of iovecs that point to the buffers that make
* up the main message data. */
kern_iovec_t *msg_data; kern_iovec_t *msg_data;
size_t msg_data_count; size_t msg_data_count;
/* a list of handle entries that contain the kernel handles included /* a list of handle entries that contain the kernel
* in a message. */ * handles included in a message. */
kern_msg_handle_t *msg_handles; kern_msg_handle_t *msg_handles;
size_t msg_handles_count; size_t msg_handles_count;
};
/* msg_type = KERN_MSG_TYPE_EVENT */
struct {
kern_msg_event_type_t msg_event;
};
};
} kern_msg_t; } kern_msg_t;
typedef struct { typedef struct {

View File

@@ -60,9 +60,9 @@ kern_status_t sys_port_create(kern_handle_t *out)
kern_status_t status kern_status_t status
= task_open_handle(self, &port->p_base, 0, &handle); = task_open_handle(self, &port->p_base, 0, &handle);
task_unlock_irqrestore(self, irq_flags); task_unlock_irqrestore(self, irq_flags);
object_unref(&port->p_base);
if (status != KERN_OK) { if (status != KERN_OK) {
object_unref(&port->p_base);
return status; return status;
} }
@@ -114,6 +114,7 @@ kern_status_t sys_port_connect(
status = port_connect(port, remote); status = port_connect(port, remote);
port_unlock_irqrestore(port, flags); port_unlock_irqrestore(port, flags);
object_unref(&remote->c_base); object_unref(&remote->c_base);
object_unref(port_obj);
return KERN_OK; return KERN_OK;
} }