Forwardports from Distributed.jl#master#49
Draft
DilumAluthge wants to merge 14 commits intomasterfrom
Draft
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #49 +/- ##
==========================================
+ Coverage 88.00% 88.07% +0.07%
==========================================
Files 11 11
Lines 2117 2063 -54
==========================================
- Hits 1863 1817 -46
+ Misses 254 246 -8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…r` or `release-*` (#144) (cherry picked from commit 9390857d62393e455232bb36d84e70be422e3a7a)
```
┌ send_msg(s::IO, header::Any, msg::Any) @ Distributed /workpath/Distributed.jl/src/messages.jl:105
│ no matching method found `send_msg(::Distributed.LocalProcess, ::Any, ::Any)` (1/2 union split): Distributed.send_msg(Distributed.worker_from_id(id::Int64)::Union{Distributed.LocalProcess, Distributed.Worker}, header::Any, msg::Any)
└────────────────────
```
(cherry picked from commit 7223f4f14dc3749515ae36e76973045e27791606)
(cherry picked from commit 56fa9f214afd5f2c487ac4132aee8e50daf19c0e)
```
┌ @ Distributed /workpath/Distributed.jl/src/process_messages.jl:321
│ no matching method found `send_msg_now(::Distributed.LocalProcess, ::Distributed.MsgHeader, ::Distributed.IdentifySocketAckMsg)` (1/2 union split): Distributed.send_msg_now(w, Distributed.MsgHeader()::Distributed.MsgHeader, Distributed.IdentifySocketAckMsg()::Distributed.IdentifySocketAckMsg)
└────────────────────
```
```
┌ @ Distributed /workpath/Distributed.jl/src/process_messages.jl:359
│ no matching method found `send_msg_now(::Distributed.LocalProcess, ::Distributed.MsgHeader, ::Distributed.JoinCompleteMsg)` (1/2 union split): Distributed.send_msg_now(controller::Union{Distributed.LocalProcess, Distributed.Worker}, Distributed.MsgHeader(Distributed.RRID(0, 0)::Distributed.RRID, (header::Distributed.MsgHeader).notify_oid::Distributed.RRID)::Distributed.MsgHeader, Distributed.JoinCompleteMsg((Distributed.Sys).CPU_THREADS::Int64, Distributed.getpid()::Int32)::Distributed.JoinCompleteMsg)
└────────────────────
```
```
┌ connect_to_peer(manager::Distributed.ClusterManager, rpid::Int64, wconfig::Distributed.WorkerConfig) @ Distributed /workpath/Distributed.jl/src/process_messages.jl:368
│ no matching method found `send_msg_now(::Distributed.LocalProcess, ::Distributed.MsgHeader, ::Distributed.IdentifySocketMsg)` (1/2 union split): Distributed.send_msg_now(w, Distributed.MsgHeader()::Distributed.MsgHeader, Distributed.IdentifySocketMsg(myid()::Int64)::Distributed.IdentifySocketMsg)
└────────────────────
```
```
┌ send_msg_now(s::Base.PipeEndpoint, header::Distributed.MsgHeader, msg::Distributed.ResultMsg) @ Distributed //workl/clones/JuliaLang/Distributed.jl/src/messages.jl:113
│ no matching method found `send_msg_now(::Distributed.LocalProcess, ::Distributed.MsgHeader, ::Distributed.(1/2 union split): Distributed.send_msg_now(Distributed.worker_from_id(id::Int64)::Union{Distributed.LocalProcess, Worker}, header::Distributed.MsgHeader, msg::Distributed.ResultMsg)
└────────────────────
```
(cherry picked from commit 305d6fc4280de14f015c13a080c785bf68c34785)
(cherry picked from commit 871e3d746ab0c7668462f7b88515a362da9c11f1)
…terManager` to a global constant `Ref{ClusterManager}` (#177)
(cherry picked from commit 2fe1aa4e267517565e99cd06664550dcd230cfc6)
…(x, y)` function (#175)
```
┌ create_worker(manager::Any, wconfig::Any) @ Distributed /workpath/Distributed.jl/src/cluster.jl:620
│ no matching method found `kwcall(::NamedTuple{(:config,), <:Tuple{Any}}, ::Type{Distributed.Worker}, ::Any, ::Sockets.TCPSocket, ::Sockets.TCPSocket, ::Base.LibuvStream)` (1/2 union split): Core.kwcall(NamedTuple{(:config,)}(tuple(wconfig::Any)::Tuple{Any})::NamedTuple{(:config,), <:Tuple{Any}}, Distributed.Worker, (getfield(w::Core.Box, :contents)::Any).id::Any, r_s::Sockets.TCPSocket, w_s::Sockets.TCPSocket, manager::Union{Base.LibuvStream, Distributed.ClusterManager})
└────────────────────
```
(cherry picked from commit 79b4ca8d7b498a728912bf9d66d31890ff0f8667)
(cherry picked from commit d65a9960ae358806312c1adf35f45c62f02caf0b)
…` (#174)
```
┌ @ Distributed /workpath/Distributed.jl/src/managers.jl:323
│ no matching method found `shell_escape_wincmd(::Nothing)` (1/2 union split): remotecmd = Distributed.shell_escape_wincmd(Distributed.escape_microsoft_c_args(tuple(exename::Any)::Tuple{Any}, exeflags::Cmd...)::Union{Nothing, String})
└────────────────────
```
(cherry picked from commit 75c559c21fa28cafba9c9c2e6fdc87d2d8b55f57)
(cherry picked from commit 9724553b50599d1ea8a6937c0697d6ca36bde597)
….)` (#167)
```
┌ put_ref(rid::Distributed.RRID, caller::Int64, args::WeakRef) @ Distributed /workpath/Distributed.jl/src/remotecall.jl:709
│ no matching method found `lock(::Nothing)` (1/2 union split): Distributed.lock((rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock})
└────────────────────
```
```
┌ put_ref(rid::Distributed.RRID, caller::Int64, args::WeakRef) @ Distributed /workpath/Distributed.jl/src/remotecall.jl:710
│ no matching method found `unlock(::Nothing)` (1/2 union split): Distributed.unlock((rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock})
││││││││││││││└────────────────────
```
```
┌ take_ref(rid::Any, caller::Any, args::Vararg{Any}) @ Distributed /workpath/Distributed.jl/src/remotecall.jl:734
│ no matching method found `lock(::Nothing)` (1/2 union split): Distributed.lock((rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock})
└────────────────────
```
```
┌ take_ref(rid::Any, caller::Any, args::Vararg{Any}) @ Distributed /workpath/Distributed.jl/src/remotecall.jl:742
│ no matching method found `unlock(::Nothing)` (1/2 union split): Distributed.unlock((rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock})
└────────────────────
```
```
┌ (::Distributed.var"#handle_msg##2#handle_msg##3"{Distributed.CallMsg{…}, Distributed.MsgHeader, Base.PipeEndpoint})() @ Distributed /workpath/Distributed.jl/src/process_messages.jl:292
│ no matching method found `unlock(::Nothing)` (1/2 union split): Distributed.unlock(((v::Distributed.SyncTake).rv::Distributed.RemoteValue).synctake::Union{Nothing, ReentrantLock})
└────────────────────
```
(cherry picked from commit 01027d14d994da11b8962d9a20a2f5a6888dec5f)
(cherry picked from commit 9f6459f83f08e2ef147d102afc1ea5d6d8ab1ec1)
….)` (#171)
```
┌ connect(manager::Distributed.SSHManager, pid::Int64, config::Distributed.WorkerConfig) @ Distributed /workpath/Distributed.jl/src/managers.jl:582
│ no matching method found `read_worker_host_port(::Nothing)` (1/2 union split): Distributed.read_worker_host_port((config::Distributed.WorkerConfig).io::Union{Nothing, IO})
└────────────────────
```
(cherry picked from commit d259b8d3cc24686430a7285abeed574ed46768e7)
(cherry picked from commit 0cf99106072adf5527acd48c668cd95c925e114f)
```
┌ setup_launched_worker(manager::Distributed.SSHManager, wconfig::Distributed.WorkerConfig, launched_q::Vector{Int64}) @ Distributed /workpath/Distributed.jl/src/cluster.jl:563
│ no matching method found `getindex(::Nothing, ::Symbol)` (1/2 union split): cnt = ((wconfig::Distributed.WorkerConfig).environ::Union{Nothing, Dict})[:cpu_threads]
└────────────────────
```
(cherry picked from commit 61ff327e0aa53d25c29780327adab66ab4dcd244)
(cherry picked from commit d06aa735f9511ac2618884e77e7caf29fdaeb1b1)
* Fix a JET error around matching methods for `push!(...)`
```
┌ @ Distributed /workpath/Distributed.jl/src/process_messages.jl:387
│ no matching method found `push!(::Nothing, ::Any)` (1/2 union split): Distributed.push!(default_worker_pool()::Union{Nothing, Distributed.AbstractWorkerPool}, w.id::Any)
└────────────────────
```
(cherry picked from commit f15f306adaafbedc2f431c96087da2ea483c9558)
* Add a `::AbstractWorkerPool` return type annotation to `default_worker_pool()`
* Add a type annotation in an additional place, to fix another JET error
(cherry picked from commit b7c43b235a963d19ec1c5e5aa9b8a565b6e49c34)
…r` at a certain point (#169) * Fix a JET error regarding the existence of the local variable `reducer` at a certain point ``` ┌ var"@distributed"(__source__::LineNumberNode, __module__::Module, args::Vararg{Any}) @ Distributed /workpath/Distributed.jl/src/macros.jl:363 │ local variable `reducer` may be undefined: reducer::Any └──────────────────── ``` By the time we get to this location in the code, we know that the local variable `reducer` exists, by the following reasoning. If the number of arguments is 1, then we have already returned before this point. If the number of arguments is neither 1 nor 2, then we have already thrown an exception by this point. Thus, if we reach this line, the number of arguments must be 2. Since the number of arguments is 2, we know that we executed the `reducer = args[1]` line above, which means that `reducer` is defined. (cherry picked from commit cdb110658e0e3a89de3739b05679863fefb5b91b) * Define `reducer = identity` in the `na==1` branch (cherry picked from commit 1bc91f97a75c1ac321ed28dfca072cbabfad7b4c)
…e()`; this fixes a JET error around matching methods for `run_work_thunk(...)` (#181) This fixes the following JET error: ``` │┌ run_work_thunk(rv::Distributed.RemoteValue, thunk::Bool) @ Distributed /workpath/Distributed.jl/src/process_messages.jl:79 ││ no matching method found `run_work_thunk(::Bool, ::Bool)`: Distributed.run_work_thunk(thunk::Bool, false) │└──────────────────── ``` (cherry picked from commit 2adcd26e81cec03dbf81fa94071b4c4499e539a1)
2aeea50 to
8e3fb14
Compare
This is a forward-port of JuliaLang/Distributed.jl#179 (JuliaLang/Distributed.jl@6649a94). When serializing RemoteChannels, a new zeroed RemoteChannel is constructed. However, the input type parameter might be part of an outer type parameter which can then lead to a TypeError during deserialization. The fix is just to reuse the type parameter of the input when constructing the zeroed RemoteChannel. (cherry picked from commit 6649a94075ff6a52ee9558db3ec4491496bf1bea) (cherry picked from commit 2e52996)
…ess_exited(...)` (#172)
* Fix some JET errors around matching methods for `kill(...)` and `process_exited(...)`
```
│┌ manage(manager::Distributed.LocalManager, id::Int64, config::Distributed.WorkerConfig, op::Symbol) @ Distributed /workpath/Distributed.jl/src/managers.jl:529
││ no matching method found `kill(::Nothing, ::Int64)` (1/2 union split): Distributed.kill((config::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}, 2)
│└────────────────────
```
```
│┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:757
│ no matching method found `kill(::Nothing, ::Int64)` (1/2 union split): Distributed.kill((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}, (profile_sig::Tuple{String, Int64})[2]::Int64)
└────────────────────
```
```
┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:761
│ no matching method found `kill(::Nothing, ::Int64)` (1/2 union split): Distributed.kill((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}, (Distributed.Base).SIGQUIT::Int64)
└────────────────────
```
```
┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:766
│ no matching method found `kill(::Nothing, ::Int64)` (1/2 union split): Distributed.kill((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process}, (Distributed.Base).SIGKILL::Int64)
└────────────────────
```
```
┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:752
│ no matching method found `process_exited(::Nothing)` (1/2 union split): Distributed.process_exited((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process})
└────────────────────
```
```
┌ (::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig})() @ Distributed /workpath/Distributed.jl/src/managers.jl:764
│ no matching method found `process_exited(::Nothing)` (1/2 union split): Distributed.process_exited((getfield(#self#::Distributed.var"#200#201"{Int64, Int64, Int64, Int64, Distributed.WorkerConfig}, :config)::Distributed.WorkerConfig).process::Union{Nothing, Base.Process})
└────────────────────
```
(cherry picked from commit e19a4bfe30fd580cc8e8baa5033e2d7a2c1e1dc9)
* Apply suggestions from code review
Co-authored-by: James Wrigley <JamesWrigley@users.noreply.github.com>
* Define `process = config.process::Process` in one place, and then re-use the `process` variable when needed
---------
Co-authored-by: James Wrigley <JamesWrigley@users.noreply.github.com>
(cherry picked from commit 231da28ef52953b68280a295419b990cb5c7e3f7)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR cherry-picks multiple PRs that were merged into the master branch of https://github.com/JuliaLang/Distributed.jl.
Forwardported PRs:
@everywhereJuliaLang/Distributed.jl#110@spawnat :anydoesn't do load-balancing JuliaLang/Distributed.jl#131masterorrelease-*JuliaLang/Distributed.jl#144send_msg(...)JuliaLang/Distributed.jl#166send_msg_now(...)JuliaLang/Distributed.jl#165Distributed.cluster_managerfrom a global non-constantClusterManagerto a global constantRef{ClusterManager}JuliaLang/Distributed.jl#177create_worker(x, y)function JuliaLang/Distributed.jl#175shell_escape_wincmd(...)JuliaLang/Distributed.jl#174lock(...)andunlock(...)JuliaLang/Distributed.jl#167read_worker_host_port(...)JuliaLang/Distributed.jl#171getindex(...)JuliaLang/Distributed.jl#170push!(...)JuliaLang/Distributed.jl#173reducerat a certain point JuliaLang/Distributed.jl#169run_work_thunk()torun_work_thunk_remotevalue(); this fixes a JET error around matching methods forrun_work_thunk(...)JuliaLang/Distributed.jl#181kill(...)andprocess_exited(...)JuliaLang/Distributed.jl#172Need manual forwardport:
SIGQUITinstead ofSIGTERMJuliaLang/Distributed.jl#103wait(::[Abstract]WorkerPool)JuliaLang/Distributed.jl#106remotecall_evalJuliaLang/Distributed.jl#123release-julia-1.11: Run CI on Julia 1.11.x JuliaLang/Distributed.jl#142Contains multiple commits, manual intervention needed:
Core.Boxin the package JuliaLang/Distributed.jl#161send_connection_hdr(...)andsend_msg_now(...)(underlying problem: theWorker(...)constructors might not always return aWorker) JuliaLang/Distributed.jl#180Non-merged PRs: