diff --git a/src/messages.jl b/src/messages.jl index 6e895f0..fb2b29e 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -110,7 +110,7 @@ end function send_msg_now(s::IO, header, msg::AbstractMsg) id = worker_id_from_socket(s) if id > -1 - return send_msg_now(worker_from_id(id), header, msg) + return send_msg_now(worker_from_id(id)::Worker, header, msg) end send_msg_unknown(s, header, msg) end diff --git a/src/process_messages.jl b/src/process_messages.jl index a444651..6a4b8c2 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -316,7 +316,7 @@ end function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) # register a new peer worker connection - w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version) + w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)::Worker send_connection_hdr(w, false) send_msg_now(w, MsgHeader(), IdentifySocketAckMsg()) notify(w.initialized) @@ -329,7 +329,7 @@ end function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) LPROC.id = msg.self_pid - controller = Worker(1, r_stream, w_stream, cluster_manager; version=version) + controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)::Worker notify(controller.initialized) register_worker(LPROC) topology(msg.topology) @@ -362,7 +362,7 @@ end function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig) try (r_s, w_s) = connect(manager, rpid, wconfig) - w = Worker(rpid, r_s, w_s, manager; config=wconfig) + w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker process_messages(w.r_stream, w.w_stream, false) send_connection_hdr(w, true) send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid()))