Commit 37163fff authored by James Hilliard's avatar James Hilliard Committed by Stefano Babic
Browse files

IPC: add status streaming support



This adds a NOTIFY_STREAM command to the ctrl interface, this
is implemented with a history replay functionality so that
clients can see notify messages from before they connect.

This has the advantage of making it possible for multiple clients
to monitor notify messages at the same time.
Signed-off-by: default avatarJames Hilliard <james.hilliard1@gmail.com>
parent 859be4a8
Pipeline #9236 passed with stage
in 6 minutes and 45 seconds
......@@ -69,6 +69,14 @@ static pthread_t subprocess_ipc_handler_thread_id;
static pthread_mutex_t subprocess_msg_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
struct notify_conn {
SIMPLEQ_ENTRY(notify_conn) next;
int sockfd;
};
SIMPLEQ_HEAD(connections, notify_conn);
static struct connections notify_conns;
static bool is_selection_allowed(const char *software_set, char *running_mode,
struct dict const *acceptedlist)
{
......@@ -116,11 +124,61 @@ static void clean_msg(char *msg, char drop)
}
}
static int write_notify_msg(ipc_message *msg, int sockfd)
{
void *buf;
size_t count;
ssize_t n;
int ret = 0;
buf = msg;
count = sizeof(*msg);
while (count > 0) {
n = send(sockfd, buf, count, MSG_NOSIGNAL);
if (n <= 0) {
/*
* We can't use the notify methods for error logging here as it will cause a deadlock.
*/
if (n == 0) {
fprintf(stderr, "Error: A status client is not responding, removing it.\n");
} else {
fprintf(stderr, "A status client disappeared, removing it: %s\n", strerror(errno));
}
ret = -1;
break;
}
count -= (size_t)n;
buf = (char*)buf + n;
}
return ret;
}
/*
* This must be called after acquiring the mutex
* for the msglock structure
*/
static void send_notify_msg(ipc_message *msg)
{
struct notify_conn *conn, *tmp;
int ret;
SIMPLEQ_FOREACH_SAFE(conn, &notify_conns, next, tmp) {
ret = write_notify_msg(msg, conn->sockfd);
if (ret < 0) {
close(conn->sockfd);
SIMPLEQ_REMOVE(&notify_conns, conn,
notify_conn, next);
free(conn);
}
}
}
static void network_notifier(RECOVERY_STATUS status, int error, int level, const char *msg)
{
int len = msg ? strlen(msg) : 0;
struct msg_elem *newmsg = (struct msg_elem *)calloc(1, sizeof(*newmsg) + len + 1);
struct msg_elem *oldmsg;
ipc_message ipcmsg;
if (!newmsg)
return;
......@@ -148,6 +206,18 @@ static void network_notifier(RECOVERY_STATUS status, int error, int level, const
SIMPLEQ_INSERT_TAIL(&notifymsgs, newmsg, next);
ipcmsg.magic = IPC_MAGIC;
ipcmsg.type = NOTIFY_STREAM;
memset(ipcmsg.data.msg, 0, sizeof(ipcmsg.data.msg));
strncpy(ipcmsg.data.notify.msg, newmsg->msg,
sizeof(ipcmsg.data.notify.msg) - 1);
ipcmsg.data.notify.status = newmsg->status;
ipcmsg.data.notify.error = newmsg->error;
ipcmsg.data.notify.level = newmsg->level;
send_notify_msg(&ipcmsg);
pthread_mutex_unlock(&msglock);
}
......@@ -366,7 +436,8 @@ void *network_thread (void *data)
struct sockaddr_un cliaddr;
ipc_message msg;
int nread;
struct msg_elem *notification;
struct msg_elem *notification, *tmp;
struct notify_conn *conn;
int ret;
update_state_t value;
struct subprocess_msg_elem *subprocess_msg;
......@@ -378,6 +449,7 @@ void *network_thread (void *data)
}
SIMPLEQ_INIT(&notifymsgs);
SIMPLEQ_INIT(&notify_conns);
SIMPLEQ_INIT(&subprocess_messages);
register_notifier(network_notifier);
......@@ -507,6 +579,60 @@ void *network_thread (void *data)
}
pthread_mutex_unlock(&msglock);
break;
case NOTIFY_STREAM:
msg.type = ACK;
memset(msg.data.msg, 0, sizeof(msg.data.msg));
msg.data.status.current = instp->status;
msg.data.status.last_result = instp->last_install;
msg.data.status.error = instp->last_error;
ret = write(ctrlconnfd, &msg, sizeof(msg));
msg.type = NOTIFY_STREAM;
if (ret < 0) {
ERROR("Error write notify ack on socket ctrl");
close(ctrlconnfd);
break;
}
/* Get first notification from the queue */
pthread_mutex_lock(&msglock);
notification = SIMPLEQ_FIRST(&notifymsgs);
/* Send notify history */
SIMPLEQ_FOREACH_SAFE(notification, &notifymsgs, next, tmp) {
memset(msg.data.msg, 0, sizeof(msg.data.msg));
strncpy(msg.data.notify.msg, notification->msg,
sizeof(msg.data.notify.msg) - 1);
msg.data.notify.status = notification->status;
msg.data.notify.error = notification->error;
msg.data.notify.level = notification->level;
ret = write_notify_msg(&msg, ctrlconnfd);
if (ret < 0) {
pthread_mutex_unlock(&msglock);
ERROR("Error write notify history on socket ctrl");
close(ctrlconnfd);
break;
}
}
/*
* Save the new connection to send notifications to
*/
conn = (struct notify_conn *)calloc(1, sizeof(*conn));
if (!conn) {
pthread_mutex_unlock(&msglock);
ERROR("Out of memory, skipping...");
close(ctrlconnfd);
pthread_mutex_unlock(&stream_mutex);
continue;
}
conn->sockfd = ctrlconnfd;
SIMPLEQ_INSERT_TAIL(&notify_conns, conn, next);
pthread_mutex_unlock(&msglock);
break;
case SET_AES_KEY:
#ifndef CONFIG_PKCS11
......
......@@ -37,7 +37,8 @@ typedef enum {
SET_UPDATE_STATE, /* set bootloader ustate */
GET_UPDATE_STATE,
REQ_INSTALL_EXT,
SET_VERSIONS_RANGE
SET_VERSIONS_RANGE,
NOTIFY_STREAM
} msgtype;
/*
......@@ -80,6 +81,12 @@ typedef union {
int error;
char desc[2048];
} status;
struct {
int status;
int error;
int level;
char msg[2048];
} notify;
struct {
struct swupdate_request req;
unsigned int len; /* Len of data valid in buf */
......@@ -122,6 +129,8 @@ int ipc_send_data(int connfd, char *buf, int size);
void ipc_end(int connfd);
int ipc_get_status(ipc_message *msg);
int ipc_get_status_timeout(ipc_message *msg, unsigned int timeout_ms);
int ipc_notify_connect(void);
int ipc_notify_receive(int *connfd, ipc_message *msg);
int ipc_postupdate(ipc_message *msg);
int ipc_send_cmd(ipc_message *msg);
......
......@@ -163,6 +163,85 @@ int ipc_get_status_timeout(ipc_message *msg, unsigned int timeout_ms)
return ret == 0 ? sizeof(*msg) : -1;
}
static int __ipc_start_notify(int connfd, ipc_message *msg, unsigned int timeout_ms)
{
fd_set fds;
struct timeval tv;
memset(msg, 0, sizeof(*msg));
msg->magic = IPC_MAGIC;
msg->type = NOTIFY_STREAM;
if (write(connfd, msg, sizeof(*msg)) != sizeof(*msg))
return -1;
if (timeout_ms) {
FD_ZERO(&fds);
FD_SET(connfd, &fds);
/*
* Invalid the message
* Caller should check it
*/
msg->magic = 0;
tv.tv_sec = 0;
tv.tv_usec = timeout_ms * 1000;
if ((select(connfd + 1, &fds, NULL, NULL, &tv) <= 0) ||
!FD_ISSET(connfd, &fds))
return -ETIMEDOUT;
}
return -(read(connfd, msg, sizeof(*msg)) != sizeof(*msg));
}
int ipc_notify_connect(void)
{
int ret;
int connfd;
ipc_message msg;
connfd = prepare_ipc();
if (connfd < 0)
return -1;
/*
* Initialize the notify stream
*/
ret = __ipc_start_notify(connfd, &msg, 0);
if (ret || msg.type != ACK) {
fprintf(stdout, "Notify connection handshake failed..\n");
close(connfd);
return ret;
}
return connfd;
}
int ipc_notify_receive(int *connfd, ipc_message *msg)
{
int ret = read(*connfd, msg, sizeof(*msg));
if (ret == -1 && (errno == EAGAIN || errno == EINTR))
return 0;
if (ret != sizeof(*msg)) {
fprintf(stdout, "Connection closing..\n");
close(*connfd);
*connfd = -1;
return -1;
}
if (msg->magic != IPC_MAGIC) {
fprintf(stdout, "Connection closing, invalid magic...\n");
close(*connfd);
*connfd = -1;
return -1;
}
return ret;
}
int ipc_inst_start_ext(void *priv, ssize_t size)
{
int connfd;
......
......@@ -141,31 +141,44 @@ static void broadcast(struct mg_mgr *mgr, char *str)
static void *broadcast_message_thread(void *data)
{
int fd = -1;
for (;;) {
ipc_message msg;
int ret = ipc_get_status(&msg);
int ret;
if (!ret && strlen(msg.data.status.desc) != 0) {
if (fd < 0)
fd = ipc_notify_connect();
/*
* if still fails, try later
*/
if (fd < 0) {
sleep(1);
continue;
}
ret = ipc_notify_receive(&fd, &msg);
if (ret != sizeof(msg))
return NULL;
if (strlen(msg.data.notify.msg) != 0) {
struct mg_mgr *mgr = (struct mg_mgr *) data;
char text[4096];
char str[4160];
snescape(text, sizeof(text), msg.data.status.desc);
snescape(text, sizeof(text), msg.data.notify.msg);
snprintf(str, sizeof(str),
"{\r\n"
"\t\"type\": \"message\",\r\n"
"\t\"level\": \"%d\",\r\n"
"\t\"text\": \"%s\"\r\n"
"}\r\n",
(msg.data.status.error) ? 3 : 6, /* RFC 5424 */
text);
"{\r\n"
"\t\"type\": \"message\",\r\n"
"\t\"level\": \"%d\",\r\n"
"\t\"text\": \"%s\"\r\n"
"}\r\n",
msg.data.notify.level, /* RFC 5424 */
text);
broadcast(mgr, str);
continue;
}
usleep(50 * 1000);
}
return NULL;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment